0
# Utility Functions and Helpers
1
2
General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, data conversion, and helper functions for common OpenLineage operations.
3
4
## Capabilities
5
6
### Operator Analysis Functions
7
8
Functions for analyzing and extracting information from Airflow operators.
9
10
```python { .api }
11
def get_operator_class(task: BaseOperator) -> type:
12
"""
13
Get the operator class from a task instance.
14
15
Args:
16
task: Airflow task/operator instance
17
18
Returns:
19
type: Operator class type
20
"""
21
22
def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str:
23
"""
24
Get the fully qualified class name of an operator.
25
26
Args:
27
operator: Airflow operator instance
28
29
Returns:
30
str: Fully qualified class name (e.g., 'airflow.operators.python.PythonOperator')
31
"""
32
33
def get_operator_provider_version(operator: BaseOperator | MappedOperator) -> str | None:
34
"""
35
Get the provider version for an operator.
36
37
Args:
38
operator: Airflow operator instance
39
40
Returns:
41
str | None: Provider version string or None if not available
42
"""
43
44
def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:
45
"""
46
Check if an operator is disabled for lineage collection.
47
48
Args:
49
operator: Airflow operator instance
50
51
Returns:
52
bool: True if operator is disabled for lineage
53
"""
54
```
55
56
### Task and Job Identification
57
58
Functions for generating job names and identifiers for OpenLineage events.
59
60
```python { .api }
61
def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str:
62
"""
63
Get the OpenLineage job name for a task instance.
64
65
Args:
66
task: Task instance
67
68
Returns:
69
str: Formatted job name for OpenLineage events
70
"""
71
```
72
73
### Documentation Extraction
74
75
Functions for extracting documentation and metadata from DAGs and tasks.
76
77
```python { .api }
78
def get_task_documentation(operator: BaseOperator | MappedOperator | None) -> tuple[str | None, str | None]:
79
"""
80
Extract documentation from a task/operator.
81
82
Args:
83
operator: Airflow operator instance
84
85
Returns:
86
tuple: (doc_md, description) - documentation markdown and description
87
"""
88
89
def get_dag_documentation(dag: DAG | None) -> tuple[str | None, str | None]:
90
"""
91
Extract documentation from a DAG.
92
93
Args:
94
dag: Airflow DAG instance
95
96
Returns:
97
tuple: (doc_md, description) - documentation markdown and description
98
"""
99
```
100
101
### Facet Generation Functions
102
103
Functions for generating Airflow-specific facets and metadata.
104
105
```python { .api }
106
def get_task_parent_run_facet(
107
task_instance: TaskInstance,
108
dag_run: DagRun,
109
dag: DAG
110
) -> dict[str, Any]:
111
"""
112
Get parent run facet information for a task.
113
114
Args:
115
task_instance: Task instance
116
dag_run: DAG run instance
117
dag: DAG instance
118
119
Returns:
120
dict: Parent run facet data
121
"""
122
123
def get_airflow_mapped_task_facet(task_instance: TaskInstance) -> dict[str, Any]:
124
"""
125
Get mapped task facet for dynamic task mapping.
126
127
Args:
128
task_instance: Task instance (mapped task)
129
130
Returns:
131
dict: Mapped task facet data
132
"""
133
134
def get_user_provided_run_facets(ti: TaskInstance, ti_state: TaskInstanceState) -> dict[str, RunFacet]:
135
"""
136
Get user-provided custom run facets.
137
138
Args:
139
ti: Task instance
140
ti_state: Task instance state
141
142
Returns:
143
dict: User-provided run facets
144
"""
145
146
def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, RunFacet]:
147
"""
148
Get Airflow DAG run facet.
149
150
Args:
151
dag_run: DAG run instance
152
153
Returns:
154
dict: DAG run facet data
155
"""
156
157
def get_processing_engine_facet() -> dict[str, processing_engine_run.ProcessingEngineRunFacet]:
158
"""
159
Get processing engine facet with Airflow version information.
160
161
Returns:
162
dict: Processing engine facet
163
"""
164
165
def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
166
"""
167
Get Airflow debug facet with system information.
168
169
Returns:
170
dict: Debug facet data (only if debug mode is enabled)
171
"""
172
173
def get_airflow_run_facet(
174
dag_run: DagRun,
175
dag: DAG,
176
task_instance: TaskInstance,
177
task: BaseOperator,
178
task_uuid: str
179
) -> dict[str, AirflowRunFacet]:
180
"""
181
Get comprehensive Airflow run facet.
182
183
Args:
184
dag_run: DAG run instance
185
dag: DAG instance
186
task_instance: Task instance
187
task: Task/operator
188
task_uuid: Unique task identifier
189
190
Returns:
191
dict: Comprehensive Airflow run facet
192
"""
193
194
def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]:
195
"""
196
Get Airflow job facet with DAG structure.
197
198
Args:
199
dag_run: DAG run instance
200
201
Returns:
202
dict: Airflow job facet data
203
"""
204
205
def get_airflow_state_run_facet(
206
dag_run: DagRun,
207
dag: DAG,
208
task_instance: TaskInstance
209
) -> dict[str, AirflowStateRunFacet]:
210
"""
211
Get Airflow state run facet.
212
213
Args:
214
dag_run: DAG run instance
215
dag: DAG instance
216
task_instance: Task instance
217
218
Returns:
219
dict: State run facet data
220
"""
221
222
def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str | None = None):
223
"""
224
Get unknown operator facet for unhandled operators.
225
226
Args:
227
task: Airflow task/operator
228
name: Optional name override
229
230
Returns:
231
dict: Unknown operator facet data
232
"""
233
```
234
235
### Data Conversion Functions
236
237
Functions for converting between different data formats and structures.
238
239
```python { .api }
240
def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None:
241
"""
242
Convert Airflow Asset to OpenLineage Dataset.
243
244
Args:
245
asset: Airflow Asset instance
246
lineage_context: Lineage extraction context
247
248
Returns:
249
OpenLineageDataset | None: Converted dataset or None if conversion fails
250
"""
251
```
252
253
### Configuration and Environment Functions
254
255
Functions for checking configuration and environment settings.
256
257
```python { .api }
258
def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bool:
259
"""
260
Check if selective lineage is enabled for a DAG or task.
261
262
Args:
263
obj: DAG or task instance
264
265
Returns:
266
bool: True if selective lineage is enabled
267
"""
268
269
def should_use_external_connection(hook) -> bool:
270
"""
271
Check if external connection should be used for a hook.
272
273
Args:
274
hook: Database or connection hook
275
276
Returns:
277
bool: True if external connection should be used
278
"""
279
```
280
281
### Utility Classes and Helpers
282
283
Helper classes for data serialization and processing.
284
285
```python { .api }
286
class InfoJsonEncodable:
287
"""Base class for JSON serializable info objects."""
288
289
class DagInfo(InfoJsonEncodable):
290
"""DAG information encoder for JSON serialization."""
291
292
class DagRunInfo(InfoJsonEncodable):
293
"""DAG run information encoder for JSON serialization."""
294
295
class TaskInstanceInfo(InfoJsonEncodable):
296
"""Task instance information encoder for JSON serialization."""
297
298
class AssetInfo(InfoJsonEncodable):
299
"""Asset information encoder for JSON serialization."""
300
301
class TaskInfo(InfoJsonEncodable):
302
"""Task information encoder for JSON serialization."""
303
304
class TaskInfoComplete(InfoJsonEncodable):
305
"""Complete task information encoder for JSON serialization."""
306
307
class TaskGroupInfo(InfoJsonEncodable):
308
"""Task group information encoder for JSON serialization."""
309
310
class OpenLineageRedactor:
311
"""OpenLineage-specific secrets redactor for sensitive data masking."""
312
313
def try_import_from_string(string: str) -> Any:
314
"""
315
Safe import utility with error handling.
316
317
Args:
318
string: Module or class path to import
319
320
Returns:
321
Any: Imported object or None if import fails
322
"""
323
324
def print_warning(log):
325
"""
326
Warning decorator function for logging warnings.
327
328
Args:
329
log: Logger instance
330
"""
331
```
332
333
### Version Compatibility
334
335
Version compatibility constants and utilities.
336
337
```python { .api }
338
AIRFLOW_V_3_0_PLUS: bool
339
"""Boolean flag indicating Airflow 3.0+ compatibility."""
340
```
341
342
## Usage Examples
343
344
### Operator Analysis
345
346
```python
347
from airflow.providers.openlineage.utils.utils import (
348
get_operator_class,
349
get_fully_qualified_class_name,
350
is_operator_disabled
351
)
352
from airflow.operators.python import PythonOperator
353
354
def my_function():
355
return "Hello World"
356
357
task = PythonOperator(
358
task_id='example_task',
359
python_callable=my_function,
360
dag=dag
361
)
362
363
# Analyze operator
364
operator_class = get_operator_class(task)
365
class_name = get_fully_qualified_class_name(task)
366
is_disabled = is_operator_disabled(task)
367
368
print(f"Operator class: {operator_class}")
369
print(f"Class name: {class_name}")
370
print(f"Is disabled: {is_disabled}")
371
```
372
373
### Documentation Extraction
374
375
```python
376
from airflow.providers.openlineage.utils.utils import get_task_documentation, get_dag_documentation
377
from airflow import DAG
378
from airflow.operators.python import PythonOperator
379
380
# DAG with documentation
381
dag = DAG(
382
'documented_dag',
383
start_date=datetime(2023, 1, 1),
384
description='A well-documented data processing pipeline',
385
doc_md="""
386
# Data Processing Pipeline
387
388
This pipeline processes user data and generates analytics reports.
389
"""
390
)
391
392
def documented_function():
393
"""Process user data and return results."""
394
return "Processing complete"
395
396
# Task with documentation
397
task = PythonOperator(
398
task_id='process_data',
399
python_callable=documented_function,
400
doc_md='Processes user data using advanced algorithms',
401
dag=dag
402
)
403
404
# Extract documentation
405
dag_doc_md, dag_description = get_dag_documentation(dag)
406
task_doc_md, task_description = get_task_documentation(task)
407
408
print(f"DAG documentation: {dag_doc_md}")
409
print(f"DAG description: {dag_description}")
410
print(f"Task documentation: {task_doc_md}")
411
print(f"Task description: {task_description}")
412
```
413
414
### Facet Generation
415
416
```python
417
from airflow.providers.openlineage.utils.utils import (
418
get_airflow_run_facet,
419
get_processing_engine_facet,
420
get_airflow_debug_facet
421
)
422
423
def generate_comprehensive_facets(dag_run, dag, task_instance, task):
424
"""Generate comprehensive facets for a task execution."""
425
426
# Generate task UUID
427
task_uuid = f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"
428
429
# Get all facets
430
facets = {}
431
432
# Airflow run facet
433
run_facet = get_airflow_run_facet(dag_run, dag, task_instance, task, task_uuid)
434
facets.update(run_facet)
435
436
# Processing engine facet
437
engine_facet = get_processing_engine_facet()
438
facets.update(engine_facet)
439
440
# Debug facet (if enabled)
441
debug_facet = get_airflow_debug_facet()
442
facets.update(debug_facet)
443
444
return facets
445
446
# Usage
447
comprehensive_facets = generate_comprehensive_facets(dag_run, dag, task_instance, task)
448
print(f"Generated facets: {list(comprehensive_facets.keys())}")
449
```
450
451
### Data Conversion
452
453
```python
454
from airflow.providers.openlineage.utils.utils import translate_airflow_asset
455
from airflow.models import Asset
456
457
# Create Airflow Asset
458
asset = Asset(
459
uri="s3://my-bucket/data/users.parquet",
460
name="user_data"
461
)
462
463
# Convert to OpenLineage Dataset
464
lineage_context = {'namespace': 'production'}
465
ol_dataset = translate_airflow_asset(asset, lineage_context)
466
467
if ol_dataset:
468
print(f"OpenLineage Dataset: {ol_dataset.namespace}/{ol_dataset.name}")
469
else:
470
print("Asset conversion failed")
471
```
472
473
### Safe Import Utility
474
475
```python
476
from airflow.providers.openlineage.utils.utils import try_import_from_string
477
478
# Safely import optional dependencies
479
pandas = try_import_from_string('pandas')
480
if pandas:
481
# Use pandas functionality
482
df = pandas.DataFrame({'a': [1, 2, 3]})
483
print("Pandas available")
484
else:
485
print("Pandas not available")
486
487
# Try importing custom modules
488
custom_processor = try_import_from_string('my_package.processors.CustomProcessor')
489
if custom_processor:
490
processor = custom_processor()
491
processor.process()
492
else:
493
print("Custom processor not available")
494
```
495
496
### JSON Serialization
497
498
```python
499
from airflow.providers.openlineage.utils.utils import DagInfo, TaskInfo, DagRunInfo
500
501
# Create serializable info objects
502
dag_info = DagInfo()
503
dag_info.dag_id = dag.dag_id
504
dag_info.schedule_interval = str(dag.schedule_interval)
505
dag_info.start_date = dag.start_date.isoformat()
506
507
task_info = TaskInfo()
508
task_info.task_id = task.task_id
509
task_info.operator_class = task.__class__.__name__
510
511
dag_run_info = DagRunInfo()
512
dag_run_info.run_id = dag_run.run_id
513
dag_run_info.execution_date = dag_run.execution_date.isoformat()
514
515
# Serialize to JSON
516
import json
517
serialized_data = {
518
'dag': dag_info.__dict__,
519
'task': task_info.__dict__,
520
'dag_run': dag_run_info.__dict__
521
}
522
523
json_string = json.dumps(serialized_data, indent=2)
524
print(f"Serialized data: {json_string}")
525
```
526
527
### Secrets Redaction
528
529
```python
530
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
531
532
# Create redactor
533
redactor = OpenLineageRedactor()
534
535
# Sample data with sensitive information
536
sensitive_data = {
537
'connection_string': 'postgresql://user:password@localhost/db',
538
'api_key': 'secret-api-key-12345',
539
'config': {
540
'database_password': 'super-secret',
541
'username': 'admin'
542
}
543
}
544
545
# Redact sensitive data
546
redacted_data = redactor.redact(sensitive_data)
547
print(f"Redacted data: {redacted_data}")
548
```
549
550
### Selective Lineage Checking
551
552
```python
553
from airflow.providers.openlineage.utils.utils import is_selective_lineage_enabled
554
from airflow.providers.openlineage.utils.selective_enable import enable_lineage
555
556
# Check DAG lineage status
557
if is_selective_lineage_enabled(dag):
558
print("Selective lineage is enabled for this DAG")
559
else:
560
print("Selective lineage is not enabled for this DAG")
561
562
# Enable selective lineage
563
enabled_dag = enable_lineage(dag)
564
565
# Check again
566
if is_selective_lineage_enabled(enabled_dag):
567
print("Selective lineage is now enabled")
568
```
569
570
### Hook Connection Analysis
571
572
```python
573
from airflow.providers.openlineage.utils.utils import should_use_external_connection
574
from airflow.hooks.postgres_hook import PostgresHook
575
576
# Create hook
577
hook = PostgresHook(postgres_conn_id='analytics_db')
578
579
# Check if external connection should be used
580
use_external = should_use_external_connection(hook)
581
582
if use_external:
583
print("Using external connection for lineage")
584
else:
585
print("Using standard connection handling")
586
```
587
588
## Integration Patterns
589
590
### Custom Extractor with Utilities
591
592
```python
593
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
594
from airflow.providers.openlineage.utils.utils import (
595
get_fully_qualified_class_name,
596
get_task_documentation,
597
get_airflow_run_facet
598
)
599
600
class UtilityExtractor(BaseExtractor):
601
def extract(self):
602
# Use utilities for comprehensive extraction
603
class_name = get_fully_qualified_class_name(self._operator)
604
doc_md, description = get_task_documentation(self._operator)
605
606
# Create lineage with metadata
607
lineage = OperatorLineage(
608
inputs=[],
609
outputs=[],
610
run_facets={
611
'operator_info': {
612
'class_name': class_name,
613
'documentation': doc_md,
614
'description': description
615
}
616
},
617
job_facets={}
618
)
619
620
return lineage
621
```
622
623
### Comprehensive Metadata Collection
624
625
```python
626
from airflow.providers.openlineage.utils.utils import *
627
628
def collect_comprehensive_metadata(dag, dag_run, task_instance, task):
629
"""Collect all available metadata using utility functions."""
630
631
metadata = {
632
'operator': {
633
'class': get_operator_class(task),
634
'fully_qualified_name': get_fully_qualified_class_name(task),
635
'provider_version': get_operator_provider_version(task),
636
'is_disabled': is_operator_disabled(task)
637
},
638
'job': {
639
'name': get_job_name(task_instance)
640
},
641
'documentation': {
642
'dag': get_dag_documentation(dag),
643
'task': get_task_documentation(task)
644
},
645
'facets': {
646
'run': get_airflow_run_facet(dag_run, dag, task_instance, task, 'uuid'),
647
'job': get_airflow_job_facet(dag_run),
648
'state': get_airflow_state_run_facet(dag_run, dag, task_instance),
649
'engine': get_processing_engine_facet(),
650
'debug': get_airflow_debug_facet()
651
},
652
'config': {
653
'selective_enabled': is_selective_lineage_enabled(task)
654
}
655
}
656
657
return metadata
658
659
# Usage
660
comprehensive_metadata = collect_comprehensive_metadata(dag, dag_run, task_instance, task)
661
```