0
# Asynchronous Triggers
1
2
Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations. The SFTP trigger provides asynchronous file system monitoring capabilities for high-performance workflows.
3
4
## Capabilities
5
6
### SFTP Trigger
7
8
Asynchronous trigger for deferrable SFTP file monitoring operations.
9
10
```python { .api }
11
class SFTPTrigger(BaseTrigger):
12
"""
13
SFTPTrigger that fires when file conditions are met on SFTP server.
14
15
Provides asynchronous monitoring of SFTP locations for file presence,
16
pattern matching, and modification time conditions. Designed for use
17
with deferrable sensors to optimize resource utilization.
18
"""
19
20
def __init__(
21
self,
22
path: str,
23
file_pattern: str = "",
24
sftp_conn_id: str = "sftp_default",
25
newer_than: datetime | str | None = None,
26
poke_interval: float = 5,
27
) -> None:
28
"""
29
Initialize SFTP trigger.
30
31
Parameters:
32
- path: Path on SFTP server to search for files
33
- file_pattern: Pattern to match against file list using fnmatch
34
- sftp_conn_id: SFTP connection ID for connecting to server
35
- newer_than: DateTime threshold for file modification time filtering
36
- poke_interval: How often, in seconds, to check for file existence
37
"""
38
39
def serialize(self) -> tuple[str, dict[str, Any]]:
40
"""Serialize SFTPTrigger arguments and classpath."""
41
42
async def run(self) -> AsyncIterator[TriggerEvent]:
43
"""Make asynchronous calls to SFTP server and yield trigger events."""
44
```
45
46
### Trigger Serialization
47
48
```python { .api }
49
def serialize(self) -> tuple[str, dict[str, Any]]:
50
"""
51
Serialize SFTPTrigger arguments and classpath.
52
53
Required for trigger persistence and recovery across Airflow restarts.
54
Returns the trigger class path and initialization parameters.
55
56
Returns:
57
Tuple containing:
58
- Class path string for trigger reconstruction
59
- Dictionary of initialization parameters
60
"""
61
```
62
63
### Asynchronous Monitoring
64
65
```python { .api }
66
async def run(self) -> AsyncIterator[TriggerEvent]:
67
"""
68
Make asynchronous calls to SFTP server and yield trigger events.
69
70
Continuously monitors SFTP server for file conditions using SFTPHookAsync.
71
Handles different monitoring scenarios:
72
- Direct file path monitoring when no pattern is specified
73
- Pattern-based file matching when file_pattern is provided
74
- Modification time filtering when newer_than is specified
75
76
Yields:
77
TriggerEvent objects indicating success/failure and found files
78
79
Raises:
80
AirflowException: For connection failures or configuration errors
81
"""
82
```
83
84
## Usage Examples
85
86
### Basic Deferrable File Monitoring
87
88
```python
89
from airflow import DAG
90
from airflow.providers.sftp.sensors.sftp import SFTPSensor
91
from datetime import datetime, timedelta
92
93
dag = DAG(
94
'sftp_deferrable_basic',
95
start_date=datetime(2023, 1, 1),
96
schedule_interval=timedelta(hours=1)
97
)
98
99
# Deferrable sensor automatically uses SFTPTrigger
100
deferrable_sensor = SFTPSensor(
101
task_id='wait_for_file',
102
path='/remote/data/important_file.csv',
103
sftp_conn_id='sftp_default',
104
deferrable=True, # Automatically uses SFTPTrigger
105
timeout=3600, # 1 hour timeout
106
dag=dag
107
)
108
```
109
110
### Pattern-Based Monitoring with Triggers
111
112
```python
113
from airflow import DAG
114
from airflow.providers.sftp.sensors.sftp import SFTPSensor
115
from datetime import datetime, timedelta
116
117
dag = DAG(
118
'sftp_deferrable_pattern',
119
start_date=datetime(2023, 1, 1),
120
schedule_interval=timedelta(days=1)
121
)
122
123
# Monitor for pattern-matched files asynchronously
124
pattern_sensor = SFTPSensor(
125
task_id='wait_for_daily_files',
126
path='/remote/daily_exports',
127
file_pattern='export_{{ ds_nodash }}_*.json',
128
sftp_conn_id='sftp_default',
129
deferrable=True, # Uses SFTPTrigger internally
130
timeout=14400, # 4 hour timeout
131
dag=dag
132
)
133
```
134
135
### Custom Trigger Usage (Advanced)
136
137
```python
138
from airflow import DAG
139
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
140
from airflow.sensors.base import BaseSensorOperator
141
from airflow.triggers.base import TriggerEvent
142
from datetime import datetime, timedelta
143
144
class CustomSFTPSensor(BaseSensorOperator):
145
"""Custom sensor using SFTPTrigger directly."""
146
147
def __init__(self, sftp_path, sftp_conn_id='sftp_default', **kwargs):
148
super().__init__(**kwargs)
149
self.sftp_path = sftp_path
150
self.sftp_conn_id = sftp_conn_id
151
152
def execute(self, context):
153
"""Defer to custom trigger configuration."""
154
self.defer(
155
trigger=SFTPTrigger(
156
path=self.sftp_path,
157
file_pattern="*.csv",
158
sftp_conn_id=self.sftp_conn_id,
159
poke_interval=10.0, # Custom interval
160
),
161
method_name="execute_complete"
162
)
163
164
def execute_complete(self, context, event=None):
165
"""Handle trigger completion."""
166
if event["status"] == "success":
167
self.log.info(f"Files found: {event['files']}")
168
return event["files"]
169
else:
170
raise Exception(f"Trigger failed: {event}")
171
172
dag = DAG(
173
'custom_sftp_trigger',
174
start_date=datetime(2023, 1, 1),
175
schedule_interval=timedelta(hours=2)
176
)
177
178
custom_sensor = CustomSFTPSensor(
179
task_id='custom_sftp_monitor',
180
sftp_path='/remote/custom_data',
181
sftp_conn_id='sftp_custom',
182
dag=dag
183
)
184
```
185
186
### High-Frequency Monitoring
187
188
```python
189
from airflow import DAG
190
from airflow.providers.sftp.sensors.sftp import SFTPSensor
191
from datetime import datetime, timedelta
192
193
dag = DAG(
194
'sftp_high_frequency',
195
start_date=datetime(2023, 1, 1),
196
schedule_interval=timedelta(minutes=15),
197
max_active_runs=5 # Allow multiple concurrent runs
198
)
199
200
# High-frequency monitoring with short poke intervals
201
high_freq_sensor = SFTPSensor(
202
task_id='high_freq_monitor',
203
path='/remote/realtime_data',
204
file_pattern='realtime_*.json',
205
newer_than='{{ ts }}', # Only files newer than task start
206
sftp_conn_id='sftp_realtime',
207
deferrable=True,
208
timeout=900, # 15 minutes timeout
209
# Note: poke_interval is configured in the trigger
210
dag=dag
211
)
212
```
213
214
### Multiple File Pattern Monitoring
215
216
```python
217
from airflow import DAG
218
from airflow.providers.sftp.sensors.sftp import SFTPSensor
219
from airflow.operators.python import PythonOperator
220
from datetime import datetime, timedelta
221
222
def process_multiple_patterns(**context):
223
"""Process results from multiple pattern sensors."""
224
csv_files = context['task_instance'].xcom_pull(task_ids='wait_for_csv')
225
json_files = context['task_instance'].xcom_pull(task_ids='wait_for_json')
226
xml_files = context['task_instance'].xcom_pull(task_ids='wait_for_xml')
227
228
all_files = []
229
if csv_files: all_files.extend(csv_files.get('files_found', []))
230
if json_files: all_files.extend(json_files.get('files_found', []))
231
if xml_files: all_files.extend(xml_files.get('files_found', []))
232
233
print(f"Found {len(all_files)} total files to process")
234
return all_files
235
236
dag = DAG(
237
'sftp_multiple_patterns',
238
start_date=datetime(2023, 1, 1),
239
schedule_interval=timedelta(hours=6)
240
)
241
242
# Monitor for CSV files
243
csv_sensor = SFTPSensor(
244
task_id='wait_for_csv',
245
path='/remote/exports',
246
file_pattern='*.csv',
247
sftp_conn_id='sftp_default',
248
deferrable=True,
249
dag=dag
250
)
251
252
# Monitor for JSON files
253
json_sensor = SFTPSensor(
254
task_id='wait_for_json',
255
path='/remote/exports',
256
file_pattern='*.json',
257
sftp_conn_id='sftp_default',
258
deferrable=True,
259
dag=dag
260
)
261
262
# Monitor for XML files
263
xml_sensor = SFTPSensor(
264
task_id='wait_for_xml',
265
path='/remote/exports',
266
file_pattern='*.xml',
267
sftp_conn_id='sftp_default',
268
deferrable=True,
269
dag=dag
270
)
271
272
# Process all found files
273
process_files = PythonOperator(
274
task_id='process_all_files',
275
python_callable=process_multiple_patterns,
276
dag=dag
277
)
278
279
[csv_sensor, json_sensor, xml_sensor] >> process_files
280
```
281
282
### Time-Based Trigger Monitoring
283
284
```python
285
from airflow import DAG
286
from airflow.providers.sftp.sensors.sftp import SFTPSensor
287
from datetime import datetime, timedelta
288
289
dag = DAG(
290
'sftp_time_based_trigger',
291
start_date=datetime(2023, 1, 1),
292
schedule_interval=timedelta(days=1)
293
)
294
295
# Wait for files newer than specific time with deferrable execution
296
time_based_sensor = SFTPSensor(
297
task_id='wait_for_recent_files',
298
path='/remote/time_sensitive',
299
file_pattern='data_*.parquet',
300
newer_than='{{ ds }}T08:00:00', # Files from today after 8 AM
301
sftp_conn_id='sftp_default',
302
deferrable=True,
303
timeout=28800, # 8 hour timeout
304
dag=dag
305
)
306
```
307
308
### Error Handling with Triggers
309
310
```python
311
from airflow import DAG
312
from airflow.providers.sftp.sensors.sftp import SFTPSensor
313
from airflow.operators.python import PythonOperator
314
from airflow.operators.email import EmailOperator
315
from datetime import datetime, timedelta
316
317
def handle_sensor_success(**context):
318
"""Handle successful file detection."""
319
task_instance = context['task_instance']
320
sensor_result = task_instance.xcom_pull(task_ids='deferrable_file_sensor')
321
322
if isinstance(sensor_result, dict) and 'files_found' in sensor_result:
323
files = sensor_result['files_found']
324
print(f"Successfully found {len(files)} files: {files}")
325
return {"status": "success", "file_count": len(files)}
326
else:
327
print(f"Sensor completed with result: {sensor_result}")
328
return {"status": "completed", "result": sensor_result}
329
330
dag = DAG(
331
'sftp_trigger_error_handling',
332
start_date=datetime(2023, 1, 1),
333
schedule_interval=timedelta(hours=4)
334
)
335
336
# Deferrable sensor with comprehensive error handling
337
deferrable_sensor = SFTPSensor(
338
task_id='deferrable_file_sensor',
339
path='/remote/critical_data',
340
file_pattern='critical_*.csv',
341
newer_than='{{ ds }}T00:00:00',
342
sftp_conn_id='sftp_critical',
343
deferrable=True,
344
timeout=14400, # 4 hours
345
retries=2,
346
retry_delay=timedelta(minutes=30),
347
dag=dag
348
)
349
350
# Handle successful completion
351
success_handler = PythonOperator(
352
task_id='handle_success',
353
python_callable=handle_sensor_success,
354
dag=dag
355
)
356
357
# Send failure notification
358
failure_notification = EmailOperator(
359
task_id='failure_notification',
360
to=['ops@company.com'],
361
subject='SFTP Monitoring Failed - {{ ds }}',
362
html_content='''
363
<h3>SFTP Trigger Monitoring Failure</h3>
364
<p>The deferrable SFTP sensor failed to find required files.</p>
365
<p>Task: {{ task.task_id }}</p>
366
<p>Execution Date: {{ ds }}</p>
367
''',
368
trigger_rule='one_failed',
369
dag=dag
370
)
371
372
deferrable_sensor >> success_handler
373
deferrable_sensor >> failure_notification
374
```
375
376
### Resource-Efficient Batch Monitoring
377
378
```python
379
from airflow import DAG
380
from airflow.providers.sftp.sensors.sftp import SFTPSensor
381
from datetime import datetime, timedelta
382
383
# Configure for resource efficiency
384
dag = DAG(
385
'sftp_batch_efficient',
386
start_date=datetime(2023, 1, 1),
387
schedule_interval=timedelta(hours=1),
388
max_active_runs=10, # Allow many concurrent deferrable tasks
389
catchup=False
390
)
391
392
# Multiple sensors running concurrently with minimal resource usage
393
sensors = []
394
for i in range(5):
395
sensor = SFTPSensor(
396
task_id=f'monitor_batch_{i}',
397
path=f'/remote/batch_{i}',
398
file_pattern='*.csv',
399
sftp_conn_id='sftp_default',
400
deferrable=True, # Each sensor uses minimal resources
401
timeout=3600,
402
dag=dag
403
)
404
sensors.append(sensor)
405
406
# All sensors can run concurrently without consuming worker slots
407
```
408
409
## Trigger Lifecycle
410
411
### Initialization and Serialization
412
413
When a deferrable sensor is executed:
414
415
1. **Sensor Execution**: The sensor's `execute()` method calls `self.defer()`
416
2. **Trigger Creation**: An `SFTPTrigger` instance is created with the specified parameters
417
3. **Serialization**: The trigger is serialized using `serialize()` method for persistence
418
4. **Worker Release**: The sensor releases its worker slot and the trigger runs asynchronously
419
420
### Asynchronous Monitoring Loop
421
422
The trigger's `run()` method:
423
424
1. **Connection Setup**: Establishes async SFTP connection using `SFTPHookAsync`
425
2. **Monitoring Loop**: Continuously checks file conditions at specified intervals
426
3. **Condition Evaluation**: Evaluates file existence, patterns, and modification times
427
4. **Event Generation**: Yields `TriggerEvent` objects when conditions are met or timeouts occur
428
429
### Completion and Callback
430
431
When the trigger completes:
432
433
1. **Event Yield**: Trigger yields a final `TriggerEvent` with success/failure status
434
2. **Sensor Resumption**: The sensor's `execute_complete()` method is called
435
3. **Result Processing**: Sensor processes the trigger event and completes execution
436
437
## Best Practices
438
439
### Resource Management
440
441
- Use deferrable sensors for long-running monitoring tasks to free up worker slots
442
- Configure appropriate `poke_interval` values to balance responsiveness with server load
443
- Implement reasonable timeout values to prevent indefinite waiting
444
- Monitor trigger task queues to ensure adequate triggerer capacity
445
446
### Performance Optimization
447
448
- Use specific file patterns to reduce unnecessary server queries
449
- Implement connection pooling for triggers monitoring the same SFTP server
450
- Consider trigger serialization overhead for very high-frequency monitoring
451
- Monitor async connection pool sizes for optimal performance
452
453
### Error Handling
454
455
- Implement appropriate retry strategies for trigger failures
456
- Configure alerts for trigger timeout scenarios
457
- Use proper exception handling in custom trigger implementations
458
- Monitor trigger execution logs for connection issues
459
460
### Scalability Considerations
461
462
- Plan triggerer capacity based on expected concurrent deferrable tasks
463
- Use database connection pooling for trigger state management
464
- Implement proper cleanup for failed or interrupted triggers
465
- Consider trigger resource limits in high-throughput scenarios