0
# Facets and Metadata Enrichment
1
2
Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, debug data, and unknown operator handling. Facets provide extensible metadata enrichment for comprehensive lineage tracking.
3
4
## Capabilities
5
6
### Airflow Run Facets
7
8
Comprehensive runtime metadata facets that capture Airflow-specific execution context.
9
10
```python { .api }
11
class AirflowRunFacet:
12
"""
13
Composite run facet containing comprehensive Airflow execution metadata.
14
"""
15
16
dag: dict # DAG metadata and configuration
17
dagRun: dict # DAG run instance information
18
task: dict # Task/operator metadata
19
taskInstance: dict # Task instance execution details
20
taskUuid: str # Unique task identifier
21
22
class AirflowDagRunFacet:
23
"""
24
DAG run specific facet with DAG and run metadata.
25
"""
26
27
dag: dict # DAG configuration and properties
28
dagRun: dict # DAG run execution details
29
30
class AirflowStateRunFacet:
31
"""
32
DAG and task state information facet for tracking execution states.
33
"""
34
35
dagRunState: str # Current DAG run state
36
tasksState: dict[str, str] # Mapping of task IDs to their current states
37
```
38
39
### Airflow Job Facets
40
41
Job-level metadata facets that describe DAG structure and task organization.
42
43
```python { .api }
44
class AirflowJobFacet:
45
"""
46
Composite job facet with task tree, groups, and task metadata.
47
"""
48
49
taskTree: dict # Hierarchical task dependency structure
50
taskGroups: dict # Task group organization and metadata
51
tasks: dict # Individual task metadata and configuration
52
```
53
54
### Mapped Task Facets
55
56
Facets specific to dynamic task mapping functionality in Airflow.
57
58
```python { .api }
59
class AirflowMappedTaskRunFacet:
60
"""
61
Facet for mapped task information and dynamic task execution metadata.
62
"""
63
64
mapIndex: int # Index of this mapped task instance
65
operatorClass: str # Fully qualified operator class name
66
67
@classmethod
68
def from_task_instance(cls, task_instance):
69
"""
70
Create facet from task instance.
71
72
Args:
73
task_instance: Airflow task instance
74
75
Returns:
76
AirflowMappedTaskRunFacet: Facet with mapped task metadata
77
"""
78
```
79
80
### Debug and Development Facets
81
82
Facets for debugging, development, and operational visibility.
83
84
```python { .api }
85
class AirflowDebugRunFacet:
86
"""
87
Debug information facet with package details and system information.
88
89
Includes comprehensive debugging metadata when debug mode is enabled,
90
potentially creating large events with detailed system information.
91
"""
92
93
packages: dict # Installed packages and their versions
94
95
class UnknownOperatorInstance:
96
"""
97
Descriptor for unknown or unrecognized operators.
98
"""
99
100
name: str # Operator name or identifier
101
properties: dict[str, object] # Operator properties and attributes
102
type: str # Operator type classification
103
104
class UnknownOperatorAttributeRunFacet:
105
"""
106
Facet for capturing information about unknown or unhandled operators.
107
108
Provides visibility into operators that don't have specific extractors
109
while still capturing basic metadata for lineage tracking.
110
"""
111
112
unknownItems: list[UnknownOperatorInstance] # List of unknown operator instances
113
```
114
115
## Usage Examples
116
117
### Creating Airflow Run Facets
118
119
```python
120
from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
121
from airflow.models import TaskInstance, DagRun
122
123
def create_run_facet(task_instance: TaskInstance, dag_run: DagRun):
124
"""Create comprehensive Airflow run facet."""
125
126
facet = AirflowRunFacet(
127
dag={
128
'dag_id': dag_run.dag_id,
129
'schedule_interval': str(dag_run.dag.schedule_interval),
130
'start_date': dag_run.dag.start_date.isoformat(),
131
'tags': dag_run.dag.tags,
132
'owner': dag_run.dag.owner
133
},
134
dagRun={
135
'run_id': dag_run.run_id,
136
'execution_date': dag_run.execution_date.isoformat(),
137
'start_date': dag_run.start_date.isoformat() if dag_run.start_date else None,
138
'end_date': dag_run.end_date.isoformat() if dag_run.end_date else None,
139
'state': dag_run.state,
140
'run_type': dag_run.run_type
141
},
142
task={
143
'task_id': task_instance.task_id,
144
'operator_class': task_instance.operator.__class__.__name__,
145
'pool': task_instance.pool,
146
'queue': task_instance.queue,
147
'priority_weight': task_instance.priority_weight
148
},
149
taskInstance={
150
'try_number': task_instance.try_number,
151
'max_tries': task_instance.max_tries,
152
'start_date': task_instance.start_date.isoformat() if task_instance.start_date else None,
153
'end_date': task_instance.end_date.isoformat() if task_instance.end_date else None,
154
'duration': task_instance.duration,
155
'state': task_instance.state,
156
'hostname': task_instance.hostname,
157
'pid': task_instance.pid
158
},
159
taskUuid=f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"
160
)
161
162
return facet
163
```
164
165
### Working with State Facets
166
167
```python
168
from airflow.providers.openlineage.plugins.facets import AirflowStateRunFacet
169
from airflow.models import DagRun
170
171
def create_state_facet(dag_run: DagRun):
172
"""Create state tracking facet."""
173
174
# Get task states from DAG run
175
task_states = {}
176
for task_instance in dag_run.get_task_instances():
177
task_states[task_instance.task_id] = task_instance.state or 'none'
178
179
facet = AirflowStateRunFacet(
180
dagRunState=dag_run.state or 'none',
181
tasksState=task_states
182
)
183
184
return facet
185
186
# Usage in lineage extraction
187
state_facet = create_state_facet(dag_run)
188
run_facets = {
189
'airflow_state': state_facet
190
}
191
```
192
193
### Mapped Task Facets
194
195
```python
196
from airflow.providers.openlineage.plugins.facets import AirflowMappedTaskRunFacet
197
from airflow.models import TaskInstance
198
199
def handle_mapped_task(task_instance: TaskInstance):
200
"""Handle mapped task metadata extraction."""
201
202
if hasattr(task_instance, 'map_index') and task_instance.map_index is not None:
203
# Create mapped task facet
204
mapped_facet = AirflowMappedTaskRunFacet.from_task_instance(task_instance)
205
206
print(f"Mapped task index: {mapped_facet.mapIndex}")
207
print(f"Operator class: {mapped_facet.operatorClass}")
208
209
return {
210
'airflow_mapped_task': mapped_facet
211
}
212
213
return {}
214
215
# Example with mapped operator
216
from airflow.operators.python import PythonOperator
217
218
@dag
219
def mapped_dag():
220
def process_item(item):
221
return f"Processed {item}"
222
223
# Mapped task creates multiple instances
224
mapped_task = PythonOperator.partial(
225
task_id='process_items',
226
python_callable=process_item
227
).expand(op_kwargs=[{'item': i} for i in range(5)])
228
229
return mapped_task
230
231
# Each mapped instance gets its own facet with map_index
232
```
233
234
### Job Structure Facets
235
236
```python
237
from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
238
from airflow.models import DAG, DagRun
239
240
def create_job_facet(dag: DAG, dag_run: DagRun):
241
"""Create job facet with DAG structure information."""
242
243
# Build task tree
244
task_tree = {}
245
for task_id, task in dag.task_dict.items():
246
task_tree[task_id] = {
247
'operator_class': task.__class__.__name__,
248
'upstream_task_ids': list(task.upstream_task_ids),
249
'downstream_task_ids': list(task.downstream_task_ids)
250
}
251
252
# Build task groups
253
task_groups = {}
254
if hasattr(dag, 'task_group_dict'):
255
for group_id, group in dag.task_group_dict.items():
256
task_groups[group_id] = {
257
'tooltip': group.tooltip,
258
'ui_color': group.ui_color,
259
'ui_fgcolor': group.ui_fgcolor,
260
'children': list(group.children.keys())
261
}
262
263
# Build tasks metadata
264
tasks = {}
265
for task_id, task in dag.task_dict.items():
266
tasks[task_id] = {
267
'operator_class': task.__class__.__name__,
268
'template_fields': list(task.template_fields) if task.template_fields else [],
269
'pool': task.pool,
270
'queue': task.queue,
271
'retries': task.retries,
272
'retry_delay': str(task.retry_delay) if task.retry_delay else None
273
}
274
275
facet = AirflowJobFacet(
276
taskTree=task_tree,
277
taskGroups=task_groups,
278
tasks=tasks
279
)
280
281
return facet
282
```
283
284
### Unknown Operator Handling
285
286
```python
287
from airflow.providers.openlineage.plugins.facets import (
288
UnknownOperatorInstance,
289
UnknownOperatorAttributeRunFacet
290
)
291
from airflow.models import BaseOperator
292
293
def handle_unknown_operator(operator: BaseOperator):
294
"""Handle operators without specific extractors."""
295
296
# Extract basic operator properties
297
properties = {}
298
for attr in dir(operator):
299
if not attr.startswith('_') and not callable(getattr(operator, attr)):
300
try:
301
value = getattr(operator, attr)
302
# Only include serializable values
303
if isinstance(value, (str, int, float, bool, list, dict)):
304
properties[attr] = value
305
except Exception:
306
continue # Skip problematic attributes
307
308
# Create unknown operator instance
309
unknown_instance = UnknownOperatorInstance(
310
name=operator.task_id,
311
properties=properties,
312
type=operator.__class__.__name__
313
)
314
315
# Create facet
316
facet = UnknownOperatorAttributeRunFacet(
317
unknownItems=[unknown_instance]
318
)
319
320
return {
321
'airflow_unknown_operator': facet
322
}
323
324
# Usage in custom extractor
325
class GenericExtractor(BaseExtractor):
326
def extract(self):
327
# Try specific extraction first
328
if hasattr(self._operator, 'get_openlineage_facets_on_start'):
329
return self._operator.get_openlineage_facets_on_start()
330
331
# Fall back to unknown operator handling
332
unknown_facets = handle_unknown_operator(self._operator)
333
334
return OperatorLineage(
335
inputs=[],
336
outputs=[],
337
run_facets=unknown_facets,
338
job_facets={}
339
)
340
```
341
342
### Debug Facets
343
344
```python
345
from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
346
import pkg_resources
347
348
def create_debug_facet():
349
"""Create debug facet with system information."""
350
351
# Get installed packages
352
packages = {}
353
for dist in pkg_resources.working_set:
354
packages[dist.project_name] = dist.version
355
356
facet = AirflowDebugRunFacet(
357
packages=packages
358
)
359
360
return facet
361
362
# Usage with configuration check
363
from airflow.providers.openlineage.conf import debug_mode
364
365
def get_debug_facets():
366
"""Get debug facets if debug mode is enabled."""
367
368
if debug_mode():
369
return {
370
'airflow_debug': create_debug_facet()
371
}
372
return {}
373
```
374
375
## Integration with Lineage Extraction
376
377
### Using Facets in Extractors
378
379
```python
380
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
381
from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
382
383
class CustomExtractor(BaseExtractor):
384
def extract(self):
385
# Basic lineage
386
lineage = OperatorLineage(
387
inputs=[],
388
outputs=[],
389
run_facets={},
390
job_facets={}
391
)
392
393
# Add Airflow-specific facets
394
if hasattr(self, '_task_instance') and hasattr(self, '_dag_run'):
395
airflow_facet = create_run_facet(self._task_instance, self._dag_run)
396
lineage.run_facets['airflow'] = airflow_facet
397
398
return lineage
399
```
400
401
### Facet Utilities
402
403
```python
404
from airflow.providers.openlineage.utils.utils import (
405
get_airflow_run_facet,
406
get_airflow_job_facet,
407
get_airflow_state_run_facet,
408
get_airflow_debug_facet
409
)
410
411
def get_comprehensive_facets(task_instance, dag_run, dag):
412
"""Get all available Airflow facets."""
413
414
facets = {}
415
416
# Add run facet
417
run_facet = get_airflow_run_facet(
418
dag_run=dag_run,
419
dag=dag,
420
task_instance=task_instance,
421
task=task_instance.task,
422
task_uuid=f"{dag_run.dag_id}.{task_instance.task_id}"
423
)
424
facets.update(run_facet)
425
426
# Add job facet
427
job_facet = get_airflow_job_facet(dag_run)
428
facets.update(job_facet)
429
430
# Add state facet
431
state_facet = get_airflow_state_run_facet(dag_run, dag, task_instance)
432
facets.update(state_facet)
433
434
# Add debug facet if enabled
435
debug_facet = get_airflow_debug_facet()
436
facets.update(debug_facet)
437
438
return facets
439
```
440
441
## Custom Facet Development
442
443
### Creating Custom Facets
444
445
```python
446
from openlineage.client.facet import RunFacet
447
from dataclasses import dataclass
448
from typing import Dict, Any
449
450
@dataclass
451
class CustomProcessingRunFacet(RunFacet):
452
"""Custom facet for processing statistics."""
453
454
records_processed: int
455
processing_time_seconds: float
456
memory_usage_mb: float
457
error_count: int
458
warnings: list[str]
459
460
@staticmethod
461
def _get_schema() -> str:
462
return "https://my-company.com/schemas/custom_processing_run_facet.json"
463
464
# Usage in extractor
465
def extract_with_custom_facet(self):
466
# ... extraction logic ...
467
468
custom_facet = CustomProcessingRunFacet(
469
records_processed=1000,
470
processing_time_seconds=45.2,
471
memory_usage_mb=128,
472
error_count=0,
473
warnings=[]
474
)
475
476
return OperatorLineage(
477
inputs=inputs,
478
outputs=outputs,
479
run_facets={
480
'custom_processing': custom_facet
481
},
482
job_facets={}
483
)
484
```
485
486
### Facet Registration
487
488
```python
489
# Register custom facets via configuration
490
# In airflow.cfg:
491
[openlineage]
492
custom_run_facets = my_package.facets.custom_processing_facet;my_package.facets.performance_facet
493
494
# The functions should return dict[str, RunFacet]
495
def custom_processing_facet(task_instance, **kwargs):
496
return {
497
'custom_processing': CustomProcessingRunFacet(...)
498
}
499
```