or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asset-handlers.mdfile-transfer.mdindex.mdxcom-backend.md
tile.json

tessl/pypi-apache-airflow-providers-common-io

Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-common-io@1.6.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-common-io@1.6.0

index.mddocs/

Apache Airflow Common IO Provider

A provider package for Apache Airflow that enables unified I/O operations and file transfer capabilities across different storage systems. This package provides operators for file transfers, XCom backends for object storage, asset/dataset handlers for file-based assets, and configuration options for storage integration.

Package Information

  • Package Name: apache-airflow-providers-common-io
  • Language: Python
  • Installation: pip install apache-airflow-providers-common-io
  • Airflow Version: Requires Apache Airflow 2.10.0+

Core Imports

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
from airflow.providers.common.io.assets.file import create_asset, sanitize_uri, convert_asset_to_openlineage

Basic Usage

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
from airflow import DAG
from datetime import datetime

# Create a DAG for file transfer
dag = DAG(
    'file_transfer_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None
)

# Transfer a file from source to destination
transfer_task = FileTransferOperator(
    task_id='transfer_file',
    src='/path/to/source/file.txt',
    dst='/path/to/destination/file.txt',
    overwrite=True,
    dag=dag
)

# Use with object storage paths and connections
s3_transfer = FileTransferOperator(
    task_id='s3_transfer',
    src='s3://source-bucket/file.txt',
    dst='s3://dest-bucket/file.txt',
    source_conn_id='aws_default',
    dest_conn_id='aws_default',
    dag=dag
)

Architecture

The Common IO Provider follows Apache Airflow's provider pattern and integrates with multiple Airflow subsystems:

  • Operators: File transfer operations using ObjectStoragePath for unified storage access
  • XCom Backend: Configurable object storage backend for XCom data with size-based routing
  • Asset Handlers: File asset creation, URI validation, and OpenLineage integration
  • Configuration: Centralized settings for object storage paths, thresholds, and compression

The package provides version compatibility handling for both Airflow 2.x and 3.0+, ensuring consistent behavior across versions.

Capabilities

File Transfer Operations

File transfer operator for copying files between different storage systems with support for local filesystem, cloud storage (S3, GCS, Azure), and other fsspec-compatible storage backends.

class FileTransferOperator(BaseOperator):
    def __init__(
        self,
        *,
        src: str | ObjectStoragePath,
        dst: str | ObjectStoragePath,
        source_conn_id: str | None = None,
        dest_conn_id: str | None = None,
        overwrite: bool = False,
        **kwargs
    ): ...
    
    def execute(self, context: Context) -> None: ...
    def get_openlineage_facets_on_start(self) -> OperatorLineage: ...

File Transfer Operations

XCom Object Storage Backend

XCom backend that intelligently stores data in object storage or database based on configurable size thresholds, with compression support and automatic cleanup.

class XComObjectStorageBackend(BaseXCom):
    @staticmethod
    def serialize_value(
        value: T,
        *,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: int | None = None,
    ) -> bytes | str: ...
    
    @staticmethod
    def deserialize_value(result) -> Any: ...
    
    @staticmethod
    def purge(xcom: XComResult, session: Session | None = None) -> None: ...

XCom Object Storage Backend

File Asset Handlers

Asset and dataset handlers for file-based assets with URI validation, asset creation, and OpenLineage conversion for data lineage tracking.

def create_asset(*, path: str | PosixPath, extra=None) -> Asset: ...
def sanitize_uri(uri: SplitResult) -> SplitResult: ...
def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset: ...

File Asset Handlers

Configuration

The Common IO Provider supports configuration options for XCom object storage behavior:

XCom Object Storage Settings

  • common.io.xcom_objectstorage_path: Path to object storage location for XComs (e.g., "s3://conn_id@bucket/path")
  • common.io.xcom_objectstorage_threshold: Size threshold in bytes (-1: always database, 0: always object storage, positive: threshold)
  • common.io.xcom_objectstorage_compression: Compression algorithm (gz, bz2, lzma, snappy, zip)

These settings are configured in airflow.cfg or via environment variables following Airflow's configuration patterns.

Types

Core types used throughout the Common IO Provider API:

# Airflow Context type for task execution
from airflow.sdk import Context  # Airflow 3.0+
# or from airflow.utils.context import Context  # Airflow 2.x

# Object storage path handling
if AIRFLOW_V_3_0_PLUS:
    from airflow.sdk import ObjectStoragePath
else:
    from airflow.io.path import ObjectStoragePath

# Asset/Dataset types (version-dependent)
if AIRFLOW_V_3_0_PLUS:
    from airflow.sdk.definitions.asset import Asset
else:
    from airflow.datasets import Dataset as Asset

# XCom result type
from airflow.sdk.execution_time.comms import XComResult  # Airflow 3.0+
# or from airflow.models.xcom import XCom as XComResult  # Airflow 2.x

# Database session type
from sqlalchemy.orm import Session

# Standard library types
from pathlib import PosixPath
from urllib.parse import SplitResult
from typing import Any, TypeVar

# OpenLineage integration
from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset
from airflow.providers.openlineage.extractors import OperatorLineage

# Generic type variable
T = TypeVar("T")