0
# OpenLineage Integration
1
2
OpenLineage facets, utilities, and compatibility functions for data lineage tracking, including SQL and Spark integration utilities. This module provides version-compatible OpenLineage integration with graceful fallbacks when OpenLineage dependencies are not available.
3
4
## Capabilities
5
6
### Base OpenLineage Classes
7
8
Core OpenLineage classes for data lineage representation.
9
10
```python { .api }
11
class BaseFacet:
12
"""Base class for OpenLineage facets."""
13
14
class Dataset:
15
"""OpenLineage dataset representation."""
16
17
class DatasetFacet:
18
"""Base class for dataset-specific facets."""
19
20
class InputDataset:
21
"""Input dataset in OpenLineage events."""
22
23
class OutputDataset:
24
"""Output dataset in OpenLineage events."""
25
26
class RunFacet:
27
"""Base class for run-specific facets."""
28
```
29
30
### Lineage Facets
31
32
Specific facet classes for detailed lineage tracking.
33
34
```python { .api }
35
class ColumnLineageDatasetFacet:
36
"""Facet for tracking column-level lineage."""
37
38
class DocumentationDatasetFacet:
39
"""Facet for dataset documentation."""
40
41
class SchemaDatasetFacet:
42
"""Facet for dataset schema information."""
43
44
class LifecycleStateChangeDatasetFacet:
45
"""Facet for dataset lifecycle state changes."""
46
47
class OutputStatisticsOutputDatasetFacet:
48
"""Facet for output dataset statistics."""
49
50
class SymlinksDatasetFacet:
51
"""Facet for dataset symlink information."""
52
```
53
54
### Run and Job Facets
55
56
Facets for tracking job execution and run information.
57
58
```python { .api }
59
class ErrorMessageRunFacet:
60
"""Facet for capturing error messages in runs."""
61
62
class ExternalQueryRunFacet:
63
"""Facet for external query execution information."""
64
65
class ExtractionErrorRunFacet:
66
"""Facet for lineage extraction errors."""
67
68
class SQLJobFacet:
69
"""Facet for SQL job information."""
70
```
71
72
### Supporting Classes
73
74
Helper classes for OpenLineage data structures.
75
76
```python { .api }
77
class Fields:
78
"""Field definitions for schema facets."""
79
80
class InputField:
81
"""Input field definition for column lineage."""
82
83
class Error:
84
"""Error representation in OpenLineage events."""
85
86
class LifecycleStateChange:
87
"""Lifecycle state change representation."""
88
89
class PreviousIdentifier:
90
"""Previous identifier for renamed datasets."""
91
92
class Identifier:
93
"""Dataset identifier representation."""
94
95
class SchemaDatasetFacetFields:
96
"""Schema field definitions."""
97
```
98
99
### No-Op Fallback
100
101
Function that creates no-op implementations when OpenLineage is not available.
102
103
```python { .api }
104
def create_no_op(*_, **__) -> None:
105
"""
106
Create a no-op placeholder when OpenLineage client is not available.
107
108
Returns:
109
None: Always returns None as a no-op implementation
110
"""
111
```
112
113
### SQL Integration
114
115
Utilities for extracting OpenLineage facets from SQL operations.
116
117
```python { .api }
118
def get_openlineage_facets_with_sql(
119
hook,
120
sql: str | list[str],
121
conn_id: str,
122
database: str | None
123
):
124
"""
125
Get OpenLineage facets from SQL queries.
126
127
Args:
128
hook: Database hook instance
129
sql (str | list[str]): SQL query or queries to analyze
130
conn_id (str): Connection ID for the database
131
database (str | None): Database name
132
133
Returns:
134
OpenLineage facets extracted from the SQL operation
135
"""
136
```
137
138
### Spark Integration
139
140
Utilities for injecting OpenLineage information into Spark properties.
141
142
```python { .api }
143
def inject_parent_job_information_into_spark_properties(
144
properties: dict,
145
context: Context
146
) -> dict:
147
"""
148
Inject OpenLineage parent job information into Spark properties.
149
150
Args:
151
properties (dict): Spark configuration properties
152
context (Context): Airflow task context
153
154
Returns:
155
dict: Updated properties with OpenLineage parent job information
156
"""
157
158
def inject_transport_information_into_spark_properties(
159
properties: dict,
160
context: Context
161
) -> dict:
162
"""
163
Inject OpenLineage transport information into Spark properties.
164
165
Args:
166
properties (dict): Spark configuration properties
167
context (Context): Airflow task context
168
169
Returns:
170
dict: Updated properties with OpenLineage transport configuration
171
"""
172
```
173
174
### Asset Translation
175
176
Utility for translating Airflow assets for OpenLineage compatibility.
177
178
```python { .api }
179
def translate_airflow_asset(*args, **kwargs):
180
"""
181
Translate Airflow assets for OpenLineage compatibility.
182
183
Handles asset/dataset compatibility between Airflow versions.
184
Maps to translate_airflow_asset in newer versions or translate_airflow_dataset
185
with parameter renaming in older versions.
186
187
Note: This function is conditionally imported and may have different signatures
188
depending on the OpenLineage provider version and Airflow version.
189
"""
190
```
191
192
## Usage Examples
193
194
```python
195
from airflow.providers.common.compat.openlineage.facet import (
196
Dataset, RunFacet, SchemaDatasetFacet, SQLJobFacet, create_no_op
197
)
198
from airflow.providers.common.compat.openlineage.utils.sql import get_openlineage_facets_with_sql
199
from airflow.providers.common.compat.openlineage.utils.spark import (
200
inject_parent_job_information_into_spark_properties,
201
inject_transport_information_into_spark_properties
202
)
203
from airflow.providers.common.compat.openlineage.check import require_openlineage_version
204
205
# Check if OpenLineage is available and use facets
206
try:
207
# Create OpenLineage dataset
208
input_dataset = Dataset(
209
namespace="postgresql://localhost:5432",
210
name="analytics.users"
211
)
212
213
# Create schema facet
214
schema_facet = SchemaDatasetFacet(
215
fields=[
216
{"name": "user_id", "type": "INTEGER"},
217
{"name": "email", "type": "VARCHAR(255)"}
218
]
219
)
220
221
# Create SQL job facet
222
sql_facet = SQLJobFacet(query="SELECT * FROM users WHERE active = true")
223
224
except ImportError:
225
# Fall back to no-op when OpenLineage not available
226
input_dataset = create_no_op()
227
schema_facet = create_no_op()
228
sql_facet = create_no_op()
229
230
# SQL integration example
231
@require_openlineage_version(provider_min_version="1.0.0")
232
def extract_sql_lineage(**context):
233
from airflow.providers.postgres.hooks.postgres import PostgresHook
234
235
hook = PostgresHook(conn_id="postgres_default")
236
sql = "SELECT user_id, email FROM users WHERE created_date = '{{ ds }}'"
237
238
# Get OpenLineage facets from SQL
239
facets = get_openlineage_facets_with_sql(
240
hook=hook,
241
sql=sql,
242
conn_id="postgres_default",
243
database="analytics"
244
)
245
246
return facets
247
248
# Spark integration example
249
@require_openlineage_version(provider_min_version="1.2.0")
250
def configure_spark_with_openlineage(**context):
251
spark_properties = {
252
"spark.app.name": "data-processing",
253
"spark.sql.adaptive.enabled": "true"
254
}
255
256
# Inject OpenLineage parent job information
257
spark_properties = inject_parent_job_information_into_spark_properties(
258
properties=spark_properties,
259
context=context
260
)
261
262
# Inject OpenLineage transport configuration
263
spark_properties = inject_transport_information_into_spark_properties(
264
properties=spark_properties,
265
context=context
266
)
267
268
return spark_properties
269
270
# Use in operators
271
from airflow.providers.postgres.operators.postgres import PostgresOperator
272
273
sql_task = PostgresOperator(
274
task_id="process_users",
275
sql="""
276
INSERT INTO processed_users
277
SELECT user_id, UPPER(email) as email
278
FROM users
279
WHERE created_date = '{{ ds }}'
280
""",
281
postgres_conn_id="postgres_default",
282
# OpenLineage will automatically extract lineage information
283
)
284
```