CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-io

Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

file-transfer.mddocs/

File Transfer Operations

The FileTransferOperator provides unified file copying capabilities across different storage systems, supporting local filesystems, cloud storage, and any fsspec-compatible storage backend. It handles streaming transfers for large files and integrates with Airflow's templating and OpenLineage systems.

Capabilities

File Transfer Operator

Copies files from source to destination with support for different storage systems, connection management, and overwrite protection.

class FileTransferOperator(BaseOperator):
    """
    Copies a file from a source to a destination.
    
    This streams the file from the source to the destination if required,
    so it does not need to fit into memory.
    """
    
    template_fields: Sequence[str] = ("src", "dst")
    
    def __init__(
        self,
        *,
        src: str | ObjectStoragePath,
        dst: str | ObjectStoragePath,
        source_conn_id: str | None = None,
        dest_conn_id: str | None = None,
        overwrite: bool = False,
        **kwargs
    ) -> None:
        """
        Initialize FileTransferOperator.
        
        Parameters:
        - src: The source file path or ObjectStoragePath object
        - dst: The destination file path or ObjectStoragePath object  
        - source_conn_id: The optional source connection id
        - dest_conn_id: The optional destination connection id
        - overwrite: Whether to overwrite existing destination files
        """
    
    def execute(self, context: Context) -> None:
        """
        Execute the file transfer operation.
        
        Parameters:
        - context: Airflow task execution context
        
        Raises:
        - ValueError: If destination exists and overwrite is False
        """
    
    def get_openlineage_facets_on_start(self) -> OperatorLineage:
        """
        Get OpenLineage facets for data lineage tracking.
        
        Returns:
        - OperatorLineage: Input and output datasets for lineage
        """
    
    @staticmethod
    def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> ObjectStoragePath:
        """
        Convert string path to ObjectStoragePath with optional connection.
        
        Parameters:
        - path: File path as string or ObjectStoragePath
        - conn_id: Optional Airflow connection ID
        
        Returns:
        - ObjectStoragePath: Resolved path object
        """

Usage Examples

Basic File Transfer

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
from airflow import DAG
from datetime import datetime

dag = DAG('file_transfer', start_date=datetime(2023, 1, 1))

# Simple local file copy
local_copy = FileTransferOperator(
    task_id='copy_local_file',
    src='/tmp/source.txt',
    dst='/tmp/destination.txt',
    overwrite=True,
    dag=dag
)

Cloud Storage Transfer

# Transfer between S3 buckets
s3_transfer = FileTransferOperator(
    task_id='s3_to_s3_transfer',
    src='s3://source-bucket/data/file.csv',
    dst='s3://dest-bucket/processed/file.csv',
    source_conn_id='aws_default',
    dest_conn_id='aws_default',
    overwrite=False,
    dag=dag
)

# Transfer from local to cloud storage
upload_task = FileTransferOperator(
    task_id='upload_to_gcs',
    src='/local/path/data.json',
    dst='gs://my-bucket/data/data.json',
    dest_conn_id='google_cloud_default',
    dag=dag
)

Using ObjectStoragePath Objects

from airflow.io.path import ObjectStoragePath

# Create ObjectStoragePath objects directly
source_path = ObjectStoragePath('s3://bucket/source.txt', conn_id='aws_conn')
dest_path = ObjectStoragePath('gcs://bucket/dest.txt', conn_id='gcp_conn')

cross_cloud_transfer = FileTransferOperator(
    task_id='cross_cloud_transfer',
    src=source_path,
    dst=dest_path,
    dag=dag
)

Template Support

# Use Airflow templating in paths
templated_transfer = FileTransferOperator(
    task_id='templated_transfer',
    src='s3://bucket/data/{{ ds }}/input.csv',
    dst='s3://bucket/processed/{{ ds }}/output.csv',
    source_conn_id='aws_default',
    dest_conn_id='aws_default',
    dag=dag
)

Error Handling

The FileTransferOperator handles several error conditions:

  • Destination exists: Raises ValueError if destination file exists and overwrite=False
  • Connection errors: Propagates connection-related exceptions from underlying storage systems
  • Permission errors: Propagates permission-related exceptions from storage backends
  • Path validation: Uses ObjectStoragePath validation for path resolution

Integration Features

OpenLineage Integration

The operator automatically provides data lineage information through the get_openlineage_facets_on_start() method, creating input and output datasets for lineage tracking systems.

Airflow Templating

The src and dst fields support Airflow's Jinja2 templating, allowing dynamic path construction using execution context variables like {{ ds }}, {{ task_instance_key_str }}, etc.

Version Compatibility

The operator works with both Airflow 2.x and 3.0+ through the version compatibility layer, automatically importing the correct BaseOperator and ObjectStoragePath classes.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-io

docs

asset-handlers.md

file-transfer.md

index.md

xcom-backend.md

tile.json