Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The FileTransferOperator provides unified file copying capabilities across different storage systems, supporting local filesystems, cloud storage, and any fsspec-compatible storage backend. It handles streaming transfers for large files and integrates with Airflow's templating and OpenLineage systems.
Copies files from source to destination with support for different storage systems, connection management, and overwrite protection.
class FileTransferOperator(BaseOperator):
"""
Copies a file from a source to a destination.
This streams the file from the source to the destination if required,
so it does not need to fit into memory.
"""
template_fields: Sequence[str] = ("src", "dst")
def __init__(
self,
*,
src: str | ObjectStoragePath,
dst: str | ObjectStoragePath,
source_conn_id: str | None = None,
dest_conn_id: str | None = None,
overwrite: bool = False,
**kwargs
) -> None:
"""
Initialize FileTransferOperator.
Parameters:
- src: The source file path or ObjectStoragePath object
- dst: The destination file path or ObjectStoragePath object
- source_conn_id: The optional source connection id
- dest_conn_id: The optional destination connection id
- overwrite: Whether to overwrite existing destination files
"""
def execute(self, context: Context) -> None:
"""
Execute the file transfer operation.
Parameters:
- context: Airflow task execution context
Raises:
- ValueError: If destination exists and overwrite is False
"""
def get_openlineage_facets_on_start(self) -> OperatorLineage:
"""
Get OpenLineage facets for data lineage tracking.
Returns:
- OperatorLineage: Input and output datasets for lineage
"""
@staticmethod
def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> ObjectStoragePath:
"""
Convert string path to ObjectStoragePath with optional connection.
Parameters:
- path: File path as string or ObjectStoragePath
- conn_id: Optional Airflow connection ID
Returns:
- ObjectStoragePath: Resolved path object
"""from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
from airflow import DAG
from datetime import datetime
dag = DAG('file_transfer', start_date=datetime(2023, 1, 1))
# Simple local file copy
local_copy = FileTransferOperator(
task_id='copy_local_file',
src='/tmp/source.txt',
dst='/tmp/destination.txt',
overwrite=True,
dag=dag
)# Transfer between S3 buckets
s3_transfer = FileTransferOperator(
task_id='s3_to_s3_transfer',
src='s3://source-bucket/data/file.csv',
dst='s3://dest-bucket/processed/file.csv',
source_conn_id='aws_default',
dest_conn_id='aws_default',
overwrite=False,
dag=dag
)
# Transfer from local to cloud storage
upload_task = FileTransferOperator(
task_id='upload_to_gcs',
src='/local/path/data.json',
dst='gs://my-bucket/data/data.json',
dest_conn_id='google_cloud_default',
dag=dag
)from airflow.io.path import ObjectStoragePath
# Create ObjectStoragePath objects directly
source_path = ObjectStoragePath('s3://bucket/source.txt', conn_id='aws_conn')
dest_path = ObjectStoragePath('gcs://bucket/dest.txt', conn_id='gcp_conn')
cross_cloud_transfer = FileTransferOperator(
task_id='cross_cloud_transfer',
src=source_path,
dst=dest_path,
dag=dag
)# Use Airflow templating in paths
templated_transfer = FileTransferOperator(
task_id='templated_transfer',
src='s3://bucket/data/{{ ds }}/input.csv',
dst='s3://bucket/processed/{{ ds }}/output.csv',
source_conn_id='aws_default',
dest_conn_id='aws_default',
dag=dag
)The FileTransferOperator handles several error conditions:
ValueError if destination file exists and overwrite=FalseThe operator automatically provides data lineage information through the get_openlineage_facets_on_start() method, creating input and output datasets for lineage tracking systems.
The src and dst fields support Airflow's Jinja2 templating, allowing dynamic path construction using execution context variables like {{ ds }}, {{ task_instance_key_str }}, etc.
The operator works with both Airflow 2.x and 3.0+ through the version compatibility layer, automatically importing the correct BaseOperator and ObjectStoragePath classes.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-io