Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute Apache Beam pipelines written in Python with comprehensive support for virtual environments, custom package requirements, multiple runners, and cloud integration. The Python pipeline operator provides the most flexible execution environment with automatic dependency management.
Launch Apache Beam pipelines written in Python with support for custom environments, package requirements, and various execution runners.
class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
def __init__(
self,
*,
py_file: str,
runner: str = "DirectRunner",
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
py_interpreter: str = "python3",
py_options: list[str] | None = None,
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
deferrable: bool = False,
**kwargs,
) -> None:
"""
Initialize Python pipeline operator.
Parameters:
- py_file (str): Path to Python pipeline file, supports local paths and gs:// URLs
- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
- default_pipeline_options (dict): High-level pipeline options applied to all tasks
- pipeline_options (dict): Task-specific pipeline options that override defaults
- py_interpreter (str): Python interpreter version, defaults to "python3"
- py_options (list[str]): Additional Python command-line options
- py_requirements (list[str]): Python packages to install in virtual environment
- py_system_site_packages (bool): Include system packages in virtual environment
- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
- deferrable (bool): Enable deferrable execution mode
"""
def execute(self, context: Context):
"""Execute the Apache Beam Python Pipeline."""
def execute_on_dataflow(self, context: Context):
"""Execute the Apache Beam Pipeline on Dataflow runner."""
def on_kill(self) -> None:
"""Cancel the pipeline when task is killed."""from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
run_local_pipeline = BeamRunPythonPipelineOperator(
task_id='run_local_beam_pipeline',
py_file='/path/to/pipeline.py',
runner='DirectRunner',
pipeline_options={
'output': '/tmp/output',
'temp_location': '/tmp/beam_temp',
},
)run_dataflow_pipeline = BeamRunPythonPipelineOperator(
task_id='run_dataflow_pipeline',
py_file='gs://my-bucket/pipeline.py',
runner='DataflowRunner',
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
'staging_location': 'gs://my-bucket/staging',
'num_workers': 4,
'max_num_workers': 10,
'machine_type': 'n1-standard-4',
},
dataflow_config={
'job_name': 'my-dataflow-job',
'project_id': 'my-gcp-project',
'location': 'us-central1',
'wait_until_finished': True,
},
)run_pipeline_custom_env = BeamRunPythonPipelineOperator(
task_id='run_pipeline_custom_env',
py_file='/path/to/pipeline.py',
py_requirements=[
'apache-beam[gcp]==2.50.0',
'pandas==2.0.0',
'numpy==1.24.0',
],
py_system_site_packages=True,
py_interpreter='python3.10',
py_options=['-u', '-W', 'ignore'],
runner='DirectRunner',
)run_deferrable_pipeline = BeamRunPythonPipelineOperator(
task_id='run_deferrable_pipeline',
py_file='gs://my-bucket/long_pipeline.py',
runner='DataflowRunner',
deferrable=True, # Enable deferrable mode
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
},
dataflow_config={
'job_name': 'long-running-job',
'wait_until_finished': True,
},
)Pipeline options control the behavior and configuration of the Beam pipeline execution:
runner, project, region, temp_location, staging_locationnum_workers, max_num_workers, machine_type, disk_size_gbstreaming, windowing, max_bundle_sizeWhen using DataflowRunner, additional configuration options are available:
dataflow_config = {
'job_name': 'unique-job-name',
'project_id': 'gcp-project-id',
'location': 'us-central1',
'gcp_conn_id': 'google_cloud_default',
'wait_until_finished': True,
'poll_sleep': 10,
'cancel_timeout': 300,
'drain_pipeline': False,
'service_account': 'pipeline-service-account@project.iam.gserviceaccount.com',
'impersonation_chain': ['service-account@project.iam.gserviceaccount.com'],
}The operator handles various error conditions:
The operator supports Airflow templating for dynamic configuration:
run_templated_pipeline = BeamRunPythonPipelineOperator(
task_id='run_templated_pipeline',
py_file='gs://{{ var.value.bucket }}/pipeline.py',
runner='{{ var.value.runner }}',
pipeline_options={
'project': '{{ var.value.gcp_project }}',
'input': 'gs://{{ var.value.input_bucket }}/{{ ds }}/*',
'output': 'gs://{{ var.value.output_bucket }}/{{ ds }}/',
},
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-beam