or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

openlineage-integration.mddocs/

0

# OpenLineage Integration

1

2

Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring, compliance requirements, and observability across PostgreSQL and Redshift data operations within Apache Airflow workflows.

3

4

## Capabilities

5

6

### Database Information Provider

7

8

Provide PostgreSQL/Redshift-specific information for OpenLineage data lineage tracking.

9

10

```python { .api }

11

def get_openlineage_database_info(self, connection) -> DatabaseInfo:

12

"""

13

Return PostgreSQL/Redshift-specific information for OpenLineage.

14

15

Parameters:

16

- connection: database connection object

17

18

Returns:

19

DatabaseInfo: Database information object containing connection details

20

21

Dependencies:

22

Requires apache-airflow-providers-openlineage package

23

"""

24

```

25

26

### Database Dialect Detection

27

28

Detect and report the specific database dialect for OpenLineage integration.

29

30

```python { .api }

31

def get_openlineage_database_dialect(self, connection) -> str:

32

"""

33

Return database dialect for OpenLineage tracking.

34

35

Parameters:

36

- connection: database connection object

37

38

Returns:

39

str: "postgres" for PostgreSQL, "redshift" for Redshift connections

40

"""

41

```

42

43

### Default Schema Detection

44

45

Determine the current default schema for OpenLineage dataset identification.

46

47

```python { .api }

48

def get_openlineage_default_schema(self) -> str | None:

49

"""

50

Return current default schema, typically from SEARCH_PATH.

51

52

Returns:

53

str or None: Current schema name, usually "public" for PostgreSQL

54

"""

55

```

56

57

## Usage Examples

58

59

### Basic OpenLineage Integration

60

61

```python

62

from airflow.providers.postgres.hooks.postgres import PostgresHook

63

64

# Hook automatically integrates with OpenLineage when package is installed

65

hook = PostgresHook(postgres_conn_id="postgres_default")

66

67

# Database operations are automatically tracked

68

records = hook.get_records("SELECT * FROM users WHERE active = true")

69

70

# OpenLineage captures:

71

# - Database connection info

72

# - SQL query executed

73

# - Tables accessed

74

# - Schema information

75

```

76

77

### Explicit Lineage Information

78

79

```python

80

def get_lineage_info():

81

"""Get OpenLineage information explicitly."""

82

hook = PostgresHook(postgres_conn_id="postgres_default")

83

84

with hook.get_conn() as conn:

85

# Get database info for lineage

86

db_info = hook.get_openlineage_database_info(conn)

87

dialect = hook.get_openlineage_database_dialect(conn)

88

default_schema = hook.get_openlineage_default_schema()

89

90

print(f"Database dialect: {dialect}")

91

print(f"Default schema: {default_schema}")

92

print(f"Database info: {db_info}")

93

94

get_lineage_info()

95

```

96

97

### Redshift vs PostgreSQL Detection

98

99

```python

100

def detect_database_type():

101

"""Detect whether connected to PostgreSQL or Redshift."""

102

103

# PostgreSQL connection

104

pg_hook = PostgresHook(postgres_conn_id="postgres_default")

105

with pg_hook.get_conn() as conn:

106

pg_dialect = pg_hook.get_openlineage_database_dialect(conn)

107

print(f"PostgreSQL dialect: {pg_dialect}") # "postgres"

108

109

# Redshift connection

110

rs_hook = PostgresHook(postgres_conn_id="redshift_default")

111

with rs_hook.get_conn() as conn:

112

rs_dialect = rs_hook.get_openlineage_database_dialect(conn)

113

print(f"Redshift dialect: {rs_dialect}") # "redshift"

114

115

detect_database_type()

116

```

117

118

## OpenLineage-Aware DAG Development

119

120

### Data Pipeline with Lineage Tracking

121

122

```python

123

from airflow import DAG

124

from airflow.decorators import task

125

from airflow.providers.postgres.hooks.postgres import PostgresHook

126

from datetime import datetime

127

128

with DAG(

129

"data_pipeline_with_lineage",

130

start_date=datetime(2024, 1, 1),

131

schedule_interval="@daily"

132

) as dag:

133

134

@task

135

def extract_raw_data():

136

"""Extract raw data - automatically tracked by OpenLineage."""

137

hook = PostgresHook(postgres_conn_id="source_db")

138

139

# This query is automatically tracked

140

data = hook.get_df("""

141

SELECT user_id, event_type, timestamp, properties

142

FROM raw_events

143

WHERE date = '{{ ds }}'

144

""")

145

146

return len(data)

147

148

@task

149

def transform_data():

150

"""Transform data - lineage captures read/write operations."""

151

hook = PostgresHook(postgres_conn_id="warehouse_db")

152

153

# OpenLineage tracks both the read and write operations

154

hook.run("""

155

INSERT INTO processed_events (user_id, event_type, event_date, processed_at)

156

SELECT

157

user_id,

158

event_type,

159

DATE(timestamp) as event_date,

160

CURRENT_TIMESTAMP as processed_at

161

FROM raw_events

162

WHERE date = '{{ ds }}'

163

""")

164

165

@task

166

def create_aggregates():

167

"""Create aggregated data - full lineage chain captured."""

168

hook = PostgresHook(postgres_conn_id="warehouse_db")

169

170

# Complex query with multiple table dependencies

171

hook.run("""

172

INSERT INTO daily_user_metrics (user_id, event_date, event_count, unique_event_types)

173

SELECT

174

user_id,

175

event_date,

176

COUNT(*) as event_count,

177

COUNT(DISTINCT event_type) as unique_event_types

178

FROM processed_events

179

WHERE event_date = '{{ ds }}'

180

GROUP BY user_id, event_date

181

""")

182

183

# Define task dependencies

184

extract_raw_data() >> transform_data() >> create_aggregates()

185

```

186

187

### Cross-Database Lineage

188

189

```python

190

@task

191

def cross_database_etl():

192

"""ETL across multiple databases with full lineage tracking."""

193

194

# Source database

195

source_hook = PostgresHook(postgres_conn_id="app_db")

196

197

# Target database

198

target_hook = PostgresHook(postgres_conn_id="analytics_db")

199

200

# Extract from source - lineage captures read

201

user_data = source_hook.get_df("""

202

SELECT u.id, u.name, u.email, u.created_at,

203

COUNT(o.id) as order_count,

204

SUM(o.total) as lifetime_value

205

FROM users u

206

LEFT JOIN orders o ON u.id = o.user_id

207

WHERE u.updated_at >= '{{ ds }}'

208

GROUP BY u.id, u.name, u.email, u.created_at

209

""")

210

211

# Load to target - lineage captures write

212

target_hook.insert_rows(

213

table="customer_analytics",

214

rows=user_data.values.tolist(),

215

target_fields=list(user_data.columns),

216

replace=True,

217

replace_index="id"

218

)

219

220

# OpenLineage automatically captures:

221

# - Source tables: app_db.public.users, app_db.public.orders

222

# - Target table: analytics_db.public.customer_analytics

223

# - Data transformation logic

224

# - Cross-database relationship

225

```

226

227

## Configuration and Setup

228

229

### OpenLineage Provider Installation

230

231

```bash

232

# Install OpenLineage provider

233

pip install apache-airflow-providers-openlineage

234

235

# Or install with PostgreSQL provider extras

236

pip install apache-airflow-providers-postgres[openlineage]

237

```

238

239

### Airflow Configuration

240

241

Configure OpenLineage in `airflow.cfg`:

242

243

```ini

244

[openlineage]

245

transport = {"type": "http", "url": "http://marquez:5000"}

246

namespace = production_data_pipeline

247

extractors = airflow.providers.postgres.hooks.postgres

248

```

249

250

### Environment Variables

251

252

```bash

253

# OpenLineage configuration

254

export OPENLINEAGE_URL=http://marquez:5000

255

export OPENLINEAGE_NAMESPACE=production

256

257

# Airflow OpenLineage integration

258

export AIRFLOW__OPENLINEAGE__TRANSPORT={"type": "http", "url": "http://marquez:5000"}

259

```

260

261

## Advanced Lineage Scenarios

262

263

### Complex Query Lineage

264

265

```python

266

@task

267

def complex_analytics_query():

268

"""Complex query with multiple table joins and CTEs."""

269

hook = PostgresHook(postgres_conn_id="analytics_db")

270

271

# OpenLineage parses and tracks all table dependencies

272

hook.run("""

273

WITH user_metrics AS (

274

SELECT

275

u.id as user_id,

276

u.segment,

277

COUNT(DISTINCT o.id) as order_count,

278

AVG(o.total) as avg_order_value

279

FROM users u

280

JOIN orders o ON u.id = o.user_id

281

WHERE o.created_at >= '{{ ds }}'

282

GROUP BY u.id, u.segment

283

),

284

product_metrics AS (

285

SELECT

286

p.category,

287

COUNT(DISTINCT oi.order_id) as orders_with_category,

288

SUM(oi.quantity * oi.price) as category_revenue

289

FROM products p

290

JOIN order_items oi ON p.id = oi.product_id

291

JOIN orders o ON oi.order_id = o.id

292

WHERE o.created_at >= '{{ ds }}'

293

GROUP BY p.category

294

)

295

INSERT INTO daily_analytics (

296

date, user_segment, product_category,

297

total_orders, avg_order_value, category_revenue

298

)

299

SELECT

300

'{{ ds }}' as date,

301

um.segment,

302

pm.category,

303

um.order_count,

304

um.avg_order_value,

305

pm.category_revenue

306

FROM user_metrics um

307

CROSS JOIN product_metrics pm

308

""")

309

310

# OpenLineage captures dependencies on:

311

# - users table

312

# - orders table

313

# - products table

314

# - order_items table

315

# And writes to daily_analytics table

316

```

317

318

### Stored Procedure Lineage

319

320

```python

321

@task

322

def call_stored_procedure():

323

"""Call stored procedure - lineage tracks inputs/outputs."""

324

hook = PostgresHook(postgres_conn_id="postgres_default")

325

326

# OpenLineage can track stored procedure calls

327

hook.run("""

328

CALL update_customer_segments(

329

p_calculation_date := '{{ ds }}',

330

p_recalculate_all := false

331

)

332

""")

333

334

# Lineage tracking depends on OpenLineage configuration

335

# and stored procedure analysis capabilities

336

```

337

338

## Lineage Data Flow

339

340

### Automatic Data Capture

341

342

OpenLineage integration automatically captures:

343

344

1. **Input Datasets**: Tables read by queries

345

2. **Output Datasets**: Tables written/updated

346

3. **Transformation Logic**: SQL queries and operations

347

4. **Schema Information**: Column-level lineage where possible

348

5. **Job Information**: Airflow DAG and task context

349

6. **Connection Details**: Database and schema information

350

351

### Manual Lineage Enhancement

352

353

```python

354

from openlineage.airflow.extractors import OperatorLineage

355

356

@task

357

def enhanced_lineage_task():

358

"""Task with manually enhanced lineage information."""

359

hook = PostgresHook(postgres_conn_id="postgres_default")

360

361

# Perform database operations

362

result = hook.run("INSERT INTO summary_table SELECT * FROM detail_table")

363

364

# Optional: Add custom lineage metadata

365

# (Implementation depends on OpenLineage setup)

366

367

return result

368

```

369

370

## Dependencies and Requirements

371

372

### Required Packages

373

374

```python

375

# Core dependencies

376

apache-airflow-providers-postgres

377

apache-airflow-providers-openlineage

378

379

# Optional for enhanced features

380

sqlparse # For SQL parsing and analysis

381

```

382

383

### Version Compatibility

384

385

- **OpenLineage Provider**: Latest version recommended

386

- **PostgreSQL Provider**: 6.2.3+

387

- **Airflow**: 2.10.0+ (required by PostgreSQL provider)

388

389

### OpenLineage Backend

390

391

Compatible with OpenLineage-compatible backends:

392

393

- **Marquez**: Open-source lineage backend

394

- **DataHub**: LinkedIn's metadata platform

395

- **Custom HTTP**: Any OpenLineage-compatible API

396

- **File**: Local JSON file output for development