or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

go-pipelines.mdhooks-monitoring.mdindex.mdjava-pipelines.mdpython-pipelines.mdtriggers.md

hooks-monitoring.mddocs/

0

# Hooks and Monitoring

1

2

Low-level interface for executing and monitoring Apache Beam pipelines with comprehensive support for synchronous and asynchronous execution modes, custom callback handling, and pipeline lifecycle management.

3

4

## Capabilities

5

6

### BeamHook

7

8

Synchronous hook providing direct interface to Apache Beam pipeline execution with process monitoring and callback support.

9

10

```python { .api }

11

class BeamHook(BaseHook):

12

def __init__(self, runner: str) -> None:

13

"""

14

Initialize Beam hook.

15

16

Parameters:

17

- runner (str): Runner type for pipeline execution

18

"""

19

20

def start_python_pipeline(

21

self,

22

variables: dict,

23

py_file: str,

24

py_options: list[str],

25

py_interpreter: str = "python3",

26

py_requirements: list[str] | None = None,

27

py_system_site_packages: bool = False,

28

process_line_callback: Callable[[str], None] | None = None,

29

is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,

30

):

31

"""

32

Start Apache Beam python pipeline.

33

34

Parameters:

35

- variables (dict): Pipeline execution variables and options

36

- py_file (str): Path to Python pipeline file

37

- py_options (list[str]): Additional Python command-line options

38

- py_interpreter (str): Python interpreter version

39

- py_requirements (list[str]): Python packages for virtual environment

40

- py_system_site_packages (bool): Include system packages in venv

41

- process_line_callback (Callable): Optional callback for output processing

42

- is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection

43

"""

44

45

def start_java_pipeline(

46

self,

47

variables: dict,

48

jar: str,

49

job_class: str | None = None,

50

process_line_callback: Callable[[str], None] | None = None,

51

is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,

52

) -> None:

53

"""

54

Start Apache Beam Java pipeline.

55

56

Parameters:

57

- variables (dict): Pipeline execution variables and options

58

- jar (str): Path to JAR file containing pipeline

59

- job_class (str): Java class name for pipeline execution

60

- process_line_callback (Callable): Optional callback for output processing

61

- is_dataflow_job_id_exist_callback (Callable): Optional callback for job ID detection

62

"""

63

64

def start_go_pipeline(

65

self,

66

variables: dict,

67

go_file: str,

68

process_line_callback: Callable[[str], None] | None = None,

69

should_init_module: bool = False,

70

) -> None:

71

"""

72

Start Apache Beam Go pipeline with source file.

73

74

Parameters:

75

- variables (dict): Pipeline execution variables and options

76

- go_file (str): Path to Go source file

77

- process_line_callback (Callable): Optional callback for output processing

78

- should_init_module (bool): Initialize Go module and dependencies

79

"""

80

81

def start_go_pipeline_with_binary(

82

self,

83

variables: dict,

84

launcher_binary: str,

85

worker_binary: str,

86

process_line_callback: Callable[[str], None] | None = None,

87

) -> None:

88

"""

89

Start Apache Beam Go pipeline with pre-compiled binary.

90

91

Parameters:

92

- variables (dict): Pipeline execution variables and options

93

- launcher_binary (str): Path to launcher binary

94

- worker_binary (str): Path to worker binary

95

- process_line_callback (Callable): Optional callback for output processing

96

"""

97

```

98

99

### BeamAsyncHook

100

101

Asynchronous hook providing non-blocking interface to Apache Beam pipeline execution with concurrent operation support.

102

103

```python { .api }

104

class BeamAsyncHook(BeamHook):

105

def __init__(self, runner: str) -> None:

106

"""

107

Initialize asynchronous Beam hook.

108

109

Parameters:

110

- runner (str): Runner type for pipeline execution

111

"""

112

113

async def start_python_pipeline_async(

114

self,

115

variables: dict,

116

py_file: str,

117

py_options: list[str] | None = None,

118

py_interpreter: str = "python3",

119

py_requirements: list[str] | None = None,

120

py_system_site_packages: bool = False,

121

process_line_callback: Callable[[str], None] | None = None,

122

):

123

"""

124

Start Apache Beam python pipeline asynchronously.

125

126

Parameters:

127

- variables (dict): Pipeline execution variables and options

128

- py_file (str): Path to Python pipeline file

129

- py_options (list[str]): Additional Python command-line options

130

- py_interpreter (str): Python interpreter version

131

- py_requirements (list[str]): Python packages for virtual environment

132

- py_system_site_packages (bool): Include system packages in venv

133

- process_line_callback (Callable): Optional callback for output processing

134

135

Returns:

136

- int: Pipeline execution return code

137

"""

138

139

async def start_java_pipeline_async(

140

self,

141

variables: dict,

142

jar: str,

143

job_class: str | None = None,

144

process_line_callback: Callable[[str], None] | None = None,

145

) -> int:

146

"""

147

Start Apache Beam Java pipeline asynchronously.

148

149

Parameters:

150

- variables (dict): Pipeline execution variables and options

151

- jar (str): Path to JAR file containing pipeline

152

- job_class (str): Java class name for pipeline execution

153

- process_line_callback (Callable): Optional callback for output processing

154

155

Returns:

156

- int: Pipeline execution return code

157

"""

158

159

async def start_pipeline_async(

160

self,

161

variables: dict,

162

command_prefix: list[str],

163

working_directory: str | None = None,

164

process_line_callback: Callable[[str], None] | None = None,

165

) -> int:

166

"""

167

Start Apache Beam pipeline with custom command asynchronously.

168

169

Parameters:

170

- variables (dict): Pipeline execution variables and options

171

- command_prefix (list[str]): Command prefix for pipeline execution

172

- working_directory (str): Directory for command execution

173

- process_line_callback (Callable): Optional callback for output processing

174

175

Returns:

176

- int: Pipeline execution return code

177

"""

178

```

179

180

### Runner Types and Utilities

181

182

```python { .api }

183

class BeamRunnerType:

184

"""Helper class for listing available runner types."""

185

DataflowRunner = "DataflowRunner"

186

DirectRunner = "DirectRunner"

187

SparkRunner = "SparkRunner"

188

FlinkRunner = "FlinkRunner"

189

SamzaRunner = "SamzaRunner"

190

NemoRunner = "NemoRunner"

191

JetRunner = "JetRunner"

192

Twister2Runner = "Twister2Runner"

193

194

def beam_options_to_args(options: dict) -> list[str]:

195

"""

196

Convert pipeline options dictionary to command line arguments.

197

198

Parameters:

199

- options (dict): Dictionary with pipeline options

200

201

Returns:

202

- list[str]: List of formatted command line arguments

203

"""

204

205

def run_beam_command(

206

cmd: list[str],

207

log: logging.Logger,

208

process_line_callback: Callable[[str], None] | None = None,

209

working_directory: str | None = None,

210

is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,

211

) -> None:

212

"""

213

Run pipeline command in subprocess with monitoring.

214

215

Parameters:

216

- cmd (list[str]): Command parts to execute

217

- log (logging.Logger): Logger for output

218

- process_line_callback (Callable): Optional output processor

219

- working_directory (str): Execution directory

220

- is_dataflow_job_id_exist_callback (Callable): Optional job ID detector

221

"""

222

```

223

224

### Usage Examples

225

226

#### Basic Hook Usage

227

228

```python

229

from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType

230

231

# Initialize hook

232

beam_hook = BeamHook(runner=BeamRunnerType.DirectRunner)

233

234

# Execute Python pipeline

235

beam_hook.start_python_pipeline(

236

variables={

237

'output': '/tmp/beam_output',

238

'temp_location': '/tmp/beam_temp',

239

},

240

py_file='/path/to/pipeline.py',

241

py_options=['-u'],

242

py_interpreter='python3',

243

)

244

```

245

246

#### Custom Process Monitoring

247

248

```python

249

def log_processor(line: str) -> None:

250

"""Custom processor for pipeline output."""

251

if 'ERROR' in line:

252

logger.error(f"Pipeline error: {line}")

253

elif 'INFO' in line:

254

logger.info(f"Pipeline info: {line}")

255

256

def job_id_detector() -> bool:

257

"""Check if Dataflow job ID has been extracted."""

258

return hasattr(beam_hook, 'dataflow_job_id') and beam_hook.dataflow_job_id

259

260

beam_hook.start_python_pipeline(

261

variables=pipeline_options,

262

py_file='gs://bucket/pipeline.py',

263

process_line_callback=log_processor,

264

is_dataflow_job_id_exist_callback=job_id_detector,

265

)

266

```

267

268

#### Asynchronous Pipeline Execution

269

270

```python

271

import asyncio

272

from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook

273

274

async def run_async_pipeline():

275

"""Execute pipeline asynchronously."""

276

async_hook = BeamAsyncHook(runner='DataflowRunner')

277

278

return_code = await async_hook.start_python_pipeline_async(

279

variables={

280

'project': 'my-gcp-project',

281

'region': 'us-central1',

282

'temp_location': 'gs://my-bucket/temp',

283

},

284

py_file='gs://my-bucket/pipeline.py',

285

py_requirements=['apache-beam[gcp]>=2.60.0'],

286

)

287

288

if return_code == 0:

289

print("Pipeline completed successfully")

290

else:

291

print(f"Pipeline failed with return code: {return_code}")

292

293

# Run the async pipeline

294

asyncio.run(run_async_pipeline())

295

```

296

297

#### Virtual Environment Management

298

299

```python

300

# Custom Python environment with specific packages

301

beam_hook.start_python_pipeline(

302

variables=pipeline_options,

303

py_file='/path/to/pipeline.py',

304

py_requirements=[

305

'apache-beam[gcp]==2.60.0',

306

'pandas==2.1.0',

307

'numpy==1.24.0',

308

'google-cloud-bigquery==3.11.0',

309

],

310

py_system_site_packages=False, # Isolated environment

311

py_interpreter='python3.10',

312

)

313

```

314

315

#### Java Pipeline with Custom Classpath

316

317

```python

318

beam_hook.start_java_pipeline(

319

variables={

320

'runner': 'DataflowRunner',

321

'project': 'my-gcp-project',

322

'region': 'us-central1',

323

'tempLocation': 'gs://my-bucket/temp',

324

},

325

jar='/path/to/pipeline.jar',

326

job_class='com.company.DataProcessingPipeline',

327

)

328

```

329

330

### Pipeline Options Processing

331

332

#### Option Format Handling

333

334

The `beam_options_to_args` function handles various option types:

335

336

```python

337

from airflow.providers.apache.beam.hooks.beam import beam_options_to_args

338

339

options = {

340

'runner': 'DataflowRunner',

341

'project': 'my-project',

342

'streaming': True, # Boolean flag

343

'labels': {'env': 'prod', 'team': 'data'}, # Dictionary

344

'experiments': ['enable_google_cloud_profiler', 'enable_streaming_engine'], # List

345

'numWorkers': 4, # Numeric value

346

'skipValidation': False, # False boolean (skipped)

347

'tempLocation': None, # None value (skipped)

348

}

349

350

args = beam_options_to_args(options)

351

# Results in: ['--runner=DataflowRunner', '--project=my-project', '--streaming',

352

# '--labels={"env":"prod","team":"data"}',

353

# '--experiments=enable_google_cloud_profiler', '--experiments=enable_streaming_engine',

354

# '--numWorkers=4']

355

```

356

357

### Error Handling and Monitoring

358

359

#### Process Monitoring

360

361

```python

362

def comprehensive_monitor(line: str) -> None:

363

"""Comprehensive pipeline output monitoring."""

364

import re

365

366

# Extract job ID from Dataflow output

367

job_id_match = re.search(r'Submitted job: ([a-zA-Z0-9\-]+)', line)

368

if job_id_match:

369

job_id = job_id_match.group(1)

370

print(f"Extracted Dataflow job ID: {job_id}")

371

372

# Monitor for errors

373

if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed']):

374

logger.error(f"Pipeline error detected: {line}")

375

376

# Track progress indicators

377

if 'Processing bundle' in line:

378

logger.info(f"Pipeline progress: {line}")

379

380

beam_hook.start_python_pipeline(

381

variables=pipeline_options,

382

py_file='pipeline.py',

383

process_line_callback=comprehensive_monitor,

384

)

385

```

386

387

#### Exception Handling

388

389

```python

390

from airflow.exceptions import AirflowException

391

392

try:

393

beam_hook.start_python_pipeline(

394

variables=pipeline_options,

395

py_file='pipeline.py',

396

)

397

except AirflowException as e:

398

if "Apache Beam process failed" in str(e):

399

logger.error(f"Beam pipeline execution failed: {e}")

400

# Handle pipeline failure

401

else:

402

logger.error(f"Hook error: {e}")

403

# Handle other errors

404

```