CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-beam

Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

hooks-monitoring.mddocs/

Hooks and Monitoring

Low-level interface for executing and monitoring Apache Beam pipelines with comprehensive support for synchronous and asynchronous execution modes, custom callback handling, and pipeline lifecycle management.

Capabilities

BeamHook

Synchronous hook providing direct interface to Apache Beam pipeline execution with process monitoring and callback support.

class BeamHook(BaseHook):
    def __init__(self, runner: str) -> None:
        """
        Initialize Beam hook.
        
        Parameters:
        - runner (str): Runner type for pipeline execution
        """

    def start_python_pipeline(
        self,
        variables: dict,
        py_file: str,
        py_options: list[str],
        py_interpreter: str = "python3",
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        process_line_callback: Callable[[str], None] | None = None,
        is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
    ):
        """
        Start Apache Beam python pipeline.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - py_file (str): Path to Python pipeline file
        - py_options (list[str]): Additional Python command-line options
        - py_interpreter (str): Python interpreter version
        - py_requirements (list[str]): Python packages for virtual environment
        - py_system_site_packages (bool): Include system packages in venv
        - process_line_callback (Callable): Optional callback for output processing
        - is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection
        """

    def start_java_pipeline(
        self,
        variables: dict,
        jar: str,
        job_class: str | None = None,
        process_line_callback: Callable[[str], None] | None = None,
        is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
    ) -> None:
        """
        Start Apache Beam Java pipeline.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - jar (str): Path to JAR file containing pipeline
        - job_class (str): Java class name for pipeline execution
        - process_line_callback (Callable): Optional callback for output processing  
        - is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection
        """

    def start_go_pipeline(
        self,
        variables: dict,
        go_file: str,
        process_line_callback: Callable[[str], None] | None = None,
        should_init_module: bool = False,
    ) -> None:
        """
        Start Apache Beam Go pipeline with source file.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - go_file (str): Path to Go source file
        - process_line_callback (Callable): Optional callback for output processing
        - should_init_module (bool): Initialize Go module and dependencies
        """

    def start_go_pipeline_with_binary(
        self,
        variables: dict,
        launcher_binary: str,
        worker_binary: str,
        process_line_callback: Callable[[str], None] | None = None,
    ) -> None:
        """
        Start Apache Beam Go pipeline with pre-compiled binary.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - launcher_binary (str): Path to launcher binary
        - worker_binary (str): Path to worker binary
        - process_line_callback (Callable): Optional callback for output processing
        """

BeamAsyncHook

Asynchronous hook providing non-blocking interface to Apache Beam pipeline execution with concurrent operation support.

class BeamAsyncHook(BeamHook):
    def __init__(self, runner: str) -> None:
        """
        Initialize asynchronous Beam hook.
        
        Parameters:
        - runner (str): Runner type for pipeline execution
        """

    async def start_python_pipeline_async(
        self,
        variables: dict,
        py_file: str,
        py_options: list[str] | None = None,
        py_interpreter: str = "python3",
        py_requirements: list[str] | None = None,
        py_system_site_packages: bool = False,
        process_line_callback: Callable[[str], None] | None = None,
    ):
        """
        Start Apache Beam python pipeline asynchronously.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - py_file (str): Path to Python pipeline file
        - py_options (list[str]): Additional Python command-line options
        - py_interpreter (str): Python interpreter version
        - py_requirements (list[str]): Python packages for virtual environment
        - py_system_site_packages (bool): Include system packages in venv
        - process_line_callback (Callable): Optional callback for output processing
        
        Returns:
        - int: Pipeline execution return code
        """

    async def start_java_pipeline_async(
        self,
        variables: dict,
        jar: str,
        job_class: str | None = None,
        process_line_callback: Callable[[str], None] | None = None,
    ) -> int:
        """
        Start Apache Beam Java pipeline asynchronously.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - jar (str): Path to JAR file containing pipeline
        - job_class (str): Java class name for pipeline execution
        - process_line_callback (Callable): Optional callback for output processing
        
        Returns:
        - int: Pipeline execution return code
        """

    async def start_pipeline_async(
        self,
        variables: dict,
        command_prefix: list[str],
        working_directory: str | None = None,
        process_line_callback: Callable[[str], None] | None = None,
    ) -> int:
        """
        Start Apache Beam pipeline with custom command asynchronously.
        
        Parameters:
        - variables (dict): Pipeline execution variables and options
        - command_prefix (list[str]): Command prefix for pipeline execution
        - working_directory (str): Directory for command execution
        - process_line_callback (Callable): Optional callback for output processing
        
        Returns:
        - int: Pipeline execution return code
        """

Runner Types and Utilities

class BeamRunnerType:
    """Helper class for listing available runner types."""
    DataflowRunner = "DataflowRunner"
    DirectRunner = "DirectRunner" 
    SparkRunner = "SparkRunner"
    FlinkRunner = "FlinkRunner"
    SamzaRunner = "SamzaRunner"
    NemoRunner = "NemoRunner"
    JetRunner = "JetRunner"
    Twister2Runner = "Twister2Runner"

def beam_options_to_args(options: dict) -> list[str]:
    """
    Convert pipeline options dictionary to command line arguments.
    
    Parameters:
    - options (dict): Dictionary with pipeline options
    
    Returns:
    - list[str]: List of formatted command line arguments
    """

def run_beam_command(
    cmd: list[str],
    log: logging.Logger,
    process_line_callback: Callable[[str], None] | None = None,
    working_directory: str | None = None,
    is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
    """
    Run pipeline command in subprocess with monitoring.
    
    Parameters:
    - cmd (list[str]): Command parts to execute
    - log (logging.Logger): Logger for output
    - process_line_callback (Callable): Optional output processor
    - working_directory (str): Execution directory
    - is_dataflow_job_id_exist_callback (Callable): Optional job ID detector
    """

Usage Examples

Basic Hook Usage

from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType

# Initialize hook
beam_hook = BeamHook(runner=BeamRunnerType.DirectRunner)

# Execute Python pipeline
beam_hook.start_python_pipeline(
    variables={
        'output': '/tmp/beam_output',
        'temp_location': '/tmp/beam_temp',
    },
    py_file='/path/to/pipeline.py',
    py_options=['-u'],
    py_interpreter='python3',
)

Custom Process Monitoring

def log_processor(line: str) -> None:
    """Custom processor for pipeline output."""
    if 'ERROR' in line:
        logger.error(f"Pipeline error: {line}")
    elif 'INFO' in line:
        logger.info(f"Pipeline info: {line}")

def job_id_detector() -> bool:
    """Check if Dataflow job ID has been extracted."""
    return hasattr(beam_hook, 'dataflow_job_id') and beam_hook.dataflow_job_id

beam_hook.start_python_pipeline(
    variables=pipeline_options,
    py_file='gs://bucket/pipeline.py',
    process_line_callback=log_processor,
    is_dataflow_job_id_exist_callback=job_id_detector,
)

Asynchronous Pipeline Execution

import asyncio
from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook

async def run_async_pipeline():
    """Execute pipeline asynchronously."""
    async_hook = BeamAsyncHook(runner='DataflowRunner')
    
    return_code = await async_hook.start_python_pipeline_async(
        variables={
            'project': 'my-gcp-project',
            'region': 'us-central1',
            'temp_location': 'gs://my-bucket/temp',
        },
        py_file='gs://my-bucket/pipeline.py',
        py_requirements=['apache-beam[gcp]>=2.60.0'],
    )
    
    if return_code == 0:
        print("Pipeline completed successfully")
    else:
        print(f"Pipeline failed with return code: {return_code}")

# Run the async pipeline
asyncio.run(run_async_pipeline())

Virtual Environment Management

# Custom Python environment with specific packages
beam_hook.start_python_pipeline(
    variables=pipeline_options,
    py_file='/path/to/pipeline.py',
    py_requirements=[
        'apache-beam[gcp]==2.60.0',
        'pandas==2.1.0',
        'numpy==1.24.0',
        'google-cloud-bigquery==3.11.0',
    ],
    py_system_site_packages=False,  # Isolated environment
    py_interpreter='python3.10',
)

Java Pipeline with Custom Classpath

beam_hook.start_java_pipeline(
    variables={
        'runner': 'DataflowRunner',
        'project': 'my-gcp-project',
        'region': 'us-central1',
        'tempLocation': 'gs://my-bucket/temp',
    },
    jar='/path/to/pipeline.jar',
    job_class='com.company.DataProcessingPipeline',
)

Pipeline Options Processing

Option Format Handling

The beam_options_to_args function handles various option types:

from airflow.providers.apache.beam.hooks.beam import beam_options_to_args

options = {
    'runner': 'DataflowRunner',
    'project': 'my-project',
    'streaming': True,  # Boolean flag
    'labels': {'env': 'prod', 'team': 'data'},  # Dictionary
    'experiments': ['enable_google_cloud_profiler', 'enable_streaming_engine'],  # List
    'numWorkers': 4,  # Numeric value
    'skipValidation': False,  # False boolean (skipped)
    'tempLocation': None,  # None value (skipped)
}

args = beam_options_to_args(options)
# Results in: ['--runner=DataflowRunner', '--project=my-project', '--streaming', 
#              '--labels={"env":"prod","team":"data"}', 
#              '--experiments=enable_google_cloud_profiler', '--experiments=enable_streaming_engine',
#              '--numWorkers=4']

Error Handling and Monitoring

Process Monitoring

def comprehensive_monitor(line: str) -> None:
    """Comprehensive pipeline output monitoring."""
    import re
    
    # Extract job ID from Dataflow output
    job_id_match = re.search(r'Submitted job: ([a-zA-Z0-9\-]+)', line)
    if job_id_match:
        job_id = job_id_match.group(1)
        print(f"Extracted Dataflow job ID: {job_id}")
    
    # Monitor for errors
    if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed']):
        logger.error(f"Pipeline error detected: {line}")
    
    # Track progress indicators
    if 'Processing bundle' in line:
        logger.info(f"Pipeline progress: {line}")

beam_hook.start_python_pipeline(
    variables=pipeline_options,
    py_file='pipeline.py',
    process_line_callback=comprehensive_monitor,
)

Exception Handling

from airflow.exceptions import AirflowException

try:
    beam_hook.start_python_pipeline(
        variables=pipeline_options,
        py_file='pipeline.py',
    )
except AirflowException as e:
    if "Apache Beam process failed" in str(e):
        logger.error(f"Beam pipeline execution failed: {e}")
        # Handle pipeline failure
    else:
        logger.error(f"Hook error: {e}")
        # Handle other errors

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-beam

docs

go-pipelines.md

hooks-monitoring.md

index.md

java-pipelines.md

python-pipelines.md

triggers.md

tile.json