or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

go-pipelines.mdhooks-monitoring.mdindex.mdjava-pipelines.mdpython-pipelines.mdtriggers.md

index.mddocs/

0

# Apache Airflow Providers Apache Beam

1

2

An Apache Airflow provider package that enables workflow orchestration and data processing capabilities by offering operators, hooks, and triggers for Apache Beam pipeline execution. Supports running Beam pipelines written in Python, Java, and Go across various runners including DirectRunner, DataflowRunner, SparkRunner, and FlinkRunner.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-beam

7

- **Package Type**: Python Package (PyPI)

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-apache-beam`

10

- **Version**: 6.1.3

11

- **Requires**: Apache Airflow >=2.10.0, Apache Beam >=2.60.0

12

13

## Core Imports

14

15

```python

16

from airflow.providers.apache.beam import __version__

17

```

18

19

Operators:

20

```python

21

from airflow.providers.apache.beam.operators.beam import (

22

BeamRunPythonPipelineOperator,

23

BeamRunJavaPipelineOperator,

24

BeamRunGoPipelineOperator,

25

BeamBasePipelineOperator

26

)

27

```

28

29

Hooks:

30

```python

31

from airflow.providers.apache.beam.hooks.beam import (

32

BeamHook,

33

BeamAsyncHook,

34

BeamRunnerType,

35

beam_options_to_args,

36

run_beam_command

37

)

38

```

39

40

Triggers:

41

```python

42

from airflow.providers.apache.beam.triggers.beam import (

43

BeamPythonPipelineTrigger,

44

BeamJavaPipelineTrigger,

45

BeamPipelineBaseTrigger

46

)

47

```

48

49

Version compatibility:

50

```python

51

from airflow.providers.apache.beam.version_compat import (

52

AIRFLOW_V_3_1_PLUS,

53

BaseHook,

54

BaseOperator

55

)

56

```

57

58

## Basic Usage

59

60

```python

61

from datetime import datetime, timedelta

62

from airflow import DAG

63

from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

64

65

# Define default arguments

66

default_args = {

67

'owner': 'data-team',

68

'depends_on_past': False,

69

'start_date': datetime(2024, 1, 1),

70

'email_on_failure': False,

71

'email_on_retry': False,

72

'retries': 1,

73

'retry_delay': timedelta(minutes=5),

74

}

75

76

# Create DAG

77

dag = DAG(

78

'beam_pipeline_example',

79

default_args=default_args,

80

description='Run Apache Beam pipeline with Airflow',

81

schedule_interval=timedelta(days=1),

82

catchup=False,

83

)

84

85

# Define Beam pipeline task

86

run_beam_pipeline = BeamRunPythonPipelineOperator(

87

task_id='run_beam_pipeline',

88

py_file='gs://my-bucket/beam_pipeline.py',

89

runner='DataflowRunner',

90

pipeline_options={

91

'project': 'my-gcp-project',

92

'region': 'us-central1',

93

'temp_location': 'gs://my-bucket/temp',

94

'staging_location': 'gs://my-bucket/staging',

95

},

96

dataflow_config={

97

'job_name': 'my-beam-pipeline',

98

'project_id': 'my-gcp-project',

99

'location': 'us-central1',

100

},

101

dag=dag,

102

)

103

```

104

105

## Architecture

106

107

The provider follows Airflow's standard architecture pattern with three main component types:

108

109

- **Operators**: Execute Beam pipelines as Airflow tasks, supporting Python, Java, and Go implementations

110

- **Hooks**: Provide low-level interface to Apache Beam, handling pipeline execution and monitoring

111

- **Triggers**: Enable deferrable execution for long-running pipelines with asynchronous monitoring

112

113

The provider integrates with Google Cloud Dataflow when the google provider is available, enabling cloud-scale pipeline execution with monitoring and job management capabilities.

114

115

## Capabilities

116

117

### Python Pipeline Execution

118

119

Execute Apache Beam pipelines written in Python with support for virtual environments, custom requirements, and various runners including local DirectRunner and cloud DataflowRunner.

120

121

```python { .api }

122

class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):

123

def __init__(

124

self,

125

*,

126

py_file: str,

127

runner: str = "DirectRunner",

128

default_pipeline_options: dict | None = None,

129

pipeline_options: dict | None = None,

130

py_interpreter: str = "python3",

131

py_options: list[str] | None = None,

132

py_requirements: list[str] | None = None,

133

py_system_site_packages: bool = False,

134

gcp_conn_id: str = "google_cloud_default",

135

dataflow_config: DataflowConfiguration | dict | None = None,

136

deferrable: bool = False,

137

**kwargs,

138

) -> None: ...

139

```

140

141

[Python Pipelines](./python-pipelines.md)

142

143

### Java Pipeline Execution

144

145

Execute Apache Beam pipelines written in Java using self-executing JAR files with support for various runners and Dataflow integration.

146

147

```python { .api }

148

class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):

149

def __init__(

150

self,

151

*,

152

jar: str,

153

runner: str = "DirectRunner",

154

job_class: str | None = None,

155

default_pipeline_options: dict | None = None,

156

pipeline_options: dict | None = None,

157

gcp_conn_id: str = "google_cloud_default",

158

dataflow_config: DataflowConfiguration | dict | None = None,

159

deferrable: bool = False,

160

**kwargs,

161

) -> None: ...

162

```

163

164

[Java Pipelines](./java-pipelines.md)

165

166

### Go Pipeline Execution

167

168

Execute Apache Beam pipelines written in Go from source files or pre-compiled binaries with support for cross-platform execution.

169

170

```python { .api }

171

class BeamRunGoPipelineOperator(BeamBasePipelineOperator):

172

def __init__(

173

self,

174

*,

175

go_file: str = "",

176

launcher_binary: str = "",

177

worker_binary: str = "",

178

runner: str = "DirectRunner",

179

default_pipeline_options: dict | None = None,

180

pipeline_options: dict | None = None,

181

gcp_conn_id: str = "google_cloud_default",

182

dataflow_config: DataflowConfiguration | dict | None = None,

183

**kwargs,

184

) -> None: ...

185

```

186

187

[Go Pipelines](./go-pipelines.md)

188

189

### Pipeline Monitoring and Hooks

190

191

Low-level interface for executing and monitoring Apache Beam pipelines with both synchronous and asynchronous execution modes.

192

193

```python { .api }

194

class BeamHook(BaseHook):

195

def __init__(self, runner: str) -> None: ...

196

def start_python_pipeline(

197

self,

198

variables: dict,

199

py_file: str,

200

py_options: list[str],

201

py_interpreter: str = "python3",

202

py_requirements: list[str] | None = None,

203

py_system_site_packages: bool = False,

204

process_line_callback: Callable[[str], None] | None = None,

205

is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,

206

) -> None: ...

207

def start_java_pipeline(

208

self,

209

variables: dict,

210

jar: str,

211

job_class: str | None = None,

212

process_line_callback: Callable[[str], None] | None = None,

213

is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,

214

) -> None: ...

215

def start_go_pipeline(

216

self,

217

variables: dict,

218

go_file: str,

219

process_line_callback: Callable[[str], None] | None = None,

220

should_init_module: bool = False,

221

) -> None: ...

222

223

class BeamAsyncHook(BeamHook):

224

def __init__(self, runner: str) -> None: ...

225

async def start_python_pipeline_async(

226

self,

227

variables: dict,

228

py_file: str,

229

py_options: list[str] | None = None,

230

py_interpreter: str = "python3",

231

py_requirements: list[str] | None = None,

232

py_system_site_packages: bool = False,

233

process_line_callback: Callable[[str], None] | None = None,

234

) -> int: ...

235

async def start_java_pipeline_async(

236

self,

237

variables: dict,

238

jar: str,

239

job_class: str | None = None,

240

process_line_callback: Callable[[str], None] | None = None,

241

) -> int: ...

242

```

243

244

[Hooks and Monitoring](./hooks-monitoring.md)

245

246

### Asynchronous Pipeline Triggers

247

248

Deferrable execution triggers for long-running pipelines that enable efficient resource utilization by yielding control during pipeline execution.

249

250

```python { .api }

251

class BeamPythonPipelineTrigger(BeamPipelineBaseTrigger):

252

def __init__(

253

self,

254

variables: dict,

255

py_file: str,

256

py_options: list[str] | None = None,

257

py_interpreter: str = "python3",

258

py_requirements: list[str] | None = None,

259

py_system_site_packages: bool = False,

260

runner: str = "DirectRunner",

261

gcp_conn_id: str = "google_cloud_default",

262

) -> None: ...

263

def serialize(self) -> tuple[str, dict[str, Any]]: ...

264

async def run(self) -> AsyncIterator[TriggerEvent]: ...

265

266

class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):

267

def __init__(

268

self,

269

variables: dict,

270

jar: str,

271

job_class: str | None = None,

272

runner: str = "DirectRunner",

273

gcp_conn_id: str = "google_cloud_default",

274

) -> None: ...

275

def serialize(self) -> tuple[str, dict[str, Any]]: ...

276

async def run(self) -> AsyncIterator[TriggerEvent]: ...

277

```

278

279

[Triggers and Deferrable Execution](./triggers.md)

280

281

### Version Compatibility

282

283

Cross-version compatibility components that provide stable imports and version detection for different Airflow releases.

284

285

```python { .api }

286

AIRFLOW_V_3_1_PLUS: bool

287

"""Boolean flag indicating Airflow version 3.1+ compatibility."""

288

289

# Version-compatible base classes (imported from appropriate Airflow version)

290

BaseHook: type

291

"""Version-compatible BaseHook class."""

292

293

BaseOperator: type

294

"""Version-compatible BaseOperator class."""

295

```

296

297

## Types

298

299

```python { .api }

300

class BeamBasePipelineOperator(BaseOperator):

301

"""

302

Abstract base class for all Apache Beam pipeline operators.

303

304

Provides common functionality including pipeline option handling,

305

Dataflow integration, and error management.

306

"""

307

template_fields = ["runner", "pipeline_options", "default_pipeline_options", "dataflow_config"]

308

309

def __init__(

310

self,

311

*,

312

runner: str = "DirectRunner",

313

default_pipeline_options: dict | None = None,

314

pipeline_options: dict | None = None,

315

gcp_conn_id: str = "google_cloud_default",

316

dataflow_config: DataflowConfiguration | dict | None = None,

317

deferrable: bool = False,

318

**kwargs,

319

) -> None: ...

320

321

def execute(self, context: Context) -> dict: ...

322

def execute_on_dataflow(self, context: Context) -> dict: ...

323

def on_kill(self) -> None: ...

324

325

class BeamRunnerType:

326

"""Helper class for listing available runner types."""

327

DataflowRunner = "DataflowRunner"

328

DirectRunner = "DirectRunner"

329

SparkRunner = "SparkRunner"

330

FlinkRunner = "FlinkRunner"

331

SamzaRunner = "SamzaRunner"

332

NemoRunner = "NemoRunner"

333

JetRunner = "JetRunner"

334

Twister2Runner = "Twister2Runner"

335

336

class DataflowConfiguration:

337

"""

338

Configuration object for Dataflow-specific options.

339

340

Used to configure Google Cloud Dataflow execution parameters

341

including job naming, project settings, and execution behavior.

342

"""

343

job_name: str

344

project_id: str

345

location: str

346

gcp_conn_id: str = "google_cloud_default"

347

wait_until_finished: bool = True

348

poll_sleep: int = 10

349

cancel_timeout: int = 300

350

drain_pipeline: bool = False

351

service_account: str | None = None

352

impersonation_chain: list[str] | None = None

353

check_if_running: str = "WaitForRun"

354

multiple_jobs: bool = False

355

356

class Context:

357

"""Airflow execution context for task instances."""

358

359

def beam_options_to_args(options: dict) -> list[str]:

360

"""

361

Convert pipeline options dictionary to command line arguments.

362

363

Args:

364

options: Dictionary with pipeline options

365

366

Returns:

367

List of formatted command line arguments

368

"""

369

370

def run_beam_command(

371

cmd: list[str],

372

log: logging.Logger,

373

process_line_callback: Callable[[str], None] | None = None,

374

working_directory: str | None = None,

375

is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,

376

) -> None:

377

"""

378

Execute Beam pipeline command in subprocess with monitoring.

379

380

Args:

381

cmd: Command parts to execute

382

log: Logger for output

383

process_line_callback: Optional output processor

384

working_directory: Execution directory

385

is_dataflow_job_id_exist_callback: Optional job ID detector

386

"""

387

388

# Version compatibility support

389

AIRFLOW_V_3_1_PLUS: bool

390

"""Boolean flag indicating Airflow version 3.1+ compatibility."""

391

392

# Trigger types

393

class TriggerEvent:

394

"""Event yielded by triggers to indicate status changes."""

395

def __init__(self, payload: dict[str, Any]) -> None: ...

396

397

class AsyncIterator[T]:

398

"""Async iterator type for trigger events."""

399

400

class NamedTemporaryFile:

401

"""Temporary file with a visible name in the file system."""

402

name: str

403

404

# Type aliases

405

Any = typing.Any

406

Callable = typing.Callable

407

```