CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-sqllineage

SQL Lineage Analysis Tool powered by Python

Overview
Eval results
Files

metadata-providers.mddocs/

Metadata Providers

Pluggable interfaces for providing schema and table metadata to enhance lineage analysis. Metadata providers supply column information for tables, enabling more accurate column-level lineage extraction and wildcard expansion.

Capabilities

Base MetaDataProvider

Abstract base class defining the interface for metadata providers. Custom providers can extend this class to integrate with different metadata sources.

class MetaDataProvider:
    def __init__(self) -> None:
        """Initialize the metadata provider"""
    
    def get_table_columns(self, table: Table, **kwargs) -> List[Column]:
        """
        Get columns for a specific table.
        
        Parameters:
        - table: Table object to get columns for
        - **kwargs: additional provider-specific arguments
        
        Returns:
        List of Column objects representing the table's columns
        """
    
    def register_session_metadata(self, table: Table, columns: List[Column]) -> None:
        """
        Register table metadata for the current session.
        
        Parameters:
        - table: Table object
        - columns: List of Column objects for the table
        """
    
    def deregister_session_metadata(self) -> None:
        """Clear all session metadata"""
    
    def session(self) -> "MetaDataSession":
        """
        Get a metadata session context manager for temporary metadata.
        
        Returns:
        MetaDataSession context manager
        """
    
    def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
        """
        Abstract method for provider-specific column retrieval.
        
        Parameters:
        - schema: schema name
        - table: table name
        - **kwargs: provider-specific arguments
        
        Returns:
        List of column names as strings
        """

DummyMetaDataProvider

Simple dictionary-based metadata provider for testing and scenarios where schema information is known in advance.

class DummyMetaDataProvider(MetaDataProvider):
    def __init__(self, metadata: Optional[Dict[str, List[str]]] = None):
        """
        Initialize with optional metadata dictionary.
        
        Parameters:
        - metadata: dictionary mapping table names to column lists
                   Keys can be "table" or "schema.table" format
        """
    
    @property
    def metadata(self) -> Dict[str, List[str]]:
        """Get the metadata dictionary mapping tables to column lists"""

SQLAlchemyMetaDataProvider

Database-backed metadata provider using SQLAlchemy for schema introspection. Supports any database that SQLAlchemy can connect to.

class SQLAlchemyMetaDataProvider(MetaDataProvider):
    def __init__(self, url: str, engine_kwargs: Optional[Dict[str, Any]] = None):
        """
        Initialize with database connection details.
        
        Parameters:
        - url: SQLAlchemy database URL
        - engine_kwargs: additional arguments for SQLAlchemy engine creation
        """
    
    @property
    def engine(self) -> "sqlalchemy.Engine":
        """Get the SQLAlchemy engine instance"""
    
    @property
    def metadata_obj(self) -> "sqlalchemy.MetaData":
        """Get the SQLAlchemy MetaData object"""

class MetaDataSession:
    def __init__(self, metadata_provider: MetaDataProvider):
        """
        Create a metadata session for managing temporary metadata.
        
        Parameters:
        - metadata_provider: the provider to create a session for
        """
    
    def __enter__(self):
        """Enter context manager"""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context manager and clean up session metadata"""
    
    def register_session_metadata(self, table: Table, columns: List[Column]) -> None:
        """Register session-level metadata for temporary tables or views"""

MetaDataSession

Context manager for temporary metadata registration during analysis.

class MetaDataSession:
    def __enter__(self) -> "MetaDataSession":
        """Enter the metadata session context"""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit the metadata session context and clear temporary metadata"""

Usage Examples

DummyMetaDataProvider

from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.runner import LineageRunner

# Define table schemas
metadata = {
    "customers": ["customer_id", "name", "email", "created_date"],
    "orders": ["order_id", "customer_id", "total", "order_date"],
    "analytics.customer_summary": ["customer_id", "total_orders", "total_spent"]
}

# Create provider and use with LineageRunner
provider = DummyMetaDataProvider(metadata)
runner = LineageRunner(sql, metadata_provider=provider)

# Now column lineage will be more accurate
for src_col, tgt_col in runner.get_column_lineage():
    print(f"{src_col} -> {tgt_col}")

SQLAlchemyMetaDataProvider with PostgreSQL

from sqllineage.core.metadata.sqlalchemy import SQLAlchemyMetaDataProvider
from sqllineage.runner import LineageRunner

# Connect to PostgreSQL database
db_url = "postgresql://user:password@localhost:5432/analytics_db"
provider = SQLAlchemyMetaDataProvider(db_url)

sql = """
INSERT INTO reporting.daily_sales
SELECT 
    date_trunc('day', order_timestamp) as sale_date,
    sum(amount) as total_sales
FROM raw.transactions
GROUP BY date_trunc('day', order_timestamp)
"""

# Provider will automatically introspect schema from database
runner = LineageRunner(sql, metadata_provider=provider)
print("Column lineage with database schema:")
runner.print_column_lineage()

SQLAlchemyMetaDataProvider with Snowflake

# Snowflake connection with additional engine options
snowflake_url = "snowflake://user:password@account/database/schema"
engine_options = {
    "connect_args": {
        "warehouse": "COMPUTE_WH",
        "role": "ANALYST_ROLE"
    }
}

provider = SQLAlchemyMetaDataProvider(snowflake_url, engine_kwargs=engine_options)
runner = LineageRunner(snowflake_sql, dialect="snowflake", metadata_provider=provider)

Custom Metadata Provider

class JSONMetaDataProvider(MetaDataProvider):
    def __init__(self, json_file_path: str):
        super().__init__()
        import json
        with open(json_file_path, 'r') as f:
            self.schema_data = json.load(f)
    
    def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
        table_key = f"{schema}.{table}" if schema else table
        return self.schema_data.get(table_key, [])

# Use custom provider
custom_provider = JSONMetaDataProvider("schemas.json")
runner = LineageRunner(sql, metadata_provider=custom_provider)

Session Metadata

from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.core.models import Table, Column

provider = DummyMetaDataProvider()

# Temporarily register metadata for a specific analysis
temp_table = Table("temp_analysis_table")
temp_columns = [Column("id"), Column("value"), Column("timestamp")]

with provider.session():
    provider.register_session_metadata(temp_table, temp_columns)
    
    # Run analysis with temporary metadata
    runner = LineageRunner(sql_with_temp_table, metadata_provider=provider)
    runner.print_column_lineage()
    
# Session metadata is automatically cleared

Metadata for Complex SQL

# Metadata for SQL with CTEs and subqueries
metadata = {
    # Base tables
    "raw.events": ["event_id", "user_id", "event_type", "timestamp", "properties"],
    "raw.users": ["user_id", "email", "signup_date", "country"],
    
    # View or materialized view
    "analytics.user_events": ["user_id", "event_count", "first_event", "last_event"]
}

provider = DummyMetaDataProvider(metadata)

complex_sql = """
WITH user_activity AS (
    SELECT 
        user_id,
        COUNT(*) as event_count,
        MIN(timestamp) as first_event,
        MAX(timestamp) as last_event
    FROM raw.events
    WHERE event_type = 'page_view'
    GROUP BY user_id
),
enriched_activity AS (
    SELECT 
        ua.user_id,
        u.email,
        u.country,
        ua.event_count,
        ua.first_event,
        ua.last_event
    FROM user_activity ua
    JOIN raw.users u ON ua.user_id = u.user_id
)
INSERT INTO analytics.user_events
SELECT user_id, event_count, first_event, last_event
FROM enriched_activity
"""

runner = LineageRunner(complex_sql, metadata_provider=provider)
print("CTE and JOIN column lineage:")
runner.print_column_lineage()

Error Handling

from sqllineage.exceptions import MetaDataProviderException

try:
    # Invalid database URL
    provider = SQLAlchemyMetaDataProvider("invalid://connection/string")
    runner = LineageRunner(sql, metadata_provider=provider)
except MetaDataProviderException as e:
    print(f"Metadata provider error: {e}")
    # Fallback to dummy provider
    fallback_provider = DummyMetaDataProvider()
    runner = LineageRunner(sql, metadata_provider=fallback_provider)

Install with Tessl CLI

npx tessl i tessl/pypi-sqllineage

docs

cli-interface.md

configuration.md

data-models.md

index.md

lineage-runner.md

metadata-providers.md

visualization-export.md

tile.json