0
# Python Pipeline Execution
1
2
Execute Apache Beam pipelines written in Python with comprehensive support for virtual environments, custom package requirements, multiple runners, and cloud integration. The Python pipeline operator provides the most flexible execution environment with automatic dependency management.
3
4
## Capabilities
5
6
### BeamRunPythonPipelineOperator
7
8
Launch Apache Beam pipelines written in Python with support for custom environments, package requirements, and various execution runners.
9
10
```python { .api }
11
class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
12
def __init__(
13
self,
14
*,
15
py_file: str,
16
runner: str = "DirectRunner",
17
default_pipeline_options: dict | None = None,
18
pipeline_options: dict | None = None,
19
py_interpreter: str = "python3",
20
py_options: list[str] | None = None,
21
py_requirements: list[str] | None = None,
22
py_system_site_packages: bool = False,
23
gcp_conn_id: str = "google_cloud_default",
24
dataflow_config: DataflowConfiguration | dict | None = None,
25
deferrable: bool = False,
26
**kwargs,
27
) -> None:
28
"""
29
Initialize Python pipeline operator.
30
31
Parameters:
32
- py_file (str): Path to Python pipeline file, supports local paths and gs:// URLs
33
- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
34
- default_pipeline_options (dict): High-level pipeline options applied to all tasks
35
- pipeline_options (dict): Task-specific pipeline options that override defaults
36
- py_interpreter (str): Python interpreter version, defaults to "python3"
37
- py_options (list[str]): Additional Python command-line options
38
- py_requirements (list[str]): Python packages to install in virtual environment
39
- py_system_site_packages (bool): Include system packages in virtual environment
40
- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
41
- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
42
- deferrable (bool): Enable deferrable execution mode
43
"""
44
45
def execute(self, context: Context):
46
"""Execute the Apache Beam Python Pipeline."""
47
48
def execute_on_dataflow(self, context: Context):
49
"""Execute the Apache Beam Pipeline on Dataflow runner."""
50
51
def on_kill(self) -> None:
52
"""Cancel the pipeline when task is killed."""
53
```
54
55
### Usage Examples
56
57
#### Basic Local Execution
58
59
```python
60
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
61
62
run_local_pipeline = BeamRunPythonPipelineOperator(
63
task_id='run_local_beam_pipeline',
64
py_file='/path/to/pipeline.py',
65
runner='DirectRunner',
66
pipeline_options={
67
'output': '/tmp/output',
68
'temp_location': '/tmp/beam_temp',
69
},
70
)
71
```
72
73
#### Cloud Dataflow Execution
74
75
```python
76
run_dataflow_pipeline = BeamRunPythonPipelineOperator(
77
task_id='run_dataflow_pipeline',
78
py_file='gs://my-bucket/pipeline.py',
79
runner='DataflowRunner',
80
pipeline_options={
81
'project': 'my-gcp-project',
82
'region': 'us-central1',
83
'temp_location': 'gs://my-bucket/temp',
84
'staging_location': 'gs://my-bucket/staging',
85
'num_workers': 4,
86
'max_num_workers': 10,
87
'machine_type': 'n1-standard-4',
88
},
89
dataflow_config={
90
'job_name': 'my-dataflow-job',
91
'project_id': 'my-gcp-project',
92
'location': 'us-central1',
93
'wait_until_finished': True,
94
},
95
)
96
```
97
98
#### Custom Python Environment
99
100
```python
101
run_pipeline_custom_env = BeamRunPythonPipelineOperator(
102
task_id='run_pipeline_custom_env',
103
py_file='/path/to/pipeline.py',
104
py_requirements=[
105
'apache-beam[gcp]==2.50.0',
106
'pandas==2.0.0',
107
'numpy==1.24.0',
108
],
109
py_system_site_packages=True,
110
py_interpreter='python3.10',
111
py_options=['-u', '-W', 'ignore'],
112
runner='DirectRunner',
113
)
114
```
115
116
#### Deferrable Execution
117
118
```python
119
run_deferrable_pipeline = BeamRunPythonPipelineOperator(
120
task_id='run_deferrable_pipeline',
121
py_file='gs://my-bucket/long_pipeline.py',
122
runner='DataflowRunner',
123
deferrable=True, # Enable deferrable mode
124
pipeline_options={
125
'project': 'my-gcp-project',
126
'region': 'us-central1',
127
'temp_location': 'gs://my-bucket/temp',
128
},
129
dataflow_config={
130
'job_name': 'long-running-job',
131
'wait_until_finished': True,
132
},
133
)
134
```
135
136
### Configuration Options
137
138
#### Pipeline Options
139
140
Pipeline options control the behavior and configuration of the Beam pipeline execution:
141
142
- **Basic Options**: `runner`, `project`, `region`, `temp_location`, `staging_location`
143
- **Scaling Options**: `num_workers`, `max_num_workers`, `machine_type`, `disk_size_gb`
144
- **Processing Options**: `streaming`, `windowing`, `max_bundle_size`
145
- **Output Options**: Various output sinks and formatting options
146
147
#### Dataflow Configuration
148
149
When using DataflowRunner, additional configuration options are available:
150
151
```python
152
dataflow_config = {
153
'job_name': 'unique-job-name',
154
'project_id': 'gcp-project-id',
155
'location': 'us-central1',
156
'gcp_conn_id': 'google_cloud_default',
157
'wait_until_finished': True,
158
'poll_sleep': 10,
159
'cancel_timeout': 300,
160
'drain_pipeline': False,
161
'service_account': 'pipeline-service-account@project.iam.gserviceaccount.com',
162
'impersonation_chain': ['service-account@project.iam.gserviceaccount.com'],
163
}
164
```
165
166
### Error Handling
167
168
The operator handles various error conditions:
169
170
- **File Not Found**: Validates pipeline file existence before execution
171
- **Environment Setup**: Manages virtual environment creation and package installation
172
- **Pipeline Failures**: Captures and reports Beam pipeline execution errors
173
- **Dataflow Errors**: Handles Dataflow-specific errors and job cancellation
174
- **Resource Cleanup**: Ensures temporary files and resources are cleaned up
175
176
### Templating Support
177
178
The operator supports Airflow templating for dynamic configuration:
179
180
```python
181
run_templated_pipeline = BeamRunPythonPipelineOperator(
182
task_id='run_templated_pipeline',
183
py_file='gs://{{ var.value.bucket }}/pipeline.py',
184
runner='{{ var.value.runner }}',
185
pipeline_options={
186
'project': '{{ var.value.gcp_project }}',
187
'input': 'gs://{{ var.value.input_bucket }}/{{ ds }}/*',
188
'output': 'gs://{{ var.value.output_bucket }}/{{ ds }}/',
189
},
190
)
191
```