CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sql-utilities.mddocs/

SQL Utilities

Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, information schema querying, and database metadata extraction for comprehensive SQL lineage tracking.

Capabilities

Table Schema Classes

Classes for representing and working with database table schemas.

class ColumnIndex(Enum):
    """
    Enumeration for information schema column indices.
    
    Defines standard column positions in information_schema query results
    for consistent schema extraction across different database systems.
    """
    SCHEMA = 0          # Table schema/database name
    TABLE_NAME = 1      # Table name
    COLUMN_NAME = 2     # Column name
    ORDINAL_POSITION = 3  # Column position in table
    UDT_NAME = 4       # User-defined type name

class TableSchema:
    """
    Table schema container with dataset conversion methods.
    
    Represents a database table's schema information including
    columns, types, and metadata for lineage extraction.
    """
    
    def to_dataset(
        self,
        namespace: str,
        database: str | None = None,
        schema: str | None = None
    ) -> Dataset:
        """
        Convert table schema to OpenLineage Dataset.
        
        Args:
            namespace: OpenLineage namespace
            database: Database name
            schema: Schema name
            
        Returns:
            Dataset: OpenLineage dataset with schema facets
        """

Type Definitions

Type aliases for complex data structures used in SQL utilities.

TablesHierarchy = dict[str | None, dict[str | None, list[str]]]
"""
Type alias for nested table hierarchy dictionary.

Structure: {database: {schema: [table_names]}}
Represents the hierarchical organization of tables across
databases and schemas for comprehensive schema analysis.
"""

Schema Extraction Functions

Functions for extracting table schemas and metadata from databases.

def get_table_schemas(
    hook,
    namespace: str,
    database: str | None,
    schema: str | None,
    tables_hierarchy: TablesHierarchy,
    information_schema_columns: list[str],
    information_schema_table_name: str,
    is_cross_db: bool,
    use_flat_cross_db_query: bool,
    is_uppercase_names: bool
) -> tuple[list[Dataset], list[Dataset]]:
    """
    Get table schemas from database using information_schema queries.
    
    Args:
        hook: Database hook for executing queries
        namespace: OpenLineage namespace
        database: Target database name
        schema: Target schema name
        tables_hierarchy: Nested table structure dictionary
        information_schema_columns: Columns to select from information_schema
        information_schema_table_name: Name of information schema table
        is_cross_db: Whether query spans multiple databases
        use_flat_cross_db_query: Whether to use flat cross-database query
        is_uppercase_names: Whether to uppercase table/column names
        
    Returns:
        tuple: (input_datasets, output_datasets) with schema information
    """

def parse_query_result(cursor) -> list[TableSchema]:
    """
    Parse database query results into TableSchema objects.
    
    Args:
        cursor: Database cursor with query results
        
    Returns:
        list[TableSchema]: Parsed table schema objects
    """

Query Generation Functions

Functions for generating SQL queries for schema discovery and analysis.

def create_information_schema_query(
    tables_hierarchy: TablesHierarchy,
    information_schema_columns: list[str],
    information_schema_table_name: str,
    is_cross_db: bool,
    use_flat_cross_db_query: bool,
    is_uppercase_names: bool
) -> str:
    """
    Create SQL query for extracting schema information from information_schema.
    
    Args:
        tables_hierarchy: Nested dictionary of database/schema/table structure
        information_schema_columns: Columns to select from information schema
        information_schema_table_name: Name of information schema table
        is_cross_db: Whether query spans multiple databases
        use_flat_cross_db_query: Whether to use flat cross-database query
        is_uppercase_names: Whether to uppercase table/column names
        
    Returns:
        str: SQL query for schema information extraction
    """

def create_filter_clauses(
    tables_hierarchy: TablesHierarchy,
    is_uppercase_names: bool
) -> ClauseElement:
    """
    Create SQL filter clauses for table hierarchy filtering.
    
    Args:
        tables_hierarchy: Nested table structure dictionary
        is_uppercase_names: Whether to uppercase identifiers
        
    Returns:
        ClauseElement: SQLAlchemy filter clause element
    """

Usage Examples

Basic Schema Extraction

from airflow.providers.openlineage.utils.sql import get_table_schemas, TablesHierarchy
from airflow.hooks.postgres_hook import PostgresHook

# Setup database connection
hook = PostgresHook(postgres_conn_id='analytics_db')

# Define table hierarchy
tables_hierarchy: TablesHierarchy = {
    'analytics': {
        'public': ['users', 'orders', 'products'],
        'staging': ['raw_users', 'raw_orders']
    },
    'reporting': {
        'public': ['daily_reports', 'monthly_summaries']
    }
}

# Extract schemas
input_datasets, output_datasets = get_table_schemas(
    hook=hook,
    namespace='production',
    database='analytics',
    schema='public',
    tables_hierarchy=tables_hierarchy,
    information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],
    information_schema_table_name='columns',
    is_cross_db=True,
    use_flat_cross_db_query=False,
    is_uppercase_names=False
)

print(f"Input datasets: {len(input_datasets)}")
print(f"Output datasets: {len(output_datasets)}")

Custom Information Schema Query

from airflow.providers.openlineage.utils.sql import create_information_schema_query

# Define complex table hierarchy
tables_hierarchy = {
    'warehouse': {
        'dim': ['dim_users', 'dim_products', 'dim_time'],
        'fact': ['fact_sales', 'fact_inventory'],
        'staging': ['stg_users', 'stg_products', 'stg_sales']
    },
    'analytics': {
        'reports': ['daily_sales', 'monthly_trends'],
        'ml': ['user_features', 'product_embeddings']
    }
}

# Generate information schema query
query = create_information_schema_query(
    tables_hierarchy=tables_hierarchy,
    information_schema_columns=[
        'table_catalog',
        'table_schema', 
        'table_name',
        'column_name',
        'ordinal_position',
        'data_type',
        'is_nullable'
    ],
    information_schema_table_name='columns',
    is_cross_db=True,
    use_flat_cross_db_query=False,
    is_uppercase_names=False
)

print("Generated query:")
print(query)

Query Result Processing

from airflow.providers.openlineage.utils.sql import parse_query_result, TableSchema
from airflow.hooks.postgres_hook import PostgresHook

def extract_table_metadata(connection_id: str, table_hierarchy: TablesHierarchy):
    """Extract and process table metadata from database."""
    
    hook = PostgresHook(postgres_conn_id=connection_id)
    
    # Execute information schema query
    query = create_information_schema_query(
        tables_hierarchy=table_hierarchy,
        information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],
        information_schema_table_name='columns',
        is_cross_db=False,
        use_flat_cross_db_query=False,
        is_uppercase_names=False
    )
    
    # Get cursor and execute query
    cursor = hook.get_cursor()
    cursor.execute(query)
    
    # Parse results
    table_schemas = parse_query_result(cursor)
    
    # Process schemas
    for schema in table_schemas:
        print(f"Table: {schema.schema_name}.{schema.table_name}")
        print(f"Columns: {len(schema.columns)}")
        
        # Convert to OpenLineage dataset
        dataset = schema.to_dataset(
            namespace='production',
            database='analytics',
            schema=schema.schema_name
        )
        
        print(f"Dataset: {dataset.namespace}/{dataset.name}")
        print(f"Schema facet: {dataset.facets.get('schema', 'None')}")

# Usage
table_hierarchy = {
    'analytics': {
        'public': ['users', 'orders']
    }
}

extract_table_metadata('analytics_db', table_hierarchy)

TableSchema Usage

from airflow.providers.openlineage.utils.sql import TableSchema, ColumnIndex
from openlineage.client.event_v2 import Dataset

def create_table_schema_from_metadata(metadata_rows):
    """Create TableSchema from raw metadata rows."""
    
    # Group rows by table
    tables = {}
    for row in metadata_rows:
        schema_name = row[ColumnIndex.SCHEMA.value]
        table_name = row[ColumnIndex.TABLE_NAME.value]
        column_name = row[ColumnIndex.COLUMN_NAME.value]
        column_position = row[ColumnIndex.ORDINAL_POSITION.value]
        data_type = row[ColumnIndex.UDT_NAME.value]
        
        table_key = f"{schema_name}.{table_name}"
        if table_key not in tables:
            tables[table_key] = TableSchema()
            tables[table_key].schema_name = schema_name
            tables[table_key].table_name = table_name
            tables[table_key].columns = []
        
        tables[table_key].columns.append({
            'name': column_name,
            'position': column_position,
            'type': data_type
        })
    
    return list(tables.values())

def convert_schemas_to_datasets(table_schemas: list[TableSchema], namespace: str):
    """Convert table schemas to OpenLineage datasets."""
    
    datasets = []
    for schema in table_schemas:
        dataset = schema.to_dataset(
            namespace=namespace,
            database='analytics',
            schema=schema.schema_name
        )
        datasets.append(dataset)
    
    return datasets

# Example usage
sample_metadata = [
    ('public', 'users', 'id', 1, 'integer'),
    ('public', 'users', 'name', 2, 'varchar'),
    ('public', 'users', 'email', 3, 'varchar'),
    ('public', 'orders', 'id', 1, 'integer'),
    ('public', 'orders', 'user_id', 2, 'integer'),
    ('public', 'orders', 'amount', 3, 'decimal')
]

schemas = create_table_schema_from_metadata(sample_metadata)
datasets = convert_schemas_to_datasets(schemas, 'production')

for dataset in datasets:
    print(f"Dataset: {dataset.name}")
    print(f"Schema columns: {len(dataset.facets.get('schema', {}).get('fields', []))}")

Cross-Database Schema Analysis

from airflow.providers.openlineage.utils.sql import get_table_schemas

def analyze_cross_database_schemas(hook, databases: list[str]):
    """Analyze schemas across multiple databases."""
    
    # Build comprehensive table hierarchy
    tables_hierarchy = {}
    
    for db in databases:
        # Query each database for table information
        db_tables = get_database_tables(hook, db)
        tables_hierarchy[db] = db_tables
    
    # Extract schemas with cross-database support
    all_inputs, all_outputs = get_table_schemas(
        hook=hook,
        namespace='multi_db',
        database=None,  # Cross-database query
        schema=None,
        tables_hierarchy=tables_hierarchy,
        information_schema_columns=[
            'table_catalog',
            'table_schema',
            'table_name', 
            'column_name',
            'ordinal_position',
            'data_type'
        ],
        information_schema_table_name='columns',
        is_cross_db=True,
        use_flat_cross_db_query=True,
        is_uppercase_names=False
    )
    
    return all_inputs, all_outputs

def get_database_tables(hook, database: str) -> dict:
    """Get table hierarchy for a specific database."""
    
    query = f"""
    SELECT DISTINCT table_schema, table_name
    FROM {database}.information_schema.tables
    WHERE table_type = 'BASE TABLE'
    ORDER BY table_schema, table_name
    """
    
    result = hook.get_records(query)
    
    schema_tables = {}
    for schema, table in result:
        if schema not in schema_tables:
            schema_tables[schema] = []
        schema_tables[schema].append(table)
    
    return schema_tables

# Usage
from airflow.hooks.postgres_hook import PostgresHook

hook = PostgresHook(postgres_conn_id='multi_db_connection')
databases = ['analytics', 'warehouse', 'reporting']

inputs, outputs = analyze_cross_database_schemas(hook, databases)
print(f"Total datasets analyzed: {len(inputs) + len(outputs)}")

Filter Clause Generation

from airflow.providers.openlineage.utils.sql import create_filter_clauses
from sqlalchemy import text

def build_custom_schema_query(tables_hierarchy: TablesHierarchy):
    """Build custom query with generated filter clauses."""
    
    # Generate filter clauses
    filter_clause = create_filter_clauses(
        tables_hierarchy=tables_hierarchy,
        is_uppercase_names=False
    )
    
    # Base query
    base_query = """
    SELECT 
        table_schema,
        table_name,
        column_name,
        ordinal_position,
        data_type,
        is_nullable
    FROM information_schema.columns
    """
    
    # Combine with filter
    if filter_clause is not None:
        full_query = f"{base_query} WHERE {filter_clause}"
    else:
        full_query = base_query
    
    return full_query

# Usage
table_hierarchy = {
    'analytics': {
        'public': ['users', 'orders'],
        'staging': ['raw_data']
    },
    'warehouse': {
        'dim': ['dim_users'],
        'fact': ['fact_sales']
    }
}

query = build_custom_schema_query(table_hierarchy)
print("Generated query with filters:")
print(query)

Integration with SQL Parser

from airflow.providers.openlineage.utils.sql import get_table_schemas
from airflow.providers.openlineage.sqlparser import SQLParser, DatabaseInfo

def enhanced_sql_parsing_with_schema(hook, sql_statements: list[str]):
    """Enhanced SQL parsing with schema information."""
    
    # Initialize SQL parser
    parser = SQLParser(dialect='postgresql')
    
    # Parse SQL to identify tables
    all_tables = set()
    for sql in sql_statements:
        metadata = parser.parse(sql)
        if metadata:
            all_tables.update(metadata.in_tables or [])
            all_tables.update(metadata.out_tables or [])
    
    # Build table hierarchy from parsed tables
    tables_hierarchy = {}
    for table in all_tables:
        # Parse table name (assuming format: schema.table or database.schema.table)
        parts = table.split('.')
        
        if len(parts) >= 2:
            if len(parts) == 2:
                schema, table_name = parts
                database = None
            else:
                database, schema, table_name = parts
            
            if database not in tables_hierarchy:
                tables_hierarchy[database] = {}
            if schema not in tables_hierarchy[database]:
                tables_hierarchy[database][schema] = []
            
            tables_hierarchy[database][schema].append(table_name)
    
    # Get schema information
    input_datasets, output_datasets = get_table_schemas(
        hook=hook,
        namespace='sql_parsing',
        database=None,
        schema=None,
        tables_hierarchy=tables_hierarchy,
        information_schema_columns=['table_schema', 'table_name', 'column_name', 'data_type'],
        information_schema_table_name='columns',
        is_cross_db=True,
        use_flat_cross_db_query=False,
        is_uppercase_names=False
    )
    
    return {
        'parsed_tables': all_tables,
        'input_datasets': input_datasets,
        'output_datasets': output_datasets,
        'tables_hierarchy': tables_hierarchy
    }

# Usage
sql_statements = [
    "SELECT * FROM analytics.public.users u JOIN analytics.public.orders o ON u.id = o.user_id",
    "INSERT INTO warehouse.fact.fact_sales SELECT * FROM analytics.staging.raw_sales",
    "CREATE TABLE reporting.public.daily_summary AS SELECT date, SUM(amount) FROM warehouse.fact.fact_sales GROUP BY date"
]

hook = PostgresHook(postgres_conn_id='analytics_db')
result = enhanced_sql_parsing_with_schema(hook, sql_statements)

print(f"Parsed tables: {result['parsed_tables']}")
print(f"Input datasets: {len(result['input_datasets'])}")
print(f"Output datasets: {len(result['output_datasets'])}")
print(f"Table hierarchy: {result['tables_hierarchy']}")

Database System Support

The SQL utilities support various database systems with appropriate adaptations:

PostgreSQL

# PostgreSQL-specific configuration
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name']
information_schema_table_name = 'columns'

MySQL

# MySQL-specific configuration
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
information_schema_table_name = 'columns'

BigQuery

# BigQuery-specific configuration (uses INFORMATION_SCHEMA views)
information_schema_columns = ['table_catalog', 'table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
information_schema_table_name = 'COLUMN_FIELD_PATHS'

Snowflake

# Snowflake-specific configuration
information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']
information_schema_table_name = 'columns'
is_uppercase_names = True  # Snowflake uses uppercase identifiers

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json