0
# File Transfer Operations
1
2
The FileTransferOperator provides unified file copying capabilities across different storage systems, supporting local filesystems, cloud storage, and any fsspec-compatible storage backend. It handles streaming transfers for large files and integrates with Airflow's templating and OpenLineage systems.
3
4
## Capabilities
5
6
### File Transfer Operator
7
8
Copies files from source to destination with support for different storage systems, connection management, and overwrite protection.
9
10
```python { .api }
11
class FileTransferOperator(BaseOperator):
12
"""
13
Copies a file from a source to a destination.
14
15
This streams the file from the source to the destination if required,
16
so it does not need to fit into memory.
17
"""
18
19
template_fields: Sequence[str] = ("src", "dst")
20
21
def __init__(
22
self,
23
*,
24
src: str | ObjectStoragePath,
25
dst: str | ObjectStoragePath,
26
source_conn_id: str | None = None,
27
dest_conn_id: str | None = None,
28
overwrite: bool = False,
29
**kwargs
30
) -> None:
31
"""
32
Initialize FileTransferOperator.
33
34
Parameters:
35
- src: The source file path or ObjectStoragePath object
36
- dst: The destination file path or ObjectStoragePath object
37
- source_conn_id: The optional source connection id
38
- dest_conn_id: The optional destination connection id
39
- overwrite: Whether to overwrite existing destination files
40
"""
41
42
def execute(self, context: Context) -> None:
43
"""
44
Execute the file transfer operation.
45
46
Parameters:
47
- context: Airflow task execution context
48
49
Raises:
50
- ValueError: If destination exists and overwrite is False
51
"""
52
53
def get_openlineage_facets_on_start(self) -> OperatorLineage:
54
"""
55
Get OpenLineage facets for data lineage tracking.
56
57
Returns:
58
- OperatorLineage: Input and output datasets for lineage
59
"""
60
61
@staticmethod
62
def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> ObjectStoragePath:
63
"""
64
Convert string path to ObjectStoragePath with optional connection.
65
66
Parameters:
67
- path: File path as string or ObjectStoragePath
68
- conn_id: Optional Airflow connection ID
69
70
Returns:
71
- ObjectStoragePath: Resolved path object
72
"""
73
```
74
75
### Usage Examples
76
77
#### Basic File Transfer
78
79
```python
80
from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
81
from airflow import DAG
82
from datetime import datetime
83
84
dag = DAG('file_transfer', start_date=datetime(2023, 1, 1))
85
86
# Simple local file copy
87
local_copy = FileTransferOperator(
88
task_id='copy_local_file',
89
src='/tmp/source.txt',
90
dst='/tmp/destination.txt',
91
overwrite=True,
92
dag=dag
93
)
94
```
95
96
#### Cloud Storage Transfer
97
98
```python
99
# Transfer between S3 buckets
100
s3_transfer = FileTransferOperator(
101
task_id='s3_to_s3_transfer',
102
src='s3://source-bucket/data/file.csv',
103
dst='s3://dest-bucket/processed/file.csv',
104
source_conn_id='aws_default',
105
dest_conn_id='aws_default',
106
overwrite=False,
107
dag=dag
108
)
109
110
# Transfer from local to cloud storage
111
upload_task = FileTransferOperator(
112
task_id='upload_to_gcs',
113
src='/local/path/data.json',
114
dst='gs://my-bucket/data/data.json',
115
dest_conn_id='google_cloud_default',
116
dag=dag
117
)
118
```
119
120
#### Using ObjectStoragePath Objects
121
122
```python
123
from airflow.io.path import ObjectStoragePath
124
125
# Create ObjectStoragePath objects directly
126
source_path = ObjectStoragePath('s3://bucket/source.txt', conn_id='aws_conn')
127
dest_path = ObjectStoragePath('gcs://bucket/dest.txt', conn_id='gcp_conn')
128
129
cross_cloud_transfer = FileTransferOperator(
130
task_id='cross_cloud_transfer',
131
src=source_path,
132
dst=dest_path,
133
dag=dag
134
)
135
```
136
137
#### Template Support
138
139
```python
140
# Use Airflow templating in paths
141
templated_transfer = FileTransferOperator(
142
task_id='templated_transfer',
143
src='s3://bucket/data/{{ ds }}/input.csv',
144
dst='s3://bucket/processed/{{ ds }}/output.csv',
145
source_conn_id='aws_default',
146
dest_conn_id='aws_default',
147
dag=dag
148
)
149
```
150
151
## Error Handling
152
153
The FileTransferOperator handles several error conditions:
154
155
- **Destination exists**: Raises `ValueError` if destination file exists and `overwrite=False`
156
- **Connection errors**: Propagates connection-related exceptions from underlying storage systems
157
- **Permission errors**: Propagates permission-related exceptions from storage backends
158
- **Path validation**: Uses ObjectStoragePath validation for path resolution
159
160
## Integration Features
161
162
### OpenLineage Integration
163
164
The operator automatically provides data lineage information through the `get_openlineage_facets_on_start()` method, creating input and output datasets for lineage tracking systems.
165
166
### Airflow Templating
167
168
The `src` and `dst` fields support Airflow's Jinja2 templating, allowing dynamic path construction using execution context variables like `{{ ds }}`, `{{ task_instance_key_str }}`, etc.
169
170
### Version Compatibility
171
172
The operator works with both Airflow 2.x and 3.0+ through the version compatibility layer, automatically importing the correct BaseOperator and ObjectStoragePath classes.