0
# File Transfer Operations
1
2
Task execution components for uploading and downloading files between local and remote SFTP locations. The SFTP operator provides robust file transfer capabilities with support for batch operations, intermediate directory creation, and comprehensive error handling.
3
4
## Capabilities
5
6
### SFTP Operation Constants
7
8
Operation type constants for specifying the direction of file transfers.
9
10
```python { .api }
11
class SFTPOperation:
12
"""Operation that can be used with SFTP."""
13
14
PUT = "put" # Upload operation from local to remote
15
GET = "get" # Download operation from remote to local
16
```
17
18
### SFTP Operator
19
20
Main operator for transferring files between local and remote SFTP locations.
21
22
```python { .api }
23
class SFTPOperator(BaseOperator):
24
"""
25
SFTPOperator for transferring files from remote host to local or vice versa.
26
27
This operator uses SFTP hook to open SFTP transport channel that serves as
28
basis for file transfer operations. Supports both single file and batch
29
file transfers with comprehensive configuration options.
30
"""
31
32
template_fields: Sequence[str] = ("local_filepath", "remote_filepath", "remote_host")
33
34
def __init__(
35
self,
36
*,
37
ssh_hook: SSHHook | None = None,
38
sftp_hook: SFTPHook | None = None,
39
ssh_conn_id: str | None = None,
40
remote_host: str | None = None,
41
local_filepath: str | list[str],
42
remote_filepath: str | list[str],
43
operation: str = SFTPOperation.PUT,
44
confirm: bool = True,
45
create_intermediate_dirs: bool = False,
46
**kwargs,
47
) -> None:
48
"""
49
Initialize SFTP operator.
50
51
Parameters:
52
- ssh_conn_id: SSH connection ID from Airflow connections
53
- sftp_hook: Predefined SFTPHook to use (preferred over ssh_conn_id)
54
- ssh_hook: Deprecated - predefined SSHHook to use (use sftp_hook instead)
55
- remote_host: Remote host to connect (templated)
56
- local_filepath: Local file path or list of paths to get or put (templated)
57
- remote_filepath: Remote file path or list of paths to get or put (templated)
58
- operation: Specify operation 'get' or 'put' (default: put)
59
- confirm: Specify if SFTP operation should be confirmed (default: True)
60
- create_intermediate_dirs: Create missing intermediate directories (default: False)
61
"""
62
63
def execute(self, context: Any) -> str | list[str] | None:
64
"""Execute the file transfer operation."""
65
66
def get_openlineage_facets_on_start(self):
67
"""Return OpenLineage datasets for lineage tracking."""
68
```
69
70
### File Transfer Execution
71
72
```python { .api }
73
def execute(self, context: Any) -> str | list[str] | None:
74
"""
75
Execute the file transfer operation (PUT or GET).
76
77
Validates file path arrays, establishes SFTP connection, and performs
78
the specified transfer operation with optional intermediate directory creation.
79
80
Parameters:
81
- context: Airflow task execution context
82
83
Returns:
84
Filepath or list of filepaths that were transferred, or None
85
86
Raises:
87
ValueError: If local_filepath and remote_filepath arrays have different lengths
88
TypeError: If operation is not 'get' or 'put'
89
AirflowException: If both ssh_hook and sftp_hook are defined, or transfer fails
90
"""
91
```
92
93
### Data Lineage Integration
94
95
```python { .api }
96
def get_openlineage_facets_on_start(self):
97
"""
98
Return OpenLineage datasets for data lineage tracking.
99
100
Creates dataset facets for both local and remote file locations
101
to enable lineage tracking in OpenLineage-compatible systems.
102
103
Returns:
104
OpenLineage facets containing input and output datasets
105
"""
106
```
107
108
## Usage Examples
109
110
### Basic File Upload
111
112
```python
113
from airflow import DAG
114
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
115
from datetime import datetime
116
117
dag = DAG(
118
'sftp_upload_example',
119
start_date=datetime(2023, 1, 1),
120
schedule_interval=None
121
)
122
123
upload_task = SFTPOperator(
124
task_id='upload_file',
125
ssh_conn_id='sftp_default',
126
local_filepath='/local/data/report.csv',
127
remote_filepath='/remote/uploads/report.csv',
128
operation=SFTPOperation.PUT,
129
dag=dag
130
)
131
```
132
133
### Basic File Download
134
135
```python
136
from airflow import DAG
137
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
138
from datetime import datetime
139
140
dag = DAG(
141
'sftp_download_example',
142
start_date=datetime(2023, 1, 1),
143
schedule_interval=None
144
)
145
146
download_task = SFTPOperator(
147
task_id='download_file',
148
ssh_conn_id='sftp_default',
149
local_filepath='/local/downloads/data.csv',
150
remote_filepath='/remote/exports/data.csv',
151
operation=SFTPOperation.GET,
152
dag=dag
153
)
154
```
155
156
### Batch File Transfer
157
158
```python
159
from airflow import DAG
160
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
161
from datetime import datetime, timedelta
162
163
dag = DAG(
164
'sftp_batch_example',
165
start_date=datetime(2023, 1, 1),
166
schedule_interval=timedelta(days=1)
167
)
168
169
# Upload multiple files
170
batch_upload = SFTPOperator(
171
task_id='batch_upload',
172
ssh_conn_id='sftp_default',
173
local_filepath=[
174
'/local/data/file1.csv',
175
'/local/data/file2.csv',
176
'/local/data/file3.csv'
177
],
178
remote_filepath=[
179
'/remote/uploads/file1.csv',
180
'/remote/uploads/file2.csv',
181
'/remote/uploads/file3.csv'
182
],
183
operation=SFTPOperation.PUT,
184
create_intermediate_dirs=True, # Create /remote/uploads/ if it doesn't exist
185
dag=dag
186
)
187
188
# Download multiple files
189
batch_download = SFTPOperator(
190
task_id='batch_download',
191
ssh_conn_id='sftp_default',
192
local_filepath=[
193
'/local/downloads/result1.json',
194
'/local/downloads/result2.json'
195
],
196
remote_filepath=[
197
'/remote/results/result1.json',
198
'/remote/results/result2.json'
199
],
200
operation=SFTPOperation.GET,
201
dag=dag
202
)
203
```
204
205
### Advanced Configuration
206
207
```python
208
from airflow import DAG
209
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
210
from airflow.providers.sftp.hooks.sftp import SFTPHook
211
from datetime import datetime
212
213
dag = DAG(
214
'sftp_advanced_example',
215
start_date=datetime(2023, 1, 1),
216
schedule_interval=None
217
)
218
219
# Using predefined hook for custom configuration
220
custom_hook = SFTPHook(ssh_conn_id='sftp_custom')
221
222
advanced_transfer = SFTPOperator(
223
task_id='advanced_transfer',
224
sftp_hook=custom_hook, # Use predefined hook
225
remote_host='custom.sftp.server.com', # Override connection host
226
local_filepath='/local/data/{{ ds }}/report.csv', # Templated path
227
remote_filepath='/remote/daily/{{ ds }}/report.csv', # Templated path
228
operation=SFTPOperation.PUT,
229
confirm=True, # Confirm successful transfer
230
create_intermediate_dirs=True, # Create date-based directories
231
dag=dag
232
)
233
```
234
235
### Error Handling and Retries
236
237
```python
238
from airflow import DAG
239
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
240
from datetime import datetime, timedelta
241
242
default_args = {
243
'retries': 3,
244
'retry_delay': timedelta(minutes=5),
245
'retry_exponential_backoff': True,
246
'max_retry_delay': timedelta(minutes=30)
247
}
248
249
dag = DAG(
250
'sftp_resilient_example',
251
default_args=default_args,
252
start_date=datetime(2023, 1, 1),
253
schedule_interval=timedelta(hours=6)
254
)
255
256
resilient_transfer = SFTPOperator(
257
task_id='resilient_transfer',
258
ssh_conn_id='sftp_prod',
259
local_filepath='/local/critical/data.parquet',
260
remote_filepath='/remote/warehouse/data.parquet',
261
operation=SFTPOperation.PUT,
262
create_intermediate_dirs=True,
263
# Operator will automatically retry on connection or transfer failures
264
dag=dag
265
)
266
```
267
268
### Integration with Sensors
269
270
```python
271
from airflow import DAG
272
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
273
from airflow.providers.sftp.sensors.sftp import SFTPSensor
274
from datetime import datetime, timedelta
275
276
dag = DAG(
277
'sftp_sensor_integration',
278
start_date=datetime(2023, 1, 1),
279
schedule_interval=timedelta(hours=1)
280
)
281
282
# Wait for source file to appear
283
wait_for_file = SFTPSensor(
284
task_id='wait_for_source',
285
path='/remote/incoming/data.csv',
286
sftp_conn_id='sftp_source',
287
timeout=3600, # Wait up to 1 hour
288
poke_interval=300, # Check every 5 minutes
289
dag=dag
290
)
291
292
# Download the file once it's available
293
download_file = SFTPOperator(
294
task_id='download_data',
295
ssh_conn_id='sftp_source',
296
local_filepath='/local/staging/data.csv',
297
remote_filepath='/remote/incoming/data.csv',
298
operation=SFTPOperation.GET,
299
dag=dag
300
)
301
302
# Upload processed file to different server
303
upload_processed = SFTPOperator(
304
task_id='upload_processed',
305
ssh_conn_id='sftp_destination',
306
local_filepath='/local/processed/data.csv',
307
remote_filepath='/remote/processed/data.csv',
308
operation=SFTPOperation.PUT,
309
create_intermediate_dirs=True,
310
dag=dag
311
)
312
313
wait_for_file >> download_file >> upload_processed
314
```
315
316
### Dynamic File Paths with Templating
317
318
```python
319
from airflow import DAG
320
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
321
from datetime import datetime, timedelta
322
323
dag = DAG(
324
'sftp_templating_example',
325
start_date=datetime(2023, 1, 1),
326
schedule_interval=timedelta(days=1)
327
)
328
329
templated_transfer = SFTPOperator(
330
task_id='templated_transfer',
331
ssh_conn_id='sftp_default',
332
# Use Airflow templating for dynamic paths
333
local_filepath='/local/data/{{ ds }}/export_{{ ts_nodash }}.csv',
334
remote_filepath='/remote/daily/{{ ds }}/export_{{ ts_nodash }}.csv',
335
operation=SFTPOperation.PUT,
336
create_intermediate_dirs=True,
337
dag=dag
338
)
339
```
340
341
## Best Practices
342
343
### Connection Management
344
345
- Use connection pooling through Airflow's connection management
346
- Prefer `sftp_hook` parameter over deprecated `ssh_hook`
347
- Use connection IDs consistently across tasks
348
- Configure timeouts appropriately for large file transfers
349
350
### File Path Handling
351
352
- Use absolute paths for both local and remote file paths
353
- Enable `create_intermediate_dirs=True` for dynamic directory structures
354
- Validate file path arrays have matching lengths for batch operations
355
- Use templating for date-based or dynamic file naming
356
357
### Error Handling
358
359
- Configure appropriate retry policies for network-dependent operations
360
- Use `confirm=True` to verify successful transfers
361
- Implement downstream validation of transferred files
362
- Monitor transfer logs for performance optimization opportunities
363
364
### Performance Optimization
365
366
- Use batch operations for multiple files instead of individual tasks
367
- Consider file size limitations for single transfers
368
- Implement prefetching controls for large file downloads
369
- Use async hooks for I/O intensive workflows where appropriate