Common Compatibility Provider - providing compatibility code for previous Airflow versions
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Version-compatible asset and dataset handling with support for asset aliases, collections, and authorization details. This module handles the transition from "Dataset" to "Asset" terminology in Airflow 3.0 while providing a consistent API.
Main asset/dataset class for representing data dependencies in Airflow workflows.
class Asset:
"""
Version-compatible asset/dataset class.
Maps to airflow.assets.Asset in Airflow 3.0+
Maps to airflow.datasets.Dataset in Airflow < 3.0
"""Asset alias class for creating symbolic references to assets that can be resolved at runtime.
class AssetAlias:
"""
Version-compatible asset/dataset alias class.
Maps to airflow.assets.AssetAlias in Airflow 3.0+
Maps to airflow.datasets.DatasetAlias in Airflow < 3.0
"""Collection classes for representing groups of assets with logical operators.
class AssetAll:
"""
Asset collection representing all assets (AND logic).
Maps to airflow.assets.AssetAll in Airflow 3.0+
Maps to airflow.datasets.DatasetAll in Airflow < 3.0
"""
class AssetAny:
"""
Asset collection representing any asset (OR logic).
Maps to airflow.assets.AssetAny in Airflow 3.0+
Maps to airflow.datasets.DatasetAny in Airflow < 3.0
"""Classes for asset authorization and permission management.
class AssetDetails:
"""
Asset details for authorization purposes.
Maps to airflow.assets.AssetDetails in Airflow 3.0+
Maps to airflow.datasets.DatasetDetails in Airflow < 3.0
"""
class AssetAliasDetails:
"""
Asset alias details for authorization purposes.
Available in Airflow 3.0+ from airflow.api_fastapi.auth.managers.models.resource_details.AssetAliasDetails
Note: May not be properly imported in Airflow < 3.0 despite being in __all__
"""Function to expand asset aliases to actual asset instances.
def expand_alias_to_assets(...):
"""
Expand asset aliases to actual assets.
Maps to appropriate expansion function based on Airflow version.
"""from airflow.providers.common.compat.assets import (
Asset,
AssetAlias,
AssetAll,
AssetAny,
expand_alias_to_assets
)
# Create assets
input_data = Asset("s3://bucket/input.csv")
output_data = Asset("s3://bucket/output.csv")
# Create asset aliases
processed_data = AssetAlias("processed_data")
# Use asset collections
all_inputs = AssetAll([input_data, processed_data])
any_output = AssetAny([output_data])
# Expand aliases
actual_assets = expand_alias_to_assets(processed_data)
# Use in DAGs (asset-aware scheduling)
from airflow import DAG
from airflow.operators.dummy import DummyOperator
dag = DAG(
"example_dag",
schedule=[input_data], # Trigger when input_data is updated
catchup=False
)
task = DummyOperator(
task_id="process_data",
outlets=[output_data], # Mark as producing output_data
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-compat