0
# SQL Parsing and Analysis
1
2
Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information. The SQL parsing functionality provides comprehensive analysis of SQL queries to automatically discover data flows and transformations.
3
4
## Capabilities
5
6
### SQL Parser Class
7
8
Main interface for parsing SQL statements and extracting lineage metadata.
9
10
```python { .api }
11
class SQLParser:
12
"""
13
Main SQL parsing interface for extracting lineage from SQL statements.
14
"""
15
16
def __init__(self, dialect: str | None = None, default_schema: str | None = None):
17
"""
18
Initialize SQL parser with optional dialect and schema.
19
20
Args:
21
dialect: SQL dialect (e.g., 'postgresql', 'mysql', 'bigquery')
22
default_schema: Default schema name for unqualified table references
23
"""
24
25
def parse(self, sql: list[str] | str) -> SqlMeta | None:
26
"""
27
Parse SQL statement(s) and return metadata.
28
29
Args:
30
sql: SQL statement(s) to parse
31
32
Returns:
33
SqlMeta: Parsed SQL metadata or None if parsing fails
34
"""
35
36
def parse_table_schemas(
37
self,
38
hook,
39
inputs: list[Dataset],
40
outputs: list[Dataset]
41
) -> tuple[list[Dataset], list[Dataset]]:
42
"""
43
Parse and enrich table schemas with column information.
44
45
Args:
46
hook: Database hook for schema queries
47
inputs: Input datasets to enrich
48
outputs: Output datasets to enrich
49
50
Returns:
51
tuple: Enriched (inputs, outputs) datasets with schema information
52
"""
53
54
def get_metadata_from_parser(
55
self,
56
parse_result: SqlMeta,
57
database: str | None,
58
schema: str | None
59
) -> tuple[list[Dataset], list[Dataset]]:
60
"""
61
Extract input/output datasets from parse results.
62
63
Args:
64
parse_result: Result from SQL parsing
65
database: Database name
66
schema: Schema name
67
68
Returns:
69
tuple: (input_datasets, output_datasets)
70
"""
71
72
def attach_column_lineage(
73
self,
74
datasets: list[Dataset],
75
database: str | None,
76
parse_result: SqlMeta
77
):
78
"""
79
Attach column-level lineage information to datasets.
80
81
Args:
82
datasets: Datasets to enrich with column lineage
83
database: Database name
84
parse_result: SQL parsing result with column mappings
85
"""
86
87
def generate_openlineage_metadata_from_sql(
88
self,
89
operator_instance,
90
task_instance,
91
task_uuid: str
92
) -> OperatorLineage:
93
"""
94
Generate complete OpenLineage metadata from SQL operations.
95
96
Args:
97
operator_instance: Airflow operator instance
98
task_instance: Task instance context
99
task_uuid: Unique task identifier
100
101
Returns:
102
OperatorLineage: Complete operator lineage with datasets and facets
103
"""
104
105
@staticmethod
106
def create_namespace(database_info: DatabaseInfo) -> str:
107
"""
108
Create namespace string from database information.
109
110
Args:
111
database_info: Database configuration
112
113
Returns:
114
str: Formatted namespace string
115
"""
116
117
@classmethod
118
def normalize_sql(cls, sql: list[str] | str) -> str:
119
"""
120
Normalize SQL statement(s) for consistent parsing.
121
122
Args:
123
sql: SQL statement(s) to normalize
124
125
Returns:
126
str: Normalized SQL string
127
"""
128
129
@classmethod
130
def split_sql_string(cls, sql: list[str] | str) -> list[str]:
131
"""
132
Split SQL string into individual statements.
133
134
Args:
135
sql: SQL string or list to split
136
137
Returns:
138
list[str]: List of individual SQL statements
139
"""
140
141
def create_information_schema_query(
142
self,
143
tables_hierarchy: dict,
144
information_schema_columns: list[str],
145
information_schema_table_name: str,
146
is_cross_db: bool,
147
use_flat_cross_db_query: bool,
148
is_uppercase_names: bool
149
) -> str:
150
"""
151
Create query for extracting schema information from information_schema.
152
153
Args:
154
tables_hierarchy: Nested dictionary of database/schema/table structure
155
information_schema_columns: Columns to select from information schema
156
information_schema_table_name: Name of information schema table
157
is_cross_db: Whether query spans multiple databases
158
use_flat_cross_db_query: Whether to use flat cross-database query
159
is_uppercase_names: Whether to uppercase table/column names
160
161
Returns:
162
str: SQL query for schema information extraction
163
"""
164
```
165
166
### Database Information Configuration
167
168
Container for database-specific configuration and connection details.
169
170
```python { .api }
171
class DatabaseInfo:
172
"""
173
Database configuration container with connection details and schema information.
174
"""
175
176
scheme: str # Database scheme (e.g., 'postgresql', 'mysql')
177
authority: str | None # Database authority/host information
178
database: str | None # Database name
179
information_schema_columns: list[str] # Columns available in information_schema
180
information_schema_table_name: str # Name of information schema table
181
use_flat_cross_db_query: bool # Whether to use flat cross-database queries
182
is_information_schema_cross_db: bool # Whether information_schema spans databases
183
is_uppercase_names: bool # Whether to uppercase identifiers
184
normalize_name_method: Callable[[str], str] # Method for normalizing names
185
```
186
187
### Type Definitions
188
189
Type definitions for SQL parsing operations and parameters.
190
191
```python { .api }
192
class GetTableSchemasParams(TypedDict):
193
"""
194
Type definition for table schema extraction parameters.
195
"""
196
hook: Any # Database hook instance
197
namespace: str # OpenLineage namespace
198
database: str | None # Database name
199
schema: str | None # Schema name
200
tables_hierarchy: dict # Nested table structure
201
information_schema_columns: list[str] # Information schema columns
202
information_schema_table_name: str # Information schema table name
203
is_cross_db: bool # Cross-database query flag
204
use_flat_cross_db_query: bool # Flat query flag
205
is_uppercase_names: bool # Uppercase names flag
206
```
207
208
### Utility Functions
209
210
Helper functions for SQL parsing and lineage extraction.
211
212
```python { .api }
213
def default_normalize_name_method(name: str) -> str:
214
"""
215
Default method for normalizing database object names.
216
217
Args:
218
name: Name to normalize
219
220
Returns:
221
str: Normalized name
222
"""
223
224
def from_table_meta(
225
table_meta: DbTableMeta,
226
database: str | None,
227
namespace: str,
228
is_uppercase: bool
229
) -> Dataset:
230
"""
231
Convert table metadata to OpenLineage Dataset.
232
233
Args:
234
table_meta: Database table metadata
235
database: Database name
236
namespace: OpenLineage namespace
237
is_uppercase: Whether names should be uppercase
238
239
Returns:
240
Dataset: OpenLineage dataset representation
241
"""
242
243
def get_openlineage_facets_with_sql(
244
hook: DbApiHook,
245
sql: str | list[str],
246
conn_id: str,
247
database: str | None
248
) -> OperatorLineage | None:
249
"""
250
Extract OpenLineage facets from SQL operations using database hook.
251
252
Args:
253
hook: Database API hook for connection
254
sql: SQL statement(s) to analyze
255
conn_id: Airflow connection ID
256
database: Database name
257
258
Returns:
259
OperatorLineage: Extracted lineage metadata or None if extraction fails
260
"""
261
```
262
263
### Constants
264
265
Default values and configuration constants for SQL parsing.
266
267
```python { .api }
268
DEFAULT_NAMESPACE: str = "default"
269
"""Default namespace for OpenLineage events when none specified."""
270
271
DEFAULT_INFORMATION_SCHEMA_COLUMNS: list[str] = [
272
"table_schema",
273
"table_name",
274
"column_name",
275
"ordinal_position",
276
"udt_name"
277
]
278
"""Default columns to select from information_schema tables."""
279
280
DEFAULT_INFORMATION_SCHEMA_TABLE_NAME: str = "columns"
281
"""Default information_schema table name for column metadata."""
282
```
283
284
## Usage Examples
285
286
### Basic SQL Parsing
287
288
```python
289
from airflow.providers.openlineage.sqlparser import SQLParser
290
291
# Initialize parser
292
parser = SQLParser(dialect='postgresql', default_schema='public')
293
294
# Parse SQL statement
295
sql = "SELECT * FROM users u JOIN orders o ON u.id = o.user_id"
296
metadata = parser.parse(sql)
297
298
if metadata:
299
print(f"Input tables: {metadata.in_tables}")
300
print(f"Output tables: {metadata.out_tables}")
301
```
302
303
### Database Configuration
304
305
```python
306
from airflow.providers.openlineage.sqlparser import DatabaseInfo, default_normalize_name_method
307
308
# Configure database information
309
db_info = DatabaseInfo(
310
scheme='postgresql',
311
authority='localhost:5432',
312
database='analytics',
313
information_schema_columns=['table_schema', 'table_name', 'column_name'],
314
information_schema_table_name='columns',
315
use_flat_cross_db_query=False,
316
is_information_schema_cross_db=False,
317
is_uppercase_names=False,
318
normalize_name_method=default_normalize_name_method
319
)
320
321
# Create namespace
322
namespace = SQLParser.create_namespace(db_info)
323
print(f"Namespace: {namespace}")
324
```
325
326
### Schema Analysis
327
328
```python
329
from airflow.providers.openlineage.sqlparser import SQLParser
330
from airflow.hooks.postgres_hook import PostgresHook
331
332
# Setup
333
parser = SQLParser(dialect='postgresql')
334
hook = PostgresHook(postgres_conn_id='my_postgres')
335
336
# Parse SQL and get basic lineage
337
sql = """
338
INSERT INTO analytics.user_metrics
339
SELECT user_id, COUNT(*) as order_count
340
FROM orders
341
GROUP BY user_id
342
"""
343
344
metadata = parser.parse(sql)
345
inputs, outputs = parser.get_metadata_from_parser(metadata, 'analytics', 'public')
346
347
# Enrich with schema information
348
enriched_inputs, enriched_outputs = parser.parse_table_schemas(hook, inputs, outputs)
349
350
print(f"Enriched inputs: {enriched_inputs}")
351
print(f"Enriched outputs: {enriched_outputs}")
352
```
353
354
### Complete Lineage Generation
355
356
```python
357
from airflow.providers.openlineage.sqlparser import get_openlineage_facets_with_sql
358
from airflow.hooks.postgres_hook import PostgresHook
359
360
# Extract complete lineage
361
hook = PostgresHook(postgres_conn_id='analytics_db')
362
sql = "INSERT INTO reports.daily_sales SELECT * FROM raw.sales WHERE date = CURRENT_DATE"
363
364
lineage = get_openlineage_facets_with_sql(
365
hook=hook,
366
sql=sql,
367
conn_id='analytics_db',
368
database='analytics'
369
)
370
371
if lineage:
372
print(f"Input datasets: {lineage.inputs}")
373
print(f"Output datasets: {lineage.outputs}")
374
print(f"Run facets: {lineage.run_facets}")
375
```
376
377
### Custom SQL Normalization
378
379
```python
380
from airflow.providers.openlineage.sqlparser import SQLParser
381
382
# Normalize complex SQL
383
complex_sql = [
384
"/* Comment */ SELECT * FROM table1;",
385
"INSERT INTO table2 SELECT * FROM table1 WHERE active = true;",
386
""
387
]
388
389
normalized = SQLParser.normalize_sql(complex_sql)
390
statements = SQLParser.split_sql_string(normalized)
391
392
print(f"Normalized SQL: {normalized}")
393
print(f"Individual statements: {statements}")
394
```
395
396
## Integration with Operators
397
398
The SQL parser integrates automatically with SQL-based operators:
399
400
```python
401
from airflow.providers.postgres.operators.postgres import PostgresOperator
402
from airflow.providers.openlineage.sqlparser import SQLParser
403
404
# The operator automatically uses SQLParser for lineage extraction
405
sql_task = PostgresOperator(
406
task_id='analyze_sales',
407
postgres_conn_id='analytics_db',
408
sql="""
409
INSERT INTO reports.monthly_sales
410
SELECT
411
DATE_TRUNC('month', order_date) as month,
412
SUM(amount) as total_sales
413
FROM sales.orders
414
WHERE order_date >= '2023-01-01'
415
GROUP BY DATE_TRUNC('month', order_date)
416
""",
417
dag=dag
418
)
419
```
420
421
## Supported SQL Dialects
422
423
The SQL parser supports various SQL dialects:
424
425
- PostgreSQL
426
- MySQL
427
- BigQuery
428
- Snowflake
429
- Redshift
430
- SQLite
431
- Generic SQL (limited features)
432
433
Dialect-specific features include appropriate query syntax, information schema handling, and identifier quoting conventions.