0
# Configuration Management
1
2
Configuration functions for accessing and controlling OpenLineage settings, including transport configuration, selective enabling, custom extractors, debugging options, and execution parameters.
3
4
## Capabilities
5
6
### Core Configuration Functions
7
8
Access fundamental OpenLineage configuration settings that control the behavior of lineage collection and emission.
9
10
```python { .api }
11
def is_disabled() -> bool:
12
"""
13
Check if OpenLineage is completely disabled.
14
15
Returns:
16
bool: True if OpenLineage events should not be collected or emitted
17
"""
18
19
def namespace() -> str:
20
"""
21
Get the OpenLineage namespace for lineage events.
22
23
Returns:
24
str: Namespace string that groups lineage events logically
25
"""
26
27
def transport() -> dict[str, Any]:
28
"""
29
Get the transport configuration for sending OpenLineage events.
30
31
Returns:
32
dict: Transport configuration including type and connection details
33
"""
34
```
35
36
### Selective Lineage Configuration
37
38
Control selective lineage collection behavior, allowing fine-grained control over which operators and DAGs emit lineage events.
39
40
```python { .api }
41
def selective_enable() -> bool:
42
"""
43
Check if selective lineage mode is enabled.
44
45
When enabled, lineage is only collected for DAGs/tasks explicitly marked
46
with enable_lineage().
47
48
Returns:
49
bool: True if selective enable mode is active
50
"""
51
52
def disabled_operators() -> set[str]:
53
"""
54
Get the set of operator class names that are disabled for lineage collection.
55
56
Returns:
57
set[str]: Set of fully qualified operator class names to exclude
58
"""
59
```
60
61
### Custom Component Configuration
62
63
Access configuration for custom extractors and facets that extend OpenLineage functionality.
64
65
```python { .api }
66
def custom_extractors() -> set[str]:
67
"""
68
Get the set of custom extractor class paths registered for lineage extraction.
69
70
Returns:
71
set[str]: Set of fully qualified class paths for custom extractors
72
"""
73
74
def custom_run_facets() -> set[str]:
75
"""
76
Get the set of custom run facet function paths for metadata enrichment.
77
78
Returns:
79
set[str]: Set of fully qualified function paths for custom facets
80
"""
81
```
82
83
### Source Code and Documentation Configuration
84
85
Control inclusion of source code and additional information in lineage events.
86
87
```python { .api }
88
def is_source_enabled() -> bool:
89
"""
90
Check if source code inclusion is enabled for lineage events.
91
92
When enabled, operators like PythonOperator and BashOperator include
93
their source code in the lineage events.
94
95
Returns:
96
bool: True if source code should be included in events
97
"""
98
99
def include_full_task_info() -> bool:
100
"""
101
Check if full task information should be included in lineage events.
102
103
When enabled, events include comprehensive task metadata which may
104
contain large fields.
105
106
Returns:
107
bool: True if full task info should be included
108
"""
109
```
110
111
### Spark Integration Configuration
112
113
Configuration specific to Spark application integration and property injection.
114
115
```python { .api }
116
def spark_inject_parent_job_info() -> bool:
117
"""
118
Check if parent job information should be injected into Spark applications.
119
120
When enabled, automatically injects OpenLineage parent job details
121
(namespace, job name, run id) into Spark application properties.
122
123
Returns:
124
bool: True if parent job info injection is enabled
125
"""
126
127
def spark_inject_transport_info() -> bool:
128
"""
129
Check if transport information should be injected into Spark applications.
130
131
When enabled, automatically injects OpenLineage transport configuration
132
into Spark application properties for lineage emission.
133
134
Returns:
135
bool: True if transport info injection is enabled
136
"""
137
```
138
139
### Performance and Debugging Configuration
140
141
Configuration options for performance tuning, debugging, and operational control.
142
143
```python { .api }
144
def debug_mode() -> bool:
145
"""
146
Check if debug mode is enabled for OpenLineage events.
147
148
When enabled, events include debugging information such as installed
149
packages and their versions, potentially creating large events.
150
151
Returns:
152
bool: True if debug mode is active
153
"""
154
155
def execution_timeout() -> int:
156
"""
157
Get the maximum execution timeout for OpenLineage metadata extraction.
158
159
Returns:
160
int: Timeout in seconds for metadata extraction operations
161
"""
162
163
def dag_state_change_process_pool_size() -> int:
164
"""
165
Get the number of processes for handling DAG state changes asynchronously.
166
167
Returns:
168
int: Process pool size for async DAG state change processing
169
"""
170
```
171
172
### Configuration File Access
173
174
Access to configuration file paths and legacy environment variable handling.
175
176
```python { .api }
177
def config_path(check_legacy_env_var: bool = True) -> str:
178
"""
179
Get the path to the OpenLineage configuration file.
180
181
Args:
182
check_legacy_env_var: Whether to check legacy environment variables
183
184
Returns:
185
str: Absolute path to the OpenLineage configuration file
186
"""
187
```
188
189
## Usage Examples
190
191
### Basic Configuration Check
192
193
```python
194
from airflow.providers.openlineage.conf import is_disabled, namespace, transport
195
196
# Check if OpenLineage is enabled
197
if not is_disabled():
198
print(f"OpenLineage namespace: {namespace()}")
199
print(f"Transport config: {transport()}")
200
```
201
202
### Selective Enable Configuration
203
204
```python
205
from airflow.providers.openlineage.conf import selective_enable, disabled_operators
206
207
if selective_enable():
208
print("Selective lineage mode is active")
209
else:
210
disabled = disabled_operators()
211
if disabled:
212
print(f"Disabled operators: {disabled}")
213
```
214
215
### Debug and Performance Settings
216
217
```python
218
from airflow.providers.openlineage.conf import debug_mode, execution_timeout, include_full_task_info
219
220
print(f"Debug mode: {debug_mode()}")
221
print(f"Execution timeout: {execution_timeout()}s")
222
print(f"Include full task info: {include_full_task_info()}")
223
```
224
225
### Custom Components Check
226
227
```python
228
from airflow.providers.openlineage.conf import custom_extractors, custom_run_facets
229
230
extractors = custom_extractors()
231
facets = custom_run_facets()
232
233
print(f"Custom extractors: {extractors}")
234
print(f"Custom facets: {facets}")
235
```
236
237
## Configuration Examples
238
239
### Transport Configuration
240
241
```python
242
# HTTP Transport
243
transport_config = {
244
"type": "http",
245
"url": "http://localhost:5000",
246
"endpoint": "api/v1/lineage",
247
"timeout": 5,
248
"verify": False
249
}
250
251
# Kafka Transport
252
transport_config = {
253
"type": "kafka",
254
"config": {
255
"bootstrap.servers": "localhost:9092",
256
"acks": "all",
257
"retries": 3
258
},
259
"topic": "openlineage.events"
260
}
261
262
# File Transport
263
transport_config = {
264
"type": "file",
265
"location": "/tmp/openlineage"
266
}
267
```
268
269
### Airflow Configuration
270
271
Configuration options can be set in `airflow.cfg`:
272
273
```ini
274
[openlineage]
275
disabled = False
276
namespace = my_airflow_instance
277
transport = {"type": "http", "url": "http://localhost:5000"}
278
selective_enable = False
279
disabled_for_operators = airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator
280
extractors = my_package.CustomExtractor;my_package.AnotherExtractor
281
custom_run_facets = my_package.custom_facet_function
282
debug_mode = False
283
execution_timeout = 10
284
include_full_task_info = False
285
```