or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

go-pipelines.mdhooks-monitoring.mdindex.mdjava-pipelines.mdpython-pipelines.mdtriggers.md
tile.json

tessl/pypi-apache-airflow-providers-apache-beam

Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-beam@6.1.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-beam@6.1.0

index.mddocs/

Apache Airflow Providers Apache Beam

An Apache Airflow provider package that enables workflow orchestration and data processing capabilities by offering operators, hooks, and triggers for Apache Beam pipeline execution. Supports running Beam pipelines written in Python, Java, and Go across various runners including DirectRunner, DataflowRunner, SparkRunner, and FlinkRunner.

Package Information

  • Package Name: apache-airflow-providers-apache-beam
  • Package Type: Python Package (PyPI)
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-beam
  • Version: 6.1.3
  • Requires: Apache Airflow >=2.10.0, Apache Beam >=2.60.0

Core Imports

from airflow.providers.apache.beam import __version__

Operators:

from airflow.providers.apache.beam.operators.beam import (
    BeamRunPythonPipelineOperator,
    BeamRunJavaPipelineOperator,
    BeamRunGoPipelineOperator,
    BeamBasePipelineOperator
)

Hooks:

from airflow.providers.apache.beam.hooks.beam import (
    BeamHook, 
    BeamAsyncHook, 
    BeamRunnerType,
    beam_options_to_args,
    run_beam_command
)

Triggers:

from airflow.providers.apache.beam.triggers.beam import (
    BeamPythonPipelineTrigger,
    BeamJavaPipelineTrigger,
    BeamPipelineBaseTrigger
)

Version compatibility:

from airflow.providers.apache.beam.version_compat import (
    AIRFLOW_V_3_1_PLUS,
    BaseHook,
    BaseOperator
)

Basic Usage

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

# Define default arguments
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create DAG
dag = DAG(
    'beam_pipeline_example',
    default_args=default_args,
    description='Run Apache Beam pipeline with Airflow',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

# Define Beam pipeline task
run_beam_pipeline = BeamRunPythonPipelineOperator(
    task_id='run_beam_pipeline',
    py_file='gs://my-bucket/beam_pipeline.py',
    runner='DataflowRunner',
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'temp_location': 'gs://my-bucket/temp',
        'staging_location': 'gs://my-bucket/staging',
    },
    dataflow_config={
        'job_name': 'my-beam-pipeline',
        'project_id': 'my-gcp-project',
        'location': 'us-central1',
    },
    dag=dag,
)

Architecture

The provider follows Airflow's standard architecture pattern with three main component types:

  • Operators: Execute Beam pipelines as Airflow tasks, supporting Python, Java, and Go implementations
  • Hooks: Provide low-level interface to Apache Beam, handling pipeline execution and monitoring
  • Triggers: Enable deferrable execution for long-running pipelines with asynchronous monitoring

The provider integrates with Google Cloud Dataflow when the google provider is available, enabling cloud-scale pipeline execution with monitoring and job management capabilities.

Capabilities

Python Pipeline Execution

Execute Apache Beam pipelines written in Python with support for virtual environments, custom requirements, and various runners including local DirectRunner and cloud DataflowRunner.

class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
    def __init__(
        self,
        *,
        py_file: str,
        runner: str = "DirectRunner",
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        py_interpreter: str = "python3",
        py_options: list[str] | None = None,
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        deferrable: bool = False,
        **kwargs,
    ) -> None: ...

Python Pipelines

Java Pipeline Execution

Execute Apache Beam pipelines written in Java using self-executing JAR files with support for various runners and Dataflow integration.

class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
    def __init__(
        self,
        *,
        jar: str,
        runner: str = "DirectRunner",
        job_class: str | None = None,
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        deferrable: bool = False,
        **kwargs,
    ) -> None: ...

Java Pipelines

Go Pipeline Execution

Execute Apache Beam pipelines written in Go from source files or pre-compiled binaries with support for cross-platform execution.

class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
    def __init__(
        self,
        *,
        go_file: str = "",
        launcher_binary: str = "",
        worker_binary: str = "",
        runner: str = "DirectRunner",
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        **kwargs,
    ) -> None: ...

Go Pipelines

Pipeline Monitoring and Hooks

Low-level interface for executing and monitoring Apache Beam pipelines with both synchronous and asynchronous execution modes.

class BeamHook(BaseHook):
    def __init__(self, runner: str) -> None: ...
    def start_python_pipeline(
        self,
        variables: dict,
        py_file: str,
        py_options: list[str],
        py_interpreter: str = "python3",
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        process_line_callback: Callable[[str], None] | None = None,
        is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
    ) -> None: ...
    def start_java_pipeline(
        self,
        variables: dict,
        jar: str,
        job_class: str | None = None,
        process_line_callback: Callable[[str], None] | None = None,
        is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
    ) -> None: ...
    def start_go_pipeline(
        self,
        variables: dict,
        go_file: str,
        process_line_callback: Callable[[str], None] | None = None,
        should_init_module: bool = False,
    ) -> None: ...

class BeamAsyncHook(BeamHook):
    def __init__(self, runner: str) -> None: ...
    async def start_python_pipeline_async(
        self,
        variables: dict,
        py_file: str,
        py_options: list[str] | None = None,
        py_interpreter: str = "python3",
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        process_line_callback: Callable[[str], None] | None = None,
    ) -> int: ...
    async def start_java_pipeline_async(
        self,
        variables: dict,
        jar: str,
        job_class: str | None = None,
        process_line_callback: Callable[[str], None] | None = None,
    ) -> int: ...

Hooks and Monitoring

Asynchronous Pipeline Triggers

Deferrable execution triggers for long-running pipelines that enable efficient resource utilization by yielding control during pipeline execution.

class BeamPythonPipelineTrigger(BeamPipelineBaseTrigger):
    def __init__(
        self,
        variables: dict,
        py_file: str,
        py_options: list[str] | None = None,
        py_interpreter: str = "python3",
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        runner: str = "DirectRunner",
        gcp_conn_id: str = "google_cloud_default",
    ) -> None: ...
    def serialize(self) -> tuple[str, dict[str, Any]]: ...
    async def run(self) -> AsyncIterator[TriggerEvent]: ...

class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
    def __init__(
        self,
        variables: dict,
        jar: str,
        job_class: str | None = None,
        runner: str = "DirectRunner",
        gcp_conn_id: str = "google_cloud_default",
    ) -> None: ...
    def serialize(self) -> tuple[str, dict[str, Any]]: ...
    async def run(self) -> AsyncIterator[TriggerEvent]: ...

Triggers and Deferrable Execution

Version Compatibility

Cross-version compatibility components that provide stable imports and version detection for different Airflow releases.

AIRFLOW_V_3_1_PLUS: bool
"""Boolean flag indicating Airflow version 3.1+ compatibility."""

# Version-compatible base classes (imported from appropriate Airflow version)
BaseHook: type
"""Version-compatible BaseHook class."""

BaseOperator: type  
"""Version-compatible BaseOperator class."""

Types

class BeamBasePipelineOperator(BaseOperator):
    """
    Abstract base class for all Apache Beam pipeline operators.
    
    Provides common functionality including pipeline option handling,
    Dataflow integration, and error management.
    """
    template_fields = ["runner", "pipeline_options", "default_pipeline_options", "dataflow_config"]
    
    def __init__(
        self,
        *,
        runner: str = "DirectRunner",
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        deferrable: bool = False,
        **kwargs,
    ) -> None: ...
    
    def execute(self, context: Context) -> dict: ...
    def execute_on_dataflow(self, context: Context) -> dict: ...
    def on_kill(self) -> None: ...

class BeamRunnerType:
    """Helper class for listing available runner types."""
    DataflowRunner = "DataflowRunner"
    DirectRunner = "DirectRunner"
    SparkRunner = "SparkRunner"
    FlinkRunner = "FlinkRunner"
    SamzaRunner = "SamzaRunner"
    NemoRunner = "NemoRunner"
    JetRunner = "JetRunner"
    Twister2Runner = "Twister2Runner"

class DataflowConfiguration:
    """
    Configuration object for Dataflow-specific options.
    
    Used to configure Google Cloud Dataflow execution parameters
    including job naming, project settings, and execution behavior.
    """
    job_name: str
    project_id: str
    location: str
    gcp_conn_id: str = "google_cloud_default"
    wait_until_finished: bool = True
    poll_sleep: int = 10
    cancel_timeout: int = 300
    drain_pipeline: bool = False
    service_account: str | None = None
    impersonation_chain: list[str] | None = None
    check_if_running: str = "WaitForRun"
    multiple_jobs: bool = False

class Context:
    """Airflow execution context for task instances."""

def beam_options_to_args(options: dict) -> list[str]:
    """
    Convert pipeline options dictionary to command line arguments.
    
    Args:
        options: Dictionary with pipeline options
        
    Returns:
        List of formatted command line arguments
    """

def run_beam_command(
    cmd: list[str],
    log: logging.Logger,
    process_line_callback: Callable[[str], None] | None = None,
    working_directory: str | None = None,
    is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
    """
    Execute Beam pipeline command in subprocess with monitoring.
    
    Args:
        cmd: Command parts to execute
        log: Logger for output
        process_line_callback: Optional output processor
        working_directory: Execution directory
        is_dataflow_job_id_exist_callback: Optional job ID detector
    """

# Version compatibility support
AIRFLOW_V_3_1_PLUS: bool
"""Boolean flag indicating Airflow version 3.1+ compatibility."""

# Trigger types
class TriggerEvent:
    """Event yielded by triggers to indicate status changes."""
    def __init__(self, payload: dict[str, Any]) -> None: ...

class AsyncIterator[T]:
    """Async iterator type for trigger events."""

class NamedTemporaryFile:
    """Temporary file with a visible name in the file system."""
    name: str

# Type aliases
Any = typing.Any
Callable = typing.Callable