0
# SQL Utilities
1
2
Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, information schema querying, and database metadata extraction for comprehensive SQL lineage tracking.
3
4
## Capabilities
5
6
### Table Schema Classes
7
8
Classes for representing and working with database table schemas.
9
10
```python { .api }
11
class ColumnIndex(Enum):
12
"""
13
Enumeration for information schema column indices.
14
15
Defines standard column positions in information_schema query results
16
for consistent schema extraction across different database systems.
17
"""
18
SCHEMA = 0 # Table schema/database name
19
TABLE_NAME = 1 # Table name
20
COLUMN_NAME = 2 # Column name
21
ORDINAL_POSITION = 3 # Column position in table
22
UDT_NAME = 4 # User-defined type name
23
24
class TableSchema:
25
"""
26
Table schema container with dataset conversion methods.
27
28
Represents a database table's schema information including
29
columns, types, and metadata for lineage extraction.
30
"""
31
32
def to_dataset(
33
self,
34
namespace: str,
35
database: str | None = None,
36
schema: str | None = None
37
) -> Dataset:
38
"""
39
Convert table schema to OpenLineage Dataset.
40
41
Args:
42
namespace: OpenLineage namespace
43
database: Database name
44
schema: Schema name
45
46
Returns:
47
Dataset: OpenLineage dataset with schema facets
48
"""
49
```
50
51
### Type Definitions
52
53
Type aliases for complex data structures used in SQL utilities.
54
55
```python { .api }
56
TablesHierarchy = dict[str | None, dict[str | None, list[str]]]
57
"""
58
Type alias for nested table hierarchy dictionary.
59
60
Structure: {database: {schema: [table_names]}}
61
Represents the hierarchical organization of tables across
62
databases and schemas for comprehensive schema analysis.
63
"""
64
```
65
66
### Schema Extraction Functions
67
68
Functions for extracting table schemas and metadata from databases.
69
70
```python { .api }
71
def get_table_schemas(
72
hook,
73
namespace: str,
74
database: str | None,
75
schema: str | None,
76
tables_hierarchy: TablesHierarchy,
77
information_schema_columns: list[str],
78
information_schema_table_name: str,
79
is_cross_db: bool,
80
use_flat_cross_db_query: bool,
81
is_uppercase_names: bool
82
) -> tuple[list[Dataset], list[Dataset]]:
83
"""
84
Get table schemas from database using information_schema queries.
85
86
Args:
87
hook: Database hook for executing queries
88
namespace: OpenLineage namespace
89
database: Target database name
90
schema: Target schema name
91
tables_hierarchy: Nested table structure dictionary
92
information_schema_columns: Columns to select from information_schema
93
information_schema_table_name: Name of information schema table
94
is_cross_db: Whether query spans multiple databases
95
use_flat_cross_db_query: Whether to use flat cross-database query
96
is_uppercase_names: Whether to uppercase table/column names
97
98
Returns:
99
tuple: (input_datasets, output_datasets) with schema information
100
"""
101
102
def parse_query_result(cursor) -> list[TableSchema]:
103
"""
104
Parse database query results into TableSchema objects.
105
106
Args:
107
cursor: Database cursor with query results
108
109
Returns:
110
list[TableSchema]: Parsed table schema objects
111
"""
112
```
113
114
### Query Generation Functions
115
116
Functions for generating SQL queries for schema discovery and analysis.
117
118
```python { .api }
119
def create_information_schema_query(
120
tables_hierarchy: TablesHierarchy,
121
information_schema_columns: list[str],
122
information_schema_table_name: str,
123
is_cross_db: bool,
124
use_flat_cross_db_query: bool,
125
is_uppercase_names: bool
126
) -> str:
127
"""
128
Create SQL query for extracting schema information from information_schema.
129
130
Args:
131
tables_hierarchy: Nested dictionary of database/schema/table structure
132
information_schema_columns: Columns to select from information schema
133
information_schema_table_name: Name of information schema table
134
is_cross_db: Whether query spans multiple databases
135
use_flat_cross_db_query: Whether to use flat cross-database query
136
is_uppercase_names: Whether to uppercase table/column names
137
138
Returns:
139
str: SQL query for schema information extraction
140
"""
141
142
def create_filter_clauses(
143
tables_hierarchy: TablesHierarchy,
144
is_uppercase_names: bool
145
) -> ClauseElement:
146
"""
147
Create SQL filter clauses for table hierarchy filtering.
148
149
Args:
150
tables_hierarchy: Nested table structure dictionary
151
is_uppercase_names: Whether to uppercase identifiers
152
153
Returns:
154
ClauseElement: SQLAlchemy filter clause element
155
"""
156
```
157
158
## Usage Examples
159
160
### Basic Schema Extraction
161
162
```python
163
from airflow.providers.openlineage.utils.sql import get_table_schemas, TablesHierarchy
164
from airflow.hooks.postgres_hook import PostgresHook
165
166
# Setup database connection
167
hook = PostgresHook(postgres_conn_id='analytics_db')
168
169
# Define table hierarchy
170
tables_hierarchy: TablesHierarchy = {
171
'analytics': {
172
'public': ['users', 'orders', 'products'],
173
'staging': ['raw_users', 'raw_orders']
174
},
175
'reporting': {
176
'public': ['daily_reports', 'monthly_summaries']
177
}
178
}
179
180
# Extract schemas
181
input_datasets, output_datasets = get_table_schemas(
182
hook=hook,
183
namespace='production',
184
database='analytics',
185
schema='public',
186
tables_hierarchy=tables_hierarchy,
187
information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],
188
information_schema_table_name='columns',
189
is_cross_db=True,
190
use_flat_cross_db_query=False,
191
is_uppercase_names=False
192
)
193
194
print(f"Input datasets: {len(input_datasets)}")
195
print(f"Output datasets: {len(output_datasets)}")
196
```
197
198
### Custom Information Schema Query
199
200
```python
201
from airflow.providers.openlineage.utils.sql import create_information_schema_query
202
203
# Define complex table hierarchy
204
tables_hierarchy = {
205
'warehouse': {
206
'dim': ['dim_users', 'dim_products', 'dim_time'],
207
'fact': ['fact_sales', 'fact_inventory'],
208
'staging': ['stg_users', 'stg_products', 'stg_sales']
209
},
210
'analytics': {
211
'reports': ['daily_sales', 'monthly_trends'],
212
'ml': ['user_features', 'product_embeddings']
213
}
214
}
215
216
# Generate information schema query
217
query = create_information_schema_query(
218
tables_hierarchy=tables_hierarchy,
219
information_schema_columns=[
220
'table_catalog',
221
'table_schema',
222
'table_name',
223
'column_name',
224
'ordinal_position',
225
'data_type',
226
'is_nullable'
227
],
228
information_schema_table_name='columns',
229
is_cross_db=True,
230
use_flat_cross_db_query=False,
231
is_uppercase_names=False
232
)
233
234
print("Generated query:")
235
print(query)
236
```
237
238
### Query Result Processing
239
240
```python
241
from airflow.providers.openlineage.utils.sql import parse_query_result, TableSchema
242
from airflow.hooks.postgres_hook import PostgresHook
243
244
def extract_table_metadata(connection_id: str, table_hierarchy: TablesHierarchy):
245
"""Extract and process table metadata from database."""
246
247
hook = PostgresHook(postgres_conn_id=connection_id)
248
249
# Execute information schema query
250
query = create_information_schema_query(
251
tables_hierarchy=table_hierarchy,
252
information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],
253
information_schema_table_name='columns',
254
is_cross_db=False,
255
use_flat_cross_db_query=False,
256
is_uppercase_names=False
257
)
258
259
# Get cursor and execute query
260
cursor = hook.get_cursor()
261
cursor.execute(query)
262
263
# Parse results
264
table_schemas = parse_query_result(cursor)
265
266
# Process schemas
267
for schema in table_schemas:
268
print(f"Table: {schema.schema_name}.{schema.table_name}")
269
print(f"Columns: {len(schema.columns)}")
270
271
# Convert to OpenLineage dataset
272
dataset = schema.to_dataset(
273
namespace='production',
274
database='analytics',
275
schema=schema.schema_name
276
)
277
278
print(f"Dataset: {dataset.namespace}/{dataset.name}")
279
print(f"Schema facet: {dataset.facets.get('schema', 'None')}")
280
281
# Usage
282
table_hierarchy = {
283
'analytics': {
284
'public': ['users', 'orders']
285
}
286
}
287
288
extract_table_metadata('analytics_db', table_hierarchy)
289
```
290
291
### TableSchema Usage
292
293
```python
294
from airflow.providers.openlineage.utils.sql import TableSchema, ColumnIndex
295
from openlineage.client.event_v2 import Dataset
296
297
def create_table_schema_from_metadata(metadata_rows):
298
"""Create TableSchema from raw metadata rows."""
299
300
# Group rows by table
301
tables = {}
302
for row in metadata_rows:
303
schema_name = row[ColumnIndex.SCHEMA.value]
304
table_name = row[ColumnIndex.TABLE_NAME.value]
305
column_name = row[ColumnIndex.COLUMN_NAME.value]
306
column_position = row[ColumnIndex.ORDINAL_POSITION.value]
307
data_type = row[ColumnIndex.UDT_NAME.value]
308
309
table_key = f"{schema_name}.{table_name}"
310
if table_key not in tables:
311
tables[table_key] = TableSchema()
312
tables[table_key].schema_name = schema_name
313
tables[table_key].table_name = table_name
314
tables[table_key].columns = []
315
316
tables[table_key].columns.append({
317
'name': column_name,
318
'position': column_position,
319
'type': data_type
320
})
321
322
return list(tables.values())
323
324
def convert_schemas_to_datasets(table_schemas: list[TableSchema], namespace: str):
325
"""Convert table schemas to OpenLineage datasets."""
326
327
datasets = []
328
for schema in table_schemas:
329
dataset = schema.to_dataset(
330
namespace=namespace,
331
database='analytics',
332
schema=schema.schema_name
333
)
334
datasets.append(dataset)
335
336
return datasets
337
338
# Example usage
339
sample_metadata = [
340
('public', 'users', 'id', 1, 'integer'),
341
('public', 'users', 'name', 2, 'varchar'),
342
('public', 'users', 'email', 3, 'varchar'),
343
('public', 'orders', 'id', 1, 'integer'),
344
('public', 'orders', 'user_id', 2, 'integer'),
345
('public', 'orders', 'amount', 3, 'decimal')
346
]
347
348
schemas = create_table_schema_from_metadata(sample_metadata)
349
datasets = convert_schemas_to_datasets(schemas, 'production')
350
351
for dataset in datasets:
352
print(f"Dataset: {dataset.name}")
353
print(f"Schema columns: {len(dataset.facets.get('schema', {}).get('fields', []))}")
354
```
355
356
### Cross-Database Schema Analysis
357
358
```python
359
from airflow.providers.openlineage.utils.sql import get_table_schemas
360
361
def analyze_cross_database_schemas(hook, databases: list[str]):
362
"""Analyze schemas across multiple databases."""
363
364
# Build comprehensive table hierarchy
365
tables_hierarchy = {}
366
367
for db in databases:
368
# Query each database for table information
369
db_tables = get_database_tables(hook, db)
370
tables_hierarchy[db] = db_tables
371
372
# Extract schemas with cross-database support
373
all_inputs, all_outputs = get_table_schemas(
374
hook=hook,
375
namespace='multi_db',
376
database=None, # Cross-database query
377
schema=None,
378
tables_hierarchy=tables_hierarchy,
379
information_schema_columns=[
380
'table_catalog',
381
'table_schema',
382
'table_name',
383
'column_name',
384
'ordinal_position',
385
'data_type'
386
],
387
information_schema_table_name='columns',
388
is_cross_db=True,
389
use_flat_cross_db_query=True,
390
is_uppercase_names=False
391
)
392
393
return all_inputs, all_outputs
394
395
def get_database_tables(hook, database: str) -> dict:
396
"""Get table hierarchy for a specific database."""
397
398
query = f"""
399
SELECT DISTINCT table_schema, table_name
400
FROM {database}.information_schema.tables
401
WHERE table_type = 'BASE TABLE'
402
ORDER BY table_schema, table_name
403
"""
404
405
result = hook.get_records(query)
406
407
schema_tables = {}
408
for schema, table in result:
409
if schema not in schema_tables:
410
schema_tables[schema] = []
411
schema_tables[schema].append(table)
412
413
return schema_tables
414
415
# Usage
416
from airflow.hooks.postgres_hook import PostgresHook
417
418
hook = PostgresHook(postgres_conn_id='multi_db_connection')
419
databases = ['analytics', 'warehouse', 'reporting']
420
421
inputs, outputs = analyze_cross_database_schemas(hook, databases)
422
print(f"Total datasets analyzed: {len(inputs) + len(outputs)}")
423
```
424
425
### Filter Clause Generation
426
427
```python
428
from airflow.providers.openlineage.utils.sql import create_filter_clauses
429
from sqlalchemy import text
430
431
def build_custom_schema_query(tables_hierarchy: TablesHierarchy):
432
"""Build custom query with generated filter clauses."""
433
434
# Generate filter clauses
435
filter_clause = create_filter_clauses(
436
tables_hierarchy=tables_hierarchy,
437
is_uppercase_names=False
438
)
439
440
# Base query
441
base_query = """
442
SELECT
443
table_schema,
444
table_name,
445
column_name,
446
ordinal_position,
447
data_type,
448
is_nullable
449
FROM information_schema.columns
450
"""
451
452
# Combine with filter
453
if filter_clause is not None:
454
full_query = f"{base_query} WHERE {filter_clause}"
455
else:
456
full_query = base_query
457
458
return full_query
459
460
# Usage
461
table_hierarchy = {
462
'analytics': {
463
'public': ['users', 'orders'],
464
'staging': ['raw_data']
465
},
466
'warehouse': {
467
'dim': ['dim_users'],
468
'fact': ['fact_sales']
469
}
470
}
471
472
query = build_custom_schema_query(table_hierarchy)
473
print("Generated query with filters:")
474
print(query)
475
```
476
477
### Integration with SQL Parser
478
479
```python
480
from airflow.providers.openlineage.utils.sql import get_table_schemas
481
from airflow.providers.openlineage.sqlparser import SQLParser, DatabaseInfo
482
483
def enhanced_sql_parsing_with_schema(hook, sql_statements: list[str]):
484
"""Enhanced SQL parsing with schema information."""
485
486
# Initialize SQL parser
487
parser = SQLParser(dialect='postgresql')
488
489
# Parse SQL to identify tables
490
all_tables = set()
491
for sql in sql_statements:
492
metadata = parser.parse(sql)
493
if metadata:
494
all_tables.update(metadata.in_tables or [])
495
all_tables.update(metadata.out_tables or [])
496
497
# Build table hierarchy from parsed tables
498
tables_hierarchy = {}
499
for table in all_tables:
500
# Parse table name (assuming format: schema.table or database.schema.table)
501
parts = table.split('.')
502
503
if len(parts) >= 2:
504
if len(parts) == 2:
505
schema, table_name = parts
506
database = None
507
else:
508
database, schema, table_name = parts
509
510
if database not in tables_hierarchy:
511
tables_hierarchy[database] = {}
512
if schema not in tables_hierarchy[database]:
513
tables_hierarchy[database][schema] = []
514
515
tables_hierarchy[database][schema].append(table_name)
516
517
# Get schema information
518
input_datasets, output_datasets = get_table_schemas(
519
hook=hook,
520
namespace='sql_parsing',
521
database=None,
522
schema=None,
523
tables_hierarchy=tables_hierarchy,
524
information_schema_columns=['table_schema', 'table_name', 'column_name', 'data_type'],
525
information_schema_table_name='columns',
526
is_cross_db=True,
527
use_flat_cross_db_query=False,
528
is_uppercase_names=False
529
)
530
531
return {
532
'parsed_tables': all_tables,
533
'input_datasets': input_datasets,
534
'output_datasets': output_datasets,
535
'tables_hierarchy': tables_hierarchy
536
}
537
538
# Usage
539
sql_statements = [
540
"SELECT * FROM analytics.public.users u JOIN analytics.public.orders o ON u.id = o.user_id",
541
"INSERT INTO warehouse.fact.fact_sales SELECT * FROM analytics.staging.raw_sales",
542
"CREATE TABLE reporting.public.daily_summary AS SELECT date, SUM(amount) FROM warehouse.fact.fact_sales GROUP BY date"
543
]
544
545
hook = PostgresHook(postgres_conn_id='analytics_db')
546
result = enhanced_sql_parsing_with_schema(hook, sql_statements)
547
548
print(f"Parsed tables: {result['parsed_tables']}")
549
print(f"Input datasets: {len(result['input_datasets'])}")
550
print(f"Output datasets: {len(result['output_datasets'])}")
551
print(f"Table hierarchy: {result['tables_hierarchy']}")
552
```
553
554
## Database System Support
555
556
The SQL utilities support various database systems with appropriate adaptations:
557
558
### PostgreSQL
559
```python
560
# PostgreSQL-specific configuration
561
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name']
562
information_schema_table_name = 'columns'
563
```
564
565
### MySQL
566
```python
567
# MySQL-specific configuration
568
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
569
information_schema_table_name = 'columns'
570
```
571
572
### BigQuery
573
```python
574
# BigQuery-specific configuration (uses INFORMATION_SCHEMA views)
575
information_schema_columns = ['table_catalog', 'table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
576
information_schema_table_name = 'COLUMN_FIELD_PATHS'
577
```
578
579
### Snowflake
580
```python
581
# Snowflake-specific configuration
582
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
583
information_schema_table_name = 'columns'
584
is_uppercase_names = True # Snowflake uses uppercase identifiers
585
```