0
# File Monitoring
1
2
Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers. The SFTP sensor provides comprehensive file system monitoring capabilities with support for both blocking and deferrable execution modes.
3
4
## Capabilities
5
6
### SFTP Sensor
7
8
Main sensor for monitoring file and directory presence on SFTP servers with extensive filtering and condition checking capabilities.
9
10
```python { .api }
11
class SFTPSensor(BaseSensorOperator):
12
"""
13
Waits for a file or directory to be present on SFTP.
14
15
Monitors SFTP locations for file presence, pattern matching, and modification
16
time conditions. Supports both synchronous polling and asynchronous deferrable
17
execution for efficient resource utilization.
18
"""
19
20
template_fields: Sequence[str] = ("path", "newer_than")
21
22
def __init__(
23
self,
24
*,
25
path: str,
26
file_pattern: str = "",
27
newer_than: datetime | str | None = None,
28
sftp_conn_id: str = "sftp_default",
29
python_callable: Callable | None = None,
30
op_args: list | None = None,
31
op_kwargs: dict[str, Any] | None = None,
32
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
33
**kwargs,
34
) -> None:
35
"""
36
Initialize SFTP sensor.
37
38
Parameters:
39
- path: Remote file or directory path (templated)
40
- file_pattern: Pattern for file matching using fnmatch format
41
- sftp_conn_id: Connection to run the sensor against (default: sftp_default)
42
- newer_than: DateTime for which file should be newer than (templated)
43
- python_callable: Optional callable to execute when files are found
44
- op_args: Arguments for python_callable
45
- op_kwargs: Keyword arguments for python_callable
46
- deferrable: Whether to defer the task until done (default: False)
47
"""
48
49
def poke(self, context: Context) -> PokeReturnValue | bool:
50
"""Check for file existence and conditions."""
51
52
def execute(self, context: Context) -> Any:
53
"""Execute the sensor, either synchronously or by deferring to trigger."""
54
55
def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:
56
"""Execute callback when the trigger fires; returns immediately."""
57
```
58
59
### File Existence Monitoring
60
61
```python { .api }
62
def poke(self, context: Context) -> PokeReturnValue | bool:
63
"""
64
Check file conditions and return status.
65
66
Performs the core sensing logic including file existence checks,
67
pattern matching, modification time comparisons, and optional
68
python callable execution.
69
70
Parameters:
71
- context: Airflow task execution context
72
73
Returns:
74
PokeReturnValue with completion status and XCom values, or boolean
75
indicating whether conditions are met
76
"""
77
```
78
79
### Synchronous and Asynchronous Execution
80
81
```python { .api }
82
def execute(self, context: Context) -> Any:
83
"""
84
Execute the sensor, either synchronously or by deferring to trigger.
85
86
When deferrable=False, uses traditional polling approach.
87
When deferrable=True, defers to SFTPTrigger for async monitoring.
88
89
Parameters:
90
- context: Airflow task execution context
91
92
Returns:
93
Sensor result or defers to trigger for async execution
94
"""
95
96
def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:
97
"""
98
Execute callback when the trigger fires.
99
100
Called when deferrable sensor completes via trigger.
101
Processes trigger results and returns immediately.
102
103
Parameters:
104
- context: Airflow task execution context
105
- event: Event data from the trigger
106
"""
107
```
108
109
## Usage Examples
110
111
### Basic File Monitoring
112
113
```python
114
from airflow import DAG
115
from airflow.providers.sftp.sensors.sftp import SFTPSensor
116
from datetime import datetime, timedelta
117
118
dag = DAG(
119
'sftp_sensor_basic',
120
start_date=datetime(2023, 1, 1),
121
schedule_interval=timedelta(hours=1)
122
)
123
124
# Wait for a specific file to appear
125
wait_for_file = SFTPSensor(
126
task_id='wait_for_data_file',
127
path='/remote/incoming/data.csv',
128
sftp_conn_id='sftp_default',
129
timeout=3600, # Wait up to 1 hour
130
poke_interval=300, # Check every 5 minutes
131
dag=dag
132
)
133
```
134
135
### Pattern-Based File Monitoring
136
137
```python
138
from airflow import DAG
139
from airflow.providers.sftp.sensors.sftp import SFTPSensor
140
from datetime import datetime, timedelta
141
142
dag = DAG(
143
'sftp_sensor_pattern',
144
start_date=datetime(2023, 1, 1),
145
schedule_interval=timedelta(days=1)
146
)
147
148
# Wait for any CSV file matching a pattern
149
wait_for_csv_files = SFTPSensor(
150
task_id='wait_for_csv_files',
151
path='/remote/incoming',
152
file_pattern='daily_report_*.csv', # Match files like daily_report_20230101.csv
153
sftp_conn_id='sftp_default',
154
timeout=7200, # Wait up to 2 hours
155
poke_interval=600, # Check every 10 minutes
156
dag=dag
157
)
158
159
# Wait for files with date patterns
160
wait_for_dated_files = SFTPSensor(
161
task_id='wait_for_dated_files',
162
path='/remote/exports',
163
file_pattern='export_{{ ds_nodash }}_*.json', # Templated pattern
164
sftp_conn_id='sftp_default',
165
dag=dag
166
)
167
```
168
169
### Modification Time Monitoring
170
171
```python
172
from airflow import DAG
173
from airflow.providers.sftp.sensors.sftp import SFTPSensor
174
from datetime import datetime, timedelta
175
176
dag = DAG(
177
'sftp_sensor_mod_time',
178
start_date=datetime(2023, 1, 1),
179
schedule_interval=timedelta(hours=6)
180
)
181
182
# Wait for file newer than a specific time
183
wait_for_recent_file = SFTPSensor(
184
task_id='wait_for_recent_file',
185
path='/remote/data/latest.csv',
186
newer_than='2023-01-01T00:00:00', # ISO format string
187
sftp_conn_id='sftp_default',
188
dag=dag
189
)
190
191
# Wait for file newer than task execution time
192
wait_for_fresh_file = SFTPSensor(
193
task_id='wait_for_fresh_file',
194
path='/remote/data/hourly.json',
195
newer_than='{{ ts }}', # Templated to task execution time
196
sftp_conn_id='sftp_default',
197
dag=dag
198
)
199
200
# Wait for file newer than yesterday
201
wait_for_daily_update = SFTPSensor(
202
task_id='wait_for_daily_update',
203
path='/remote/reports',
204
file_pattern='daily_*.csv',
205
newer_than='{{ yesterday_ds }}T00:00:00', # Yesterday at midnight
206
sftp_conn_id='sftp_default',
207
dag=dag
208
)
209
```
210
211
### Custom Processing with Python Callable
212
213
```python
214
from airflow import DAG
215
from airflow.providers.sftp.sensors.sftp import SFTPSensor
216
from datetime import datetime, timedelta
217
218
def process_found_files(files_found, **context):
219
"""Custom processing function for found files."""
220
print(f"Found {len(files_found)} files: {files_found}")
221
222
# Custom logic for file validation
223
for file_path in files_found:
224
print(f"Processing: {file_path}")
225
# Add custom validation or processing logic
226
227
return {"processed_files": len(files_found), "status": "success"}
228
229
dag = DAG(
230
'sftp_sensor_callable',
231
start_date=datetime(2023, 1, 1),
232
schedule_interval=timedelta(hours=2)
233
)
234
235
sensor_with_processing = SFTPSensor(
236
task_id='sensor_with_processing',
237
path='/remote/incoming',
238
file_pattern='*.xml',
239
sftp_conn_id='sftp_default',
240
python_callable=process_found_files,
241
op_kwargs={'extra_param': 'custom_value'},
242
dag=dag
243
)
244
```
245
246
### Deferrable Sensor for Resource Efficiency
247
248
```python
249
from airflow import DAG
250
from airflow.providers.sftp.sensors.sftp import SFTPSensor
251
from datetime import datetime, timedelta
252
253
dag = DAG(
254
'sftp_sensor_deferrable',
255
start_date=datetime(2023, 1, 1),
256
schedule_interval=timedelta(hours=1)
257
)
258
259
# Use deferrable sensor to free up worker slots while waiting
260
deferrable_sensor = SFTPSensor(
261
task_id='deferrable_file_sensor',
262
path='/remote/large_files',
263
file_pattern='bigdata_*.parquet',
264
sftp_conn_id='sftp_default',
265
deferrable=True, # Use async trigger instead of blocking
266
timeout=14400, # Wait up to 4 hours
267
dag=dag
268
)
269
```
270
271
### Complex File Monitoring Workflow
272
273
```python
274
from airflow import DAG
275
from airflow.providers.sftp.sensors.sftp import SFTPSensor
276
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
277
from airflow.operators.python import PythonOperator
278
from datetime import datetime, timedelta
279
280
def validate_files(**context):
281
"""Validate downloaded files before processing."""
282
files_found = context['task_instance'].xcom_pull(task_ids='wait_for_source_files')
283
print(f"Validating files: {files_found}")
284
# Add validation logic
285
return True
286
287
dag = DAG(
288
'sftp_complex_monitoring',
289
start_date=datetime(2023, 1, 1),
290
schedule_interval=timedelta(days=1)
291
)
292
293
# Wait for multiple files with specific pattern and recency
294
wait_for_source_files = SFTPSensor(
295
task_id='wait_for_source_files',
296
path='/remote/daily_exports',
297
file_pattern='export_{{ ds_nodash }}_*.csv',
298
newer_than='{{ ds }}T06:00:00', # Files newer than 6 AM on execution date
299
sftp_conn_id='sftp_source',
300
timeout=10800, # 3 hours timeout
301
poke_interval=900, # Check every 15 minutes
302
dag=dag
303
)
304
305
# Download files once they're available
306
download_files = SFTPOperator(
307
task_id='download_files',
308
ssh_conn_id='sftp_source',
309
local_filepath='/local/staging/{{ ds }}/',
310
remote_filepath='/remote/daily_exports/export_{{ ds_nodash }}_*.csv',
311
operation=SFTPOperation.GET,
312
create_intermediate_dirs=True,
313
dag=dag
314
)
315
316
# Validate downloaded files
317
validate = PythonOperator(
318
task_id='validate_files',
319
python_callable=validate_files,
320
dag=dag
321
)
322
323
# Wait for processing completion signal
324
wait_for_completion = SFTPSensor(
325
task_id='wait_for_completion',
326
path='/remote/status/processing_complete_{{ ds_nodash }}.flag',
327
sftp_conn_id='sftp_source',
328
timeout=7200, # 2 hours for processing
329
dag=dag
330
)
331
332
wait_for_source_files >> download_files >> validate >> wait_for_completion
333
```
334
335
### Directory Monitoring
336
337
```python
338
from airflow import DAG
339
from airflow.providers.sftp.sensors.sftp import SFTPSensor
340
from datetime import datetime, timedelta
341
342
dag = DAG(
343
'sftp_directory_monitoring',
344
start_date=datetime(2023, 1, 1),
345
schedule_interval=timedelta(hours=4)
346
)
347
348
# Monitor for any file in a directory
349
wait_for_any_file = SFTPSensor(
350
task_id='wait_for_any_file',
351
path='/remote/incoming',
352
file_pattern='*', # Match any file
353
sftp_conn_id='sftp_default',
354
dag=dag
355
)
356
357
# Monitor for specific file types
358
wait_for_json_files = SFTPSensor(
359
task_id='wait_for_json_files',
360
path='/remote/api_exports',
361
file_pattern='*.json',
362
newer_than='{{ ds }}T00:00:00', # Today's files only
363
sftp_conn_id='sftp_default',
364
dag=dag
365
)
366
```
367
368
### Error Handling and Monitoring
369
370
```python
371
from airflow import DAG
372
from airflow.providers.sftp.sensors.sftp import SFTPSensor
373
from airflow.operators.email import EmailOperator
374
from datetime import datetime, timedelta
375
376
default_args = {
377
'retries': 2,
378
'retry_delay': timedelta(minutes=10),
379
'email_on_failure': True,
380
'email_on_retry': False
381
}
382
383
dag = DAG(
384
'sftp_sensor_monitoring',
385
default_args=default_args,
386
start_date=datetime(2023, 1, 1),
387
schedule_interval=timedelta(hours=1)
388
)
389
390
# Critical file monitoring with failure notifications
391
critical_file_sensor = SFTPSensor(
392
task_id='critical_file_sensor',
393
path='/remote/critical/daily_feed.csv',
394
newer_than='{{ ds }}T07:00:00', # Must be from today after 7 AM
395
sftp_conn_id='sftp_critical',
396
timeout=7200, # 2 hour timeout
397
poke_interval=300, # Check every 5 minutes
398
dag=dag
399
)
400
401
# Send alert if sensor fails
402
failure_alert = EmailOperator(
403
task_id='failure_alert',
404
to=['ops@company.com'],
405
subject='Critical SFTP File Missing - {{ ds }}',
406
html_content='''
407
<h3>Alert: Critical SFTP File Missing</h3>
408
<p>The daily feed file was not found within the expected timeframe.</p>
409
<p>Execution Date: {{ ds }}</p>
410
<p>Please check the SFTP server and data pipeline.</p>
411
''',
412
trigger_rule='one_failed', # Trigger on sensor failure
413
dag=dag
414
)
415
416
critical_file_sensor >> failure_alert
417
```
418
419
## Best Practices
420
421
### Performance Optimization
422
423
- Use appropriate `poke_interval` values to balance responsiveness with server load
424
- Set reasonable `timeout` values based on expected file arrival patterns
425
- Use `deferrable=True` for long-running sensors to free up worker slots
426
- Consider file pattern specificity to reduce unnecessary checks
427
428
### Resource Management
429
430
- Configure sensor pools to limit concurrent SFTP connections
431
- Use connection pooling for sensors monitoring the same SFTP server
432
- Monitor sensor task duration and adjust timeouts accordingly
433
- Implement sensor retries with exponential backoff for transient failures
434
435
### Monitoring and Alerting
436
437
- Set up alerts for sensor timeout failures
438
- Monitor sensor execution patterns to optimize scheduling
439
- Use XCom to pass file information to downstream tasks
440
- Implement custom logging for sensor status tracking
441
442
### Pattern Matching
443
444
- Use specific patterns to avoid false positives
445
- Test fnmatch patterns thoroughly with expected file names
446
- Consider using templated patterns for date-based file monitoring
447
- Document pattern expectations for team maintenance
448
449
### Time-based Conditions
450
451
- Use UTC timestamps consistently across all sensors
452
- Account for timezone differences between Airflow and SFTP servers
453
- Implement buffer times for file processing delays
454
- Consider file system timestamp precision limitations