CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-beam

Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

triggers.mddocs/

Triggers and Deferrable Execution

Asynchronous pipeline triggers that enable efficient resource utilization by yielding control during long-running pipeline execution. Triggers provide deferrable execution capabilities for Apache Beam pipelines, allowing Airflow workers to handle other tasks while pipelines run.

Capabilities

BeamPythonPipelineTrigger

Deferrable trigger for monitoring Python pipeline execution until completion with asynchronous status checking and resource-efficient waiting.

class BeamPythonPipelineTrigger(BeamPipelineBaseTrigger):
    def __init__(
        self,
        variables: dict,
        py_file: str,
        py_options: list[str] | None = None,
        py_interpreter: str = "python3",
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        runner: str = "DirectRunner",
        gcp_conn_id: str = "google_cloud_default",
    ):
        """
        Initialize Python pipeline trigger.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - py_file (str): Path to Python pipeline file, supports gs:// URLs
        - py_options (list[str]): Additional Python command-line options
        - py_interpreter (str): Python interpreter version
        - py_requirements (list[str]): Python packages for virtual environment
        - py_system_site_packages (bool): Include system packages in venv
        - runner (str): Runner type for pipeline execution
        - gcp_conn_id (str): Google Cloud connection ID for GCS access
        """

    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serialize trigger arguments and classpath for persistence."""

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Execute pipeline and yield completion events."""

BeamJavaPipelineTrigger

Deferrable trigger for monitoring Java pipeline execution with JAR file support and asynchronous job lifecycle management.

class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
    def __init__(
        self,
        variables: dict,
        jar: str,
        job_class: str | None = None,
        runner: str = "DirectRunner",
        gcp_conn_id: str = "google_cloud_default",
    ):
        """
        Initialize Java pipeline trigger.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - jar (str): Path to JAR file, supports gs:// URLs
        - job_class (str): Java class name for pipeline execution
        - runner (str): Runner type for pipeline execution
        - gcp_conn_id (str): Google Cloud connection ID for GCS access
        """

    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serialize trigger arguments and classpath for persistence."""

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Execute pipeline and yield completion events."""

BeamPipelineBaseTrigger

Base trigger class providing common functionality for all Beam pipeline triggers including GCS file handling and hook management.

class BeamPipelineBaseTrigger(BaseTrigger):
    """Base class for Beam Pipeline Triggers."""

    def __init__(
        self,
        variables: dict,
        runner: str = "DirectRunner",
        gcp_conn_id: str = "google_cloud_default",
    ) -> None:
        """
        Initialize base trigger.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - runner (str): Runner type for pipeline execution
        - gcp_conn_id (str): Google Cloud connection ID for GCS access
        """

    @staticmethod
    def _get_async_hook(*args, **kwargs) -> BeamAsyncHook:
        """Get asynchronous Beam hook instance."""

    @staticmethod
    def file_has_gcs_path(file_path: str) -> bool:
        """Check if file path is a GCS URL."""

    @staticmethod
    async def provide_gcs_tempfile(gcs_file: str, gcp_conn_id: str) -> NamedTemporaryFile:
        """Download GCS file to temporary local file."""

    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serialize trigger arguments and classpath for persistence."""

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Execute pipeline and yield completion events."""

Usage Examples

Basic Deferrable Python Pipeline

from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

# Operator automatically uses trigger when deferrable=True
deferrable_python_task = BeamRunPythonPipelineOperator(
    task_id='deferrable_python_beam',
    py_file='gs://my-bucket/long_pipeline.py',
    runner='DirectRunner',
    deferrable=True,  # Enables deferrable execution
    pipeline_options={
        'output': '/tmp/output',
        'temp_location': '/tmp/beam_temp',
    },
)

Manual Trigger Usage

from airflow.providers.apache.beam.triggers.beam import BeamPythonPipelineTrigger
from airflow.operators.dummy import DummyOperator

class CustomBeamOperator(DummyOperator):
    def execute(self, context):
        # Defer to trigger for long-running pipeline
        self.defer(
            trigger=BeamPythonPipelineTrigger(
                variables={
                    'output': '/tmp/output',
                    'temp_location': '/tmp/beam_temp',
                },
                py_file='/path/to/pipeline.py',
                py_requirements=['apache-beam>=2.60.0'],
                runner='DirectRunner',
            ),
            method_name='execute_complete',
        )
    
    def execute_complete(self, context, event):
        """Handle trigger completion."""
        if event['status'] == 'success':
            self.log.info("Pipeline completed successfully")
            return event
        else:
            raise AirflowException(f"Pipeline failed: {event['message']}")

Java Pipeline with Deferrable Execution

from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator

deferrable_java_task = BeamRunJavaPipelineOperator(
    task_id='deferrable_java_beam',
    jar='gs://my-bucket/pipeline.jar',
    job_class='com.company.LongRunningPipeline',
    runner='DataflowRunner',
    deferrable=True,
    pipeline_options={
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'tempLocation': 'gs://my-bucket/temp',
        'streaming': 'true',  # Long-running streaming job
    },
    dataflow_config={
        'job_name': 'streaming-java-pipeline',
        'wait_until_finished': True,
    },
)

GCS File Handling

# Trigger automatically handles GCS file downloads
gcs_trigger = BeamPythonPipelineTrigger(
    variables={
        'project': 'my-gcp-project',
        'temp_location': 'gs://my-bucket/temp',
    },
    py_file='gs://my-bucket/pipelines/data_processor.py',  # Automatically downloaded
    py_requirements=['apache-beam[gcp]>=2.60.0'],
    runner='DataflowRunner',
    gcp_conn_id='my_gcp_connection',
)

Trigger Events and Status

Event Types

Triggers yield TriggerEvent objects with status information:

# Success event
{
    'status': 'success',
    'message': 'Pipeline has finished SUCCESSFULLY'
}

# Error event  
{
    'status': 'error',
    'message': 'Operation failed'
}

# Error with exception details
{
    'status': 'error', 
    'message': 'Apache Beam process failed with return code 1'
}

Event Handling in Operators

def execute_complete(self, context, event):
    """Handle trigger completion events."""
    if event['status'] == 'error':
        raise AirflowException(event['message'])
    
    self.log.info(f"Pipeline completed: {event['message']}")
    return {'status': event['status'], 'message': event['message']}

Resource Management

Virtual Environment Handling

Python triggers automatically manage virtual environments:

python_trigger = BeamPythonPipelineTrigger(
    variables=pipeline_options,
    py_file='pipeline.py',
    py_requirements=[
        'apache-beam[gcp]==2.60.0',
        'pandas==2.1.0',
        'google-cloud-bigquery==3.11.0',
    ],
    py_system_site_packages=False,  # Isolated environment
    py_interpreter='python3.10',
)

Temporary File Cleanup

Triggers handle cleanup of temporary files:

  • GCS downloads are automatically cleaned up
  • Virtual environments are removed after execution
  • Temporary directories are properly disposed

Serialization and Persistence

Trigger Serialization

Triggers must be serializable for persistence across worker restarts:

# Trigger serialization format
def serialize(self) -> tuple[str, dict[str, Any]]:
    return (
        "airflow.providers.apache.beam.triggers.beam.BeamPythonPipelineTrigger",
        {
            "variables": self.variables,
            "py_file": self.py_file,
            "py_options": self.py_options,
            "py_interpreter": self.py_interpreter,
            "py_requirements": self.py_requirements,
            "py_system_site_packages": self.py_system_site_packages,
            "runner": self.runner,
            "gcp_conn_id": self.gcp_conn_id,
        },
    )

Error Handling and Recovery

Exception Management

Triggers handle various error conditions:

async def run(self) -> AsyncIterator[TriggerEvent]:
    try:
        # Handle GCS file download
        if self.file_has_gcs_path(self.py_file):
            tmp_file = await self.provide_gcs_tempfile(self.py_file, self.gcp_conn_id)
            self.py_file = tmp_file.name

        # Execute pipeline
        return_code = await hook.start_python_pipeline_async(...)
        
        if return_code == 0:
            yield TriggerEvent({
                "status": "success",
                "message": "Pipeline has finished SUCCESSFULLY",
            })
        else:
            yield TriggerEvent({
                "status": "error", 
                "message": "Operation failed"
            })
            
    except Exception as e:
        self.log.exception("Exception occurred while checking pipeline state")
        yield TriggerEvent({
            "status": "error",
            "message": str(e)
        })

Best Practices

When to Use Deferrable Execution

Use deferrable execution for:

  • Long-running batch pipelines (>5 minutes)
  • Streaming pipelines with indefinite runtime
  • Cloud-based pipelines with variable execution time
  • High-concurrency environments with limited workers

Resource Optimization

# Optimize for resource usage
BeamRunPythonPipelineOperator(
    task_id='optimized_pipeline',
    py_file='gs://bucket/pipeline.py',
    deferrable=True,  # Yield worker during execution
    runner='DataflowRunner',  # Offload to cloud
    dataflow_config={
        'wait_until_finished': True,  # Wait for completion
        'poll_sleep': 30,  # Reduce polling frequency
    },
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-beam

docs

go-pipelines.md

hooks-monitoring.md

index.md

java-pipelines.md

python-pipelines.md

triggers.md

tile.json