0
# Task Decorators
1
2
Simplified interfaces for creating SFTP-based tasks using Python decorators. The SFTP provider includes task decorators that enable more readable and maintainable DAG definitions for common SFTP operations, particularly file monitoring scenarios.
3
4
## Capabilities
5
6
### SFTP Sensor Task Decorator
7
8
Task decorator for creating SFTP sensor tasks with simplified syntax and enhanced functionality.
9
10
```python { .api }
11
def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator:
12
"""
13
Wrap a function into an Airflow SFTP sensor operator.
14
15
Creates a decorated task that combines SFTP file monitoring with custom
16
Python processing logic. The decorated function receives files_found
17
in its keyword arguments when files are detected.
18
19
Parameters:
20
- python_callable: Function to decorate and execute when files are found
21
- **kwargs: Additional arguments passed to the underlying SFTPSensor
22
23
Returns:
24
TaskDecorator that creates _DecoratedSFTPSensor instances
25
"""
26
```
27
28
### Decorated SFTP Sensor Class
29
30
Internal implementation class for decorated SFTP sensor tasks.
31
32
```python { .api }
33
class _DecoratedSFTPSensor(SFTPSensor):
34
"""
35
Wraps a Python callable and captures args/kwargs when called for execution.
36
37
Combines SFTP file monitoring capabilities with custom Python processing.
38
Inherits all SFTPSensor functionality while adding decorator-specific
39
handling for Python callable execution.
40
"""
41
42
template_fields: Sequence[str] = ("op_args", "op_kwargs", *SFTPSensor.template_fields)
43
custom_operator_name = "@task.sftp_sensor"
44
shallow_copy_attrs: Sequence[str] = ("python_callable",)
45
46
def __init__(
47
self,
48
*,
49
task_id: str,
50
**kwargs,
51
) -> None:
52
"""
53
Initialize decorated SFTP sensor.
54
55
Parameters:
56
- task_id: Unique task identifier
57
- **kwargs: Arguments passed to parent SFTPSensor class
58
"""
59
```
60
61
## Usage Examples
62
63
### Basic SFTP Sensor Decorator
64
65
```python
66
from airflow import DAG
67
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
68
from datetime import datetime, timedelta
69
70
dag = DAG(
71
'sftp_decorator_basic',
72
start_date=datetime(2023, 1, 1),
73
schedule_interval=timedelta(hours=1)
74
)
75
76
@sftp_sensor_task(
77
path='/remote/data',
78
file_pattern='*.csv',
79
sftp_conn_id='sftp_default',
80
dag=dag
81
)
82
def process_csv_files(files_found, **context):
83
"""Process CSV files when they are found."""
84
print(f"Found {len(files_found)} CSV files: {files_found}")
85
86
# Custom processing logic
87
processed_files = []
88
for file_path in files_found:
89
print(f"Processing file: {file_path}")
90
# Add your file processing logic here
91
processed_files.append(f"processed_{file_path}")
92
93
return {
94
"status": "success",
95
"processed_count": len(processed_files),
96
"processed_files": processed_files
97
}
98
```
99
100
### Advanced File Processing with Decorator
101
102
```python
103
from airflow import DAG
104
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
105
from airflow.providers.sftp.hooks.sftp import SFTPHook
106
from datetime import datetime, timedelta
107
import json
108
109
dag = DAG(
110
'sftp_decorator_advanced',
111
start_date=datetime(2023, 1, 1),
112
schedule_interval=timedelta(days=1)
113
)
114
115
@sftp_sensor_task(
116
path='/remote/daily_exports',
117
file_pattern='export_{{ ds_nodash }}_*.json',
118
newer_than='{{ ds }}T00:00:00',
119
sftp_conn_id='sftp_default',
120
timeout=7200, # 2 hours
121
poke_interval=300, # 5 minutes
122
dag=dag
123
)
124
def validate_and_download_exports(files_found, **context):
125
"""Validate JSON exports and download them for processing."""
126
hook = SFTPHook(ssh_conn_id='sftp_default')
127
128
validated_files = []
129
invalid_files = []
130
131
for file_path in files_found:
132
try:
133
# Get file size for validation
134
file_info = hook.describe_directory(file_path.rsplit('/', 1)[0])
135
filename = file_path.rsplit('/', 1)[1]
136
137
if filename in file_info:
138
file_size = file_info[filename]['size']
139
if file_size > 100: # Minimum size check
140
validated_files.append(file_path)
141
print(f"Valid file: {file_path} ({file_size} bytes)")
142
else:
143
invalid_files.append(file_path)
144
print(f"Invalid file (too small): {file_path} ({file_size} bytes)")
145
146
except Exception as e:
147
print(f"Error validating {file_path}: {e}")
148
invalid_files.append(file_path)
149
150
hook.close_conn()
151
152
return {
153
"valid_files": validated_files,
154
"invalid_files": invalid_files,
155
"validation_summary": {
156
"total_found": len(files_found),
157
"valid_count": len(validated_files),
158
"invalid_count": len(invalid_files)
159
}
160
}
161
```
162
163
### Deferrable Sensor with Custom Processing
164
165
```python
166
from airflow import DAG
167
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
168
from datetime import datetime, timedelta
169
170
dag = DAG(
171
'sftp_decorator_deferrable',
172
start_date=datetime(2023, 1, 1),
173
schedule_interval=timedelta(hours=2),
174
max_active_runs=5
175
)
176
177
@sftp_sensor_task(
178
path='/remote/realtime_data',
179
file_pattern='sensor_data_*.parquet',
180
newer_than='{{ ts }}', # Only files newer than task execution time
181
sftp_conn_id='sftp_realtime',
182
deferrable=True, # Use async trigger for resource efficiency
183
timeout=3600,
184
dag=dag
185
)
186
def process_sensor_data(files_found, **context):
187
"""Process real-time sensor data files."""
188
execution_date = context['ds']
189
task_instance = context['task_instance']
190
191
print(f"Processing sensor data for {execution_date}")
192
print(f"Found {len(files_found)} files: {files_found}")
193
194
# Simulate processing logic
195
processing_results = []
196
for file_path in files_found:
197
# Extract timestamp from filename
198
filename = file_path.split('/')[-1]
199
if 'sensor_data_' in filename:
200
timestamp = filename.replace('sensor_data_', '').replace('.parquet', '')
201
processing_results.append({
202
"file": file_path,
203
"timestamp": timestamp,
204
"status": "processed"
205
})
206
207
# Push results to XCom for downstream tasks
208
task_instance.xcom_push(key='processing_results', value=processing_results)
209
210
return {
211
"execution_date": execution_date,
212
"files_processed": len(processing_results),
213
"processing_results": processing_results
214
}
215
```
216
217
### Multiple Pattern Monitoring with Decorator
218
219
```python
220
from airflow import DAG
221
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
222
from datetime import datetime, timedelta
223
224
dag = DAG(
225
'sftp_decorator_patterns',
226
start_date=datetime(2023, 1, 1),
227
schedule_interval=timedelta(hours=4)
228
)
229
230
@sftp_sensor_task(
231
path='/remote/mixed_data',
232
file_pattern='*', # Monitor all files
233
sftp_conn_id='sftp_default',
234
dag=dag
235
)
236
def categorize_files(files_found, **context):
237
"""Categorize found files by type and process accordingly."""
238
239
categorized = {
240
'csv_files': [],
241
'json_files': [],
242
'xml_files': [],
243
'other_files': []
244
}
245
246
for file_path in files_found:
247
filename = file_path.lower()
248
if filename.endswith('.csv'):
249
categorized['csv_files'].append(file_path)
250
elif filename.endswith('.json'):
251
categorized['json_files'].append(file_path)
252
elif filename.endswith('.xml'):
253
categorized['xml_files'].append(file_path)
254
else:
255
categorized['other_files'].append(file_path)
256
257
# Log categorization results
258
for category, files in categorized.items():
259
if files:
260
print(f"{category}: {len(files)} files")
261
for file in files:
262
print(f" - {file}")
263
264
return categorized
265
```
266
267
### Error Handling in Decorated Tasks
268
269
```python
270
from airflow import DAG
271
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
272
from airflow.exceptions import AirflowException
273
from datetime import datetime, timedelta
274
275
dag = DAG(
276
'sftp_decorator_error_handling',
277
start_date=datetime(2023, 1, 1),
278
schedule_interval=timedelta(hours=6)
279
)
280
281
@sftp_sensor_task(
282
path='/remote/critical_files',
283
file_pattern='critical_*.txt',
284
newer_than='{{ ds }}T06:00:00',
285
sftp_conn_id='sftp_critical',
286
timeout=7200,
287
retries=2,
288
retry_delay=timedelta(minutes=15),
289
dag=dag
290
)
291
def process_critical_files(files_found, **context):
292
"""Process critical files with comprehensive error handling."""
293
294
if not files_found:
295
raise AirflowException("No critical files found - this should not happen")
296
297
try:
298
processed_files = []
299
failed_files = []
300
301
for file_path in files_found:
302
try:
303
# Simulate file processing
304
print(f"Processing critical file: {file_path}")
305
306
# Add your critical file processing logic here
307
# For example: data validation, format checking, etc.
308
309
# Simulate processing success/failure
310
if "invalid" not in file_path.lower():
311
processed_files.append(file_path)
312
print(f"Successfully processed: {file_path}")
313
else:
314
failed_files.append(file_path)
315
print(f"Processing failed: {file_path}")
316
317
except Exception as e:
318
failed_files.append(file_path)
319
print(f"Error processing {file_path}: {e}")
320
321
# Check if any critical files failed
322
if failed_files:
323
error_msg = f"Failed to process {len(failed_files)} critical files: {failed_files}"
324
print(error_msg)
325
# Decide whether to fail the task or just warn
326
if len(failed_files) > len(processed_files):
327
raise AirflowException(error_msg)
328
329
return {
330
"total_files": len(files_found),
331
"processed_files": processed_files,
332
"failed_files": failed_files,
333
"success_rate": len(processed_files) / len(files_found) * 100
334
}
335
336
except Exception as e:
337
print(f"Critical error in file processing: {e}")
338
raise AirflowException(f"Critical file processing failed: {e}")
339
```
340
341
### Integration with Downstream Tasks
342
343
```python
344
from airflow import DAG
345
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
346
from airflow.operators.python import PythonOperator
347
from datetime import datetime, timedelta
348
349
def process_downstream(**context):
350
"""Process results from decorated SFTP sensor."""
351
# Pull results from the decorated sensor
352
sensor_results = context['task_instance'].xcom_pull(task_ids='monitor_data_files')
353
354
print(f"Received sensor results: {sensor_results}")
355
356
if sensor_results and 'files_found' in sensor_results:
357
files = sensor_results['files_found']
358
print(f"Processing {len(files)} files downstream")
359
360
# Add downstream processing logic
361
for file_path in files:
362
print(f"Downstream processing: {file_path}")
363
364
return "Downstream processing complete"
365
366
dag = DAG(
367
'sftp_decorator_integration',
368
start_date=datetime(2023, 1, 1),
369
schedule_interval=timedelta(hours=3)
370
)
371
372
# Decorated sensor task
373
@sftp_sensor_task(
374
task_id='monitor_data_files',
375
path='/remote/data_pipeline',
376
file_pattern='pipeline_*.csv',
377
sftp_conn_id='sftp_default',
378
dag=dag
379
)
380
def monitor_and_validate(files_found, **context):
381
"""Monitor files and perform initial validation."""
382
validated_files = []
383
384
for file_path in files_found:
385
# Perform validation logic
386
if file_path.endswith('.csv'):
387
validated_files.append(file_path)
388
print(f"Validated: {file_path}")
389
390
return {
391
"files_found": files_found,
392
"validated_files": validated_files,
393
"validation_count": len(validated_files)
394
}
395
396
# Downstream processing task
397
downstream_task = PythonOperator(
398
task_id='downstream_processing',
399
python_callable=process_downstream,
400
dag=dag
401
)
402
403
# Set up task dependencies
404
monitor_and_validate >> downstream_task
405
```
406
407
### Template Usage in Decorators
408
409
```python
410
from airflow import DAG
411
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
412
from datetime import datetime, timedelta
413
414
dag = DAG(
415
'sftp_decorator_templating',
416
start_date=datetime(2023, 1, 1),
417
schedule_interval=timedelta(days=1)
418
)
419
420
@sftp_sensor_task(
421
path='/remote/daily/{{ ds }}', # Templated path
422
file_pattern='data_{{ ds_nodash }}_*.json', # Templated pattern
423
newer_than='{{ ds }}T05:00:00', # Templated time
424
sftp_conn_id='sftp_default',
425
timeout=14400,
426
dag=dag
427
)
428
def process_daily_data(files_found, **context):
429
"""Process daily data files using Airflow templating."""
430
execution_date = context['ds']
431
formatted_date = context['ds_nodash']
432
433
print(f"Processing daily data for {execution_date}")
434
print(f"Looking for pattern: data_{formatted_date}_*.json")
435
print(f"Found {len(files_found)} files")
436
437
daily_summary = {
438
"execution_date": execution_date,
439
"formatted_date": formatted_date,
440
"files_found": files_found,
441
"file_count": len(files_found)
442
}
443
444
# Process each file
445
for file_path in files_found:
446
print(f"Processing daily file: {file_path}")
447
# Add daily file processing logic
448
449
return daily_summary
450
```
451
452
## Decorator Benefits
453
454
### Simplified Syntax
455
456
- Combines sensor logic with custom processing in a single function
457
- Reduces boilerplate code compared to separate sensor and processing tasks
458
- Provides cleaner DAG definitions with decorator syntax
459
- Enables direct access to found files in the decorated function
460
461
### Enhanced Functionality
462
463
- Automatic handling of `files_found` parameter injection
464
- Seamless integration with Airflow's templating system
465
- Built-in XCom handling for downstream task communication
466
- Support for all SFTPSensor parameters and configurations
467
468
### Improved Maintainability
469
470
- Co-locates sensor configuration with processing logic
471
- Reduces task dependencies and complex XCom passing
472
- Provides clear function signatures for custom processing
473
- Enables better code organization and reusability
474
475
## Best Practices
476
477
### Function Design
478
479
- Keep decorated functions focused on processing found files
480
- Use descriptive function names that indicate the processing purpose
481
- Document function parameters and return values clearly
482
- Handle edge cases like empty file lists gracefully
483
484
### Error Handling
485
486
- Implement proper exception handling within decorated functions
487
- Use Airflow exceptions for task failures that should stop the pipeline
488
- Log processing steps for debugging and monitoring
489
- Consider partial failure scenarios for batch file processing
490
491
### Performance Considerations
492
493
- Avoid heavy processing within the decorated function for large file sets
494
- Consider using the decorator for coordination and separate tasks for processing
495
- Use appropriate timeout values for sensor configuration
496
- Monitor memory usage when processing file metadata
497
498
### Integration Patterns
499
500
- Use return values to pass results to downstream tasks via XCom
501
- Implement consistent return value structures across decorated tasks
502
- Consider using the decorator for validation and coordination logic
503
- Combine with other Airflow operators for complex workflows