Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems
npx @tessl/cli install tessl/pypi-apache-airflow-providers-common-io@1.6.00
# Apache Airflow Common IO Provider
1
2
A provider package for Apache Airflow that enables unified I/O operations and file transfer capabilities across different storage systems. This package provides operators for file transfers, XCom backends for object storage, asset/dataset handlers for file-based assets, and configuration options for storage integration.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-common-io
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-common-io`
9
- **Airflow Version**: Requires Apache Airflow 2.10.0+
10
11
## Core Imports
12
13
```python
14
from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
15
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
16
from airflow.providers.common.io.assets.file import create_asset, sanitize_uri, convert_asset_to_openlineage
17
```
18
19
## Basic Usage
20
21
```python
22
from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
23
from airflow import DAG
24
from datetime import datetime
25
26
# Create a DAG for file transfer
27
dag = DAG(
28
'file_transfer_example',
29
start_date=datetime(2023, 1, 1),
30
schedule_interval=None
31
)
32
33
# Transfer a file from source to destination
34
transfer_task = FileTransferOperator(
35
task_id='transfer_file',
36
src='/path/to/source/file.txt',
37
dst='/path/to/destination/file.txt',
38
overwrite=True,
39
dag=dag
40
)
41
42
# Use with object storage paths and connections
43
s3_transfer = FileTransferOperator(
44
task_id='s3_transfer',
45
src='s3://source-bucket/file.txt',
46
dst='s3://dest-bucket/file.txt',
47
source_conn_id='aws_default',
48
dest_conn_id='aws_default',
49
dag=dag
50
)
51
```
52
53
## Architecture
54
55
The Common IO Provider follows Apache Airflow's provider pattern and integrates with multiple Airflow subsystems:
56
57
- **Operators**: File transfer operations using ObjectStoragePath for unified storage access
58
- **XCom Backend**: Configurable object storage backend for XCom data with size-based routing
59
- **Asset Handlers**: File asset creation, URI validation, and OpenLineage integration
60
- **Configuration**: Centralized settings for object storage paths, thresholds, and compression
61
62
The package provides version compatibility handling for both Airflow 2.x and 3.0+, ensuring consistent behavior across versions.
63
64
## Capabilities
65
66
### File Transfer Operations
67
68
File transfer operator for copying files between different storage systems with support for local filesystem, cloud storage (S3, GCS, Azure), and other fsspec-compatible storage backends.
69
70
```python { .api }
71
class FileTransferOperator(BaseOperator):
72
def __init__(
73
self,
74
*,
75
src: str | ObjectStoragePath,
76
dst: str | ObjectStoragePath,
77
source_conn_id: str | None = None,
78
dest_conn_id: str | None = None,
79
overwrite: bool = False,
80
**kwargs
81
): ...
82
83
def execute(self, context: Context) -> None: ...
84
def get_openlineage_facets_on_start(self) -> OperatorLineage: ...
85
```
86
87
[File Transfer Operations](./file-transfer.md)
88
89
### XCom Object Storage Backend
90
91
XCom backend that intelligently stores data in object storage or database based on configurable size thresholds, with compression support and automatic cleanup.
92
93
```python { .api }
94
class XComObjectStorageBackend(BaseXCom):
95
@staticmethod
96
def serialize_value(
97
value: T,
98
*,
99
key: str | None = None,
100
task_id: str | None = None,
101
dag_id: str | None = None,
102
run_id: str | None = None,
103
map_index: int | None = None,
104
) -> bytes | str: ...
105
106
@staticmethod
107
def deserialize_value(result) -> Any: ...
108
109
@staticmethod
110
def purge(xcom: XComResult, session: Session | None = None) -> None: ...
111
```
112
113
[XCom Object Storage Backend](./xcom-backend.md)
114
115
### File Asset Handlers
116
117
Asset and dataset handlers for file-based assets with URI validation, asset creation, and OpenLineage conversion for data lineage tracking.
118
119
```python { .api }
120
def create_asset(*, path: str | PosixPath, extra=None) -> Asset: ...
121
def sanitize_uri(uri: SplitResult) -> SplitResult: ...
122
def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset: ...
123
```
124
125
[File Asset Handlers](./asset-handlers.md)
126
127
## Configuration
128
129
The Common IO Provider supports configuration options for XCom object storage behavior:
130
131
### XCom Object Storage Settings
132
133
- `common.io.xcom_objectstorage_path`: Path to object storage location for XComs (e.g., "s3://conn_id@bucket/path")
134
- `common.io.xcom_objectstorage_threshold`: Size threshold in bytes (-1: always database, 0: always object storage, positive: threshold)
135
- `common.io.xcom_objectstorage_compression`: Compression algorithm (gz, bz2, lzma, snappy, zip)
136
137
These settings are configured in `airflow.cfg` or via environment variables following Airflow's configuration patterns.
138
139
## Types
140
141
Core types used throughout the Common IO Provider API:
142
143
```python { .api }
144
# Airflow Context type for task execution
145
from airflow.sdk import Context # Airflow 3.0+
146
# or from airflow.utils.context import Context # Airflow 2.x
147
148
# Object storage path handling
149
if AIRFLOW_V_3_0_PLUS:
150
from airflow.sdk import ObjectStoragePath
151
else:
152
from airflow.io.path import ObjectStoragePath
153
154
# Asset/Dataset types (version-dependent)
155
if AIRFLOW_V_3_0_PLUS:
156
from airflow.sdk.definitions.asset import Asset
157
else:
158
from airflow.datasets import Dataset as Asset
159
160
# XCom result type
161
from airflow.sdk.execution_time.comms import XComResult # Airflow 3.0+
162
# or from airflow.models.xcom import XCom as XComResult # Airflow 2.x
163
164
# Database session type
165
from sqlalchemy.orm import Session
166
167
# Standard library types
168
from pathlib import PosixPath
169
from urllib.parse import SplitResult
170
from typing import Any, TypeVar
171
172
# OpenLineage integration
173
from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset
174
from airflow.providers.openlineage.extractors import OperatorLineage
175
176
# Generic type variable
177
T = TypeVar("T")
178
```