Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems
npx @tessl/cli install tessl/pypi-apache-airflow-providers-common-io@1.6.0A provider package for Apache Airflow that enables unified I/O operations and file transfer capabilities across different storage systems. This package provides operators for file transfers, XCom backends for object storage, asset/dataset handlers for file-based assets, and configuration options for storage integration.
pip install apache-airflow-providers-common-iofrom airflow.providers.common.io.operators.file_transfer import FileTransferOperator
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
from airflow.providers.common.io.assets.file import create_asset, sanitize_uri, convert_asset_to_openlineagefrom airflow.providers.common.io.operators.file_transfer import FileTransferOperator
from airflow import DAG
from datetime import datetime
# Create a DAG for file transfer
dag = DAG(
'file_transfer_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
# Transfer a file from source to destination
transfer_task = FileTransferOperator(
task_id='transfer_file',
src='/path/to/source/file.txt',
dst='/path/to/destination/file.txt',
overwrite=True,
dag=dag
)
# Use with object storage paths and connections
s3_transfer = FileTransferOperator(
task_id='s3_transfer',
src='s3://source-bucket/file.txt',
dst='s3://dest-bucket/file.txt',
source_conn_id='aws_default',
dest_conn_id='aws_default',
dag=dag
)The Common IO Provider follows Apache Airflow's provider pattern and integrates with multiple Airflow subsystems:
The package provides version compatibility handling for both Airflow 2.x and 3.0+, ensuring consistent behavior across versions.
File transfer operator for copying files between different storage systems with support for local filesystem, cloud storage (S3, GCS, Azure), and other fsspec-compatible storage backends.
class FileTransferOperator(BaseOperator):
def __init__(
self,
*,
src: str | ObjectStoragePath,
dst: str | ObjectStoragePath,
source_conn_id: str | None = None,
dest_conn_id: str | None = None,
overwrite: bool = False,
**kwargs
): ...
def execute(self, context: Context) -> None: ...
def get_openlineage_facets_on_start(self) -> OperatorLineage: ...XCom backend that intelligently stores data in object storage or database based on configurable size thresholds, with compression support and automatic cleanup.
class XComObjectStorageBackend(BaseXCom):
@staticmethod
def serialize_value(
value: T,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str: ...
@staticmethod
def deserialize_value(result) -> Any: ...
@staticmethod
def purge(xcom: XComResult, session: Session | None = None) -> None: ...Asset and dataset handlers for file-based assets with URI validation, asset creation, and OpenLineage conversion for data lineage tracking.
def create_asset(*, path: str | PosixPath, extra=None) -> Asset: ...
def sanitize_uri(uri: SplitResult) -> SplitResult: ...
def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset: ...The Common IO Provider supports configuration options for XCom object storage behavior:
common.io.xcom_objectstorage_path: Path to object storage location for XComs (e.g., "s3://conn_id@bucket/path")common.io.xcom_objectstorage_threshold: Size threshold in bytes (-1: always database, 0: always object storage, positive: threshold)common.io.xcom_objectstorage_compression: Compression algorithm (gz, bz2, lzma, snappy, zip)These settings are configured in airflow.cfg or via environment variables following Airflow's configuration patterns.
Core types used throughout the Common IO Provider API:
# Airflow Context type for task execution
from airflow.sdk import Context # Airflow 3.0+
# or from airflow.utils.context import Context # Airflow 2.x
# Object storage path handling
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import ObjectStoragePath
else:
from airflow.io.path import ObjectStoragePath
# Asset/Dataset types (version-dependent)
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.definitions.asset import Asset
else:
from airflow.datasets import Dataset as Asset
# XCom result type
from airflow.sdk.execution_time.comms import XComResult # Airflow 3.0+
# or from airflow.models.xcom import XCom as XComResult # Airflow 2.x
# Database session type
from sqlalchemy.orm import Session
# Standard library types
from pathlib import PosixPath
from urllib.parse import SplitResult
from typing import Any, TypeVar
# OpenLineage integration
from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset
from airflow.providers.openlineage.extractors import OperatorLineage
# Generic type variable
T = TypeVar("T")