0
# Hooks and Monitoring
1
2
Low-level interface for executing and monitoring Apache Beam pipelines with comprehensive support for synchronous and asynchronous execution modes, custom callback handling, and pipeline lifecycle management.
3
4
## Capabilities
5
6
### BeamHook
7
8
Synchronous hook providing direct interface to Apache Beam pipeline execution with process monitoring and callback support.
9
10
```python { .api }
11
class BeamHook(BaseHook):
12
def __init__(self, runner: str) -> None:
13
"""
14
Initialize Beam hook.
15
16
Parameters:
17
- runner (str): Runner type for pipeline execution
18
"""
19
20
def start_python_pipeline(
21
self,
22
variables: dict,
23
py_file: str,
24
py_options: list[str],
25
py_interpreter: str = "python3",
26
py_requirements: list[str] | None = None,
27
py_system_site_packages: bool = False,
28
process_line_callback: Callable[[str], None] | None = None,
29
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
30
):
31
"""
32
Start Apache Beam python pipeline.
33
34
Parameters:
35
- variables (dict): Pipeline execution variables and options
36
- py_file (str): Path to Python pipeline file
37
- py_options (list[str]): Additional Python command-line options
38
- py_interpreter (str): Python interpreter version
39
- py_requirements (list[str]): Python packages for virtual environment
40
- py_system_site_packages (bool): Include system packages in venv
41
- process_line_callback (Callable): Optional callback for output processing
42
- is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection
43
"""
44
45
def start_java_pipeline(
46
self,
47
variables: dict,
48
jar: str,
49
job_class: str | None = None,
50
process_line_callback: Callable[[str], None] | None = None,
51
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
52
) -> None:
53
"""
54
Start Apache Beam Java pipeline.
55
56
Parameters:
57
- variables (dict): Pipeline execution variables and options
58
- jar (str): Path to JAR file containing pipeline
59
- job_class (str): Java class name for pipeline execution
60
- process_line_callback (Callable): Optional callback for output processing
61
- is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection
62
"""
63
64
def start_go_pipeline(
65
self,
66
variables: dict,
67
go_file: str,
68
process_line_callback: Callable[[str], None] | None = None,
69
should_init_module: bool = False,
70
) -> None:
71
"""
72
Start Apache Beam Go pipeline with source file.
73
74
Parameters:
75
- variables (dict): Pipeline execution variables and options
76
- go_file (str): Path to Go source file
77
- process_line_callback (Callable): Optional callback for output processing
78
- should_init_module (bool): Initialize Go module and dependencies
79
"""
80
81
def start_go_pipeline_with_binary(
82
self,
83
variables: dict,
84
launcher_binary: str,
85
worker_binary: str,
86
process_line_callback: Callable[[str], None] | None = None,
87
) -> None:
88
"""
89
Start Apache Beam Go pipeline with pre-compiled binary.
90
91
Parameters:
92
- variables (dict): Pipeline execution variables and options
93
- launcher_binary (str): Path to launcher binary
94
- worker_binary (str): Path to worker binary
95
- process_line_callback (Callable): Optional callback for output processing
96
"""
97
```
98
99
### BeamAsyncHook
100
101
Asynchronous hook providing non-blocking interface to Apache Beam pipeline execution with concurrent operation support.
102
103
```python { .api }
104
class BeamAsyncHook(BeamHook):
105
def __init__(self, runner: str) -> None:
106
"""
107
Initialize asynchronous Beam hook.
108
109
Parameters:
110
- runner (str): Runner type for pipeline execution
111
"""
112
113
async def start_python_pipeline_async(
114
self,
115
variables: dict,
116
py_file: str,
117
py_options: list[str] | None = None,
118
py_interpreter: str = "python3",
119
py_requirements: list[str] | None = None,
120
py_system_site_packages: bool = False,
121
process_line_callback: Callable[[str], None] | None = None,
122
):
123
"""
124
Start Apache Beam python pipeline asynchronously.
125
126
Parameters:
127
- variables (dict): Pipeline execution variables and options
128
- py_file (str): Path to Python pipeline file
129
- py_options (list[str]): Additional Python command-line options
130
- py_interpreter (str): Python interpreter version
131
- py_requirements (list[str]): Python packages for virtual environment
132
- py_system_site_packages (bool): Include system packages in venv
133
- process_line_callback (Callable): Optional callback for output processing
134
135
Returns:
136
- int: Pipeline execution return code
137
"""
138
139
async def start_java_pipeline_async(
140
self,
141
variables: dict,
142
jar: str,
143
job_class: str | None = None,
144
process_line_callback: Callable[[str], None] | None = None,
145
) -> int:
146
"""
147
Start Apache Beam Java pipeline asynchronously.
148
149
Parameters:
150
- variables (dict): Pipeline execution variables and options
151
- jar (str): Path to JAR file containing pipeline
152
- job_class (str): Java class name for pipeline execution
153
- process_line_callback (Callable): Optional callback for output processing
154
155
Returns:
156
- int: Pipeline execution return code
157
"""
158
159
async def start_pipeline_async(
160
self,
161
variables: dict,
162
command_prefix: list[str],
163
working_directory: str | None = None,
164
process_line_callback: Callable[[str], None] | None = None,
165
) -> int:
166
"""
167
Start Apache Beam pipeline with custom command asynchronously.
168
169
Parameters:
170
- variables (dict): Pipeline execution variables and options
171
- command_prefix (list[str]): Command prefix for pipeline execution
172
- working_directory (str): Directory for command execution
173
- process_line_callback (Callable): Optional callback for output processing
174
175
Returns:
176
- int: Pipeline execution return code
177
"""
178
```
179
180
### Runner Types and Utilities
181
182
```python { .api }
183
class BeamRunnerType:
184
"""Helper class for listing available runner types."""
185
DataflowRunner = "DataflowRunner"
186
DirectRunner = "DirectRunner"
187
SparkRunner = "SparkRunner"
188
FlinkRunner = "FlinkRunner"
189
SamzaRunner = "SamzaRunner"
190
NemoRunner = "NemoRunner"
191
JetRunner = "JetRunner"
192
Twister2Runner = "Twister2Runner"
193
194
def beam_options_to_args(options: dict) -> list[str]:
195
"""
196
Convert pipeline options dictionary to command line arguments.
197
198
Parameters:
199
- options (dict): Dictionary with pipeline options
200
201
Returns:
202
- list[str]: List of formatted command line arguments
203
"""
204
205
def run_beam_command(
206
cmd: list[str],
207
log: logging.Logger,
208
process_line_callback: Callable[[str], None] | None = None,
209
working_directory: str | None = None,
210
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
211
) -> None:
212
"""
213
Run pipeline command in subprocess with monitoring.
214
215
Parameters:
216
- cmd (list[str]): Command parts to execute
217
- log (logging.Logger): Logger for output
218
- process_line_callback (Callable): Optional output processor
219
- working_directory (str): Execution directory
220
- is_dataflow_job_id_exist_callback (Callable): Optional job ID detector
221
"""
222
```
223
224
### Usage Examples
225
226
#### Basic Hook Usage
227
228
```python
229
from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType
230
231
# Initialize hook
232
beam_hook = BeamHook(runner=BeamRunnerType.DirectRunner)
233
234
# Execute Python pipeline
235
beam_hook.start_python_pipeline(
236
variables={
237
'output': '/tmp/beam_output',
238
'temp_location': '/tmp/beam_temp',
239
},
240
py_file='/path/to/pipeline.py',
241
py_options=['-u'],
242
py_interpreter='python3',
243
)
244
```
245
246
#### Custom Process Monitoring
247
248
```python
249
def log_processor(line: str) -> None:
250
"""Custom processor for pipeline output."""
251
if 'ERROR' in line:
252
logger.error(f"Pipeline error: {line}")
253
elif 'INFO' in line:
254
logger.info(f"Pipeline info: {line}")
255
256
def job_id_detector() -> bool:
257
"""Check if Dataflow job ID has been extracted."""
258
return hasattr(beam_hook, 'dataflow_job_id') and beam_hook.dataflow_job_id
259
260
beam_hook.start_python_pipeline(
261
variables=pipeline_options,
262
py_file='gs://bucket/pipeline.py',
263
process_line_callback=log_processor,
264
is_dataflow_job_id_exist_callback=job_id_detector,
265
)
266
```
267
268
#### Asynchronous Pipeline Execution
269
270
```python
271
import asyncio
272
from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook
273
274
async def run_async_pipeline():
275
"""Execute pipeline asynchronously."""
276
async_hook = BeamAsyncHook(runner='DataflowRunner')
277
278
return_code = await async_hook.start_python_pipeline_async(
279
variables={
280
'project': 'my-gcp-project',
281
'region': 'us-central1',
282
'temp_location': 'gs://my-bucket/temp',
283
},
284
py_file='gs://my-bucket/pipeline.py',
285
py_requirements=['apache-beam[gcp]>=2.60.0'],
286
)
287
288
if return_code == 0:
289
print("Pipeline completed successfully")
290
else:
291
print(f"Pipeline failed with return code: {return_code}")
292
293
# Run the async pipeline
294
asyncio.run(run_async_pipeline())
295
```
296
297
#### Virtual Environment Management
298
299
```python
300
# Custom Python environment with specific packages
301
beam_hook.start_python_pipeline(
302
variables=pipeline_options,
303
py_file='/path/to/pipeline.py',
304
py_requirements=[
305
'apache-beam[gcp]==2.60.0',
306
'pandas==2.1.0',
307
'numpy==1.24.0',
308
'google-cloud-bigquery==3.11.0',
309
],
310
py_system_site_packages=False, # Isolated environment
311
py_interpreter='python3.10',
312
)
313
```
314
315
#### Java Pipeline with Custom Classpath
316
317
```python
318
beam_hook.start_java_pipeline(
319
variables={
320
'runner': 'DataflowRunner',
321
'project': 'my-gcp-project',
322
'region': 'us-central1',
323
'tempLocation': 'gs://my-bucket/temp',
324
},
325
jar='/path/to/pipeline.jar',
326
job_class='com.company.DataProcessingPipeline',
327
)
328
```
329
330
### Pipeline Options Processing
331
332
#### Option Format Handling
333
334
The `beam_options_to_args` function handles various option types:
335
336
```python
337
from airflow.providers.apache.beam.hooks.beam import beam_options_to_args
338
339
options = {
340
'runner': 'DataflowRunner',
341
'project': 'my-project',
342
'streaming': True, # Boolean flag
343
'labels': {'env': 'prod', 'team': 'data'}, # Dictionary
344
'experiments': ['enable_google_cloud_profiler', 'enable_streaming_engine'], # List
345
'numWorkers': 4, # Numeric value
346
'skipValidation': False, # False boolean (skipped)
347
'tempLocation': None, # None value (skipped)
348
}
349
350
args = beam_options_to_args(options)
351
# Results in: ['--runner=DataflowRunner', '--project=my-project', '--streaming',
352
# '--labels={"env":"prod","team":"data"}',
353
# '--experiments=enable_google_cloud_profiler', '--experiments=enable_streaming_engine',
354
# '--numWorkers=4']
355
```
356
357
### Error Handling and Monitoring
358
359
#### Process Monitoring
360
361
```python
362
def comprehensive_monitor(line: str) -> None:
363
"""Comprehensive pipeline output monitoring."""
364
import re
365
366
# Extract job ID from Dataflow output
367
job_id_match = re.search(r'Submitted job: ([a-zA-Z0-9\-]+)', line)
368
if job_id_match:
369
job_id = job_id_match.group(1)
370
print(f"Extracted Dataflow job ID: {job_id}")
371
372
# Monitor for errors
373
if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed']):
374
logger.error(f"Pipeline error detected: {line}")
375
376
# Track progress indicators
377
if 'Processing bundle' in line:
378
logger.info(f"Pipeline progress: {line}")
379
380
beam_hook.start_python_pipeline(
381
variables=pipeline_options,
382
py_file='pipeline.py',
383
process_line_callback=comprehensive_monitor,
384
)
385
```
386
387
#### Exception Handling
388
389
```python
390
from airflow.exceptions import AirflowException
391
392
try:
393
beam_hook.start_python_pipeline(
394
variables=pipeline_options,
395
py_file='pipeline.py',
396
)
397
except AirflowException as e:
398
if "Apache Beam process failed" in str(e):
399
logger.error(f"Beam pipeline execution failed: {e}")
400
# Handle pipeline failure
401
else:
402
logger.error(f"Hook error: {e}")
403
# Handle other errors
404
```