CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-compat

Common Compatibility Provider - providing compatibility code for previous Airflow versions

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

lineage-entities.mddocs/

Lineage Entities

Data lineage entities including files, users, tables, columns, and tags with template field support for dynamic content rendering. These entities provide structured representations of data assets and their relationships for lineage tracking.

Capabilities

File Entity

Represents file-based data assets in lineage tracking.

class File:
    """
    File entity for lineage tracking.
    
    Attributes:
        url (str): File URL or path
        type_hint (str | None): Optional type hint for the file
        template_fields (tuple): Fields that support templating - ("url",)
    """
    
    def __init__(self, url: str, type_hint: str | None = None): ...

User Entity

Represents users associated with data assets for ownership and access tracking.

class User:
    """
    User entity for lineage tracking.
    
    Attributes:
        email (str): User email address
        first_name (str | None): Optional first name
        last_name (str | None): Optional last name
        template_fields (tuple): Fields that support templating - ("email", "first_name", "last_name")
    """
    
    def __init__(
        self, 
        email: str, 
        first_name: str | None = None, 
        last_name: str | None = None
    ): ...

Tag Entity

Represents tags or classifications applied to data assets.

class Tag:
    """
    Tag/classification entity for data assets.
    
    Attributes:
        tag_name (str): Name of the tag
        template_fields (tuple): Fields that support templating - ("tag_name",)
    """
    
    def __init__(self, tag_name: str): ...

Column Entity

Represents individual columns within table entities.

class Column:
    """
    Table column entity for lineage tracking.
    
    Attributes:
        name (str): Column name
        description (str | None): Optional column description
        data_type (str): Column data type
        tags (list[Tag]): List of tags applied to the column
        template_fields (tuple): Fields that support templating - ("name", "description", "data_type", "tags")
    """
    
    def __init__(
        self,
        name: str,
        description: str | None = None,
        data_type: str = "",
        tags: list[Tag] | None = None
    ): ...

Table Entity

Represents database tables or structured datasets in lineage tracking.

class Table:
    """
    Table entity for lineage tracking.
    
    Attributes:
        database (str): Database name
        cluster (str): Cluster or server name
        name (str): Table name
        tags (list[Tag]): List of tags applied to the table
        description (str | None): Optional table description
        columns (list[Column]): List of table columns
        owners (list[User]): List of table owners
        extra (dict[str, Any]): Additional metadata
        type_hint (str | None): Optional type hint
        template_fields (tuple): Fields that support templating
    """
    
    def __init__(
        self,
        database: str,
        cluster: str,
        name: str,
        tags: list[Tag] | None = None,
        description: str | None = None,
        columns: list[Column] | None = None,
        owners: list[User] | None = None,
        extra: dict[str, Any] | None = None,
        type_hint: str | None = None
    ): ...

Utility Functions

Helper functions for lineage entity management.

def default_if_none(arg: bool | None) -> bool:
    """
    Return default value when argument is None.
    
    Args:
        arg (bool | None): Boolean value or None
        
    Returns:
        bool: False if arg is None, otherwise arg
    """

Hook Integration

Hook lineage collector for version-compatible lineage tracking.

def get_hook_lineage_collector():
    """
    Get version-compatible hook lineage collector.
    
    Returns hook lineage collector appropriate for current Airflow version.
    In Airflow 3.0+, returns airflow.lineage.hook.get_hook_lineage_collector()
    In Airflow < 3.0, returns compatibility wrapper with asset/dataset renaming
    
    Returns:
        Hook lineage collector with version-appropriate asset/dataset methods
    """

Usage Examples

from airflow.providers.common.compat.lineage.entities import (
    File, User, Tag, Column, Table, default_if_none
)
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

# Create lineage entities
owner = User(
    email="data-team@company.com",
    first_name="Data",
    last_name="Team"
)

pii_tag = Tag("PII")
sensitive_tag = Tag("SENSITIVE")

# Define table columns
user_id_col = Column(
    name="user_id",
    description="Unique user identifier",
    data_type="INTEGER",
    tags=[]
)

email_col = Column(
    name="email",
    description="User email address",
    data_type="VARCHAR(255)",
    tags=[pii_tag, sensitive_tag]
)

# Create table entity
users_table = Table(
    database="analytics",
    cluster="prod-cluster",
    name="users",
    description="User information table",
    columns=[user_id_col, email_col],
    owners=[owner],
    tags=[sensitive_tag],
    extra={"partition_key": "created_date"}
)

# Create file entity
data_file = File(
    url="s3://data-lake/users/{{ ds }}/users.parquet",
    type_hint="parquet"
)

# Use in operators with lineage tracking
from airflow.operators.python import PythonOperator

def process_data_with_lineage(**context):
    # Get lineage collector
    collector = get_hook_lineage_collector()
    
    # Add lineage information
    collector.add_input(users_table)
    collector.add_output(data_file)
    
    # Process data
    print(f"Processing {users_table.name} to {data_file.url}")

lineage_task = PythonOperator(
    task_id="process_with_lineage",
    python_callable=process_data_with_lineage,
    # Template fields will be rendered with Airflow context
)

# Utility usage
include_metadata = default_if_none(None)  # Returns False

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-compat

docs

asset-management.md

index.md

lineage-entities.md

notifier-compatibility.md

openlineage-integration.md

provider-verification.md

security-permissions.md

standard-components.md

version-compatibility.md

tile.json