0
# File Monitoring
1
2
Sensor classes for waiting and monitoring file or directory availability on FTP servers with configurable error handling, retry logic, and support for both standard FTP and secure FTPS protocols.
3
4
## Capabilities
5
6
### FTP Sensor
7
8
Primary sensor for monitoring file or directory presence on FTP servers with intelligent error handling and transient error recovery.
9
10
```python { .api }
11
class FTPSensor(BaseSensorOperator):
12
"""
13
Wait for file or directory to be present on FTP server.
14
15
Template Fields: ("path",)
16
17
Parameters:
18
- path (str): Remote file or directory path to monitor
19
- ftp_conn_id (str): FTP connection ID (default: "ftp_default")
20
- fail_on_transient_errors (bool): Fail on transient errors (default: True)
21
"""
22
23
template_fields: Sequence[str] = ("path",)
24
25
# Transient FTP error codes that can be retried
26
transient_errors = [421, 425, 426, 434, 450, 451, 452]
27
error_code_pattern = re.compile(r"\d+")
28
29
def __init__(
30
self,
31
*,
32
path: str,
33
ftp_conn_id: str = "ftp_default",
34
fail_on_transient_errors: bool = True,
35
**kwargs
36
) -> None: ...
37
38
def poke(self, context: Context) -> bool:
39
"""
40
Check if file or directory exists on FTP server.
41
42
Parameters:
43
- context (Context): Airflow task context
44
45
Returns:
46
bool: True if file/directory exists, False otherwise
47
"""
48
49
def _create_hook(self) -> FTPHook:
50
"""
51
Create and return FTPHook instance.
52
53
Returns:
54
FTPHook: Configured FTP hook
55
"""
56
57
def _get_error_code(self, e) -> int | Exception:
58
"""
59
Extract numeric error code from FTP exception.
60
61
Parameters:
62
- e (Exception): FTP exception
63
64
Returns:
65
int | Exception: Extracted error code or original exception
66
"""
67
```
68
69
### FTPS Sensor
70
71
Secure sensor for monitoring files on FTPS servers with SSL/TLS encryption support.
72
73
```python { .api }
74
class FTPSSensor(FTPSensor):
75
"""
76
Wait for file or directory to be present on FTPS server.
77
78
Inherits all FTPSensor functionality with SSL/TLS encryption support.
79
"""
80
81
def _create_hook(self) -> FTPHook:
82
"""
83
Create and return FTPSHook instance.
84
85
Returns:
86
FTPHook: Configured FTPS hook with SSL/TLS support
87
"""
88
```
89
90
## Usage Examples
91
92
### Basic File Monitoring
93
94
```python
95
from airflow import DAG
96
from airflow.providers.ftp.sensors.ftp import FTPSensor
97
from datetime import datetime, timedelta
98
99
dag = DAG('ftp_monitoring_example', start_date=datetime(2023, 1, 1))
100
101
# Wait for daily data file to appear
102
wait_for_data = FTPSensor(
103
task_id='wait_for_daily_data',
104
path='/remote/data/daily_report_{{ ds }}.csv', # Templated path
105
ftp_conn_id='my_ftp_connection',
106
poke_interval=60, # Check every minute
107
timeout=3600, # Timeout after 1 hour
108
mode='poke', # Blocking mode
109
dag=dag
110
)
111
```
112
113
### Directory Monitoring
114
115
```python
116
from airflow.providers.ftp.sensors.ftp import FTPSensor
117
118
# Wait for any file to appear in directory
119
wait_for_directory = FTPSensor(
120
task_id='wait_for_directory_content',
121
path='/remote/inbox/', # Monitor directory
122
ftp_conn_id='my_ftp_connection',
123
poke_interval=300, # Check every 5 minutes
124
timeout=7200, # Timeout after 2 hours
125
dag=dag
126
)
127
```
128
129
### Secure File Monitoring with FTPS
130
131
```python
132
from airflow.providers.ftp.sensors.ftp import FTPSSensor
133
134
# Monitor secure FTP server
135
wait_for_secure_file = FTPSSensor(
136
task_id='wait_for_secure_data',
137
path='/secure/confidential/data.xml',
138
ftp_conn_id='my_secure_ftp_connection', # FTPS connection
139
poke_interval=120, # Check every 2 minutes
140
timeout=1800, # Timeout after 30 minutes
141
dag=dag
142
)
143
```
144
145
### Advanced Error Handling Configuration
146
147
```python
148
from airflow.providers.ftp.sensors.ftp import FTPSensor
149
150
# Handle transient errors gracefully
151
resilient_sensor = FTPSensor(
152
task_id='resilient_file_monitor',
153
path='/remote/unreliable_source/data.txt',
154
ftp_conn_id='unreliable_ftp',
155
fail_on_transient_errors=False, # Don't fail on transient errors
156
poke_interval=180, # Check every 3 minutes
157
timeout=10800, # Extended timeout (3 hours)
158
retries=3, # Retry on permanent failures
159
retry_delay=timedelta(minutes=10),
160
dag=dag
161
)
162
```
163
164
### Reschedule Mode for Long-Running Monitoring
165
166
```python
167
from airflow.providers.ftp.sensors.ftp import FTPSensor
168
169
# Use reschedule mode to free up worker slots
170
long_running_sensor = FTPSensor(
171
task_id='long_running_file_monitor',
172
path='/remote/batch_data/weekly_export.zip',
173
ftp_conn_id='batch_ftp',
174
poke_interval=1800, # Check every 30 minutes
175
timeout=604800, # Timeout after 1 week
176
mode='reschedule', # Non-blocking mode
177
dag=dag
178
)
179
```
180
181
### Complete Monitoring Pipeline
182
183
```python
184
from airflow import DAG
185
from airflow.providers.ftp.sensors.ftp import FTPSensor, FTPSSensor
186
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
187
from airflow.operators.python import PythonOperator
188
from datetime import datetime, timedelta
189
190
def validate_file():
191
"""Validate downloaded file format and content."""
192
print("Validating file format and content...")
193
# File validation logic here
194
return True
195
196
def notify_completion():
197
"""Send notification about successful processing."""
198
print("Sending completion notification...")
199
# Notification logic here
200
201
dag = DAG(
202
'comprehensive_ftp_monitoring',
203
start_date=datetime(2023, 1, 1),
204
schedule_interval=timedelta(hours=6), # Run every 6 hours
205
catchup=False
206
)
207
208
# Monitor multiple sources simultaneously
209
wait_for_source1 = FTPSensor(
210
task_id='wait_for_source1_data',
211
path='/source1/data/{{ ds }}/export.csv',
212
ftp_conn_id='source1_ftp',
213
poke_interval=300,
214
timeout=3600,
215
dag=dag
216
)
217
218
wait_for_source2 = FTPSSensor( # Secure source
219
task_id='wait_for_source2_data',
220
path='/secure/source2/{{ ds }}/sensitive_data.xml',
221
ftp_conn_id='secure_ftp',
222
poke_interval=300,
223
timeout=3600,
224
dag=dag
225
)
226
227
# Download files once available
228
download_source1 = FTPFileTransmitOperator(
229
task_id='download_source1',
230
ftp_conn_id='source1_ftp',
231
remote_filepath='/source1/data/{{ ds }}/export.csv',
232
local_filepath='/local/staging/source1_{{ ds }}.csv',
233
operation=FTPOperation.GET,
234
create_intermediate_dirs=True,
235
dag=dag
236
)
237
238
download_source2 = FTPFileTransmitOperator(
239
task_id='download_source2',
240
ftp_conn_id='secure_ftp',
241
remote_filepath='/secure/source2/{{ ds }}/sensitive_data.xml',
242
local_filepath='/local/staging/source2_{{ ds }}.xml',
243
operation=FTPOperation.GET,
244
create_intermediate_dirs=True,
245
dag=dag
246
)
247
248
# Validate downloaded files
249
validate_files = PythonOperator(
250
task_id='validate_files',
251
python_callable=validate_file,
252
dag=dag
253
)
254
255
# Send completion notification
256
notify = PythonOperator(
257
task_id='notify_completion',
258
python_callable=notify_completion,
259
dag=dag
260
)
261
262
# Define task dependencies
263
[wait_for_source1, wait_for_source2] >> [download_source1, download_source2]
264
[download_source1, download_source2] >> validate_files >> notify
265
```
266
267
## Error Handling and Recovery
268
269
### Transient Error Codes
270
271
The sensor automatically handles these transient FTP error codes by retrying rather than failing:
272
273
- **421**: Service not available, closing control connection
274
- **425**: Can't open data connection
275
- **426**: Connection closed; transfer aborted
276
- **434**: Requested host unavailable
277
- **450**: Requested file action not taken (file unavailable)
278
- **451**: Requested action aborted: local error in processing
279
- **452**: Requested action not taken (insufficient storage)
280
281
### Error Handling Strategies
282
283
1. **Permanent Errors (like 550 - File not found)**: Sensor returns False and continues poking
284
2. **Transient Errors**: Behavior depends on `fail_on_transient_errors` parameter:
285
- `True` (default): Raises exception, fails task
286
- `False`: Returns False, continues poking
287
3. **Connection Errors**: Propagated to Airflow for retry handling
288
289
### Best Practices
290
291
- Use `fail_on_transient_errors=False` for unreliable FTP servers
292
- Set appropriate `poke_interval` to balance responsiveness and server load
293
- Use `mode='reschedule'` for long-running sensors to free worker slots
294
- Configure retries and retry delays for better fault tolerance
295
- Monitor sensor logs for patterns in transient errors
296
297
## Integration with Airflow Features
298
299
### Templating Support
300
301
The `path` parameter supports Airflow templating:
302
303
```python
304
# Template examples
305
path='/data/{{ ds }}/daily_report.csv' # Execution date
306
path='/data/{{ macros.ds_add(ds, -1) }}/file.txt' # Previous day
307
path='/hourly/{{ ts_nodash_with_tz[:10] }}/data.csv' # Hour-based paths
308
```
309
310
### Sensor Modes
311
312
- **Poke Mode** (default): Blocks worker slot while waiting
313
- **Reschedule Mode**: Releases worker slot between checks, better for long waits
314
315
### Connection Management
316
317
Sensors use Airflow's connection pooling and automatic cleanup, ensuring efficient resource utilization across multiple concurrent monitoring tasks.