Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Low-level interface for executing and monitoring Apache Beam pipelines with comprehensive support for synchronous and asynchronous execution modes, custom callback handling, and pipeline lifecycle management.
Synchronous hook providing direct interface to Apache Beam pipeline execution with process monitoring and callback support.
class BeamHook(BaseHook):
def __init__(self, runner: str) -> None:
"""
Initialize Beam hook.
Parameters:
- runner (str): Runner type for pipeline execution
"""
def start_python_pipeline(
self,
variables: dict,
py_file: str,
py_options: list[str],
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
):
"""
Start Apache Beam python pipeline.
Parameters:
- variables (dict): Pipeline execution variables and options
- py_file (str): Path to Python pipeline file
- py_options (list[str]): Additional Python command-line options
- py_interpreter (str): Python interpreter version
- py_requirements (list[str]): Python packages for virtual environment
- py_system_site_packages (bool): Include system packages in venv
- process_line_callback (Callable): Optional callback for output processing
- is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection
"""
def start_java_pipeline(
self,
variables: dict,
jar: str,
job_class: str | None = None,
process_line_callback: Callable[[str], None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
"""
Start Apache Beam Java pipeline.
Parameters:
- variables (dict): Pipeline execution variables and options
- jar (str): Path to JAR file containing pipeline
- job_class (str): Java class name for pipeline execution
- process_line_callback (Callable): Optional callback for output processing
- is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection
"""
def start_go_pipeline(
self,
variables: dict,
go_file: str,
process_line_callback: Callable[[str], None] | None = None,
should_init_module: bool = False,
) -> None:
"""
Start Apache Beam Go pipeline with source file.
Parameters:
- variables (dict): Pipeline execution variables and options
- go_file (str): Path to Go source file
- process_line_callback (Callable): Optional callback for output processing
- should_init_module (bool): Initialize Go module and dependencies
"""
def start_go_pipeline_with_binary(
self,
variables: dict,
launcher_binary: str,
worker_binary: str,
process_line_callback: Callable[[str], None] | None = None,
) -> None:
"""
Start Apache Beam Go pipeline with pre-compiled binary.
Parameters:
- variables (dict): Pipeline execution variables and options
- launcher_binary (str): Path to launcher binary
- worker_binary (str): Path to worker binary
- process_line_callback (Callable): Optional callback for output processing
"""Asynchronous hook providing non-blocking interface to Apache Beam pipeline execution with concurrent operation support.
class BeamAsyncHook(BeamHook):
def __init__(self, runner: str) -> None:
"""
Initialize asynchronous Beam hook.
Parameters:
- runner (str): Runner type for pipeline execution
"""
async def start_python_pipeline_async(
self,
variables: dict,
py_file: str,
py_options: list[str] | None = None,
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
):
"""
Start Apache Beam python pipeline asynchronously.
Parameters:
- variables (dict): Pipeline execution variables and options
- py_file (str): Path to Python pipeline file
- py_options (list[str]): Additional Python command-line options
- py_interpreter (str): Python interpreter version
- py_requirements (list[str]): Python packages for virtual environment
- py_system_site_packages (bool): Include system packages in venv
- process_line_callback (Callable): Optional callback for output processing
Returns:
- int: Pipeline execution return code
"""
async def start_java_pipeline_async(
self,
variables: dict,
jar: str,
job_class: str | None = None,
process_line_callback: Callable[[str], None] | None = None,
) -> int:
"""
Start Apache Beam Java pipeline asynchronously.
Parameters:
- variables (dict): Pipeline execution variables and options
- jar (str): Path to JAR file containing pipeline
- job_class (str): Java class name for pipeline execution
- process_line_callback (Callable): Optional callback for output processing
Returns:
- int: Pipeline execution return code
"""
async def start_pipeline_async(
self,
variables: dict,
command_prefix: list[str],
working_directory: str | None = None,
process_line_callback: Callable[[str], None] | None = None,
) -> int:
"""
Start Apache Beam pipeline with custom command asynchronously.
Parameters:
- variables (dict): Pipeline execution variables and options
- command_prefix (list[str]): Command prefix for pipeline execution
- working_directory (str): Directory for command execution
- process_line_callback (Callable): Optional callback for output processing
Returns:
- int: Pipeline execution return code
"""class BeamRunnerType:
"""Helper class for listing available runner types."""
DataflowRunner = "DataflowRunner"
DirectRunner = "DirectRunner"
SparkRunner = "SparkRunner"
FlinkRunner = "FlinkRunner"
SamzaRunner = "SamzaRunner"
NemoRunner = "NemoRunner"
JetRunner = "JetRunner"
Twister2Runner = "Twister2Runner"
def beam_options_to_args(options: dict) -> list[str]:
"""
Convert pipeline options dictionary to command line arguments.
Parameters:
- options (dict): Dictionary with pipeline options
Returns:
- list[str]: List of formatted command line arguments
"""
def run_beam_command(
cmd: list[str],
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
"""
Run pipeline command in subprocess with monitoring.
Parameters:
- cmd (list[str]): Command parts to execute
- log (logging.Logger): Logger for output
- process_line_callback (Callable): Optional output processor
- working_directory (str): Execution directory
- is_dataflow_job_id_exist_callback (Callable): Optional job ID detector
"""from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType
# Initialize hook
beam_hook = BeamHook(runner=BeamRunnerType.DirectRunner)
# Execute Python pipeline
beam_hook.start_python_pipeline(
variables={
'output': '/tmp/beam_output',
'temp_location': '/tmp/beam_temp',
},
py_file='/path/to/pipeline.py',
py_options=['-u'],
py_interpreter='python3',
)def log_processor(line: str) -> None:
"""Custom processor for pipeline output."""
if 'ERROR' in line:
logger.error(f"Pipeline error: {line}")
elif 'INFO' in line:
logger.info(f"Pipeline info: {line}")
def job_id_detector() -> bool:
"""Check if Dataflow job ID has been extracted."""
return hasattr(beam_hook, 'dataflow_job_id') and beam_hook.dataflow_job_id
beam_hook.start_python_pipeline(
variables=pipeline_options,
py_file='gs://bucket/pipeline.py',
process_line_callback=log_processor,
is_dataflow_job_id_exist_callback=job_id_detector,
)import asyncio
from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook
async def run_async_pipeline():
"""Execute pipeline asynchronously."""
async_hook = BeamAsyncHook(runner='DataflowRunner')
return_code = await async_hook.start_python_pipeline_async(
variables={
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
},
py_file='gs://my-bucket/pipeline.py',
py_requirements=['apache-beam[gcp]>=2.60.0'],
)
if return_code == 0:
print("Pipeline completed successfully")
else:
print(f"Pipeline failed with return code: {return_code}")
# Run the async pipeline
asyncio.run(run_async_pipeline())# Custom Python environment with specific packages
beam_hook.start_python_pipeline(
variables=pipeline_options,
py_file='/path/to/pipeline.py',
py_requirements=[
'apache-beam[gcp]==2.60.0',
'pandas==2.1.0',
'numpy==1.24.0',
'google-cloud-bigquery==3.11.0',
],
py_system_site_packages=False, # Isolated environment
py_interpreter='python3.10',
)beam_hook.start_java_pipeline(
variables={
'runner': 'DataflowRunner',
'project': 'my-gcp-project',
'region': 'us-central1',
'tempLocation': 'gs://my-bucket/temp',
},
jar='/path/to/pipeline.jar',
job_class='com.company.DataProcessingPipeline',
)The beam_options_to_args function handles various option types:
from airflow.providers.apache.beam.hooks.beam import beam_options_to_args
options = {
'runner': 'DataflowRunner',
'project': 'my-project',
'streaming': True, # Boolean flag
'labels': {'env': 'prod', 'team': 'data'}, # Dictionary
'experiments': ['enable_google_cloud_profiler', 'enable_streaming_engine'], # List
'numWorkers': 4, # Numeric value
'skipValidation': False, # False boolean (skipped)
'tempLocation': None, # None value (skipped)
}
args = beam_options_to_args(options)
# Results in: ['--runner=DataflowRunner', '--project=my-project', '--streaming',
# '--labels={"env":"prod","team":"data"}',
# '--experiments=enable_google_cloud_profiler', '--experiments=enable_streaming_engine',
# '--numWorkers=4']def comprehensive_monitor(line: str) -> None:
"""Comprehensive pipeline output monitoring."""
import re
# Extract job ID from Dataflow output
job_id_match = re.search(r'Submitted job: ([a-zA-Z0-9\-]+)', line)
if job_id_match:
job_id = job_id_match.group(1)
print(f"Extracted Dataflow job ID: {job_id}")
# Monitor for errors
if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed']):
logger.error(f"Pipeline error detected: {line}")
# Track progress indicators
if 'Processing bundle' in line:
logger.info(f"Pipeline progress: {line}")
beam_hook.start_python_pipeline(
variables=pipeline_options,
py_file='pipeline.py',
process_line_callback=comprehensive_monitor,
)from airflow.exceptions import AirflowException
try:
beam_hook.start_python_pipeline(
variables=pipeline_options,
py_file='pipeline.py',
)
except AirflowException as e:
if "Apache Beam process failed" in str(e):
logger.error(f"Beam pipeline execution failed: {e}")
# Handle pipeline failure
else:
logger.error(f"Hook error: {e}")
# Handle other errorsInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-beam