Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, information schema querying, and database metadata extraction for comprehensive SQL lineage tracking.
Classes for representing and working with database table schemas.
class ColumnIndex(Enum):
"""
Enumeration for information schema column indices.
Defines standard column positions in information_schema query results
for consistent schema extraction across different database systems.
"""
SCHEMA = 0 # Table schema/database name
TABLE_NAME = 1 # Table name
COLUMN_NAME = 2 # Column name
ORDINAL_POSITION = 3 # Column position in table
UDT_NAME = 4 # User-defined type name
class TableSchema:
"""
Table schema container with dataset conversion methods.
Represents a database table's schema information including
columns, types, and metadata for lineage extraction.
"""
def to_dataset(
self,
namespace: str,
database: str | None = None,
schema: str | None = None
) -> Dataset:
"""
Convert table schema to OpenLineage Dataset.
Args:
namespace: OpenLineage namespace
database: Database name
schema: Schema name
Returns:
Dataset: OpenLineage dataset with schema facets
"""Type aliases for complex data structures used in SQL utilities.
TablesHierarchy = dict[str | None, dict[str | None, list[str]]]
"""
Type alias for nested table hierarchy dictionary.
Structure: {database: {schema: [table_names]}}
Represents the hierarchical organization of tables across
databases and schemas for comprehensive schema analysis.
"""Functions for extracting table schemas and metadata from databases.
def get_table_schemas(
hook,
namespace: str,
database: str | None,
schema: str | None,
tables_hierarchy: TablesHierarchy,
information_schema_columns: list[str],
information_schema_table_name: str,
is_cross_db: bool,
use_flat_cross_db_query: bool,
is_uppercase_names: bool
) -> tuple[list[Dataset], list[Dataset]]:
"""
Get table schemas from database using information_schema queries.
Args:
hook: Database hook for executing queries
namespace: OpenLineage namespace
database: Target database name
schema: Target schema name
tables_hierarchy: Nested table structure dictionary
information_schema_columns: Columns to select from information_schema
information_schema_table_name: Name of information schema table
is_cross_db: Whether query spans multiple databases
use_flat_cross_db_query: Whether to use flat cross-database query
is_uppercase_names: Whether to uppercase table/column names
Returns:
tuple: (input_datasets, output_datasets) with schema information
"""
def parse_query_result(cursor) -> list[TableSchema]:
"""
Parse database query results into TableSchema objects.
Args:
cursor: Database cursor with query results
Returns:
list[TableSchema]: Parsed table schema objects
"""Functions for generating SQL queries for schema discovery and analysis.
def create_information_schema_query(
tables_hierarchy: TablesHierarchy,
information_schema_columns: list[str],
information_schema_table_name: str,
is_cross_db: bool,
use_flat_cross_db_query: bool,
is_uppercase_names: bool
) -> str:
"""
Create SQL query for extracting schema information from information_schema.
Args:
tables_hierarchy: Nested dictionary of database/schema/table structure
information_schema_columns: Columns to select from information schema
information_schema_table_name: Name of information schema table
is_cross_db: Whether query spans multiple databases
use_flat_cross_db_query: Whether to use flat cross-database query
is_uppercase_names: Whether to uppercase table/column names
Returns:
str: SQL query for schema information extraction
"""
def create_filter_clauses(
tables_hierarchy: TablesHierarchy,
is_uppercase_names: bool
) -> ClauseElement:
"""
Create SQL filter clauses for table hierarchy filtering.
Args:
tables_hierarchy: Nested table structure dictionary
is_uppercase_names: Whether to uppercase identifiers
Returns:
ClauseElement: SQLAlchemy filter clause element
"""from airflow.providers.openlineage.utils.sql import get_table_schemas, TablesHierarchy
from airflow.hooks.postgres_hook import PostgresHook
# Setup database connection
hook = PostgresHook(postgres_conn_id='analytics_db')
# Define table hierarchy
tables_hierarchy: TablesHierarchy = {
'analytics': {
'public': ['users', 'orders', 'products'],
'staging': ['raw_users', 'raw_orders']
},
'reporting': {
'public': ['daily_reports', 'monthly_summaries']
}
}
# Extract schemas
input_datasets, output_datasets = get_table_schemas(
hook=hook,
namespace='production',
database='analytics',
schema='public',
tables_hierarchy=tables_hierarchy,
information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],
information_schema_table_name='columns',
is_cross_db=True,
use_flat_cross_db_query=False,
is_uppercase_names=False
)
print(f"Input datasets: {len(input_datasets)}")
print(f"Output datasets: {len(output_datasets)}")from airflow.providers.openlineage.utils.sql import create_information_schema_query
# Define complex table hierarchy
tables_hierarchy = {
'warehouse': {
'dim': ['dim_users', 'dim_products', 'dim_time'],
'fact': ['fact_sales', 'fact_inventory'],
'staging': ['stg_users', 'stg_products', 'stg_sales']
},
'analytics': {
'reports': ['daily_sales', 'monthly_trends'],
'ml': ['user_features', 'product_embeddings']
}
}
# Generate information schema query
query = create_information_schema_query(
tables_hierarchy=tables_hierarchy,
information_schema_columns=[
'table_catalog',
'table_schema',
'table_name',
'column_name',
'ordinal_position',
'data_type',
'is_nullable'
],
information_schema_table_name='columns',
is_cross_db=True,
use_flat_cross_db_query=False,
is_uppercase_names=False
)
print("Generated query:")
print(query)from airflow.providers.openlineage.utils.sql import parse_query_result, TableSchema
from airflow.hooks.postgres_hook import PostgresHook
def extract_table_metadata(connection_id: str, table_hierarchy: TablesHierarchy):
"""Extract and process table metadata from database."""
hook = PostgresHook(postgres_conn_id=connection_id)
# Execute information schema query
query = create_information_schema_query(
tables_hierarchy=table_hierarchy,
information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],
information_schema_table_name='columns',
is_cross_db=False,
use_flat_cross_db_query=False,
is_uppercase_names=False
)
# Get cursor and execute query
cursor = hook.get_cursor()
cursor.execute(query)
# Parse results
table_schemas = parse_query_result(cursor)
# Process schemas
for schema in table_schemas:
print(f"Table: {schema.schema_name}.{schema.table_name}")
print(f"Columns: {len(schema.columns)}")
# Convert to OpenLineage dataset
dataset = schema.to_dataset(
namespace='production',
database='analytics',
schema=schema.schema_name
)
print(f"Dataset: {dataset.namespace}/{dataset.name}")
print(f"Schema facet: {dataset.facets.get('schema', 'None')}")
# Usage
table_hierarchy = {
'analytics': {
'public': ['users', 'orders']
}
}
extract_table_metadata('analytics_db', table_hierarchy)from airflow.providers.openlineage.utils.sql import TableSchema, ColumnIndex
from openlineage.client.event_v2 import Dataset
def create_table_schema_from_metadata(metadata_rows):
"""Create TableSchema from raw metadata rows."""
# Group rows by table
tables = {}
for row in metadata_rows:
schema_name = row[ColumnIndex.SCHEMA.value]
table_name = row[ColumnIndex.TABLE_NAME.value]
column_name = row[ColumnIndex.COLUMN_NAME.value]
column_position = row[ColumnIndex.ORDINAL_POSITION.value]
data_type = row[ColumnIndex.UDT_NAME.value]
table_key = f"{schema_name}.{table_name}"
if table_key not in tables:
tables[table_key] = TableSchema()
tables[table_key].schema_name = schema_name
tables[table_key].table_name = table_name
tables[table_key].columns = []
tables[table_key].columns.append({
'name': column_name,
'position': column_position,
'type': data_type
})
return list(tables.values())
def convert_schemas_to_datasets(table_schemas: list[TableSchema], namespace: str):
"""Convert table schemas to OpenLineage datasets."""
datasets = []
for schema in table_schemas:
dataset = schema.to_dataset(
namespace=namespace,
database='analytics',
schema=schema.schema_name
)
datasets.append(dataset)
return datasets
# Example usage
sample_metadata = [
('public', 'users', 'id', 1, 'integer'),
('public', 'users', 'name', 2, 'varchar'),
('public', 'users', 'email', 3, 'varchar'),
('public', 'orders', 'id', 1, 'integer'),
('public', 'orders', 'user_id', 2, 'integer'),
('public', 'orders', 'amount', 3, 'decimal')
]
schemas = create_table_schema_from_metadata(sample_metadata)
datasets = convert_schemas_to_datasets(schemas, 'production')
for dataset in datasets:
print(f"Dataset: {dataset.name}")
print(f"Schema columns: {len(dataset.facets.get('schema', {}).get('fields', []))}")from airflow.providers.openlineage.utils.sql import get_table_schemas
def analyze_cross_database_schemas(hook, databases: list[str]):
"""Analyze schemas across multiple databases."""
# Build comprehensive table hierarchy
tables_hierarchy = {}
for db in databases:
# Query each database for table information
db_tables = get_database_tables(hook, db)
tables_hierarchy[db] = db_tables
# Extract schemas with cross-database support
all_inputs, all_outputs = get_table_schemas(
hook=hook,
namespace='multi_db',
database=None, # Cross-database query
schema=None,
tables_hierarchy=tables_hierarchy,
information_schema_columns=[
'table_catalog',
'table_schema',
'table_name',
'column_name',
'ordinal_position',
'data_type'
],
information_schema_table_name='columns',
is_cross_db=True,
use_flat_cross_db_query=True,
is_uppercase_names=False
)
return all_inputs, all_outputs
def get_database_tables(hook, database: str) -> dict:
"""Get table hierarchy for a specific database."""
query = f"""
SELECT DISTINCT table_schema, table_name
FROM {database}.information_schema.tables
WHERE table_type = 'BASE TABLE'
ORDER BY table_schema, table_name
"""
result = hook.get_records(query)
schema_tables = {}
for schema, table in result:
if schema not in schema_tables:
schema_tables[schema] = []
schema_tables[schema].append(table)
return schema_tables
# Usage
from airflow.hooks.postgres_hook import PostgresHook
hook = PostgresHook(postgres_conn_id='multi_db_connection')
databases = ['analytics', 'warehouse', 'reporting']
inputs, outputs = analyze_cross_database_schemas(hook, databases)
print(f"Total datasets analyzed: {len(inputs) + len(outputs)}")from airflow.providers.openlineage.utils.sql import create_filter_clauses
from sqlalchemy import text
def build_custom_schema_query(tables_hierarchy: TablesHierarchy):
"""Build custom query with generated filter clauses."""
# Generate filter clauses
filter_clause = create_filter_clauses(
tables_hierarchy=tables_hierarchy,
is_uppercase_names=False
)
# Base query
base_query = """
SELECT
table_schema,
table_name,
column_name,
ordinal_position,
data_type,
is_nullable
FROM information_schema.columns
"""
# Combine with filter
if filter_clause is not None:
full_query = f"{base_query} WHERE {filter_clause}"
else:
full_query = base_query
return full_query
# Usage
table_hierarchy = {
'analytics': {
'public': ['users', 'orders'],
'staging': ['raw_data']
},
'warehouse': {
'dim': ['dim_users'],
'fact': ['fact_sales']
}
}
query = build_custom_schema_query(table_hierarchy)
print("Generated query with filters:")
print(query)from airflow.providers.openlineage.utils.sql import get_table_schemas
from airflow.providers.openlineage.sqlparser import SQLParser, DatabaseInfo
def enhanced_sql_parsing_with_schema(hook, sql_statements: list[str]):
"""Enhanced SQL parsing with schema information."""
# Initialize SQL parser
parser = SQLParser(dialect='postgresql')
# Parse SQL to identify tables
all_tables = set()
for sql in sql_statements:
metadata = parser.parse(sql)
if metadata:
all_tables.update(metadata.in_tables or [])
all_tables.update(metadata.out_tables or [])
# Build table hierarchy from parsed tables
tables_hierarchy = {}
for table in all_tables:
# Parse table name (assuming format: schema.table or database.schema.table)
parts = table.split('.')
if len(parts) >= 2:
if len(parts) == 2:
schema, table_name = parts
database = None
else:
database, schema, table_name = parts
if database not in tables_hierarchy:
tables_hierarchy[database] = {}
if schema not in tables_hierarchy[database]:
tables_hierarchy[database][schema] = []
tables_hierarchy[database][schema].append(table_name)
# Get schema information
input_datasets, output_datasets = get_table_schemas(
hook=hook,
namespace='sql_parsing',
database=None,
schema=None,
tables_hierarchy=tables_hierarchy,
information_schema_columns=['table_schema', 'table_name', 'column_name', 'data_type'],
information_schema_table_name='columns',
is_cross_db=True,
use_flat_cross_db_query=False,
is_uppercase_names=False
)
return {
'parsed_tables': all_tables,
'input_datasets': input_datasets,
'output_datasets': output_datasets,
'tables_hierarchy': tables_hierarchy
}
# Usage
sql_statements = [
"SELECT * FROM analytics.public.users u JOIN analytics.public.orders o ON u.id = o.user_id",
"INSERT INTO warehouse.fact.fact_sales SELECT * FROM analytics.staging.raw_sales",
"CREATE TABLE reporting.public.daily_summary AS SELECT date, SUM(amount) FROM warehouse.fact.fact_sales GROUP BY date"
]
hook = PostgresHook(postgres_conn_id='analytics_db')
result = enhanced_sql_parsing_with_schema(hook, sql_statements)
print(f"Parsed tables: {result['parsed_tables']}")
print(f"Input datasets: {len(result['input_datasets'])}")
print(f"Output datasets: {len(result['output_datasets'])}")
print(f"Table hierarchy: {result['tables_hierarchy']}")The SQL utilities support various database systems with appropriate adaptations:
# PostgreSQL-specific configuration
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name']
information_schema_table_name = 'columns'# MySQL-specific configuration
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
information_schema_table_name = 'columns'# BigQuery-specific configuration (uses INFORMATION_SCHEMA views)
information_schema_columns = ['table_catalog', 'table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
information_schema_table_name = 'COLUMN_FIELD_PATHS'# Snowflake-specific configuration
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
information_schema_table_name = 'columns'
is_uppercase_names = True # Snowflake uses uppercase identifiersInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage