Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asynchronous pipeline triggers that enable efficient resource utilization by yielding control during long-running pipeline execution. Triggers provide deferrable execution capabilities for Apache Beam pipelines, allowing Airflow workers to handle other tasks while pipelines run.
Deferrable trigger for monitoring Python pipeline execution until completion with asynchronous status checking and resource-efficient waiting.
class BeamPythonPipelineTrigger(BeamPipelineBaseTrigger):
def __init__(
self,
variables: dict,
py_file: str,
py_options: list[str] | None = None,
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
runner: str = "DirectRunner",
gcp_conn_id: str = "google_cloud_default",
):
"""
Initialize Python pipeline trigger.
Parameters:
- variables (dict): Pipeline execution variables and options
- py_file (str): Path to Python pipeline file, supports gs:// URLs
- py_options (list[str]): Additional Python command-line options
- py_interpreter (str): Python interpreter version
- py_requirements (list[str]): Python packages for virtual environment
- py_system_site_packages (bool): Include system packages in venv
- runner (str): Runner type for pipeline execution
- gcp_conn_id (str): Google Cloud connection ID for GCS access
"""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize trigger arguments and classpath for persistence."""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Execute pipeline and yield completion events."""Deferrable trigger for monitoring Java pipeline execution with JAR file support and asynchronous job lifecycle management.
class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
def __init__(
self,
variables: dict,
jar: str,
job_class: str | None = None,
runner: str = "DirectRunner",
gcp_conn_id: str = "google_cloud_default",
):
"""
Initialize Java pipeline trigger.
Parameters:
- variables (dict): Pipeline execution variables and options
- jar (str): Path to JAR file, supports gs:// URLs
- job_class (str): Java class name for pipeline execution
- runner (str): Runner type for pipeline execution
- gcp_conn_id (str): Google Cloud connection ID for GCS access
"""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize trigger arguments and classpath for persistence."""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Execute pipeline and yield completion events."""Base trigger class providing common functionality for all Beam pipeline triggers including GCS file handling and hook management.
class BeamPipelineBaseTrigger(BaseTrigger):
"""Base class for Beam Pipeline Triggers."""
def __init__(
self,
variables: dict,
runner: str = "DirectRunner",
gcp_conn_id: str = "google_cloud_default",
) -> None:
"""
Initialize base trigger.
Parameters:
- variables (dict): Pipeline execution variables and options
- runner (str): Runner type for pipeline execution
- gcp_conn_id (str): Google Cloud connection ID for GCS access
"""
@staticmethod
def _get_async_hook(*args, **kwargs) -> BeamAsyncHook:
"""Get asynchronous Beam hook instance."""
@staticmethod
def file_has_gcs_path(file_path: str) -> bool:
"""Check if file path is a GCS URL."""
@staticmethod
async def provide_gcs_tempfile(gcs_file: str, gcp_conn_id: str) -> NamedTemporaryFile:
"""Download GCS file to temporary local file."""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize trigger arguments and classpath for persistence."""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Execute pipeline and yield completion events."""from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
# Operator automatically uses trigger when deferrable=True
deferrable_python_task = BeamRunPythonPipelineOperator(
task_id='deferrable_python_beam',
py_file='gs://my-bucket/long_pipeline.py',
runner='DirectRunner',
deferrable=True, # Enables deferrable execution
pipeline_options={
'output': '/tmp/output',
'temp_location': '/tmp/beam_temp',
},
)from airflow.providers.apache.beam.triggers.beam import BeamPythonPipelineTrigger
from airflow.operators.dummy import DummyOperator
class CustomBeamOperator(DummyOperator):
def execute(self, context):
# Defer to trigger for long-running pipeline
self.defer(
trigger=BeamPythonPipelineTrigger(
variables={
'output': '/tmp/output',
'temp_location': '/tmp/beam_temp',
},
py_file='/path/to/pipeline.py',
py_requirements=['apache-beam>=2.60.0'],
runner='DirectRunner',
),
method_name='execute_complete',
)
def execute_complete(self, context, event):
"""Handle trigger completion."""
if event['status'] == 'success':
self.log.info("Pipeline completed successfully")
return event
else:
raise AirflowException(f"Pipeline failed: {event['message']}")from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
deferrable_java_task = BeamRunJavaPipelineOperator(
task_id='deferrable_java_beam',
jar='gs://my-bucket/pipeline.jar',
job_class='com.company.LongRunningPipeline',
runner='DataflowRunner',
deferrable=True,
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'tempLocation': 'gs://my-bucket/temp',
'streaming': 'true', # Long-running streaming job
},
dataflow_config={
'job_name': 'streaming-java-pipeline',
'wait_until_finished': True,
},
)# Trigger automatically handles GCS file downloads
gcs_trigger = BeamPythonPipelineTrigger(
variables={
'project': 'my-gcp-project',
'temp_location': 'gs://my-bucket/temp',
},
py_file='gs://my-bucket/pipelines/data_processor.py', # Automatically downloaded
py_requirements=['apache-beam[gcp]>=2.60.0'],
runner='DataflowRunner',
gcp_conn_id='my_gcp_connection',
)Triggers yield TriggerEvent objects with status information:
# Success event
{
'status': 'success',
'message': 'Pipeline has finished SUCCESSFULLY'
}
# Error event
{
'status': 'error',
'message': 'Operation failed'
}
# Error with exception details
{
'status': 'error',
'message': 'Apache Beam process failed with return code 1'
}def execute_complete(self, context, event):
"""Handle trigger completion events."""
if event['status'] == 'error':
raise AirflowException(event['message'])
self.log.info(f"Pipeline completed: {event['message']}")
return {'status': event['status'], 'message': event['message']}Python triggers automatically manage virtual environments:
python_trigger = BeamPythonPipelineTrigger(
variables=pipeline_options,
py_file='pipeline.py',
py_requirements=[
'apache-beam[gcp]==2.60.0',
'pandas==2.1.0',
'google-cloud-bigquery==3.11.0',
],
py_system_site_packages=False, # Isolated environment
py_interpreter='python3.10',
)Triggers handle cleanup of temporary files:
Triggers must be serializable for persistence across worker restarts:
# Trigger serialization format
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"airflow.providers.apache.beam.triggers.beam.BeamPythonPipelineTrigger",
{
"variables": self.variables,
"py_file": self.py_file,
"py_options": self.py_options,
"py_interpreter": self.py_interpreter,
"py_requirements": self.py_requirements,
"py_system_site_packages": self.py_system_site_packages,
"runner": self.runner,
"gcp_conn_id": self.gcp_conn_id,
},
)Triggers handle various error conditions:
async def run(self) -> AsyncIterator[TriggerEvent]:
try:
# Handle GCS file download
if self.file_has_gcs_path(self.py_file):
tmp_file = await self.provide_gcs_tempfile(self.py_file, self.gcp_conn_id)
self.py_file = tmp_file.name
# Execute pipeline
return_code = await hook.start_python_pipeline_async(...)
if return_code == 0:
yield TriggerEvent({
"status": "success",
"message": "Pipeline has finished SUCCESSFULLY",
})
else:
yield TriggerEvent({
"status": "error",
"message": "Operation failed"
})
except Exception as e:
self.log.exception("Exception occurred while checking pipeline state")
yield TriggerEvent({
"status": "error",
"message": str(e)
})Use deferrable execution for:
# Optimize for resource usage
BeamRunPythonPipelineOperator(
task_id='optimized_pipeline',
py_file='gs://bucket/pipeline.py',
deferrable=True, # Yield worker during execution
runner='DataflowRunner', # Offload to cloud
dataflow_config={
'wait_until_finished': True, # Wait for completion
'poll_sleep': 30, # Reduce polling frequency
},
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-beam