SQL Lineage Analysis Tool powered by Python
Pluggable interfaces for providing schema and table metadata to enhance lineage analysis. Metadata providers supply column information for tables, enabling more accurate column-level lineage extraction and wildcard expansion.
Abstract base class defining the interface for metadata providers. Custom providers can extend this class to integrate with different metadata sources.
class MetaDataProvider:
def __init__(self) -> None:
"""Initialize the metadata provider"""
def get_table_columns(self, table: Table, **kwargs) -> List[Column]:
"""
Get columns for a specific table.
Parameters:
- table: Table object to get columns for
- **kwargs: additional provider-specific arguments
Returns:
List of Column objects representing the table's columns
"""
def register_session_metadata(self, table: Table, columns: List[Column]) -> None:
"""
Register table metadata for the current session.
Parameters:
- table: Table object
- columns: List of Column objects for the table
"""
def deregister_session_metadata(self) -> None:
"""Clear all session metadata"""
def session(self) -> "MetaDataSession":
"""
Get a metadata session context manager for temporary metadata.
Returns:
MetaDataSession context manager
"""
def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
"""
Abstract method for provider-specific column retrieval.
Parameters:
- schema: schema name
- table: table name
- **kwargs: provider-specific arguments
Returns:
List of column names as strings
"""Simple dictionary-based metadata provider for testing and scenarios where schema information is known in advance.
class DummyMetaDataProvider(MetaDataProvider):
def __init__(self, metadata: Optional[Dict[str, List[str]]] = None):
"""
Initialize with optional metadata dictionary.
Parameters:
- metadata: dictionary mapping table names to column lists
Keys can be "table" or "schema.table" format
"""
@property
def metadata(self) -> Dict[str, List[str]]:
"""Get the metadata dictionary mapping tables to column lists"""Database-backed metadata provider using SQLAlchemy for schema introspection. Supports any database that SQLAlchemy can connect to.
class SQLAlchemyMetaDataProvider(MetaDataProvider):
def __init__(self, url: str, engine_kwargs: Optional[Dict[str, Any]] = None):
"""
Initialize with database connection details.
Parameters:
- url: SQLAlchemy database URL
- engine_kwargs: additional arguments for SQLAlchemy engine creation
"""
@property
def engine(self) -> "sqlalchemy.Engine":
"""Get the SQLAlchemy engine instance"""
@property
def metadata_obj(self) -> "sqlalchemy.MetaData":
"""Get the SQLAlchemy MetaData object"""
class MetaDataSession:
def __init__(self, metadata_provider: MetaDataProvider):
"""
Create a metadata session for managing temporary metadata.
Parameters:
- metadata_provider: the provider to create a session for
"""
def __enter__(self):
"""Enter context manager"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and clean up session metadata"""
def register_session_metadata(self, table: Table, columns: List[Column]) -> None:
"""Register session-level metadata for temporary tables or views"""Context manager for temporary metadata registration during analysis.
class MetaDataSession:
def __enter__(self) -> "MetaDataSession":
"""Enter the metadata session context"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit the metadata session context and clear temporary metadata"""from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.runner import LineageRunner
# Define table schemas
metadata = {
"customers": ["customer_id", "name", "email", "created_date"],
"orders": ["order_id", "customer_id", "total", "order_date"],
"analytics.customer_summary": ["customer_id", "total_orders", "total_spent"]
}
# Create provider and use with LineageRunner
provider = DummyMetaDataProvider(metadata)
runner = LineageRunner(sql, metadata_provider=provider)
# Now column lineage will be more accurate
for src_col, tgt_col in runner.get_column_lineage():
print(f"{src_col} -> {tgt_col}")from sqllineage.core.metadata.sqlalchemy import SQLAlchemyMetaDataProvider
from sqllineage.runner import LineageRunner
# Connect to PostgreSQL database
db_url = "postgresql://user:password@localhost:5432/analytics_db"
provider = SQLAlchemyMetaDataProvider(db_url)
sql = """
INSERT INTO reporting.daily_sales
SELECT
date_trunc('day', order_timestamp) as sale_date,
sum(amount) as total_sales
FROM raw.transactions
GROUP BY date_trunc('day', order_timestamp)
"""
# Provider will automatically introspect schema from database
runner = LineageRunner(sql, metadata_provider=provider)
print("Column lineage with database schema:")
runner.print_column_lineage()# Snowflake connection with additional engine options
snowflake_url = "snowflake://user:password@account/database/schema"
engine_options = {
"connect_args": {
"warehouse": "COMPUTE_WH",
"role": "ANALYST_ROLE"
}
}
provider = SQLAlchemyMetaDataProvider(snowflake_url, engine_kwargs=engine_options)
runner = LineageRunner(snowflake_sql, dialect="snowflake", metadata_provider=provider)class JSONMetaDataProvider(MetaDataProvider):
def __init__(self, json_file_path: str):
super().__init__()
import json
with open(json_file_path, 'r') as f:
self.schema_data = json.load(f)
def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
table_key = f"{schema}.{table}" if schema else table
return self.schema_data.get(table_key, [])
# Use custom provider
custom_provider = JSONMetaDataProvider("schemas.json")
runner = LineageRunner(sql, metadata_provider=custom_provider)from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.core.models import Table, Column
provider = DummyMetaDataProvider()
# Temporarily register metadata for a specific analysis
temp_table = Table("temp_analysis_table")
temp_columns = [Column("id"), Column("value"), Column("timestamp")]
with provider.session():
provider.register_session_metadata(temp_table, temp_columns)
# Run analysis with temporary metadata
runner = LineageRunner(sql_with_temp_table, metadata_provider=provider)
runner.print_column_lineage()
# Session metadata is automatically cleared# Metadata for SQL with CTEs and subqueries
metadata = {
# Base tables
"raw.events": ["event_id", "user_id", "event_type", "timestamp", "properties"],
"raw.users": ["user_id", "email", "signup_date", "country"],
# View or materialized view
"analytics.user_events": ["user_id", "event_count", "first_event", "last_event"]
}
provider = DummyMetaDataProvider(metadata)
complex_sql = """
WITH user_activity AS (
SELECT
user_id,
COUNT(*) as event_count,
MIN(timestamp) as first_event,
MAX(timestamp) as last_event
FROM raw.events
WHERE event_type = 'page_view'
GROUP BY user_id
),
enriched_activity AS (
SELECT
ua.user_id,
u.email,
u.country,
ua.event_count,
ua.first_event,
ua.last_event
FROM user_activity ua
JOIN raw.users u ON ua.user_id = u.user_id
)
INSERT INTO analytics.user_events
SELECT user_id, event_count, first_event, last_event
FROM enriched_activity
"""
runner = LineageRunner(complex_sql, metadata_provider=provider)
print("CTE and JOIN column lineage:")
runner.print_column_lineage()from sqllineage.exceptions import MetaDataProviderException
try:
# Invalid database URL
provider = SQLAlchemyMetaDataProvider("invalid://connection/string")
runner = LineageRunner(sql, metadata_provider=provider)
except MetaDataProviderException as e:
print(f"Metadata provider error: {e}")
# Fallback to dummy provider
fallback_provider = DummyMetaDataProvider()
runner = LineageRunner(sql, metadata_provider=fallback_provider)Install with Tessl CLI
npx tessl i tessl/pypi-sqllineage