Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-openlineage@2.6.00
# Apache Airflow OpenLineage Provider
1
2
A comprehensive provider package for Apache Airflow that enables OpenLineage data lineage tracking and observability for data pipelines. This provider integrates with the OpenLineage ecosystem to automatically collect and emit metadata about data transformations, job executions, and data flows across various data processing engines and databases.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-openlineage
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-openlineage`
9
- **Minimum Airflow Version**: 2.10.0+
10
11
## Core Imports
12
13
```python
14
from airflow.providers.openlineage import __version__
15
```
16
17
Configuration access:
18
```python
19
from airflow.providers.openlineage.conf import (
20
is_disabled, namespace, transport, selective_enable, custom_extractors
21
)
22
```
23
24
Plugin integration (automatic via Airflow):
25
```python
26
# Automatically loaded when provider is installed
27
# Plugin class: airflow.providers.openlineage.plugins.openlineage.OpenLineageProviderPlugin
28
```
29
30
## Basic Usage
31
32
### Enabling OpenLineage for DAGs
33
34
```python
35
from airflow import DAG
36
from airflow.providers.openlineage.utils.selective_enable import enable_lineage
37
from airflow.operators.empty import EmptyOperator
38
from datetime import datetime
39
40
# Enable lineage for entire DAG
41
dag = enable_lineage(DAG(
42
'example_dag',
43
start_date=datetime(2023, 1, 1),
44
schedule_interval='@daily'
45
))
46
47
# Tasks automatically emit lineage events
48
task = EmptyOperator(task_id='example_task', dag=dag)
49
```
50
51
### Using OpenLineage-aware Operators
52
53
```python
54
from airflow.providers.openlineage.operators.empty import EmptyOperator
55
56
# OpenLineage-aware empty operator
57
empty_task = EmptyOperator(
58
task_id='openlineage_empty',
59
dag=dag
60
)
61
```
62
63
### Custom Lineage Extraction
64
65
```python
66
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
67
from openlineage.client.event_v2 import Dataset
68
69
class CustomExtractor(BaseExtractor):
70
@classmethod
71
def get_operator_classnames(cls):
72
return ['MyCustomOperator']
73
74
def extract(self):
75
inputs = [Dataset(namespace="example", name="input_table")]
76
outputs = [Dataset(namespace="example", name="output_table")]
77
return OperatorLineage(inputs=inputs, outputs=outputs)
78
```
79
80
## Architecture
81
82
The OpenLineage provider uses a plugin-based architecture for collecting and emitting lineage events:
83
84
- **Plugin System**: Automatically integrates with Airflow's plugin mechanism to capture DAG and task lifecycle events
85
- **Extractor Framework**: Modular system for extracting lineage metadata from different operator types
86
- **Event Listener**: Captures Airflow events (DAG runs, task instances) and transforms them into OpenLineage events
87
- **Transport Layer**: Configurable transport mechanisms (HTTP, Kafka, File, Console) for sending events to OpenLineage backends
88
- **Facet System**: Extensible metadata enrichment through custom facets for additional context
89
90
## Capabilities
91
92
### Configuration Management
93
94
Access and control OpenLineage settings, including transport configuration, selective enabling, custom extractors, and debugging options.
95
96
```python { .api }
97
def config_path(check_legacy_env_var: bool = True) -> str: ...
98
def is_source_enabled() -> bool: ...
99
def disabled_operators() -> set[str]: ...
100
def selective_enable() -> bool: ...
101
def spark_inject_parent_job_info() -> bool: ...
102
def spark_inject_transport_info() -> bool: ...
103
def custom_extractors() -> set[str]: ...
104
def custom_run_facets() -> set[str]: ...
105
def namespace() -> str: ...
106
def transport() -> dict[str, Any]: ...
107
def is_disabled() -> bool: ...
108
def dag_state_change_process_pool_size() -> int: ...
109
def execution_timeout() -> int: ...
110
def include_full_task_info() -> bool: ...
111
def debug_mode() -> bool: ...
112
```
113
114
[Configuration](./configuration.md)
115
116
### SQL Parsing and Analysis
117
118
Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information.
119
120
```python { .api }
121
class SQLParser:
122
def __init__(self, dialect: str | None = None, default_schema: str | None = None): ...
123
def parse(self, sql: list[str] | str) -> SqlMeta | None: ...
124
def generate_openlineage_metadata_from_sql(...) -> OperatorLineage: ...
125
126
class DatabaseInfo:
127
scheme: str
128
authority: str | None
129
database: str | None
130
# ... additional configuration attributes
131
```
132
133
[SQL Parsing](./sql-parsing.md)
134
135
### Lineage Extraction Framework
136
137
Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, and custom extractor registration.
138
139
```python { .api }
140
class BaseExtractor:
141
def __init__(self, operator): ...
142
def extract() -> OperatorLineage | None: ...
143
def extract_on_complete(task_instance) -> OperatorLineage | None: ...
144
def extract_on_failure(task_instance) -> OperatorLineage | None: ...
145
146
class ExtractorManager:
147
def __init__(self): ...
148
def add_extractor(operator_class: str, extractor: type[BaseExtractor]): ...
149
def extract_metadata(dagrun, task, task_instance_state, task_instance=None) -> OperatorLineage: ...
150
151
class OperatorLineage:
152
inputs: list[Dataset]
153
outputs: list[Dataset]
154
run_facets: dict[str, BaseFacet]
155
job_facets: dict[str, BaseFacet]
156
```
157
158
[Lineage Extraction](./lineage-extraction.md)
159
160
### OpenLineage Plugin Integration
161
162
Core plugin components for Airflow integration, including event listeners, adapters, and automatic event emission.
163
164
```python { .api }
165
class OpenLineageAdapter:
166
def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None): ...
167
def emit(event: RunEvent) -> RunEvent: ...
168
def start_task(...) -> RunEvent: ...
169
def complete_task(...) -> RunEvent: ...
170
def fail_task(...) -> RunEvent: ...
171
172
class OpenLineageListener:
173
# Event listener methods for DAG and task lifecycle
174
pass
175
176
def get_openlineage_listener() -> OpenLineageListener: ...
177
```
178
179
[Plugin Integration](./plugin-integration.md)
180
181
### Facets and Metadata Enrichment
182
183
Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, and debug data.
184
185
```python { .api }
186
class AirflowRunFacet:
187
dag: dict
188
dagRun: dict
189
task: dict
190
taskInstance: dict
191
taskUuid: str
192
193
class AirflowJobFacet:
194
taskTree: dict
195
taskGroups: dict
196
tasks: dict
197
198
class AirflowStateRunFacet:
199
dagRunState: str
200
tasksState: dict[str, str]
201
202
class AirflowDebugRunFacet:
203
packages: dict
204
```
205
206
[Facets and Metadata](./facets-metadata.md)
207
208
### Template Macros
209
210
Template macros for accessing OpenLineage information within DAG definitions and task templates.
211
212
```python { .api }
213
def lineage_job_namespace() -> str: ...
214
def lineage_job_name(task_instance: TaskInstance) -> str: ...
215
def lineage_run_id(task_instance: TaskInstance) -> str: ...
216
def lineage_parent_id(task_instance: TaskInstance) -> str: ...
217
def lineage_root_parent_id(task_instance: TaskInstance) -> str: ...
218
def lineage_root_job_name(task_instance: TaskInstance) -> str: ...
219
def lineage_root_run_id(task_instance: TaskInstance) -> str: ...
220
```
221
222
[Template Macros](./template-macros.md)
223
224
### Selective Lineage Control
225
226
Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels.
227
228
```python { .api }
229
def enable_lineage(obj: T) -> T: ...
230
def disable_lineage(obj: T) -> T: ...
231
def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool: ...
232
def is_dag_lineage_enabled(dag: DAG) -> bool: ...
233
```
234
235
[Selective Control](./selective-control.md)
236
237
### SQL Utilities
238
239
Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, and information schema querying.
240
241
```python { .api }
242
class TableSchema:
243
def to_dataset(namespace: str, database: str | None = None, schema: str | None = None) -> Dataset: ...
244
245
def get_table_schemas(...) -> tuple[list[Dataset], list[Dataset]]: ...
246
def parse_query_result(cursor) -> list[TableSchema]: ...
247
def create_information_schema_query(...) -> str: ...
248
```
249
250
[SQL Utilities](./sql-utilities.md)
251
252
### Spark Integration
253
254
Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties.
255
256
```python { .api }
257
def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict: ...
258
def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict: ...
259
```
260
261
[Spark Integration](./spark-integration.md)
262
263
### Utility Functions and Helpers
264
265
General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, and data conversion.
266
267
```python { .api }
268
def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str: ...
269
def get_operator_class(task: BaseOperator) -> type: ...
270
def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool: ...
271
def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str: ...
272
def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None: ...
273
```
274
275
[Utility Functions](./utility-functions.md)
276
277
## Common Use Cases
278
279
### Setting Up Transport Configuration
280
281
```python
282
# In airflow.cfg or environment variables
283
[openlineage]
284
transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}
285
namespace = my_airflow_instance
286
```
287
288
### Selective Lineage Enabling
289
290
```python
291
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
292
293
# Enable for specific DAG
294
dag = enable_lineage(DAG(...))
295
296
# Disable for specific task
297
task = disable_lineage(PythonOperator(...))
298
```
299
300
### Custom Extractor Registration
301
302
```python
303
# In airflow.cfg
304
[openlineage]
305
extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor
306
```