0
# File Transfer Operations
1
2
Operator classes for performing file uploads, downloads, and transfers between local and remote FTP servers with Airflow integration, templating support, and OpenLineage compatibility for data lineage tracking.
3
4
## Capabilities
5
6
### FTP Operation Constants
7
8
Constants defining supported file transfer operations.
9
10
```python { .api }
11
class FTPOperation:
12
"""Operation types for FTP file transfers."""
13
14
PUT = "put" # Upload files to remote server
15
GET = "get" # Download files from remote server
16
```
17
18
### FTP File Transfer Operator
19
20
Primary operator for transferring files between local filesystem and FTP servers with support for single files or batch operations.
21
22
```python { .api }
23
class FTPFileTransmitOperator(BaseOperator):
24
"""
25
Transfer files between local and remote FTP locations.
26
27
Template Fields: ("local_filepath", "remote_filepath")
28
29
Parameters:
30
- ftp_conn_id (str): FTP connection ID (default: "ftp_default")
31
- local_filepath (str | list[str]): Local file path(s)
32
- remote_filepath (str | list[str]): Remote file path(s)
33
- operation (str): Transfer direction - FTPOperation.PUT or FTPOperation.GET
34
- create_intermediate_dirs (bool): Create missing directories (default: False)
35
"""
36
37
template_fields: Sequence[str] = ("local_filepath", "remote_filepath")
38
39
def __init__(
40
self,
41
*,
42
ftp_conn_id: str = "ftp_default",
43
local_filepath: str | list[str],
44
remote_filepath: str | list[str],
45
operation: str = FTPOperation.PUT,
46
create_intermediate_dirs: bool = False,
47
**kwargs
48
) -> None: ...
49
50
def execute(self, context: Any) -> str | list[str] | None:
51
"""
52
Execute file transfer operation.
53
54
Parameters:
55
- context (Any): Airflow task context
56
57
Returns:
58
str | list[str] | None: Local filepath(s) after operation
59
"""
60
61
def get_openlineage_facets_on_start(self):
62
"""
63
Return OpenLineage datasets for data lineage tracking.
64
65
Returns:
66
OperatorLineage: Input and output datasets for lineage
67
"""
68
69
@cached_property
70
def hook(self) -> FTPHook:
71
"""
72
Create and return FTPHook instance.
73
74
Returns:
75
FTPHook: Configured FTP hook
76
"""
77
```
78
79
### FTPS File Transfer Operator
80
81
Secure file transfer operator using FTPS (FTP over SSL/TLS) for encrypted file transfers.
82
83
```python { .api }
84
class FTPSFileTransmitOperator(FTPFileTransmitOperator):
85
"""
86
Transfer files using FTPS (FTP over SSL/TLS) for encrypted transfers.
87
88
Inherits all FTPFileTransmitOperator functionality with SSL/TLS encryption.
89
"""
90
91
@cached_property
92
def hook(self) -> FTPSHook:
93
"""
94
Create and return FTPSHook instance.
95
96
Returns:
97
FTPSHook: Configured FTPS hook with SSL/TLS support
98
"""
99
```
100
101
## Usage Examples
102
103
### Single File Upload
104
105
```python
106
from airflow import DAG
107
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
108
from datetime import datetime
109
110
dag = DAG('ftp_upload_example', start_date=datetime(2023, 1, 1))
111
112
upload_task = FTPFileTransmitOperator(
113
task_id='upload_data_file',
114
ftp_conn_id='my_ftp_connection',
115
local_filepath='/local/data/output.csv',
116
remote_filepath='/remote/uploads/output.csv',
117
operation=FTPOperation.PUT,
118
create_intermediate_dirs=True, # Create /remote/uploads/ if it doesn't exist
119
dag=dag
120
)
121
```
122
123
### Single File Download
124
125
```python
126
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
127
128
download_task = FTPFileTransmitOperator(
129
task_id='download_data_file',
130
ftp_conn_id='my_ftp_connection',
131
local_filepath='/local/data/input.csv',
132
remote_filepath='/remote/data/input.csv',
133
operation=FTPOperation.GET,
134
create_intermediate_dirs=True, # Create /local/data/ if it doesn't exist
135
dag=dag
136
)
137
```
138
139
### Batch File Operations
140
141
```python
142
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
143
144
# Upload multiple files in one operation
145
batch_upload = FTPFileTransmitOperator(
146
task_id='batch_upload_files',
147
ftp_conn_id='my_ftp_connection',
148
local_filepath=[
149
'/local/data/file1.csv',
150
'/local/data/file2.csv',
151
'/local/data/file3.csv'
152
],
153
remote_filepath=[
154
'/remote/uploads/file1.csv',
155
'/remote/uploads/file2.csv',
156
'/remote/uploads/file3.csv'
157
],
158
operation=FTPOperation.PUT,
159
create_intermediate_dirs=True,
160
dag=dag
161
)
162
163
# Download multiple files in one operation
164
batch_download = FTPFileTransmitOperator(
165
task_id='batch_download_files',
166
ftp_conn_id='my_ftp_connection',
167
local_filepath=[
168
'/local/downloads/report1.pdf',
169
'/local/downloads/report2.pdf'
170
],
171
remote_filepath=[
172
'/remote/reports/report1.pdf',
173
'/remote/reports/report2.pdf'
174
],
175
operation=FTPOperation.GET,
176
create_intermediate_dirs=True,
177
dag=dag
178
)
179
```
180
181
### Secure File Transfer with FTPS
182
183
```python
184
from airflow.providers.ftp.operators.ftp import FTPSFileTransmitOperator, FTPOperation
185
186
secure_upload = FTPSFileTransmitOperator(
187
task_id='secure_upload',
188
ftp_conn_id='my_secure_ftp_connection', # Connection configured for FTPS
189
local_filepath='/local/sensitive/data.xml',
190
remote_filepath='/remote/secure/data.xml',
191
operation=FTPOperation.PUT,
192
create_intermediate_dirs=True,
193
dag=dag
194
)
195
```
196
197
### Template Usage with Airflow Variables
198
199
```python
200
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
201
202
# Using templated file paths with Airflow variables and macros
203
templated_transfer = FTPFileTransmitOperator(
204
task_id='templated_transfer',
205
ftp_conn_id='my_ftp_connection',
206
local_filepath='/local/data/{{ ds }}/report.csv', # Uses execution date
207
remote_filepath='/remote/reports/{{ ds }}/report.csv',
208
operation=FTPOperation.PUT,
209
create_intermediate_dirs=True,
210
dag=dag
211
)
212
```
213
214
### Complete ETL Pipeline Example
215
216
```python
217
from airflow import DAG
218
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
219
from airflow.providers.ftp.sensors.ftp import FTPSensor
220
from airflow.operators.python import PythonOperator
221
from datetime import datetime, timedelta
222
223
def process_data():
224
# Data processing logic here
225
print("Processing downloaded data...")
226
return "Data processed successfully"
227
228
dag = DAG(
229
'ftp_etl_pipeline',
230
start_date=datetime(2023, 1, 1),
231
schedule_interval=timedelta(days=1),
232
catchup=False
233
)
234
235
# Wait for input file to arrive
236
wait_for_input = FTPSensor(
237
task_id='wait_for_input_file',
238
path='/remote/input/{{ ds }}/data.csv',
239
ftp_conn_id='source_ftp',
240
poke_interval=300, # Check every 5 minutes
241
timeout=3600, # Timeout after 1 hour
242
dag=dag
243
)
244
245
# Download input file
246
download_input = FTPFileTransmitOperator(
247
task_id='download_input_file',
248
ftp_conn_id='source_ftp',
249
local_filepath='/local/staging/{{ ds }}/input.csv',
250
remote_filepath='/remote/input/{{ ds }}/data.csv',
251
operation=FTPOperation.GET,
252
create_intermediate_dirs=True,
253
dag=dag
254
)
255
256
# Process the data
257
process_task = PythonOperator(
258
task_id='process_data',
259
python_callable=process_data,
260
dag=dag
261
)
262
263
# Upload processed results
264
upload_results = FTPFileTransmitOperator(
265
task_id='upload_results',
266
ftp_conn_id='destination_ftp',
267
local_filepath='/local/output/{{ ds }}/processed_data.csv',
268
remote_filepath='/remote/output/{{ ds }}/processed_data.csv',
269
operation=FTPOperation.PUT,
270
create_intermediate_dirs=True,
271
dag=dag
272
)
273
274
# Define task dependencies
275
wait_for_input >> download_input >> process_task >> upload_results
276
```
277
278
## Error Handling
279
280
The operators handle various error conditions:
281
282
- **ValueError**: Raised when local and remote filepath arrays have different lengths
283
- **TypeError**: Raised for unsupported operation values (not GET or PUT)
284
- **FTP Errors**: Propagated from underlying FTPHook operations
285
- **File System Errors**: Raised when local directories cannot be created or accessed
286
287
## OpenLineage Integration
288
289
When OpenLineage providers are available, the operators automatically generate data lineage information:
290
291
- **Input Datasets**: Source file locations (local for PUT, remote for GET)
292
- **Output Datasets**: Destination file locations (remote for PUT, local for GET)
293
- **Namespace Format**: `file://hostname:port` for proper dataset identification
294
295
This enables comprehensive data lineage tracking across FTP file transfer operations within your data pipelines.