0
# Selective Lineage Control
1
2
Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels. This provides granular control over which components emit OpenLineage events, enabling performance optimization and privacy controls.
3
4
## Capabilities
5
6
### Lineage Control Functions
7
8
Functions for enabling and disabling lineage collection on DAGs and tasks.
9
10
```python { .api }
11
def enable_lineage(obj: T) -> T:
12
"""
13
Enable lineage collection for a DAG or task.
14
15
Args:
16
obj: DAG or task instance to enable lineage for
17
18
Returns:
19
T: The same object with lineage enabled (for method chaining)
20
"""
21
22
def disable_lineage(obj: T) -> T:
23
"""
24
Disable lineage collection for a DAG or task.
25
26
Args:
27
obj: DAG or task instance to disable lineage for
28
29
Returns:
30
T: The same object with lineage disabled (for method chaining)
31
"""
32
```
33
34
### Lineage Status Checking
35
36
Functions to check whether lineage is enabled for specific DAGs and tasks.
37
38
```python { .api }
39
def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool:
40
"""
41
Check if lineage collection is enabled for a specific task.
42
43
Args:
44
task: Task/operator to check
45
46
Returns:
47
bool: True if lineage is enabled for this task
48
"""
49
50
def is_dag_lineage_enabled(dag: DAG) -> bool:
51
"""
52
Check if lineage collection is enabled for a specific DAG.
53
54
Args:
55
dag: DAG to check
56
57
Returns:
58
bool: True if lineage is enabled for this DAG
59
"""
60
```
61
62
### Parameter Constants
63
64
Constants for the lineage control parameter system.
65
66
```python { .api }
67
ENABLE_OL_PARAM_NAME: str
68
"""Name of the parameter used to control OpenLineage enablement."""
69
70
ENABLE_OL_PARAM: Param
71
"""Parameter object for enabling OpenLineage on DAGs/tasks."""
72
73
DISABLE_OL_PARAM: Param
74
"""Parameter object for disabling OpenLineage on DAGs/tasks."""
75
```
76
77
## Usage Examples
78
79
### Basic DAG Lineage Control
80
81
```python
82
from airflow import DAG
83
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
84
from datetime import datetime
85
86
# Enable lineage for entire DAG
87
dag = enable_lineage(DAG(
88
'analytics_pipeline',
89
start_date=datetime(2023, 1, 1),
90
schedule_interval='@daily',
91
description='Analytics data processing pipeline'
92
))
93
94
# Disable lineage for a different DAG
95
sensitive_dag = disable_lineage(DAG(
96
'sensitive_data_processing',
97
start_date=datetime(2023, 1, 1),
98
schedule_interval='@hourly',
99
description='Sensitive data processing - no lineage tracking'
100
))
101
```
102
103
### Task-Level Lineage Control
104
105
```python
106
from airflow.operators.python import PythonOperator
107
from airflow.operators.bash import BashOperator
108
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
109
110
def process_data():
111
return "Processing complete"
112
113
def sensitive_operation():
114
return "Sensitive processing complete"
115
116
# Enable lineage for specific task
117
process_task = enable_lineage(PythonOperator(
118
task_id='process_public_data',
119
python_callable=process_data,
120
dag=dag
121
))
122
123
# Disable lineage for sensitive task
124
sensitive_task = disable_lineage(PythonOperator(
125
task_id='process_sensitive_data',
126
python_callable=sensitive_operation,
127
dag=dag
128
))
129
130
# Mix with regular tasks (inherit DAG setting)
131
regular_task = BashOperator(
132
task_id='cleanup_temp_files',
133
bash_command='rm -rf /tmp/processing/*',
134
dag=dag
135
)
136
```
137
138
### Checking Lineage Status
139
140
```python
141
from airflow.providers.openlineage.utils.selective_enable import (
142
is_dag_lineage_enabled,
143
is_task_lineage_enabled
144
)
145
146
# Check DAG lineage status
147
if is_dag_lineage_enabled(dag):
148
print(f"Lineage is enabled for DAG: {dag.dag_id}")
149
else:
150
print(f"Lineage is disabled for DAG: {dag.dag_id}")
151
152
# Check task lineage status
153
for task_id, task in dag.task_dict.items():
154
if is_task_lineage_enabled(task):
155
print(f"Task {task_id}: lineage enabled")
156
else:
157
print(f"Task {task_id}: lineage disabled")
158
```
159
160
### Conditional Lineage Control
161
162
```python
163
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
164
165
def create_processing_dag(environment: str):
166
"""Create DAG with environment-specific lineage settings."""
167
168
dag = DAG(
169
f'data_processing_{environment}',
170
start_date=datetime(2023, 1, 1),
171
schedule_interval='@daily'
172
)
173
174
# Enable lineage only for production
175
if environment == 'production':
176
dag = enable_lineage(dag)
177
else:
178
dag = disable_lineage(dag)
179
180
return dag
181
182
# Create environment-specific DAGs
183
prod_dag = create_processing_dag('production') # Lineage enabled
184
dev_dag = create_processing_dag('development') # Lineage disabled
185
test_dag = create_processing_dag('testing') # Lineage disabled
186
```
187
188
### Selective Enable Mode
189
190
```python
191
from airflow.providers.openlineage.conf import selective_enable
192
from airflow.providers.openlineage.utils.selective_enable import enable_lineage
193
194
# Check if selective enable mode is active
195
if selective_enable():
196
print("Selective enable mode: Only explicitly enabled DAGs/tasks will emit lineage")
197
198
# In selective mode, must explicitly enable lineage
199
dag = enable_lineage(DAG(
200
'important_pipeline',
201
start_date=datetime(2023, 1, 1)
202
))
203
else:
204
print("Normal mode: All DAGs/tasks emit lineage unless explicitly disabled")
205
206
# In normal mode, lineage is enabled by default
207
dag = DAG(
208
'standard_pipeline',
209
start_date=datetime(2023, 1, 1)
210
)
211
```
212
213
### Advanced Patterns
214
215
```python
216
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
217
218
class LineageControlledDAG(DAG):
219
"""Custom DAG class with built-in lineage control."""
220
221
def __init__(self, enable_lineage_tracking=True, **kwargs):
222
super().__init__(**kwargs)
223
224
if enable_lineage_tracking:
225
enable_lineage(self)
226
else:
227
disable_lineage(self)
228
229
# Usage
230
production_dag = LineageControlledDAG(
231
'production_pipeline',
232
enable_lineage_tracking=True,
233
start_date=datetime(2023, 1, 1)
234
)
235
236
development_dag = LineageControlledDAG(
237
'development_pipeline',
238
enable_lineage_tracking=False,
239
start_date=datetime(2023, 1, 1)
240
)
241
```
242
243
### Task Group Lineage Control
244
245
```python
246
from airflow.utils.task_group import TaskGroup
247
from airflow.operators.python import PythonOperator
248
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
249
250
with DAG('grouped_pipeline', start_date=datetime(2023, 1, 1)) as dag:
251
252
# Enable lineage for entire task group
253
with TaskGroup('data_ingestion') as ingestion_group:
254
extract_task = PythonOperator(
255
task_id='extract_data',
256
python_callable=extract_function
257
)
258
validate_task = PythonOperator(
259
task_id='validate_data',
260
python_callable=validate_function
261
)
262
263
# Enable lineage for the group
264
enable_lineage(ingestion_group)
265
266
# Disable lineage for sensitive processing group
267
with TaskGroup('sensitive_processing') as sensitive_group:
268
process_pii = PythonOperator(
269
task_id='process_pii',
270
python_callable=process_pii_function
271
)
272
anonymize_data = PythonOperator(
273
task_id='anonymize_data',
274
python_callable=anonymize_function
275
)
276
277
# Disable lineage for the entire group
278
disable_lineage(sensitive_group)
279
```
280
281
### Dynamic Lineage Control
282
283
```python
284
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
285
from airflow.models import Variable
286
287
def create_dynamic_dag():
288
"""Create DAG with dynamic lineage control based on Airflow Variables."""
289
290
dag = DAG(
291
'dynamic_lineage_dag',
292
start_date=datetime(2023, 1, 1),
293
schedule_interval='@daily'
294
)
295
296
# Check Airflow Variable for lineage setting
297
lineage_enabled = Variable.get('enable_lineage_tracking', default_var='true').lower() == 'true'
298
299
if lineage_enabled:
300
dag = enable_lineage(dag)
301
print("Lineage tracking enabled via Variable")
302
else:
303
dag = disable_lineage(dag)
304
print("Lineage tracking disabled via Variable")
305
306
return dag
307
308
# Usage
309
dynamic_dag = create_dynamic_dag()
310
```
311
312
### Integration with Configuration
313
314
```python
315
from airflow.providers.openlineage.conf import selective_enable, disabled_operators
316
from airflow.providers.openlineage.utils.selective_enable import is_task_lineage_enabled
317
318
def should_collect_lineage(task):
319
"""Comprehensive lineage collection decision logic."""
320
321
# Check if operator type is disabled
322
operator_class = f"{task.__class__.__module__}.{task.__class__.__name__}"
323
if operator_class in disabled_operators():
324
return False
325
326
# Check selective enable mode
327
if selective_enable():
328
return is_task_lineage_enabled(task)
329
330
# Check task-specific disable
331
return is_task_lineage_enabled(task)
332
333
# Usage in custom extractor
334
class SmartExtractor(BaseExtractor):
335
def extract(self):
336
if not should_collect_lineage(self._operator):
337
return None
338
339
# Normal extraction logic
340
return OperatorLineage(...)
341
```
342
343
## Configuration Integration
344
345
### Airflow Configuration
346
347
The selective enable functionality integrates with Airflow configuration:
348
349
```ini
350
# airflow.cfg
351
[openlineage]
352
selective_enable = true
353
disabled_for_operators = airflow.operators.bash.BashOperator;airflow.operators.dummy.DummyOperator
354
```
355
356
### Environment Variables
357
358
```bash
359
# Enable selective mode
360
export AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
361
362
# Disable specific operators
363
export AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS="airflow.operators.python.PythonOperator"
364
```
365
366
## Best Practices
367
368
### Production Environments
369
370
```python
371
# Enable lineage for critical data pipelines
372
critical_dag = enable_lineage(DAG(
373
'financial_reporting',
374
start_date=datetime(2023, 1, 1),
375
schedule_interval='@daily'
376
))
377
378
# Disable for high-frequency operational tasks
379
operational_dag = disable_lineage(DAG(
380
'system_monitoring',
381
start_date=datetime(2023, 1, 1),
382
schedule_interval='*/5 * * * *' # Every 5 minutes
383
))
384
```
385
386
### Development Workflows
387
388
```python
389
import os
390
391
def create_environment_aware_dag(dag_id: str, **kwargs):
392
"""Create DAG with environment-aware lineage settings."""
393
394
dag = DAG(dag_id, **kwargs)
395
396
# Enable lineage only in production and staging
397
environment = os.getenv('AIRFLOW_ENV', 'development')
398
if environment in ['production', 'staging']:
399
dag = enable_lineage(dag)
400
else:
401
dag = disable_lineage(dag)
402
403
return dag
404
```
405
406
### Performance Optimization
407
408
```python
409
# Disable lineage for resource-intensive DAGs in development
410
if os.getenv('AIRFLOW_ENV') == 'development':
411
heavy_processing_dag = disable_lineage(DAG(
412
'ml_model_training',
413
start_date=datetime(2023, 1, 1)
414
))
415
else:
416
heavy_processing_dag = enable_lineage(DAG(
417
'ml_model_training',
418
start_date=datetime(2023, 1, 1)
419
))
420
```