Common Compatibility Provider - providing compatibility code for previous Airflow versions
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Data lineage entities including files, users, tables, columns, and tags with template field support for dynamic content rendering. These entities provide structured representations of data assets and their relationships for lineage tracking.
Represents file-based data assets in lineage tracking.
class File:
"""
File entity for lineage tracking.
Attributes:
url (str): File URL or path
type_hint (str | None): Optional type hint for the file
template_fields (tuple): Fields that support templating - ("url",)
"""
def __init__(self, url: str, type_hint: str | None = None): ...Represents users associated with data assets for ownership and access tracking.
class User:
"""
User entity for lineage tracking.
Attributes:
email (str): User email address
first_name (str | None): Optional first name
last_name (str | None): Optional last name
template_fields (tuple): Fields that support templating - ("email", "first_name", "last_name")
"""
def __init__(
self,
email: str,
first_name: str | None = None,
last_name: str | None = None
): ...Represents tags or classifications applied to data assets.
class Tag:
"""
Tag/classification entity for data assets.
Attributes:
tag_name (str): Name of the tag
template_fields (tuple): Fields that support templating - ("tag_name",)
"""
def __init__(self, tag_name: str): ...Represents individual columns within table entities.
class Column:
"""
Table column entity for lineage tracking.
Attributes:
name (str): Column name
description (str | None): Optional column description
data_type (str): Column data type
tags (list[Tag]): List of tags applied to the column
template_fields (tuple): Fields that support templating - ("name", "description", "data_type", "tags")
"""
def __init__(
self,
name: str,
description: str | None = None,
data_type: str = "",
tags: list[Tag] | None = None
): ...Represents database tables or structured datasets in lineage tracking.
class Table:
"""
Table entity for lineage tracking.
Attributes:
database (str): Database name
cluster (str): Cluster or server name
name (str): Table name
tags (list[Tag]): List of tags applied to the table
description (str | None): Optional table description
columns (list[Column]): List of table columns
owners (list[User]): List of table owners
extra (dict[str, Any]): Additional metadata
type_hint (str | None): Optional type hint
template_fields (tuple): Fields that support templating
"""
def __init__(
self,
database: str,
cluster: str,
name: str,
tags: list[Tag] | None = None,
description: str | None = None,
columns: list[Column] | None = None,
owners: list[User] | None = None,
extra: dict[str, Any] | None = None,
type_hint: str | None = None
): ...Helper functions for lineage entity management.
def default_if_none(arg: bool | None) -> bool:
"""
Return default value when argument is None.
Args:
arg (bool | None): Boolean value or None
Returns:
bool: False if arg is None, otherwise arg
"""Hook lineage collector for version-compatible lineage tracking.
def get_hook_lineage_collector():
"""
Get version-compatible hook lineage collector.
Returns hook lineage collector appropriate for current Airflow version.
In Airflow 3.0+, returns airflow.lineage.hook.get_hook_lineage_collector()
In Airflow < 3.0, returns compatibility wrapper with asset/dataset renaming
Returns:
Hook lineage collector with version-appropriate asset/dataset methods
"""from airflow.providers.common.compat.lineage.entities import (
File, User, Tag, Column, Table, default_if_none
)
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector
# Create lineage entities
owner = User(
email="data-team@company.com",
first_name="Data",
last_name="Team"
)
pii_tag = Tag("PII")
sensitive_tag = Tag("SENSITIVE")
# Define table columns
user_id_col = Column(
name="user_id",
description="Unique user identifier",
data_type="INTEGER",
tags=[]
)
email_col = Column(
name="email",
description="User email address",
data_type="VARCHAR(255)",
tags=[pii_tag, sensitive_tag]
)
# Create table entity
users_table = Table(
database="analytics",
cluster="prod-cluster",
name="users",
description="User information table",
columns=[user_id_col, email_col],
owners=[owner],
tags=[sensitive_tag],
extra={"partition_key": "created_date"}
)
# Create file entity
data_file = File(
url="s3://data-lake/users/{{ ds }}/users.parquet",
type_hint="parquet"
)
# Use in operators with lineage tracking
from airflow.operators.python import PythonOperator
def process_data_with_lineage(**context):
# Get lineage collector
collector = get_hook_lineage_collector()
# Add lineage information
collector.add_input(users_table)
collector.add_output(data_file)
# Process data
print(f"Processing {users_table.name} to {data_file.url}")
lineage_task = PythonOperator(
task_id="process_with_lineage",
python_callable=process_data_with_lineage,
# Template fields will be rendered with Airflow context
)
# Utility usage
include_metadata = default_if_none(None) # Returns FalseInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-compat