CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-compat

Common Compatibility Provider - providing compatibility code for previous Airflow versions

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

openlineage-integration.mddocs/

OpenLineage Integration

OpenLineage facets, utilities, and compatibility functions for data lineage tracking, including SQL and Spark integration utilities. This module provides version-compatible OpenLineage integration with graceful fallbacks when OpenLineage dependencies are not available.

Capabilities

Base OpenLineage Classes

Core OpenLineage classes for data lineage representation.

class BaseFacet:
    """Base class for OpenLineage facets."""

class Dataset:
    """OpenLineage dataset representation."""

class DatasetFacet:
    """Base class for dataset-specific facets."""

class InputDataset:
    """Input dataset in OpenLineage events."""

class OutputDataset:
    """Output dataset in OpenLineage events."""

class RunFacet:
    """Base class for run-specific facets."""

Lineage Facets

Specific facet classes for detailed lineage tracking.

class ColumnLineageDatasetFacet:
    """Facet for tracking column-level lineage."""

class DocumentationDatasetFacet:
    """Facet for dataset documentation."""

class SchemaDatasetFacet:
    """Facet for dataset schema information."""

class LifecycleStateChangeDatasetFacet:
    """Facet for dataset lifecycle state changes."""

class OutputStatisticsOutputDatasetFacet:
    """Facet for output dataset statistics."""

class SymlinksDatasetFacet:
    """Facet for dataset symlink information."""

Run and Job Facets

Facets for tracking job execution and run information.

class ErrorMessageRunFacet:
    """Facet for capturing error messages in runs."""

class ExternalQueryRunFacet:
    """Facet for external query execution information."""

class ExtractionErrorRunFacet:
    """Facet for lineage extraction errors."""

class SQLJobFacet:
    """Facet for SQL job information."""

Supporting Classes

Helper classes for OpenLineage data structures.

class Fields:
    """Field definitions for schema facets."""

class InputField:
    """Input field definition for column lineage."""

class Error:
    """Error representation in OpenLineage events."""

class LifecycleStateChange:
    """Lifecycle state change representation."""

class PreviousIdentifier:
    """Previous identifier for renamed datasets."""

class Identifier:
    """Dataset identifier representation."""

class SchemaDatasetFacetFields:
    """Schema field definitions."""

No-Op Fallback

Function that creates no-op implementations when OpenLineage is not available.

def create_no_op(*_, **__) -> None:
    """
    Create a no-op placeholder when OpenLineage client is not available.
    
    Returns:
        None: Always returns None as a no-op implementation
    """

SQL Integration

Utilities for extracting OpenLineage facets from SQL operations.

def get_openlineage_facets_with_sql(
    hook, 
    sql: str | list[str], 
    conn_id: str, 
    database: str | None
):
    """
    Get OpenLineage facets from SQL queries.
    
    Args:
        hook: Database hook instance
        sql (str | list[str]): SQL query or queries to analyze
        conn_id (str): Connection ID for the database
        database (str | None): Database name
    
    Returns:
        OpenLineage facets extracted from the SQL operation
    """

Spark Integration

Utilities for injecting OpenLineage information into Spark properties.

def inject_parent_job_information_into_spark_properties(
    properties: dict, 
    context: Context
) -> dict:
    """
    Inject OpenLineage parent job information into Spark properties.
    
    Args:
        properties (dict): Spark configuration properties
        context (Context): Airflow task context
    
    Returns:
        dict: Updated properties with OpenLineage parent job information
    """

def inject_transport_information_into_spark_properties(
    properties: dict, 
    context: Context
) -> dict:
    """
    Inject OpenLineage transport information into Spark properties.
    
    Args:
        properties (dict): Spark configuration properties
        context (Context): Airflow task context
    
    Returns:
        dict: Updated properties with OpenLineage transport configuration
    """

Asset Translation

Utility for translating Airflow assets for OpenLineage compatibility.

def translate_airflow_asset(*args, **kwargs):
    """
    Translate Airflow assets for OpenLineage compatibility.
    
    Handles asset/dataset compatibility between Airflow versions.
    Maps to translate_airflow_asset in newer versions or translate_airflow_dataset 
    with parameter renaming in older versions.
    
    Note: This function is conditionally imported and may have different signatures
    depending on the OpenLineage provider version and Airflow version.
    """

Usage Examples

from airflow.providers.common.compat.openlineage.facet import (
    Dataset, RunFacet, SchemaDatasetFacet, SQLJobFacet, create_no_op
)
from airflow.providers.common.compat.openlineage.utils.sql import get_openlineage_facets_with_sql
from airflow.providers.common.compat.openlineage.utils.spark import (
    inject_parent_job_information_into_spark_properties,
    inject_transport_information_into_spark_properties
)
from airflow.providers.common.compat.openlineage.check import require_openlineage_version

# Check if OpenLineage is available and use facets
try:
    # Create OpenLineage dataset
    input_dataset = Dataset(
        namespace="postgresql://localhost:5432",
        name="analytics.users"
    )
    
    # Create schema facet
    schema_facet = SchemaDatasetFacet(
        fields=[
            {"name": "user_id", "type": "INTEGER"},
            {"name": "email", "type": "VARCHAR(255)"}
        ]
    )
    
    # Create SQL job facet
    sql_facet = SQLJobFacet(query="SELECT * FROM users WHERE active = true")
    
except ImportError:
    # Fall back to no-op when OpenLineage not available
    input_dataset = create_no_op()
    schema_facet = create_no_op()
    sql_facet = create_no_op()

# SQL integration example
@require_openlineage_version(provider_min_version="1.0.0")
def extract_sql_lineage(**context):
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    
    hook = PostgresHook(conn_id="postgres_default")
    sql = "SELECT user_id, email FROM users WHERE created_date = '{{ ds }}'"
    
    # Get OpenLineage facets from SQL
    facets = get_openlineage_facets_with_sql(
        hook=hook,
        sql=sql,
        conn_id="postgres_default",
        database="analytics"
    )
    
    return facets

# Spark integration example
@require_openlineage_version(provider_min_version="1.2.0")
def configure_spark_with_openlineage(**context):
    spark_properties = {
        "spark.app.name": "data-processing",
        "spark.sql.adaptive.enabled": "true"
    }
    
    # Inject OpenLineage parent job information
    spark_properties = inject_parent_job_information_into_spark_properties(
        properties=spark_properties,
        context=context
    )
    
    # Inject OpenLineage transport configuration
    spark_properties = inject_transport_information_into_spark_properties(
        properties=spark_properties,
        context=context
    )
    
    return spark_properties

# Use in operators
from airflow.providers.postgres.operators.postgres import PostgresOperator

sql_task = PostgresOperator(
    task_id="process_users",
    sql="""
        INSERT INTO processed_users 
        SELECT user_id, UPPER(email) as email 
        FROM users 
        WHERE created_date = '{{ ds }}'
    """,
    postgres_conn_id="postgres_default",
    # OpenLineage will automatically extract lineage information
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-compat

docs

asset-management.md

index.md

lineage-entities.md

notifier-compatibility.md

openlineage-integration.md

provider-verification.md

security-permissions.md

standard-components.md

version-compatibility.md

tile.json