Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information. The SQL parsing functionality provides comprehensive analysis of SQL queries to automatically discover data flows and transformations.
Main interface for parsing SQL statements and extracting lineage metadata.
class SQLParser:
"""
Main SQL parsing interface for extracting lineage from SQL statements.
"""
def __init__(self, dialect: str | None = None, default_schema: str | None = None):
"""
Initialize SQL parser with optional dialect and schema.
Args:
dialect: SQL dialect (e.g., 'postgresql', 'mysql', 'bigquery')
default_schema: Default schema name for unqualified table references
"""
def parse(self, sql: list[str] | str) -> SqlMeta | None:
"""
Parse SQL statement(s) and return metadata.
Args:
sql: SQL statement(s) to parse
Returns:
SqlMeta: Parsed SQL metadata or None if parsing fails
"""
def parse_table_schemas(
self,
hook,
inputs: list[Dataset],
outputs: list[Dataset]
) -> tuple[list[Dataset], list[Dataset]]:
"""
Parse and enrich table schemas with column information.
Args:
hook: Database hook for schema queries
inputs: Input datasets to enrich
outputs: Output datasets to enrich
Returns:
tuple: Enriched (inputs, outputs) datasets with schema information
"""
def get_metadata_from_parser(
self,
parse_result: SqlMeta,
database: str | None,
schema: str | None
) -> tuple[list[Dataset], list[Dataset]]:
"""
Extract input/output datasets from parse results.
Args:
parse_result: Result from SQL parsing
database: Database name
schema: Schema name
Returns:
tuple: (input_datasets, output_datasets)
"""
def attach_column_lineage(
self,
datasets: list[Dataset],
database: str | None,
parse_result: SqlMeta
):
"""
Attach column-level lineage information to datasets.
Args:
datasets: Datasets to enrich with column lineage
database: Database name
parse_result: SQL parsing result with column mappings
"""
def generate_openlineage_metadata_from_sql(
self,
operator_instance,
task_instance,
task_uuid: str
) -> OperatorLineage:
"""
Generate complete OpenLineage metadata from SQL operations.
Args:
operator_instance: Airflow operator instance
task_instance: Task instance context
task_uuid: Unique task identifier
Returns:
OperatorLineage: Complete operator lineage with datasets and facets
"""
@staticmethod
def create_namespace(database_info: DatabaseInfo) -> str:
"""
Create namespace string from database information.
Args:
database_info: Database configuration
Returns:
str: Formatted namespace string
"""
@classmethod
def normalize_sql(cls, sql: list[str] | str) -> str:
"""
Normalize SQL statement(s) for consistent parsing.
Args:
sql: SQL statement(s) to normalize
Returns:
str: Normalized SQL string
"""
@classmethod
def split_sql_string(cls, sql: list[str] | str) -> list[str]:
"""
Split SQL string into individual statements.
Args:
sql: SQL string or list to split
Returns:
list[str]: List of individual SQL statements
"""
def create_information_schema_query(
self,
tables_hierarchy: dict,
information_schema_columns: list[str],
information_schema_table_name: str,
is_cross_db: bool,
use_flat_cross_db_query: bool,
is_uppercase_names: bool
) -> str:
"""
Create query for extracting schema information from information_schema.
Args:
tables_hierarchy: Nested dictionary of database/schema/table structure
information_schema_columns: Columns to select from information schema
information_schema_table_name: Name of information schema table
is_cross_db: Whether query spans multiple databases
use_flat_cross_db_query: Whether to use flat cross-database query
is_uppercase_names: Whether to uppercase table/column names
Returns:
str: SQL query for schema information extraction
"""Container for database-specific configuration and connection details.
class DatabaseInfo:
"""
Database configuration container with connection details and schema information.
"""
scheme: str # Database scheme (e.g., 'postgresql', 'mysql')
authority: str | None # Database authority/host information
database: str | None # Database name
information_schema_columns: list[str] # Columns available in information_schema
information_schema_table_name: str # Name of information schema table
use_flat_cross_db_query: bool # Whether to use flat cross-database queries
is_information_schema_cross_db: bool # Whether information_schema spans databases
is_uppercase_names: bool # Whether to uppercase identifiers
normalize_name_method: Callable[[str], str] # Method for normalizing namesType definitions for SQL parsing operations and parameters.
class GetTableSchemasParams(TypedDict):
"""
Type definition for table schema extraction parameters.
"""
hook: Any # Database hook instance
namespace: str # OpenLineage namespace
database: str | None # Database name
schema: str | None # Schema name
tables_hierarchy: dict # Nested table structure
information_schema_columns: list[str] # Information schema columns
information_schema_table_name: str # Information schema table name
is_cross_db: bool # Cross-database query flag
use_flat_cross_db_query: bool # Flat query flag
is_uppercase_names: bool # Uppercase names flagHelper functions for SQL parsing and lineage extraction.
def default_normalize_name_method(name: str) -> str:
"""
Default method for normalizing database object names.
Args:
name: Name to normalize
Returns:
str: Normalized name
"""
def from_table_meta(
table_meta: DbTableMeta,
database: str | None,
namespace: str,
is_uppercase: bool
) -> Dataset:
"""
Convert table metadata to OpenLineage Dataset.
Args:
table_meta: Database table metadata
database: Database name
namespace: OpenLineage namespace
is_uppercase: Whether names should be uppercase
Returns:
Dataset: OpenLineage dataset representation
"""
def get_openlineage_facets_with_sql(
hook: DbApiHook,
sql: str | list[str],
conn_id: str,
database: str | None
) -> OperatorLineage | None:
"""
Extract OpenLineage facets from SQL operations using database hook.
Args:
hook: Database API hook for connection
sql: SQL statement(s) to analyze
conn_id: Airflow connection ID
database: Database name
Returns:
OperatorLineage: Extracted lineage metadata or None if extraction fails
"""Default values and configuration constants for SQL parsing.
DEFAULT_NAMESPACE: str = "default"
"""Default namespace for OpenLineage events when none specified."""
DEFAULT_INFORMATION_SCHEMA_COLUMNS: list[str] = [
"table_schema",
"table_name",
"column_name",
"ordinal_position",
"udt_name"
]
"""Default columns to select from information_schema tables."""
DEFAULT_INFORMATION_SCHEMA_TABLE_NAME: str = "columns"
"""Default information_schema table name for column metadata."""from airflow.providers.openlineage.sqlparser import SQLParser
# Initialize parser
parser = SQLParser(dialect='postgresql', default_schema='public')
# Parse SQL statement
sql = "SELECT * FROM users u JOIN orders o ON u.id = o.user_id"
metadata = parser.parse(sql)
if metadata:
print(f"Input tables: {metadata.in_tables}")
print(f"Output tables: {metadata.out_tables}")from airflow.providers.openlineage.sqlparser import DatabaseInfo, default_normalize_name_method
# Configure database information
db_info = DatabaseInfo(
scheme='postgresql',
authority='localhost:5432',
database='analytics',
information_schema_columns=['table_schema', 'table_name', 'column_name'],
information_schema_table_name='columns',
use_flat_cross_db_query=False,
is_information_schema_cross_db=False,
is_uppercase_names=False,
normalize_name_method=default_normalize_name_method
)
# Create namespace
namespace = SQLParser.create_namespace(db_info)
print(f"Namespace: {namespace}")from airflow.providers.openlineage.sqlparser import SQLParser
from airflow.hooks.postgres_hook import PostgresHook
# Setup
parser = SQLParser(dialect='postgresql')
hook = PostgresHook(postgres_conn_id='my_postgres')
# Parse SQL and get basic lineage
sql = """
INSERT INTO analytics.user_metrics
SELECT user_id, COUNT(*) as order_count
FROM orders
GROUP BY user_id
"""
metadata = parser.parse(sql)
inputs, outputs = parser.get_metadata_from_parser(metadata, 'analytics', 'public')
# Enrich with schema information
enriched_inputs, enriched_outputs = parser.parse_table_schemas(hook, inputs, outputs)
print(f"Enriched inputs: {enriched_inputs}")
print(f"Enriched outputs: {enriched_outputs}")from airflow.providers.openlineage.sqlparser import get_openlineage_facets_with_sql
from airflow.hooks.postgres_hook import PostgresHook
# Extract complete lineage
hook = PostgresHook(postgres_conn_id='analytics_db')
sql = "INSERT INTO reports.daily_sales SELECT * FROM raw.sales WHERE date = CURRENT_DATE"
lineage = get_openlineage_facets_with_sql(
hook=hook,
sql=sql,
conn_id='analytics_db',
database='analytics'
)
if lineage:
print(f"Input datasets: {lineage.inputs}")
print(f"Output datasets: {lineage.outputs}")
print(f"Run facets: {lineage.run_facets}")from airflow.providers.openlineage.sqlparser import SQLParser
# Normalize complex SQL
complex_sql = [
"/* Comment */ SELECT * FROM table1;",
"INSERT INTO table2 SELECT * FROM table1 WHERE active = true;",
""
]
normalized = SQLParser.normalize_sql(complex_sql)
statements = SQLParser.split_sql_string(normalized)
print(f"Normalized SQL: {normalized}")
print(f"Individual statements: {statements}")The SQL parser integrates automatically with SQL-based operators:
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.openlineage.sqlparser import SQLParser
# The operator automatically uses SQLParser for lineage extraction
sql_task = PostgresOperator(
task_id='analyze_sales',
postgres_conn_id='analytics_db',
sql="""
INSERT INTO reports.monthly_sales
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(amount) as total_sales
FROM sales.orders
WHERE order_date >= '2023-01-01'
GROUP BY DATE_TRUNC('month', order_date)
""",
dag=dag
)The SQL parser supports various SQL dialects:
Dialect-specific features include appropriate query syntax, information schema handling, and identifier quoting conventions.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage