CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-beam

Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

python-pipelines.mddocs/

Python Pipeline Execution

Execute Apache Beam pipelines written in Python with comprehensive support for virtual environments, custom package requirements, multiple runners, and cloud integration. The Python pipeline operator provides the most flexible execution environment with automatic dependency management.

Capabilities

BeamRunPythonPipelineOperator

Launch Apache Beam pipelines written in Python with support for custom environments, package requirements, and various execution runners.

class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
    def __init__(
        self,
        *,
        py_file: str,
        runner: str = "DirectRunner",
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        py_interpreter: str = "python3",
        py_options: list[str] | None = None,
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        deferrable: bool = False,
        **kwargs,
    ) -> None:
        """
        Initialize Python pipeline operator.

        Parameters:
        - py_file (str): Path to Python pipeline file, supports local paths and gs:// URLs
        - runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
        - default_pipeline_options (dict): High-level pipeline options applied to all tasks
        - pipeline_options (dict): Task-specific pipeline options that override defaults  
        - py_interpreter (str): Python interpreter version, defaults to "python3"
        - py_options (list[str]): Additional Python command-line options
        - py_requirements (list[str]): Python packages to install in virtual environment
        - py_system_site_packages (bool): Include system packages in virtual environment
        - gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
        - dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
        - deferrable (bool): Enable deferrable execution mode
        """

    def execute(self, context: Context):
        """Execute the Apache Beam Python Pipeline."""

    def execute_on_dataflow(self, context: Context):
        """Execute the Apache Beam Pipeline on Dataflow runner."""

    def on_kill(self) -> None:
        """Cancel the pipeline when task is killed."""

Usage Examples

Basic Local Execution

from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

run_local_pipeline = BeamRunPythonPipelineOperator(
    task_id='run_local_beam_pipeline',
    py_file='/path/to/pipeline.py',
    runner='DirectRunner',
    pipeline_options={
        'output': '/tmp/output',
        'temp_location': '/tmp/beam_temp',
    },
)

Cloud Dataflow Execution

run_dataflow_pipeline = BeamRunPythonPipelineOperator(
    task_id='run_dataflow_pipeline',
    py_file='gs://my-bucket/pipeline.py',
    runner='DataflowRunner', 
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'temp_location': 'gs://my-bucket/temp',
        'staging_location': 'gs://my-bucket/staging',
        'num_workers': 4,
        'max_num_workers': 10,
        'machine_type': 'n1-standard-4',
    },
    dataflow_config={
        'job_name': 'my-dataflow-job',
        'project_id': 'my-gcp-project',
        'location': 'us-central1',
        'wait_until_finished': True,
    },
)

Custom Python Environment

run_pipeline_custom_env = BeamRunPythonPipelineOperator(
    task_id='run_pipeline_custom_env',
    py_file='/path/to/pipeline.py',
    py_requirements=[
        'apache-beam[gcp]==2.50.0',
        'pandas==2.0.0',
        'numpy==1.24.0',
    ],
    py_system_site_packages=True,
    py_interpreter='python3.10',
    py_options=['-u', '-W', 'ignore'],
    runner='DirectRunner',
)

Deferrable Execution

run_deferrable_pipeline = BeamRunPythonPipelineOperator(
    task_id='run_deferrable_pipeline',
    py_file='gs://my-bucket/long_pipeline.py',
    runner='DataflowRunner',
    deferrable=True,  # Enable deferrable mode
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'temp_location': 'gs://my-bucket/temp',
    },
    dataflow_config={
        'job_name': 'long-running-job',
        'wait_until_finished': True,
    },
)

Configuration Options

Pipeline Options

Pipeline options control the behavior and configuration of the Beam pipeline execution:

  • Basic Options: runner, project, region, temp_location, staging_location
  • Scaling Options: num_workers, max_num_workers, machine_type, disk_size_gb
  • Processing Options: streaming, windowing, max_bundle_size
  • Output Options: Various output sinks and formatting options

Dataflow Configuration

When using DataflowRunner, additional configuration options are available:

dataflow_config = {
    'job_name': 'unique-job-name',
    'project_id': 'gcp-project-id', 
    'location': 'us-central1',
    'gcp_conn_id': 'google_cloud_default',
    'wait_until_finished': True,
    'poll_sleep': 10,
    'cancel_timeout': 300,
    'drain_pipeline': False,
    'service_account': 'pipeline-service-account@project.iam.gserviceaccount.com',
    'impersonation_chain': ['service-account@project.iam.gserviceaccount.com'],
}

Error Handling

The operator handles various error conditions:

  • File Not Found: Validates pipeline file existence before execution
  • Environment Setup: Manages virtual environment creation and package installation
  • Pipeline Failures: Captures and reports Beam pipeline execution errors
  • Dataflow Errors: Handles Dataflow-specific errors and job cancellation
  • Resource Cleanup: Ensures temporary files and resources are cleaned up

Templating Support

The operator supports Airflow templating for dynamic configuration:

run_templated_pipeline = BeamRunPythonPipelineOperator(
    task_id='run_templated_pipeline',
    py_file='gs://{{ var.value.bucket }}/pipeline.py',
    runner='{{ var.value.runner }}',
    pipeline_options={
        'project': '{{ var.value.gcp_project }}',
        'input': 'gs://{{ var.value.input_bucket }}/{{ ds }}/*',
        'output': 'gs://{{ var.value.output_bucket }}/{{ ds }}/',
    },
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-beam

docs

go-pipelines.md

hooks-monitoring.md

index.md

java-pipelines.md

python-pipelines.md

triggers.md

tile.json