CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sql-parsing.mddocs/

SQL Parsing and Analysis

Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information. The SQL parsing functionality provides comprehensive analysis of SQL queries to automatically discover data flows and transformations.

Capabilities

SQL Parser Class

Main interface for parsing SQL statements and extracting lineage metadata.

class SQLParser:
    """
    Main SQL parsing interface for extracting lineage from SQL statements.
    """
    
    def __init__(self, dialect: str | None = None, default_schema: str | None = None):
        """
        Initialize SQL parser with optional dialect and schema.
        
        Args:
            dialect: SQL dialect (e.g., 'postgresql', 'mysql', 'bigquery')
            default_schema: Default schema name for unqualified table references
        """
    
    def parse(self, sql: list[str] | str) -> SqlMeta | None:
        """
        Parse SQL statement(s) and return metadata.
        
        Args:
            sql: SQL statement(s) to parse
            
        Returns:
            SqlMeta: Parsed SQL metadata or None if parsing fails
        """
    
    def parse_table_schemas(
        self,
        hook,
        inputs: list[Dataset],
        outputs: list[Dataset] 
    ) -> tuple[list[Dataset], list[Dataset]]:
        """
        Parse and enrich table schemas with column information.
        
        Args:
            hook: Database hook for schema queries
            inputs: Input datasets to enrich
            outputs: Output datasets to enrich
            
        Returns:
            tuple: Enriched (inputs, outputs) datasets with schema information
        """
    
    def get_metadata_from_parser(
        self,
        parse_result: SqlMeta,
        database: str | None,
        schema: str | None
    ) -> tuple[list[Dataset], list[Dataset]]:
        """
        Extract input/output datasets from parse results.
        
        Args:
            parse_result: Result from SQL parsing
            database: Database name
            schema: Schema name
            
        Returns:
            tuple: (input_datasets, output_datasets)
        """
    
    def attach_column_lineage(
        self,
        datasets: list[Dataset],
        database: str | None,
        parse_result: SqlMeta
    ):
        """
        Attach column-level lineage information to datasets.
        
        Args:
            datasets: Datasets to enrich with column lineage
            database: Database name
            parse_result: SQL parsing result with column mappings
        """
    
    def generate_openlineage_metadata_from_sql(
        self,
        operator_instance,
        task_instance,
        task_uuid: str
    ) -> OperatorLineage:
        """
        Generate complete OpenLineage metadata from SQL operations.
        
        Args:
            operator_instance: Airflow operator instance
            task_instance: Task instance context
            task_uuid: Unique task identifier
            
        Returns:
            OperatorLineage: Complete operator lineage with datasets and facets
        """
    
    @staticmethod
    def create_namespace(database_info: DatabaseInfo) -> str:
        """
        Create namespace string from database information.
        
        Args:
            database_info: Database configuration
            
        Returns:
            str: Formatted namespace string
        """
    
    @classmethod
    def normalize_sql(cls, sql: list[str] | str) -> str:
        """
        Normalize SQL statement(s) for consistent parsing.
        
        Args:
            sql: SQL statement(s) to normalize
            
        Returns:
            str: Normalized SQL string
        """
    
    @classmethod
    def split_sql_string(cls, sql: list[str] | str) -> list[str]:
        """
        Split SQL string into individual statements.
        
        Args:
            sql: SQL string or list to split
            
        Returns:
            list[str]: List of individual SQL statements
        """
    
    def create_information_schema_query(
        self,
        tables_hierarchy: dict,
        information_schema_columns: list[str],
        information_schema_table_name: str,
        is_cross_db: bool,
        use_flat_cross_db_query: bool,
        is_uppercase_names: bool
    ) -> str:
        """
        Create query for extracting schema information from information_schema.
        
        Args:
            tables_hierarchy: Nested dictionary of database/schema/table structure
            information_schema_columns: Columns to select from information schema
            information_schema_table_name: Name of information schema table
            is_cross_db: Whether query spans multiple databases
            use_flat_cross_db_query: Whether to use flat cross-database query
            is_uppercase_names: Whether to uppercase table/column names
            
        Returns:
            str: SQL query for schema information extraction
        """

Database Information Configuration

Container for database-specific configuration and connection details.

class DatabaseInfo:
    """
    Database configuration container with connection details and schema information.
    """
    
    scheme: str  # Database scheme (e.g., 'postgresql', 'mysql')
    authority: str | None  # Database authority/host information
    database: str | None  # Database name
    information_schema_columns: list[str]  # Columns available in information_schema
    information_schema_table_name: str  # Name of information schema table
    use_flat_cross_db_query: bool  # Whether to use flat cross-database queries
    is_information_schema_cross_db: bool  # Whether information_schema spans databases
    is_uppercase_names: bool  # Whether to uppercase identifiers
    normalize_name_method: Callable[[str], str]  # Method for normalizing names

Type Definitions

Type definitions for SQL parsing operations and parameters.

class GetTableSchemasParams(TypedDict):
    """
    Type definition for table schema extraction parameters.
    """
    hook: Any  # Database hook instance
    namespace: str  # OpenLineage namespace
    database: str | None  # Database name
    schema: str | None  # Schema name
    tables_hierarchy: dict  # Nested table structure
    information_schema_columns: list[str]  # Information schema columns
    information_schema_table_name: str  # Information schema table name
    is_cross_db: bool  # Cross-database query flag
    use_flat_cross_db_query: bool  # Flat query flag
    is_uppercase_names: bool  # Uppercase names flag

Utility Functions

Helper functions for SQL parsing and lineage extraction.

def default_normalize_name_method(name: str) -> str:
    """
    Default method for normalizing database object names.
    
    Args:
        name: Name to normalize
        
    Returns:
        str: Normalized name
    """

def from_table_meta(
    table_meta: DbTableMeta,
    database: str | None,
    namespace: str,
    is_uppercase: bool
) -> Dataset:
    """
    Convert table metadata to OpenLineage Dataset.
    
    Args:
        table_meta: Database table metadata
        database: Database name
        namespace: OpenLineage namespace
        is_uppercase: Whether names should be uppercase
        
    Returns:
        Dataset: OpenLineage dataset representation
    """

def get_openlineage_facets_with_sql(
    hook: DbApiHook,
    sql: str | list[str],
    conn_id: str,
    database: str | None
) -> OperatorLineage | None:
    """
    Extract OpenLineage facets from SQL operations using database hook.
    
    Args:
        hook: Database API hook for connection
        sql: SQL statement(s) to analyze
        conn_id: Airflow connection ID
        database: Database name
        
    Returns:
        OperatorLineage: Extracted lineage metadata or None if extraction fails
    """

Constants

Default values and configuration constants for SQL parsing.

DEFAULT_NAMESPACE: str = "default"
"""Default namespace for OpenLineage events when none specified."""

DEFAULT_INFORMATION_SCHEMA_COLUMNS: list[str] = [
    "table_schema",
    "table_name", 
    "column_name",
    "ordinal_position",
    "udt_name"
]
"""Default columns to select from information_schema tables."""

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME: str = "columns"
"""Default information_schema table name for column metadata."""

Usage Examples

Basic SQL Parsing

from airflow.providers.openlineage.sqlparser import SQLParser

# Initialize parser
parser = SQLParser(dialect='postgresql', default_schema='public')

# Parse SQL statement
sql = "SELECT * FROM users u JOIN orders o ON u.id = o.user_id"
metadata = parser.parse(sql)

if metadata:
    print(f"Input tables: {metadata.in_tables}")
    print(f"Output tables: {metadata.out_tables}")

Database Configuration

from airflow.providers.openlineage.sqlparser import DatabaseInfo, default_normalize_name_method

# Configure database information
db_info = DatabaseInfo(
    scheme='postgresql',
    authority='localhost:5432',
    database='analytics',
    information_schema_columns=['table_schema', 'table_name', 'column_name'],
    information_schema_table_name='columns',
    use_flat_cross_db_query=False,
    is_information_schema_cross_db=False,
    is_uppercase_names=False,
    normalize_name_method=default_normalize_name_method
)

# Create namespace
namespace = SQLParser.create_namespace(db_info)
print(f"Namespace: {namespace}")

Schema Analysis

from airflow.providers.openlineage.sqlparser import SQLParser
from airflow.hooks.postgres_hook import PostgresHook

# Setup
parser = SQLParser(dialect='postgresql')
hook = PostgresHook(postgres_conn_id='my_postgres')

# Parse SQL and get basic lineage
sql = """
INSERT INTO analytics.user_metrics 
SELECT user_id, COUNT(*) as order_count
FROM orders 
GROUP BY user_id
"""

metadata = parser.parse(sql)
inputs, outputs = parser.get_metadata_from_parser(metadata, 'analytics', 'public')

# Enrich with schema information
enriched_inputs, enriched_outputs = parser.parse_table_schemas(hook, inputs, outputs)

print(f"Enriched inputs: {enriched_inputs}")
print(f"Enriched outputs: {enriched_outputs}")

Complete Lineage Generation

from airflow.providers.openlineage.sqlparser import get_openlineage_facets_with_sql
from airflow.hooks.postgres_hook import PostgresHook

# Extract complete lineage
hook = PostgresHook(postgres_conn_id='analytics_db')
sql = "INSERT INTO reports.daily_sales SELECT * FROM raw.sales WHERE date = CURRENT_DATE"

lineage = get_openlineage_facets_with_sql(
    hook=hook,
    sql=sql,
    conn_id='analytics_db',
    database='analytics'
)

if lineage:
    print(f"Input datasets: {lineage.inputs}")
    print(f"Output datasets: {lineage.outputs}")
    print(f"Run facets: {lineage.run_facets}")

Custom SQL Normalization

from airflow.providers.openlineage.sqlparser import SQLParser

# Normalize complex SQL
complex_sql = [
    "/* Comment */ SELECT * FROM table1;",
    "INSERT INTO table2 SELECT * FROM table1 WHERE active = true;",
    ""
]

normalized = SQLParser.normalize_sql(complex_sql)
statements = SQLParser.split_sql_string(normalized)

print(f"Normalized SQL: {normalized}")
print(f"Individual statements: {statements}")

Integration with Operators

The SQL parser integrates automatically with SQL-based operators:

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.openlineage.sqlparser import SQLParser

# The operator automatically uses SQLParser for lineage extraction
sql_task = PostgresOperator(
    task_id='analyze_sales',
    postgres_conn_id='analytics_db',
    sql="""
    INSERT INTO reports.monthly_sales
    SELECT 
        DATE_TRUNC('month', order_date) as month,
        SUM(amount) as total_sales
    FROM sales.orders 
    WHERE order_date >= '2023-01-01'
    GROUP BY DATE_TRUNC('month', order_date)
    """,
    dag=dag
)

Supported SQL Dialects

The SQL parser supports various SQL dialects:

  • PostgreSQL
  • MySQL
  • BigQuery
  • Snowflake
  • Redshift
  • SQLite
  • Generic SQL (limited features)

Dialect-specific features include appropriate query syntax, information schema handling, and identifier quoting conventions.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json