0
# Lineage Extraction Framework
1
2
Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, custom extractor registration, and centralized management. This framework enables automatic discovery of data flows and transformations across different operator types.
3
4
## Capabilities
5
6
### Base Extractor Classes
7
8
Core classes that define the extraction interface and data structures for lineage metadata.
9
10
```python { .api }
11
class OperatorLineage:
12
"""
13
Generic container for lineage data including inputs, outputs, and metadata facets.
14
"""
15
16
inputs: list[Dataset] # Input datasets consumed by the operation
17
outputs: list[Dataset] # Output datasets produced by the operation
18
run_facets: dict[str, BaseFacet] # Runtime metadata facets
19
job_facets: dict[str, BaseFacet] # Job-level metadata facets
20
21
class BaseExtractor:
22
"""
23
Abstract base class for implementing custom lineage extractors.
24
"""
25
26
def __init__(self, operator):
27
"""
28
Initialize extractor with operator instance.
29
30
Args:
31
operator: Airflow operator instance to extract lineage from
32
"""
33
34
@classmethod
35
def get_operator_classnames(cls) -> list[str]:
36
"""
37
Return list of operator class names this extractor handles.
38
39
Returns:
40
list[str]: Fully qualified operator class names
41
"""
42
43
def extract(self) -> OperatorLineage | None:
44
"""
45
Extract lineage metadata for task start events.
46
47
Returns:
48
OperatorLineage: Extracted lineage metadata or None if no extraction possible
49
"""
50
51
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
52
"""
53
Extract lineage metadata for task completion events.
54
55
Args:
56
task_instance: Completed task instance
57
58
Returns:
59
OperatorLineage: Extracted lineage metadata or None
60
"""
61
62
def extract_on_failure(self, task_instance) -> OperatorLineage | None:
63
"""
64
Extract lineage metadata for task failure events.
65
66
Args:
67
task_instance: Failed task instance
68
69
Returns:
70
OperatorLineage: Extracted lineage metadata or None
71
"""
72
73
class DefaultExtractor(BaseExtractor):
74
"""
75
Default implementation of BaseExtractor that provides fallback extraction.
76
77
Uses operator's built-in OpenLineage methods if available, otherwise
78
attempts to extract basic lineage from operator properties.
79
"""
80
```
81
82
### Extractor Manager
83
84
Central management system for registering extractors and coordinating lineage extraction.
85
86
```python { .api }
87
class ExtractorManager:
88
"""
89
Central manager for registering and executing lineage extractors.
90
"""
91
92
def __init__(self):
93
"""Initialize extractor manager with built-in extractors."""
94
95
def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
96
"""
97
Register a custom extractor for specific operator class.
98
99
Args:
100
operator_class: Fully qualified operator class name
101
extractor: Extractor class to handle the operator
102
"""
103
104
def extract_metadata(
105
self,
106
dagrun,
107
task,
108
task_instance_state: TaskInstanceState,
109
task_instance=None
110
) -> OperatorLineage:
111
"""
112
Extract lineage metadata for a task using appropriate extractor.
113
114
Args:
115
dagrun: DAG run instance
116
task: Task/operator instance
117
task_instance_state: Current task instance state
118
task_instance: Task instance (optional)
119
120
Returns:
121
OperatorLineage: Extracted lineage metadata
122
"""
123
124
def get_extractor_class(self, task: Operator) -> type[BaseExtractor] | None:
125
"""
126
Get appropriate extractor class for a task.
127
128
Args:
129
task: Airflow task/operator
130
131
Returns:
132
type[BaseExtractor]: Extractor class or None if no match
133
"""
134
135
def extract_inlets_and_outlets(self, task_metadata: OperatorLineage, task):
136
"""
137
Extract additional lineage from task's inlets and outlets properties.
138
139
Args:
140
task_metadata: Existing task lineage metadata
141
task: Airflow task instance
142
"""
143
144
def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:
145
"""
146
Extract lineage from database hooks if available.
147
148
Returns:
149
tuple: (input_datasets, output_datasets) or None
150
"""
151
152
@staticmethod
153
def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:
154
"""
155
Convert object storage URI to OpenLineage Dataset.
156
157
Args:
158
uri: Object storage URI (s3://, gcs://, etc.)
159
160
Returns:
161
Dataset: OpenLineage dataset or None if conversion fails
162
"""
163
164
@staticmethod
165
def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
166
"""
167
Convert Airflow Table object to OpenLineage Dataset.
168
169
Args:
170
table: Airflow Table instance
171
172
Returns:
173
Dataset: OpenLineage dataset representation
174
"""
175
176
@staticmethod
177
def convert_to_ol_dataset(obj) -> Dataset | None:
178
"""
179
Convert various object types to OpenLineage Dataset.
180
181
Args:
182
obj: Object to convert (URI string, Table, etc.)
183
184
Returns:
185
Dataset: OpenLineage dataset or None if conversion not supported
186
"""
187
188
def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:
189
"""
190
Validate and normalize task metadata.
191
192
Args:
193
task_metadata: Raw task metadata to validate
194
195
Returns:
196
OperatorLineage: Validated metadata or None if invalid
197
"""
198
```
199
200
### Built-in Extractors
201
202
Pre-built extractors for common Airflow operator types.
203
204
```python { .api }
205
class BashExtractor(BaseExtractor):
206
"""
207
Extractor for BashOperator tasks that captures command execution metadata.
208
209
Extracts command text, working directory, and environment variables
210
when source code inclusion is enabled.
211
"""
212
213
@classmethod
214
def get_operator_classnames(cls) -> list[str]:
215
"""Returns: ['airflow.operators.bash.BashOperator']"""
216
217
class PythonExtractor(BaseExtractor):
218
"""
219
Extractor for PythonOperator tasks that captures function execution metadata.
220
221
Extracts function source code, callable information, and context variables
222
when source code inclusion is enabled.
223
"""
224
225
@classmethod
226
def get_operator_classnames(cls) -> list[str]:
227
"""Returns: ['airflow.operators.python.PythonOperator']"""
228
```
229
230
### Extraction Constants
231
232
Method name constants for operator-level OpenLineage integration.
233
234
```python { .api }
235
OL_METHOD_NAME_START: str = "get_openlineage_facets_on_start"
236
"""Method name for start event lineage extraction in operators."""
237
238
OL_METHOD_NAME_COMPLETE: str = "get_openlineage_facets_on_complete"
239
"""Method name for completion event lineage extraction in operators."""
240
241
OL_METHOD_NAME_FAIL: str = "get_openlineage_facets_on_failure"
242
"""Method name for failure event lineage extraction in operators."""
243
```
244
245
## Usage Examples
246
247
### Creating Custom Extractor
248
249
```python
250
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
251
from openlineage.client.event_v2 import Dataset
252
from my_package.operators import CustomDataOperator
253
254
class CustomDataExtractor(BaseExtractor):
255
"""Custom extractor for CustomDataOperator."""
256
257
@classmethod
258
def get_operator_classnames(cls):
259
return ['my_package.operators.CustomDataOperator']
260
261
def extract(self):
262
# Extract lineage from operator properties
263
operator = self._operator
264
265
inputs = []
266
if hasattr(operator, 'input_path'):
267
inputs.append(Dataset(
268
namespace='file',
269
name=operator.input_path
270
))
271
272
outputs = []
273
if hasattr(operator, 'output_path'):
274
outputs.append(Dataset(
275
namespace='file',
276
name=operator.output_path
277
))
278
279
return OperatorLineage(
280
inputs=inputs,
281
outputs=outputs,
282
run_facets={},
283
job_facets={}
284
)
285
286
# Register custom extractor
287
from airflow.providers.openlineage.extractors.manager import ExtractorManager
288
289
manager = ExtractorManager()
290
manager.add_extractor('my_package.operators.CustomDataOperator', CustomDataExtractor)
291
```
292
293
### Using Extractor Manager
294
295
```python
296
from airflow.providers.openlineage.extractors.manager import ExtractorManager
297
from airflow.operators.python import PythonOperator
298
from airflow.utils.state import TaskInstanceState
299
300
# Initialize manager
301
manager = ExtractorManager()
302
303
# Create sample task
304
def my_function():
305
return "Hello World"
306
307
task = PythonOperator(
308
task_id='python_task',
309
python_callable=my_function,
310
dag=dag
311
)
312
313
# Extract lineage metadata
314
lineage = manager.extract_metadata(
315
dagrun=dag_run,
316
task=task,
317
task_instance_state=TaskInstanceState.RUNNING,
318
task_instance=task_instance
319
)
320
321
print(f"Inputs: {lineage.inputs}")
322
print(f"Outputs: {lineage.outputs}")
323
print(f"Run facets: {lineage.run_facets}")
324
```
325
326
### Dataset Conversion Utilities
327
328
```python
329
from airflow.providers.openlineage.extractors.manager import ExtractorManager
330
from airflow.models import Table
331
332
# Convert S3 URI to dataset
333
s3_uri = "s3://my-bucket/data/users.parquet"
334
s3_dataset = ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(s3_uri)
335
print(f"S3 Dataset: {s3_dataset}")
336
337
# Convert Airflow Table to dataset
338
table = Table(
339
name='users',
340
schema='public',
341
database='analytics'
342
)
343
table_dataset = ExtractorManager.convert_to_ol_dataset_from_table(table)
344
print(f"Table Dataset: {table_dataset}")
345
346
# Generic conversion
347
mixed_objects = [
348
"s3://bucket/file.csv",
349
table,
350
{"name": "custom_dataset"}
351
]
352
353
datasets = [
354
ExtractorManager.convert_to_ol_dataset(obj)
355
for obj in mixed_objects
356
if ExtractorManager.convert_to_ol_dataset(obj) is not None
357
]
358
print(f"Converted datasets: {datasets}")
359
```
360
361
### Operator Integration
362
363
Custom operators can implement OpenLineage methods directly:
364
365
```python
366
from airflow import BaseOperator
367
from airflow.providers.openlineage.extractors.base import OperatorLineage
368
from openlineage.client.event_v2 import Dataset
369
370
class MyCustomOperator(BaseOperator):
371
def __init__(self, input_file: str, output_file: str, **kwargs):
372
super().__init__(**kwargs)
373
self.input_file = input_file
374
self.output_file = output_file
375
376
def execute(self, context):
377
# Operator logic here
378
pass
379
380
def get_openlineage_facets_on_start(self) -> OperatorLineage:
381
"""Called when task starts."""
382
return OperatorLineage(
383
inputs=[Dataset(namespace='file', name=self.input_file)],
384
outputs=[Dataset(namespace='file', name=self.output_file)],
385
run_facets={},
386
job_facets={}
387
)
388
389
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
390
"""Called when task completes successfully."""
391
# Can include actual file sizes, row counts, etc.
392
return OperatorLineage(
393
inputs=[Dataset(namespace='file', name=self.input_file)],
394
outputs=[Dataset(namespace='file', name=self.output_file)],
395
run_facets={
396
'processing_stats': {
397
'rows_processed': 1000,
398
'execution_time': task_instance.duration
399
}
400
},
401
job_facets={}
402
)
403
```
404
405
### Extracting from Task Properties
406
407
```python
408
from airflow.providers.openlineage.extractors.manager import ExtractorManager
409
410
# Extract from task's inlets/outlets
411
task_with_lineage = PythonOperator(
412
task_id='process_data',
413
python_callable=my_function,
414
inlets=[
415
Dataset(namespace='db', name='raw.users'),
416
Dataset(namespace='db', name='raw.orders')
417
],
418
outlets=[
419
Dataset(namespace='db', name='analytics.user_metrics')
420
],
421
dag=dag
422
)
423
424
manager = ExtractorManager()
425
base_lineage = OperatorLineage(inputs=[], outputs=[], run_facets={}, job_facets={})
426
427
# This will merge inlets/outlets into the lineage
428
manager.extract_inlets_and_outlets(base_lineage, task_with_lineage)
429
430
print(f"Final inputs: {base_lineage.inputs}")
431
print(f"Final outputs: {base_lineage.outputs}")
432
```
433
434
## Extractor Registration
435
436
### Configuration-based Registration
437
438
Register extractors via Airflow configuration:
439
440
```ini
441
[openlineage]
442
extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor
443
```
444
445
### Programmatic Registration
446
447
```python
448
from airflow.providers.openlineage.extractors.manager import ExtractorManager
449
450
# Get the global manager instance
451
manager = ExtractorManager()
452
453
# Register multiple extractors
454
extractors = {
455
'my_package.operators.S3Operator': 'my_package.extractors.S3Extractor',
456
'my_package.operators.KafkaOperator': 'my_package.extractors.KafkaExtractor',
457
}
458
459
for operator_class, extractor_class in extractors.items():
460
# Import and register
461
extractor = __import__(extractor_class, fromlist=[''])
462
manager.add_extractor(operator_class, extractor)
463
```
464
465
## Advanced Patterns
466
467
### Conditional Extraction
468
469
```python
470
class ConditionalExtractor(BaseExtractor):
471
def extract(self):
472
# Only extract if certain conditions are met
473
if not hasattr(self._operator, 'enable_lineage') or not self._operator.enable_lineage:
474
return None
475
476
# Normal extraction logic
477
return OperatorLineage(...)
478
```
479
480
### Multi-format Support
481
482
```python
483
class FileExtractor(BaseExtractor):
484
def extract(self):
485
file_path = self._operator.file_path
486
487
# Determine format and create appropriate metadata
488
if file_path.endswith('.parquet'):
489
namespace = 'parquet'
490
elif file_path.endswith('.csv'):
491
namespace = 'csv'
492
else:
493
namespace = 'file'
494
495
return OperatorLineage(
496
inputs=[Dataset(namespace=namespace, name=file_path)],
497
outputs=[],
498
run_facets={},
499
job_facets={}
500
)
501
```