0
# OpenLineage Integration
1
2
Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring, compliance requirements, and observability across PostgreSQL and Redshift data operations within Apache Airflow workflows.
3
4
## Capabilities
5
6
### Database Information Provider
7
8
Provide PostgreSQL/Redshift-specific information for OpenLineage data lineage tracking.
9
10
```python { .api }
11
def get_openlineage_database_info(self, connection) -> DatabaseInfo:
12
"""
13
Return PostgreSQL/Redshift-specific information for OpenLineage.
14
15
Parameters:
16
- connection: database connection object
17
18
Returns:
19
DatabaseInfo: Database information object containing connection details
20
21
Dependencies:
22
Requires apache-airflow-providers-openlineage package
23
"""
24
```
25
26
### Database Dialect Detection
27
28
Detect and report the specific database dialect for OpenLineage integration.
29
30
```python { .api }
31
def get_openlineage_database_dialect(self, connection) -> str:
32
"""
33
Return database dialect for OpenLineage tracking.
34
35
Parameters:
36
- connection: database connection object
37
38
Returns:
39
str: "postgres" for PostgreSQL, "redshift" for Redshift connections
40
"""
41
```
42
43
### Default Schema Detection
44
45
Determine the current default schema for OpenLineage dataset identification.
46
47
```python { .api }
48
def get_openlineage_default_schema(self) -> str | None:
49
"""
50
Return current default schema, typically from SEARCH_PATH.
51
52
Returns:
53
str or None: Current schema name, usually "public" for PostgreSQL
54
"""
55
```
56
57
## Usage Examples
58
59
### Basic OpenLineage Integration
60
61
```python
62
from airflow.providers.postgres.hooks.postgres import PostgresHook
63
64
# Hook automatically integrates with OpenLineage when package is installed
65
hook = PostgresHook(postgres_conn_id="postgres_default")
66
67
# Database operations are automatically tracked
68
records = hook.get_records("SELECT * FROM users WHERE active = true")
69
70
# OpenLineage captures:
71
# - Database connection info
72
# - SQL query executed
73
# - Tables accessed
74
# - Schema information
75
```
76
77
### Explicit Lineage Information
78
79
```python
80
def get_lineage_info():
81
"""Get OpenLineage information explicitly."""
82
hook = PostgresHook(postgres_conn_id="postgres_default")
83
84
with hook.get_conn() as conn:
85
# Get database info for lineage
86
db_info = hook.get_openlineage_database_info(conn)
87
dialect = hook.get_openlineage_database_dialect(conn)
88
default_schema = hook.get_openlineage_default_schema()
89
90
print(f"Database dialect: {dialect}")
91
print(f"Default schema: {default_schema}")
92
print(f"Database info: {db_info}")
93
94
get_lineage_info()
95
```
96
97
### Redshift vs PostgreSQL Detection
98
99
```python
100
def detect_database_type():
101
"""Detect whether connected to PostgreSQL or Redshift."""
102
103
# PostgreSQL connection
104
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
105
with pg_hook.get_conn() as conn:
106
pg_dialect = pg_hook.get_openlineage_database_dialect(conn)
107
print(f"PostgreSQL dialect: {pg_dialect}") # "postgres"
108
109
# Redshift connection
110
rs_hook = PostgresHook(postgres_conn_id="redshift_default")
111
with rs_hook.get_conn() as conn:
112
rs_dialect = rs_hook.get_openlineage_database_dialect(conn)
113
print(f"Redshift dialect: {rs_dialect}") # "redshift"
114
115
detect_database_type()
116
```
117
118
## OpenLineage-Aware DAG Development
119
120
### Data Pipeline with Lineage Tracking
121
122
```python
123
from airflow import DAG
124
from airflow.decorators import task
125
from airflow.providers.postgres.hooks.postgres import PostgresHook
126
from datetime import datetime
127
128
with DAG(
129
"data_pipeline_with_lineage",
130
start_date=datetime(2024, 1, 1),
131
schedule_interval="@daily"
132
) as dag:
133
134
@task
135
def extract_raw_data():
136
"""Extract raw data - automatically tracked by OpenLineage."""
137
hook = PostgresHook(postgres_conn_id="source_db")
138
139
# This query is automatically tracked
140
data = hook.get_df("""
141
SELECT user_id, event_type, timestamp, properties
142
FROM raw_events
143
WHERE date = '{{ ds }}'
144
""")
145
146
return len(data)
147
148
@task
149
def transform_data():
150
"""Transform data - lineage captures read/write operations."""
151
hook = PostgresHook(postgres_conn_id="warehouse_db")
152
153
# OpenLineage tracks both the read and write operations
154
hook.run("""
155
INSERT INTO processed_events (user_id, event_type, event_date, processed_at)
156
SELECT
157
user_id,
158
event_type,
159
DATE(timestamp) as event_date,
160
CURRENT_TIMESTAMP as processed_at
161
FROM raw_events
162
WHERE date = '{{ ds }}'
163
""")
164
165
@task
166
def create_aggregates():
167
"""Create aggregated data - full lineage chain captured."""
168
hook = PostgresHook(postgres_conn_id="warehouse_db")
169
170
# Complex query with multiple table dependencies
171
hook.run("""
172
INSERT INTO daily_user_metrics (user_id, event_date, event_count, unique_event_types)
173
SELECT
174
user_id,
175
event_date,
176
COUNT(*) as event_count,
177
COUNT(DISTINCT event_type) as unique_event_types
178
FROM processed_events
179
WHERE event_date = '{{ ds }}'
180
GROUP BY user_id, event_date
181
""")
182
183
# Define task dependencies
184
extract_raw_data() >> transform_data() >> create_aggregates()
185
```
186
187
### Cross-Database Lineage
188
189
```python
190
@task
191
def cross_database_etl():
192
"""ETL across multiple databases with full lineage tracking."""
193
194
# Source database
195
source_hook = PostgresHook(postgres_conn_id="app_db")
196
197
# Target database
198
target_hook = PostgresHook(postgres_conn_id="analytics_db")
199
200
# Extract from source - lineage captures read
201
user_data = source_hook.get_df("""
202
SELECT u.id, u.name, u.email, u.created_at,
203
COUNT(o.id) as order_count,
204
SUM(o.total) as lifetime_value
205
FROM users u
206
LEFT JOIN orders o ON u.id = o.user_id
207
WHERE u.updated_at >= '{{ ds }}'
208
GROUP BY u.id, u.name, u.email, u.created_at
209
""")
210
211
# Load to target - lineage captures write
212
target_hook.insert_rows(
213
table="customer_analytics",
214
rows=user_data.values.tolist(),
215
target_fields=list(user_data.columns),
216
replace=True,
217
replace_index="id"
218
)
219
220
# OpenLineage automatically captures:
221
# - Source tables: app_db.public.users, app_db.public.orders
222
# - Target table: analytics_db.public.customer_analytics
223
# - Data transformation logic
224
# - Cross-database relationship
225
```
226
227
## Configuration and Setup
228
229
### OpenLineage Provider Installation
230
231
```bash
232
# Install OpenLineage provider
233
pip install apache-airflow-providers-openlineage
234
235
# Or install with PostgreSQL provider extras
236
pip install apache-airflow-providers-postgres[openlineage]
237
```
238
239
### Airflow Configuration
240
241
Configure OpenLineage in `airflow.cfg`:
242
243
```ini
244
[openlineage]
245
transport = {"type": "http", "url": "http://marquez:5000"}
246
namespace = production_data_pipeline
247
extractors = airflow.providers.postgres.hooks.postgres
248
```
249
250
### Environment Variables
251
252
```bash
253
# OpenLineage configuration
254
export OPENLINEAGE_URL=http://marquez:5000
255
export OPENLINEAGE_NAMESPACE=production
256
257
# Airflow OpenLineage integration
258
export AIRFLOW__OPENLINEAGE__TRANSPORT={"type": "http", "url": "http://marquez:5000"}
259
```
260
261
## Advanced Lineage Scenarios
262
263
### Complex Query Lineage
264
265
```python
266
@task
267
def complex_analytics_query():
268
"""Complex query with multiple table joins and CTEs."""
269
hook = PostgresHook(postgres_conn_id="analytics_db")
270
271
# OpenLineage parses and tracks all table dependencies
272
hook.run("""
273
WITH user_metrics AS (
274
SELECT
275
u.id as user_id,
276
u.segment,
277
COUNT(DISTINCT o.id) as order_count,
278
AVG(o.total) as avg_order_value
279
FROM users u
280
JOIN orders o ON u.id = o.user_id
281
WHERE o.created_at >= '{{ ds }}'
282
GROUP BY u.id, u.segment
283
),
284
product_metrics AS (
285
SELECT
286
p.category,
287
COUNT(DISTINCT oi.order_id) as orders_with_category,
288
SUM(oi.quantity * oi.price) as category_revenue
289
FROM products p
290
JOIN order_items oi ON p.id = oi.product_id
291
JOIN orders o ON oi.order_id = o.id
292
WHERE o.created_at >= '{{ ds }}'
293
GROUP BY p.category
294
)
295
INSERT INTO daily_analytics (
296
date, user_segment, product_category,
297
total_orders, avg_order_value, category_revenue
298
)
299
SELECT
300
'{{ ds }}' as date,
301
um.segment,
302
pm.category,
303
um.order_count,
304
um.avg_order_value,
305
pm.category_revenue
306
FROM user_metrics um
307
CROSS JOIN product_metrics pm
308
""")
309
310
# OpenLineage captures dependencies on:
311
# - users table
312
# - orders table
313
# - products table
314
# - order_items table
315
# And writes to daily_analytics table
316
```
317
318
### Stored Procedure Lineage
319
320
```python
321
@task
322
def call_stored_procedure():
323
"""Call stored procedure - lineage tracks inputs/outputs."""
324
hook = PostgresHook(postgres_conn_id="postgres_default")
325
326
# OpenLineage can track stored procedure calls
327
hook.run("""
328
CALL update_customer_segments(
329
p_calculation_date := '{{ ds }}',
330
p_recalculate_all := false
331
)
332
""")
333
334
# Lineage tracking depends on OpenLineage configuration
335
# and stored procedure analysis capabilities
336
```
337
338
## Lineage Data Flow
339
340
### Automatic Data Capture
341
342
OpenLineage integration automatically captures:
343
344
1. **Input Datasets**: Tables read by queries
345
2. **Output Datasets**: Tables written/updated
346
3. **Transformation Logic**: SQL queries and operations
347
4. **Schema Information**: Column-level lineage where possible
348
5. **Job Information**: Airflow DAG and task context
349
6. **Connection Details**: Database and schema information
350
351
### Manual Lineage Enhancement
352
353
```python
354
from openlineage.airflow.extractors import OperatorLineage
355
356
@task
357
def enhanced_lineage_task():
358
"""Task with manually enhanced lineage information."""
359
hook = PostgresHook(postgres_conn_id="postgres_default")
360
361
# Perform database operations
362
result = hook.run("INSERT INTO summary_table SELECT * FROM detail_table")
363
364
# Optional: Add custom lineage metadata
365
# (Implementation depends on OpenLineage setup)
366
367
return result
368
```
369
370
## Dependencies and Requirements
371
372
### Required Packages
373
374
```python
375
# Core dependencies
376
apache-airflow-providers-postgres
377
apache-airflow-providers-openlineage
378
379
# Optional for enhanced features
380
sqlparse # For SQL parsing and analysis
381
```
382
383
### Version Compatibility
384
385
- **OpenLineage Provider**: Latest version recommended
386
- **PostgreSQL Provider**: 6.2.3+
387
- **Airflow**: 2.10.0+ (required by PostgreSQL provider)
388
389
### OpenLineage Backend
390
391
Compatible with OpenLineage-compatible backends:
392
393
- **Marquez**: Open-source lineage backend
394
- **DataHub**: LinkedIn's metadata platform
395
- **Custom HTTP**: Any OpenLineage-compatible API
396
- **File**: Local JSON file output for development