Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute Apache Beam pipelines written in Go from source files or pre-compiled binaries with support for cross-platform execution, module management, and cloud integration. The Go pipeline operator provides flexible execution options for Go-based data processing workflows.
Launch Apache Beam pipelines written in Go using either source files with automatic compilation or pre-compiled binaries with cross-platform support.
class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
def __init__(
self,
*,
go_file: str = "",
launcher_binary: str = "",
worker_binary: str = "",
runner: str = "DirectRunner",
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
**kwargs,
) -> None:
"""
Initialize Go pipeline operator.
Parameters:
- go_file (str): Path to Go source file, exactly one of go_file or launcher_binary required
- launcher_binary (str): Path to pre-compiled Go binary for launcher platform
- worker_binary (str): Path to Go binary for worker platform, defaults to launcher_binary
- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
- default_pipeline_options (dict): High-level pipeline options applied to all tasks
- pipeline_options (dict): Task-specific pipeline options that override defaults
- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
"""
def execute(self, context: Context):
"""Execute the Apache Beam Go Pipeline."""
def on_kill(self) -> None:
"""Cancel the pipeline when task is killed."""from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator
run_go_source_pipeline = BeamRunGoPipelineOperator(
task_id='run_go_source_pipeline',
go_file='/path/to/pipeline.go',
runner='DirectRunner',
pipeline_options={
'output': '/tmp/output',
'temp_location': '/tmp/beam_temp',
},
)run_gcs_go_pipeline = BeamRunGoPipelineOperator(
task_id='run_gcs_go_pipeline',
go_file='gs://my-bucket/pipeline.go',
runner='DataflowRunner',
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
'staging_location': 'gs://my-bucket/staging',
'job_name': 'go-pipeline-job',
},
dataflow_config={
'project_id': 'my-gcp-project',
'location': 'us-central1',
'wait_until_finished': True,
},
)run_go_binary_pipeline = BeamRunGoPipelineOperator(
task_id='run_go_binary_pipeline',
launcher_binary='/path/to/pipeline-launcher',
worker_binary='/path/to/pipeline-worker', # Optional, different binary for workers
runner='DataflowRunner',
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
},
)run_cross_platform_pipeline = BeamRunGoPipelineOperator(
task_id='run_cross_platform_pipeline',
launcher_binary='gs://my-bucket/binaries/pipeline-linux-amd64',
worker_binary='gs://my-bucket/binaries/pipeline-linux-arm64',
runner='DataflowRunner',
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'machine_type': 't2a-standard-4', # ARM-based machine type
'temp_location': 'gs://my-bucket/temp',
},
)run_unified_binary_pipeline = BeamRunGoPipelineOperator(
task_id='run_unified_binary_pipeline',
launcher_binary='gs://my-bucket/pipeline-universal',
# worker_binary will default to launcher_binary value
runner='DataflowRunner',
pipeline_options={
'project': 'my-gcp-project',
'temp_location': 'gs://my-bucket/temp',
},
)When using Go source files, the operator automatically handles module initialization and dependency management:
For source files downloaded from GCS, the operator will:
go mod initgo mod tidyFor local Go files, ensure your Go module is properly set up:
# In your pipeline directory
go mod init my-pipeline
go mod tidyGo pipelines have unique configuration characteristics:
Go pipelines use snake_case option formatting:
pipeline_options = {
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
'num_workers': '4',
'max_num_workers': '10',
'machine_type': 'n1-standard-4',
'job_name': 'go-pipeline-job',
}Example commands for building Go binaries for different platforms:
# Linux AMD64 (common for most cloud environments)
GOOS=linux GOARCH=amd64 go build -o pipeline-linux-amd64 pipeline.go
# Linux ARM64 (for ARM-based cloud instances)
GOOS=linux GOARCH=arm64 go build -o pipeline-linux-arm64 pipeline.go
# Local development (current platform)
go build -o pipeline-local pipeline.goPre-compiled binaries must:
The operator handles Go-specific error conditions:
Go pipelines integrate with Dataflow but have specific limitations:
dataflow_config = {
'job_name': 'go-pipeline-job',
'project_id': 'gcp-project-id',
'location': 'us-central1',
'wait_until_finished': True,
# Note: impersonation_chain is not supported for Go pipelines
}For production deployments, consider:
The operator supports Airflow templating for dynamic Go pipeline configuration:
run_templated_go_pipeline = BeamRunGoPipelineOperator(
task_id='run_templated_go_pipeline',
go_file='gs://{{ var.value.source_bucket }}/{{ var.value.pipeline_name }}.go',
runner='{{ var.value.runner }}',
pipeline_options={
'project': '{{ var.value.gcp_project }}',
'input': 'gs://{{ var.value.input_bucket }}/{{ ds }}/*',
'output': 'gs://{{ var.value.output_bucket }}/{{ ds }}/',
'job_name': 'go-pipeline-{{ ds_nodash }}',
},
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-beam