Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-beam@6.1.00
# Apache Airflow Providers Apache Beam
1
2
An Apache Airflow provider package that enables workflow orchestration and data processing capabilities by offering operators, hooks, and triggers for Apache Beam pipeline execution. Supports running Beam pipelines written in Python, Java, and Go across various runners including DirectRunner, DataflowRunner, SparkRunner, and FlinkRunner.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-beam
7
- **Package Type**: Python Package (PyPI)
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-apache-beam`
10
- **Version**: 6.1.3
11
- **Requires**: Apache Airflow >=2.10.0, Apache Beam >=2.60.0
12
13
## Core Imports
14
15
```python
16
from airflow.providers.apache.beam import __version__
17
```
18
19
Operators:
20
```python
21
from airflow.providers.apache.beam.operators.beam import (
22
BeamRunPythonPipelineOperator,
23
BeamRunJavaPipelineOperator,
24
BeamRunGoPipelineOperator,
25
BeamBasePipelineOperator
26
)
27
```
28
29
Hooks:
30
```python
31
from airflow.providers.apache.beam.hooks.beam import (
32
BeamHook,
33
BeamAsyncHook,
34
BeamRunnerType,
35
beam_options_to_args,
36
run_beam_command
37
)
38
```
39
40
Triggers:
41
```python
42
from airflow.providers.apache.beam.triggers.beam import (
43
BeamPythonPipelineTrigger,
44
BeamJavaPipelineTrigger,
45
BeamPipelineBaseTrigger
46
)
47
```
48
49
Version compatibility:
50
```python
51
from airflow.providers.apache.beam.version_compat import (
52
AIRFLOW_V_3_1_PLUS,
53
BaseHook,
54
BaseOperator
55
)
56
```
57
58
## Basic Usage
59
60
```python
61
from datetime import datetime, timedelta
62
from airflow import DAG
63
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
64
65
# Define default arguments
66
default_args = {
67
'owner': 'data-team',
68
'depends_on_past': False,
69
'start_date': datetime(2024, 1, 1),
70
'email_on_failure': False,
71
'email_on_retry': False,
72
'retries': 1,
73
'retry_delay': timedelta(minutes=5),
74
}
75
76
# Create DAG
77
dag = DAG(
78
'beam_pipeline_example',
79
default_args=default_args,
80
description='Run Apache Beam pipeline with Airflow',
81
schedule_interval=timedelta(days=1),
82
catchup=False,
83
)
84
85
# Define Beam pipeline task
86
run_beam_pipeline = BeamRunPythonPipelineOperator(
87
task_id='run_beam_pipeline',
88
py_file='gs://my-bucket/beam_pipeline.py',
89
runner='DataflowRunner',
90
pipeline_options={
91
'project': 'my-gcp-project',
92
'region': 'us-central1',
93
'temp_location': 'gs://my-bucket/temp',
94
'staging_location': 'gs://my-bucket/staging',
95
},
96
dataflow_config={
97
'job_name': 'my-beam-pipeline',
98
'project_id': 'my-gcp-project',
99
'location': 'us-central1',
100
},
101
dag=dag,
102
)
103
```
104
105
## Architecture
106
107
The provider follows Airflow's standard architecture pattern with three main component types:
108
109
- **Operators**: Execute Beam pipelines as Airflow tasks, supporting Python, Java, and Go implementations
110
- **Hooks**: Provide low-level interface to Apache Beam, handling pipeline execution and monitoring
111
- **Triggers**: Enable deferrable execution for long-running pipelines with asynchronous monitoring
112
113
The provider integrates with Google Cloud Dataflow when the google provider is available, enabling cloud-scale pipeline execution with monitoring and job management capabilities.
114
115
## Capabilities
116
117
### Python Pipeline Execution
118
119
Execute Apache Beam pipelines written in Python with support for virtual environments, custom requirements, and various runners including local DirectRunner and cloud DataflowRunner.
120
121
```python { .api }
122
class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
123
def __init__(
124
self,
125
*,
126
py_file: str,
127
runner: str = "DirectRunner",
128
default_pipeline_options: dict | None = None,
129
pipeline_options: dict | None = None,
130
py_interpreter: str = "python3",
131
py_options: list[str] | None = None,
132
py_requirements: list[str] | None = None,
133
py_system_site_packages: bool = False,
134
gcp_conn_id: str = "google_cloud_default",
135
dataflow_config: DataflowConfiguration | dict | None = None,
136
deferrable: bool = False,
137
**kwargs,
138
) -> None: ...
139
```
140
141
[Python Pipelines](./python-pipelines.md)
142
143
### Java Pipeline Execution
144
145
Execute Apache Beam pipelines written in Java using self-executing JAR files with support for various runners and Dataflow integration.
146
147
```python { .api }
148
class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
149
def __init__(
150
self,
151
*,
152
jar: str,
153
runner: str = "DirectRunner",
154
job_class: str | None = None,
155
default_pipeline_options: dict | None = None,
156
pipeline_options: dict | None = None,
157
gcp_conn_id: str = "google_cloud_default",
158
dataflow_config: DataflowConfiguration | dict | None = None,
159
deferrable: bool = False,
160
**kwargs,
161
) -> None: ...
162
```
163
164
[Java Pipelines](./java-pipelines.md)
165
166
### Go Pipeline Execution
167
168
Execute Apache Beam pipelines written in Go from source files or pre-compiled binaries with support for cross-platform execution.
169
170
```python { .api }
171
class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
172
def __init__(
173
self,
174
*,
175
go_file: str = "",
176
launcher_binary: str = "",
177
worker_binary: str = "",
178
runner: str = "DirectRunner",
179
default_pipeline_options: dict | None = None,
180
pipeline_options: dict | None = None,
181
gcp_conn_id: str = "google_cloud_default",
182
dataflow_config: DataflowConfiguration | dict | None = None,
183
**kwargs,
184
) -> None: ...
185
```
186
187
[Go Pipelines](./go-pipelines.md)
188
189
### Pipeline Monitoring and Hooks
190
191
Low-level interface for executing and monitoring Apache Beam pipelines with both synchronous and asynchronous execution modes.
192
193
```python { .api }
194
class BeamHook(BaseHook):
195
def __init__(self, runner: str) -> None: ...
196
def start_python_pipeline(
197
self,
198
variables: dict,
199
py_file: str,
200
py_options: list[str],
201
py_interpreter: str = "python3",
202
py_requirements: list[str] | None = None,
203
py_system_site_packages: bool = False,
204
process_line_callback: Callable[[str], None] | None = None,
205
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
206
) -> None: ...
207
def start_java_pipeline(
208
self,
209
variables: dict,
210
jar: str,
211
job_class: str | None = None,
212
process_line_callback: Callable[[str], None] | None = None,
213
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
214
) -> None: ...
215
def start_go_pipeline(
216
self,
217
variables: dict,
218
go_file: str,
219
process_line_callback: Callable[[str], None] | None = None,
220
should_init_module: bool = False,
221
) -> None: ...
222
223
class BeamAsyncHook(BeamHook):
224
def __init__(self, runner: str) -> None: ...
225
async def start_python_pipeline_async(
226
self,
227
variables: dict,
228
py_file: str,
229
py_options: list[str] | None = None,
230
py_interpreter: str = "python3",
231
py_requirements: list[str] | None = None,
232
py_system_site_packages: bool = False,
233
process_line_callback: Callable[[str], None] | None = None,
234
) -> int: ...
235
async def start_java_pipeline_async(
236
self,
237
variables: dict,
238
jar: str,
239
job_class: str | None = None,
240
process_line_callback: Callable[[str], None] | None = None,
241
) -> int: ...
242
```
243
244
[Hooks and Monitoring](./hooks-monitoring.md)
245
246
### Asynchronous Pipeline Triggers
247
248
Deferrable execution triggers for long-running pipelines that enable efficient resource utilization by yielding control during pipeline execution.
249
250
```python { .api }
251
class BeamPythonPipelineTrigger(BeamPipelineBaseTrigger):
252
def __init__(
253
self,
254
variables: dict,
255
py_file: str,
256
py_options: list[str] | None = None,
257
py_interpreter: str = "python3",
258
py_requirements: list[str] | None = None,
259
py_system_site_packages: bool = False,
260
runner: str = "DirectRunner",
261
gcp_conn_id: str = "google_cloud_default",
262
) -> None: ...
263
def serialize(self) -> tuple[str, dict[str, Any]]: ...
264
async def run(self) -> AsyncIterator[TriggerEvent]: ...
265
266
class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
267
def __init__(
268
self,
269
variables: dict,
270
jar: str,
271
job_class: str | None = None,
272
runner: str = "DirectRunner",
273
gcp_conn_id: str = "google_cloud_default",
274
) -> None: ...
275
def serialize(self) -> tuple[str, dict[str, Any]]: ...
276
async def run(self) -> AsyncIterator[TriggerEvent]: ...
277
```
278
279
[Triggers and Deferrable Execution](./triggers.md)
280
281
### Version Compatibility
282
283
Cross-version compatibility components that provide stable imports and version detection for different Airflow releases.
284
285
```python { .api }
286
AIRFLOW_V_3_1_PLUS: bool
287
"""Boolean flag indicating Airflow version 3.1+ compatibility."""
288
289
# Version-compatible base classes (imported from appropriate Airflow version)
290
BaseHook: type
291
"""Version-compatible BaseHook class."""
292
293
BaseOperator: type
294
"""Version-compatible BaseOperator class."""
295
```
296
297
## Types
298
299
```python { .api }
300
class BeamBasePipelineOperator(BaseOperator):
301
"""
302
Abstract base class for all Apache Beam pipeline operators.
303
304
Provides common functionality including pipeline option handling,
305
Dataflow integration, and error management.
306
"""
307
template_fields = ["runner", "pipeline_options", "default_pipeline_options", "dataflow_config"]
308
309
def __init__(
310
self,
311
*,
312
runner: str = "DirectRunner",
313
default_pipeline_options: dict | None = None,
314
pipeline_options: dict | None = None,
315
gcp_conn_id: str = "google_cloud_default",
316
dataflow_config: DataflowConfiguration | dict | None = None,
317
deferrable: bool = False,
318
**kwargs,
319
) -> None: ...
320
321
def execute(self, context: Context) -> dict: ...
322
def execute_on_dataflow(self, context: Context) -> dict: ...
323
def on_kill(self) -> None: ...
324
325
class BeamRunnerType:
326
"""Helper class for listing available runner types."""
327
DataflowRunner = "DataflowRunner"
328
DirectRunner = "DirectRunner"
329
SparkRunner = "SparkRunner"
330
FlinkRunner = "FlinkRunner"
331
SamzaRunner = "SamzaRunner"
332
NemoRunner = "NemoRunner"
333
JetRunner = "JetRunner"
334
Twister2Runner = "Twister2Runner"
335
336
class DataflowConfiguration:
337
"""
338
Configuration object for Dataflow-specific options.
339
340
Used to configure Google Cloud Dataflow execution parameters
341
including job naming, project settings, and execution behavior.
342
"""
343
job_name: str
344
project_id: str
345
location: str
346
gcp_conn_id: str = "google_cloud_default"
347
wait_until_finished: bool = True
348
poll_sleep: int = 10
349
cancel_timeout: int = 300
350
drain_pipeline: bool = False
351
service_account: str | None = None
352
impersonation_chain: list[str] | None = None
353
check_if_running: str = "WaitForRun"
354
multiple_jobs: bool = False
355
356
class Context:
357
"""Airflow execution context for task instances."""
358
359
def beam_options_to_args(options: dict) -> list[str]:
360
"""
361
Convert pipeline options dictionary to command line arguments.
362
363
Args:
364
options: Dictionary with pipeline options
365
366
Returns:
367
List of formatted command line arguments
368
"""
369
370
def run_beam_command(
371
cmd: list[str],
372
log: logging.Logger,
373
process_line_callback: Callable[[str], None] | None = None,
374
working_directory: str | None = None,
375
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
376
) -> None:
377
"""
378
Execute Beam pipeline command in subprocess with monitoring.
379
380
Args:
381
cmd: Command parts to execute
382
log: Logger for output
383
process_line_callback: Optional output processor
384
working_directory: Execution directory
385
is_dataflow_job_id_exist_callback: Optional job ID detector
386
"""
387
388
# Version compatibility support
389
AIRFLOW_V_3_1_PLUS: bool
390
"""Boolean flag indicating Airflow version 3.1+ compatibility."""
391
392
# Trigger types
393
class TriggerEvent:
394
"""Event yielded by triggers to indicate status changes."""
395
def __init__(self, payload: dict[str, Any]) -> None: ...
396
397
class AsyncIterator[T]:
398
"""Async iterator type for trigger events."""
399
400
class NamedTemporaryFile:
401
"""Temporary file with a visible name in the file system."""
402
name: str
403
404
# Type aliases
405
Any = typing.Any
406
Callable = typing.Callable
407
```