CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-io

Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

asset-handlers.mddocs/

File Asset Handlers

The Common IO Provider includes asset and dataset handlers for file-based assets that integrate with Airflow's asset system and OpenLineage for data lineage tracking. These handlers support file URI validation, asset creation, and conversion for lineage systems.

Capabilities

Asset Creation

Creates file-based assets from file paths with proper URI normalization and validation.

def create_asset(*, path: str | PosixPath, extra=None) -> Asset:
    """
    Create a file asset from a file path.
    
    Normalizes file:// URIs and handles various path formats for consistent
    asset creation across different file path representations.
    
    Parameters:
    - path: File path as string or PosixPath object
    - extra: Optional extra metadata for the asset
    
    Returns:
    - Asset: Airflow Asset object with normalized file:// URI
    """

URI Validation

Validates file URI formats to ensure proper asset handling and prevent invalid asset creation.

def sanitize_uri(uri: SplitResult) -> SplitResult:
    """
    Validate and sanitize file URI format.
    
    Ensures that file:// URIs contain valid, non-empty paths and meet
    the requirements for asset URI handling.
    
    Parameters:
    - uri: Parsed URI from urllib.parse.urlsplit
    
    Returns:
    - SplitResult: Validated URI structure
    
    Raises:
    - ValueError: If URI format is invalid or path is empty
    """

OpenLineage Conversion

Converts file assets to OpenLineage dataset format for data lineage tracking integration.

def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset:
    """
    Convert Airflow Asset to OpenLineage Dataset for lineage tracking.
    
    Translates Asset with valid AIP-60 URI to OpenLineage format with
    assistance from the lineage context. Handles various file path formats
    and network locations.
    
    Parameters:
    - asset: Airflow Asset object with file:// URI
    - lineage_context: OpenLineage context for conversion
    
    Returns:
    - OpenLineageDataset: OpenLineage dataset with namespace and name
    
    Note:
    - Windows paths are not standardized and can produce unexpected behaviour
    """

Usage Examples

Creating File Assets

from airflow.providers.common.io.assets.file import create_asset
from pathlib import PosixPath

# Create asset from string path
asset1 = create_asset(path="/data/input/dataset.csv")

# Create asset from PosixPath
asset2 = create_asset(path=PosixPath("/data/output/results.json"))

# Create asset with extra metadata
asset3 = create_asset(
    path="/data/processed/analysis.parquet",
    extra={"format": "parquet", "schema_version": "1.0"}
)

# Handle various file URI formats
asset4 = create_asset(path="file:///absolute/path/data.txt")
asset5 = create_asset(path="file://relative/path/data.txt")

Using Assets in DAGs

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.io.assets.file import create_asset
from datetime import datetime

# Define file assets
input_asset = create_asset(path="/data/input/raw_data.csv")
output_asset = create_asset(path="/data/output/processed_data.csv")

dag = DAG(
    'asset_example',
    start_date=datetime(2023, 1, 1),
    schedule=[input_asset]  # Schedule on input asset updates
)

def process_data(**context):
    # Process input file and create output
    input_path = "/data/input/raw_data.csv"
    output_path = "/data/output/processed_data.csv"
    # ... processing logic ...
    return output_path

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    outlets=[output_asset],  # Declare output asset
    dag=dag
)

Asset URI Validation

from airflow.providers.common.io.assets.file import sanitize_uri
from urllib.parse import urlsplit

# Validate file URIs
try:
    uri = urlsplit("file:///data/valid/path.txt")
    validated_uri = sanitize_uri(uri)
    print("Valid URI:", validated_uri.geturl())
except ValueError as e:
    print("Invalid URI:", e)

# This will raise ValueError
try:
    empty_uri = urlsplit("file://")  # Empty path
    sanitize_uri(empty_uri)
except ValueError:
    print("Empty path not allowed")

OpenLineage Integration

from airflow.providers.common.io.assets.file import convert_asset_to_openlineage, create_asset

# Create file asset
asset = create_asset(path="/data/warehouse/table.parquet")

# Convert to OpenLineage dataset (typically handled automatically)
# This is usually called by Airflow's lineage system
def lineage_context_example():
    lineage_context = {}  # Provided by Airflow lineage system
    ol_dataset = convert_asset_to_openlineage(asset, lineage_context)
    
    return {
        "namespace": ol_dataset.namespace,
        "name": ol_dataset.name
    }

Provider Registration

The asset handlers are automatically registered with Airflow through the provider mechanism and handle file:// scheme URIs:

Automatic Registration

# Provider configuration (from provider.yaml)
asset_uris = [
    {
        "schemes": ["file"],
        "handler": "airflow.providers.common.io.assets.file.sanitize_uri",
        "to_openlineage_converter": "airflow.providers.common.io.assets.file.convert_asset_to_openlineage",
        "factory": "airflow.providers.common.io.assets.file.create_asset",
    }
]

Supported URI Schemes

  • file: Local and network file paths
    • file:///absolute/path/to/file.txt
    • file://server/share/file.txt (network paths)
    • Automatic normalization of various file URI formats

Integration with Airflow Assets

Asset-Based Scheduling

from airflow import DAG
from airflow.providers.common.io.assets.file import create_asset

# Define data pipeline assets
raw_data = create_asset(path="/data/raw/daily_data.csv")
clean_data = create_asset(path="/data/clean/daily_data.csv")
report_data = create_asset(path="/reports/daily_report.pdf")

# DAG triggered by raw data updates
process_dag = DAG(
    'data_processing',
    schedule=[raw_data],
    start_date=datetime(2023, 1, 1)
)

# DAG triggered by clean data updates  
report_dag = DAG(
    'report_generation',
    schedule=[clean_data],
    start_date=datetime(2023, 1, 1)
)

Asset Lineage Tracking

File assets automatically participate in Airflow's data lineage tracking:

  • Input assets: Declared via DAG schedule parameter
  • Output assets: Declared via task outlets parameter
  • OpenLineage: Automatic conversion for external lineage systems
  • Asset graph: Visual representation in Airflow UI

Path Handling Details

URI Normalization

The create_asset function handles various file path formats:

  1. Full file URIs: file:///absolute/path → normalized
  2. Relative file URIs: file://relative/path → converted to absolute
  3. Plain paths: /absolute/path → converted to file:///absolute/path
  4. PosixPath objects: Converted to string then processed

Cross-Platform Considerations

  • Linux/macOS: Standard POSIX path handling
  • Windows: Limited support, may produce unexpected behavior
  • Network paths: Support for UNC paths via file:// URIs

Version Compatibility

Asset handlers work with both Airflow 2.x and 3.0+:

  • Airflow 2.x: Uses airflow.datasets.Dataset
  • Airflow 3.0+: Uses airflow.sdk.definitions.asset.Asset
  • Automatic imports: Version detection handles compatibility

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-io

docs

asset-handlers.md

file-transfer.md

index.md

xcom-backend.md

tile.json