Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Common IO Provider includes asset and dataset handlers for file-based assets that integrate with Airflow's asset system and OpenLineage for data lineage tracking. These handlers support file URI validation, asset creation, and conversion for lineage systems.
Creates file-based assets from file paths with proper URI normalization and validation.
def create_asset(*, path: str | PosixPath, extra=None) -> Asset:
"""
Create a file asset from a file path.
Normalizes file:// URIs and handles various path formats for consistent
asset creation across different file path representations.
Parameters:
- path: File path as string or PosixPath object
- extra: Optional extra metadata for the asset
Returns:
- Asset: Airflow Asset object with normalized file:// URI
"""Validates file URI formats to ensure proper asset handling and prevent invalid asset creation.
def sanitize_uri(uri: SplitResult) -> SplitResult:
"""
Validate and sanitize file URI format.
Ensures that file:// URIs contain valid, non-empty paths and meet
the requirements for asset URI handling.
Parameters:
- uri: Parsed URI from urllib.parse.urlsplit
Returns:
- SplitResult: Validated URI structure
Raises:
- ValueError: If URI format is invalid or path is empty
"""Converts file assets to OpenLineage dataset format for data lineage tracking integration.
def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset:
"""
Convert Airflow Asset to OpenLineage Dataset for lineage tracking.
Translates Asset with valid AIP-60 URI to OpenLineage format with
assistance from the lineage context. Handles various file path formats
and network locations.
Parameters:
- asset: Airflow Asset object with file:// URI
- lineage_context: OpenLineage context for conversion
Returns:
- OpenLineageDataset: OpenLineage dataset with namespace and name
Note:
- Windows paths are not standardized and can produce unexpected behaviour
"""from airflow.providers.common.io.assets.file import create_asset
from pathlib import PosixPath
# Create asset from string path
asset1 = create_asset(path="/data/input/dataset.csv")
# Create asset from PosixPath
asset2 = create_asset(path=PosixPath("/data/output/results.json"))
# Create asset with extra metadata
asset3 = create_asset(
path="/data/processed/analysis.parquet",
extra={"format": "parquet", "schema_version": "1.0"}
)
# Handle various file URI formats
asset4 = create_asset(path="file:///absolute/path/data.txt")
asset5 = create_asset(path="file://relative/path/data.txt")from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.io.assets.file import create_asset
from datetime import datetime
# Define file assets
input_asset = create_asset(path="/data/input/raw_data.csv")
output_asset = create_asset(path="/data/output/processed_data.csv")
dag = DAG(
'asset_example',
start_date=datetime(2023, 1, 1),
schedule=[input_asset] # Schedule on input asset updates
)
def process_data(**context):
# Process input file and create output
input_path = "/data/input/raw_data.csv"
output_path = "/data/output/processed_data.csv"
# ... processing logic ...
return output_path
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
outlets=[output_asset], # Declare output asset
dag=dag
)from airflow.providers.common.io.assets.file import sanitize_uri
from urllib.parse import urlsplit
# Validate file URIs
try:
uri = urlsplit("file:///data/valid/path.txt")
validated_uri = sanitize_uri(uri)
print("Valid URI:", validated_uri.geturl())
except ValueError as e:
print("Invalid URI:", e)
# This will raise ValueError
try:
empty_uri = urlsplit("file://") # Empty path
sanitize_uri(empty_uri)
except ValueError:
print("Empty path not allowed")from airflow.providers.common.io.assets.file import convert_asset_to_openlineage, create_asset
# Create file asset
asset = create_asset(path="/data/warehouse/table.parquet")
# Convert to OpenLineage dataset (typically handled automatically)
# This is usually called by Airflow's lineage system
def lineage_context_example():
lineage_context = {} # Provided by Airflow lineage system
ol_dataset = convert_asset_to_openlineage(asset, lineage_context)
return {
"namespace": ol_dataset.namespace,
"name": ol_dataset.name
}The asset handlers are automatically registered with Airflow through the provider mechanism and handle file:// scheme URIs:
# Provider configuration (from provider.yaml)
asset_uris = [
{
"schemes": ["file"],
"handler": "airflow.providers.common.io.assets.file.sanitize_uri",
"to_openlineage_converter": "airflow.providers.common.io.assets.file.convert_asset_to_openlineage",
"factory": "airflow.providers.common.io.assets.file.create_asset",
}
]file:///absolute/path/to/file.txtfile://server/share/file.txt (network paths)from airflow import DAG
from airflow.providers.common.io.assets.file import create_asset
# Define data pipeline assets
raw_data = create_asset(path="/data/raw/daily_data.csv")
clean_data = create_asset(path="/data/clean/daily_data.csv")
report_data = create_asset(path="/reports/daily_report.pdf")
# DAG triggered by raw data updates
process_dag = DAG(
'data_processing',
schedule=[raw_data],
start_date=datetime(2023, 1, 1)
)
# DAG triggered by clean data updates
report_dag = DAG(
'report_generation',
schedule=[clean_data],
start_date=datetime(2023, 1, 1)
)File assets automatically participate in Airflow's data lineage tracking:
schedule parameteroutlets parameterThe create_asset function handles various file path formats:
file:///absolute/path → normalizedfile://relative/path → converted to absolute/absolute/path → converted to file:///absolute/pathAsset handlers work with both Airflow 2.x and 3.0+:
airflow.datasets.Datasetairflow.sdk.definitions.asset.AssetInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-io