0
# Plugin Integration
1
2
Core plugin components for Airflow integration, including event listeners, adapters, automatic event emission, and OpenLineage client management. This integration provides seamless data lineage tracking across the entire Airflow ecosystem.
3
4
## Capabilities
5
6
### OpenLineage Adapter
7
8
Core adapter for creating and emitting OpenLineage events, managing the OpenLineage client, and coordinating event lifecycle.
9
10
```python { .api }
11
class OpenLineageAdapter:
12
"""
13
Core adapter for OpenLineage event creation and emission.
14
15
Manages OpenLineage client instances, event building, and emission to
16
configured transport backends.
17
"""
18
19
def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None):
20
"""
21
Initialize adapter with optional client and secrets masker.
22
23
Args:
24
client: Pre-configured OpenLineage client
25
secrets_masker: Secrets masking utility for sensitive data
26
"""
27
28
def get_or_create_openlineage_client(self) -> OpenLineageClient:
29
"""
30
Get existing or create new OpenLineage client from configuration.
31
32
Returns:
33
OpenLineageClient: Configured client for event emission
34
"""
35
36
def get_openlineage_config(self) -> dict | None:
37
"""
38
Get complete OpenLineage configuration dictionary.
39
40
Returns:
41
dict: Configuration settings or None if not configured
42
"""
43
44
@staticmethod
45
def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: int) -> str:
46
"""
47
Build unique DAG run identifier for OpenLineage events.
48
49
Args:
50
dag_id: DAG identifier
51
logical_date: DAG run logical execution date
52
clear_number: Clear/retry number for the run
53
54
Returns:
55
str: Unique DAG run identifier
56
"""
57
58
@staticmethod
59
def build_task_instance_run_id(
60
dag_id: str,
61
task_id: str,
62
execution_date: datetime,
63
try_number: int
64
) -> str:
65
"""
66
Build unique task instance run identifier.
67
68
Args:
69
dag_id: DAG identifier
70
task_id: Task identifier
71
execution_date: Task execution date
72
try_number: Task attempt number
73
74
Returns:
75
str: Unique task run identifier
76
"""
77
78
def emit(self, event: RunEvent) -> RunEvent:
79
"""
80
Emit OpenLineage event to configured transport.
81
82
Args:
83
event: OpenLineage run event to emit
84
85
Returns:
86
RunEvent: The emitted event (for chaining/logging)
87
"""
88
89
def start_task(
90
self,
91
run_id: str,
92
job_name: str,
93
job_description: str,
94
event_time: datetime,
95
parent_run_id: str | None,
96
code_location: str | None,
97
nominal_start_time: datetime | None,
98
inputs: list[Dataset],
99
outputs: list[Dataset],
100
run_facets: dict[str, RunFacet] | None = None,
101
job_facets: dict[str, JobFacet] | None = None
102
) -> RunEvent:
103
"""
104
Create and emit task start event.
105
106
Args:
107
run_id: Unique run identifier
108
job_name: Job name
109
job_description: Job description
110
event_time: Event timestamp
111
parent_run_id: Parent run identifier (for nested jobs)
112
code_location: Source code location
113
nominal_start_time: Scheduled start time
114
inputs: Input datasets
115
outputs: Output datasets
116
run_facets: Runtime metadata facets
117
job_facets: Job-level metadata facets
118
119
Returns:
120
RunEvent: Created and emitted start event
121
"""
122
123
def complete_task(
124
self,
125
run_id: str,
126
job_name: str,
127
job_description: str,
128
event_time: datetime,
129
parent_run_id: str | None,
130
code_location: str | None,
131
nominal_start_time: datetime | None,
132
inputs: list[Dataset],
133
outputs: list[Dataset],
134
run_facets: dict[str, RunFacet] | None = None,
135
job_facets: dict[str, JobFacet] | None = None
136
) -> RunEvent:
137
"""
138
Create and emit task completion event.
139
140
Args:
141
run_id: Unique run identifier
142
job_name: Job name
143
job_description: Job description
144
event_time: Event timestamp
145
parent_run_id: Parent run identifier
146
code_location: Source code location
147
nominal_start_time: Scheduled start time
148
inputs: Input datasets
149
outputs: Output datasets
150
run_facets: Runtime metadata facets
151
job_facets: Job-level metadata facets
152
153
Returns:
154
RunEvent: Created and emitted completion event
155
"""
156
157
def fail_task(
158
self,
159
run_id: str,
160
job_name: str,
161
job_description: str,
162
event_time: datetime,
163
parent_run_id: str | None,
164
code_location: str | None,
165
nominal_start_time: datetime | None,
166
inputs: list[Dataset],
167
outputs: list[Dataset],
168
run_facets: dict[str, RunFacet] | None = None,
169
job_facets: dict[str, JobFacet] | None = None
170
) -> RunEvent:
171
"""
172
Create and emit task failure event.
173
174
Args:
175
run_id: Unique run identifier
176
job_name: Job name
177
job_description: Job description
178
event_time: Event timestamp
179
parent_run_id: Parent run identifier
180
code_location: Source code location
181
nominal_start_time: Scheduled start time
182
inputs: Input datasets
183
outputs: Output datasets
184
run_facets: Runtime metadata facets
185
job_facets: Job-level metadata facets
186
187
Returns:
188
RunEvent: Created and emitted failure event
189
"""
190
191
def dag_started(
192
self,
193
dag_run: DagRun,
194
msg: str,
195
nominal_start_time: datetime,
196
dag: DAG
197
):
198
"""
199
Handle DAG start event and emit corresponding OpenLineage event.
200
201
Args:
202
dag_run: Started DAG run instance
203
msg: Event message
204
nominal_start_time: Scheduled start time
205
dag: DAG instance
206
"""
207
208
def dag_success(
209
self,
210
dag_run: DagRun,
211
msg: str,
212
nominal_start_time: datetime,
213
dag: DAG
214
):
215
"""
216
Handle DAG success event and emit corresponding OpenLineage event.
217
218
Args:
219
dag_run: Successful DAG run instance
220
msg: Event message
221
nominal_start_time: Scheduled start time
222
dag: DAG instance
223
"""
224
225
def dag_failed(
226
self,
227
dag_run: DagRun,
228
msg: str,
229
nominal_start_time: datetime,
230
dag: DAG
231
):
232
"""
233
Handle DAG failure event and emit corresponding OpenLineage event.
234
235
Args:
236
dag_run: Failed DAG run instance
237
msg: Event message
238
nominal_start_time: Scheduled start time
239
dag: DAG instance
240
"""
241
```
242
243
### OpenLineage Event Listener
244
245
Event listener that captures Airflow DAG and task lifecycle events and coordinates with the adapter for OpenLineage event emission.
246
247
```python { .api }
248
class OpenLineageListener:
249
"""
250
Event listener for DAG and task lifecycle events.
251
252
Integrates with Airflow's event system to automatically capture
253
DAG runs, task instances, and their state changes for lineage tracking.
254
"""
255
# Implementation details are internal - provides event listening capabilities
256
257
def get_openlineage_listener() -> OpenLineageListener:
258
"""
259
Get singleton instance of OpenLineage event listener.
260
261
Returns:
262
OpenLineageListener: Global listener instance for event capture
263
"""
264
```
265
266
### Main Plugin Class
267
268
Primary Airflow plugin that registers OpenLineage functionality with the Airflow ecosystem.
269
270
```python { .api }
271
class OpenLineageProviderPlugin:
272
"""
273
Main Airflow plugin class for OpenLineage integration.
274
275
Automatically registers OpenLineage listeners, macros, and other
276
components when the provider package is installed.
277
"""
278
```
279
280
### Run State Enumeration
281
282
Enumeration for OpenLineage run states used in event creation.
283
284
```python { .api }
285
class RunState(Enum):
286
"""
287
Enumeration for OpenLineage run states.
288
289
Values:
290
START: Job/task is starting
291
RUNNING: Job/task is currently running
292
COMPLETE: Job/task completed successfully
293
ABORT: Job/task was aborted
294
FAIL: Job/task failed with error
295
"""
296
START = "START"
297
RUNNING = "RUNNING"
298
COMPLETE = "COMPLETE"
299
ABORT = "ABORT"
300
FAIL = "FAIL"
301
```
302
303
## Usage Examples
304
305
### Basic Adapter Usage
306
307
```python
308
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
309
from openlineage.client.event_v2 import Dataset
310
from datetime import datetime
311
312
# Initialize adapter
313
adapter = OpenLineageAdapter()
314
315
# Create datasets
316
inputs = [Dataset(namespace='db', name='raw.users')]
317
outputs = [Dataset(namespace='db', name='analytics.user_summary')]
318
319
# Emit task start event
320
start_event = adapter.start_task(
321
run_id='my-task-run-123',
322
job_name='process_users',
323
job_description='Process user data for analytics',
324
event_time=datetime.utcnow(),
325
parent_run_id=None,
326
code_location='dags/user_processing.py',
327
nominal_start_time=datetime.utcnow(),
328
inputs=inputs,
329
outputs=outputs,
330
run_facets={},
331
job_facets={}
332
)
333
334
print(f"Emitted start event: {start_event.eventType}")
335
```
336
337
### Manual Event Emission
338
339
```python
340
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
341
from openlineage.client.event_v2 import RunEvent, Run, Job
342
from datetime import datetime
343
344
adapter = OpenLineageAdapter()
345
346
# Create custom run event
347
event = RunEvent(
348
eventTime=datetime.utcnow(),
349
eventType='START',
350
run=Run(runId='custom-run-456'),
351
job=Job(namespace='my-namespace', name='custom-job'),
352
inputs=[],
353
outputs=[],
354
producer='airflow-openlineage-provider'
355
)
356
357
# Emit event
358
emitted_event = adapter.emit(event)
359
print(f"Emitted custom event: {emitted_event}")
360
```
361
362
### Client Configuration
363
364
```python
365
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
366
367
# Initialize adapter (will create client from config)
368
adapter = OpenLineageAdapter()
369
370
# Get client for direct usage
371
client = adapter.get_or_create_openlineage_client()
372
print(f"Client transport: {client.transport}")
373
374
# Get configuration
375
config = adapter.get_openlineage_config()
376
if config:
377
print(f"Transport type: {config.get('transport', {}).get('type')}")
378
print(f"Namespace: {config.get('namespace')}")
379
```
380
381
### Run ID Generation
382
383
```python
384
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
385
from datetime import datetime
386
387
# Generate DAG run ID
388
dag_run_id = OpenLineageAdapter.build_dag_run_id(
389
dag_id='user_processing_dag',
390
logical_date=datetime(2023, 12, 1),
391
clear_number=0
392
)
393
print(f"DAG run ID: {dag_run_id}")
394
395
# Generate task run ID
396
task_run_id = OpenLineageAdapter.build_task_instance_run_id(
397
dag_id='user_processing_dag',
398
task_id='extract_users',
399
execution_date=datetime(2023, 12, 1),
400
try_number=1
401
)
402
print(f"Task run ID: {task_run_id}")
403
```
404
405
### DAG Event Handling
406
407
```python
408
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
409
from airflow.models import DagRun, DAG
410
from datetime import datetime
411
412
adapter = OpenLineageAdapter()
413
414
# Handle DAG lifecycle events
415
dag_run = DagRun(
416
dag_id='my_dag',
417
execution_date=datetime.utcnow(),
418
run_id='manual_2023-12-01'
419
)
420
421
dag = DAG('my_dag', start_date=datetime(2023, 1, 1))
422
423
# Emit DAG start event
424
adapter.dag_started(
425
dag_run=dag_run,
426
msg='DAG started execution',
427
nominal_start_time=datetime.utcnow(),
428
dag=dag
429
)
430
431
# Later, emit DAG completion
432
adapter.dag_success(
433
dag_run=dag_run,
434
msg='DAG completed successfully',
435
nominal_start_time=datetime.utcnow(),
436
dag=dag
437
)
438
```
439
440
### Integration with Task Context
441
442
```python
443
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
444
from airflow.operators.python import PythonOperator
445
446
def my_task_function(**context):
447
# Access adapter within task
448
adapter = OpenLineageAdapter()
449
450
# Get task instance info
451
task_instance = context['task_instance']
452
453
# Build run ID for current task
454
run_id = OpenLineageAdapter.build_task_instance_run_id(
455
dag_id=task_instance.dag_id,
456
task_id=task_instance.task_id,
457
execution_date=task_instance.execution_date,
458
try_number=task_instance.try_number
459
)
460
461
print(f"Current task run ID: {run_id}")
462
463
# Task logic here...
464
465
# Use in DAG
466
task = PythonOperator(
467
task_id='my_task',
468
python_callable=my_task_function,
469
provide_context=True,
470
dag=dag
471
)
472
```
473
474
## Automatic Integration
475
476
The plugin integrates automatically when the provider is installed:
477
478
### Plugin Registration
479
480
```python
481
# In pyproject.toml
482
[project.entry-points."airflow.plugins"]
483
openlineage = "airflow.providers.openlineage.plugins.openlineage:OpenLineageProviderPlugin"
484
```
485
486
### Listener Registration
487
488
The plugin automatically registers the OpenLineage listener with Airflow's event system:
489
490
```python
491
# Automatic registration in Airflow
492
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
493
494
# Listener is automatically registered to capture:
495
# - DAG run events (start, success, failure)
496
# - Task instance events (start, success, failure, retry)
497
# - Task state changes
498
```
499
500
### Configuration Integration
501
502
The adapter automatically reads configuration from Airflow settings:
503
504
```ini
505
# airflow.cfg
506
[openlineage]
507
transport = {"type": "http", "url": "http://marquez:5000"}
508
namespace = production_airflow
509
disabled = false
510
```
511
512
## Error Handling and Resilience
513
514
### Transport Failures
515
516
```python
517
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
518
519
adapter = OpenLineageAdapter()
520
521
try:
522
event = adapter.start_task(...)
523
print("Event emitted successfully")
524
except Exception as e:
525
print(f"Failed to emit event: {e}")
526
# Task continues executing - lineage failures don't break DAGs
527
```
528
529
### Client Recovery
530
531
```python
532
# Adapter handles client failures gracefully
533
adapter = OpenLineageAdapter()
534
535
# If client creation fails, adapter continues with no-op behavior
536
client = adapter.get_or_create_openlineage_client()
537
538
# Events may be silently dropped if transport is unavailable
539
# This ensures DAG execution is not impacted by lineage issues
540
```
541
542
## Advanced Integration Patterns
543
544
### Custom Transport Configuration
545
546
```python
547
from openlineage.client import OpenLineageClient
548
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
549
550
# Create custom client
551
custom_client = OpenLineageClient(
552
url='http://my-lineage-backend:8080',
553
session_timeout=30
554
)
555
556
# Use with adapter
557
adapter = OpenLineageAdapter(client=custom_client)
558
```
559
560
### Secrets Masking
561
562
```python
563
from openlineage.client.secrets import SecretsMasker
564
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
565
566
# Create custom secrets masker
567
masker = SecretsMasker(
568
patterns=['password', 'api_key', 'token'],
569
replacement='***MASKED***'
570
)
571
572
# Use with adapter
573
adapter = OpenLineageAdapter(secrets_masker=masker)
574
```