CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-beam

Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

go-pipelines.mddocs/

Go Pipeline Execution

Execute Apache Beam pipelines written in Go from source files or pre-compiled binaries with support for cross-platform execution, module management, and cloud integration. The Go pipeline operator provides flexible execution options for Go-based data processing workflows.

Capabilities

BeamRunGoPipelineOperator

Launch Apache Beam pipelines written in Go using either source files with automatic compilation or pre-compiled binaries with cross-platform support.

class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
    def __init__(
        self,
        *,
        go_file: str = "",
        launcher_binary: str = "",
        worker_binary: str = "",
        runner: str = "DirectRunner",
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize Go pipeline operator.

        Parameters:
        - go_file (str): Path to Go source file, exactly one of go_file or launcher_binary required
        - launcher_binary (str): Path to pre-compiled Go binary for launcher platform
        - worker_binary (str): Path to Go binary for worker platform, defaults to launcher_binary
        - runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
        - default_pipeline_options (dict): High-level pipeline options applied to all tasks
        - pipeline_options (dict): Task-specific pipeline options that override defaults
        - gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
        - dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
        """

    def execute(self, context: Context):
        """Execute the Apache Beam Go Pipeline."""

    def on_kill(self) -> None:
        """Cancel the pipeline when task is killed."""

Usage Examples

Source File Execution (Local)

from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator

run_go_source_pipeline = BeamRunGoPipelineOperator(
    task_id='run_go_source_pipeline',
    go_file='/path/to/pipeline.go',
    runner='DirectRunner',
    pipeline_options={
        'output': '/tmp/output',
        'temp_location': '/tmp/beam_temp',
    },
)

Source File with GCS (Dataflow)

run_gcs_go_pipeline = BeamRunGoPipelineOperator(
    task_id='run_gcs_go_pipeline',
    go_file='gs://my-bucket/pipeline.go',
    runner='DataflowRunner',
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'temp_location': 'gs://my-bucket/temp',
        'staging_location': 'gs://my-bucket/staging',
        'job_name': 'go-pipeline-job',
    },
    dataflow_config={
        'project_id': 'my-gcp-project',
        'location': 'us-central1',
        'wait_until_finished': True,
    },
)

Pre-compiled Binary Execution

run_go_binary_pipeline = BeamRunGoPipelineOperator(
    task_id='run_go_binary_pipeline',
    launcher_binary='/path/to/pipeline-launcher',
    worker_binary='/path/to/pipeline-worker',  # Optional, different binary for workers
    runner='DataflowRunner',
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'temp_location': 'gs://my-bucket/temp',
    },
)

Cross-Platform Binary Execution

run_cross_platform_pipeline = BeamRunGoPipelineOperator(
    task_id='run_cross_platform_pipeline',
    launcher_binary='gs://my-bucket/binaries/pipeline-linux-amd64',
    worker_binary='gs://my-bucket/binaries/pipeline-linux-arm64',
    runner='DataflowRunner',
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'machine_type': 't2a-standard-4',  # ARM-based machine type
        'temp_location': 'gs://my-bucket/temp',
    },
)

Same Binary for Launcher and Worker

run_unified_binary_pipeline = BeamRunGoPipelineOperator(
    task_id='run_unified_binary_pipeline', 
    launcher_binary='gs://my-bucket/pipeline-universal',
    # worker_binary will default to launcher_binary value
    runner='DataflowRunner',
    pipeline_options={
        'project': 'my-gcp-project',
        'temp_location': 'gs://my-bucket/temp',
    },
)

Go Module Management

When using Go source files, the operator automatically handles module initialization and dependency management:

Automatic Module Initialization

For source files downloaded from GCS, the operator will:

  1. Initialize a Go module with go mod init
  2. Install dependencies with go mod tidy
  3. Compile and execute the pipeline

Local Development

For local Go files, ensure your Go module is properly set up:

# In your pipeline directory
go mod init my-pipeline
go mod tidy

Configuration Options

Go-Specific Considerations

Go pipelines have unique configuration characteristics:

  • No Impersonation Support: Service account impersonation is not supported and will be ignored
  • Binary Execution: Pre-compiled binaries provide faster startup and deployment
  • Cross-Compilation: Support for different architectures (amd64, arm64)
  • Module Dependencies: Automatic dependency resolution for source files

Pipeline Options Format

Go pipelines use snake_case option formatting:

pipeline_options = {
    'project': 'my-gcp-project',
    'region': 'us-central1',
    'temp_location': 'gs://my-bucket/temp',
    'num_workers': '4',
    'max_num_workers': '10',
    'machine_type': 'n1-standard-4',
    'job_name': 'go-pipeline-job',
}

Binary Compilation

Building Cross-Platform Binaries

Example commands for building Go binaries for different platforms:

# Linux AMD64 (common for most cloud environments)
GOOS=linux GOARCH=amd64 go build -o pipeline-linux-amd64 pipeline.go

# Linux ARM64 (for ARM-based cloud instances)
GOOS=linux GOARCH=arm64 go build -o pipeline-linux-arm64 pipeline.go

# Local development (current platform)
go build -o pipeline-local pipeline.go

Binary Requirements

Pre-compiled binaries must:

  • Include all dependencies (static linking recommended)
  • Be executable on target platform
  • Include Apache Beam Go SDK
  • Handle command-line arguments properly

Error Handling

The operator handles Go-specific error conditions:

  • Go Installation: Validates Go runtime availability
  • Module Resolution: Handles Go module initialization and dependency errors
  • Binary Access: Downloads and validates binary files from GCS
  • Cross-Platform Issues: Manages architecture and platform compatibility
  • Compilation Errors: Reports Go compilation issues for source files

Dataflow Integration

Go pipelines integrate with Dataflow but have specific limitations:

dataflow_config = {
    'job_name': 'go-pipeline-job',
    'project_id': 'gcp-project-id',
    'location': 'us-central1',
    'wait_until_finished': True,
    # Note: impersonation_chain is not supported for Go pipelines
}

Performance Considerations

Source vs Binary Execution

  • Source Files: Require compilation time, automatic dependency management
  • Binary Files: Faster startup, explicit dependency management, cross-platform support

Binary Optimization

For production deployments, consider:

  • Static linking to reduce dependencies
  • Binary size optimization with build flags
  • Platform-specific optimizations

Templating Support

The operator supports Airflow templating for dynamic Go pipeline configuration:

run_templated_go_pipeline = BeamRunGoPipelineOperator(
    task_id='run_templated_go_pipeline',
    go_file='gs://{{ var.value.source_bucket }}/{{ var.value.pipeline_name }}.go',
    runner='{{ var.value.runner }}',
    pipeline_options={
        'project': '{{ var.value.gcp_project }}',
        'input': 'gs://{{ var.value.input_bucket }}/{{ ds }}/*',
        'output': 'gs://{{ var.value.output_bucket }}/{{ ds }}/',
        'job_name': 'go-pipeline-{{ ds_nodash }}',
    },
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-beam

docs

go-pipelines.md

hooks-monitoring.md

index.md

java-pipelines.md

python-pipelines.md

triggers.md

tile.json