0
# File System Monitoring
1
2
Sensors for monitoring HDFS file system states and coordinating workflow execution based on file availability. These sensors enable event-driven data pipelines that wait for specific files or file sets before proceeding with downstream processing.
3
4
## Capabilities
5
6
### Single File Monitoring
7
8
Wait for a specific file or directory to appear in HDFS before proceeding with downstream tasks.
9
10
```python { .api }
11
class WebHdfsSensor:
12
"""
13
Sensor that waits for a file or folder to land in HDFS.
14
15
Attributes:
16
template_fields: Sequence of template fields ("filepath",)
17
"""
18
19
def __init__(
20
self,
21
*,
22
filepath: str,
23
webhdfs_conn_id: str = "webhdfs_default",
24
**kwargs
25
) -> None:
26
"""
27
Initialize WebHDFS sensor for single file monitoring.
28
29
Parameters:
30
filepath: The path to monitor in HDFS
31
webhdfs_conn_id: The connection id for the webhdfs client
32
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
33
"""
34
35
def poke(self, context) -> bool:
36
"""
37
Check if the filepath exists in HDFS.
38
39
Parameters:
40
context: Airflow task context
41
42
Returns:
43
bool: True if file exists, False otherwise
44
"""
45
```
46
47
### Multiple Files Monitoring
48
49
Wait for multiple specific files to appear in a directory before proceeding, useful for batch processing scenarios where multiple input files are required.
50
51
```python { .api }
52
class MultipleFilesWebHdfsSensor:
53
"""
54
Sensor that waits for multiple files in a folder to land in HDFS.
55
56
Attributes:
57
template_fields: Sequence of template fields ("directory_path", "expected_filenames")
58
"""
59
60
def __init__(
61
self,
62
*,
63
directory_path: str,
64
expected_filenames: Sequence[str],
65
webhdfs_conn_id: str = "webhdfs_default",
66
**kwargs
67
) -> None:
68
"""
69
Initialize WebHDFS sensor for multiple files monitoring.
70
71
Parameters:
72
directory_path: The directory path to monitor in HDFS
73
expected_filenames: Sequence of expected filenames to wait for
74
webhdfs_conn_id: The connection id for the webhdfs client
75
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
76
"""
77
78
def poke(self, context) -> bool:
79
"""
80
Check if all expected files exist in the directory.
81
82
Parameters:
83
context: Airflow task context
84
85
Returns:
86
bool: True if all expected files exist, False if any are missing
87
"""
88
```
89
90
## Usage Examples
91
92
### Basic File Sensor
93
94
Wait for a single file to appear in HDFS:
95
96
```python
97
from airflow import DAG
98
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
99
from airflow.operators.python import PythonOperator
100
from datetime import datetime, timedelta
101
102
def process_data():
103
print("File is ready, processing data...")
104
105
dag = DAG(
106
'hdfs_file_sensor_example',
107
default_args={
108
'owner': 'data_team',
109
'depends_on_past': False,
110
'email_on_failure': False,
111
'email_on_retry': False,
112
'retries': 1,
113
'retry_delay': timedelta(minutes=5)
114
},
115
description='Wait for HDFS file and process',
116
schedule_interval=timedelta(hours=1),
117
start_date=datetime(2024, 1, 1),
118
catchup=False
119
)
120
121
# Sensor task
122
file_sensor = WebHdfsSensor(
123
task_id='wait_for_input_file',
124
filepath='/data/input/daily_sales_{{ ds }}.csv', # Templated filepath
125
webhdfs_conn_id='production_hdfs',
126
poke_interval=60, # Check every 60 seconds
127
timeout=3600, # Timeout after 1 hour
128
dag=dag
129
)
130
131
# Processing task
132
process_task = PythonOperator(
133
task_id='process_sales_data',
134
python_callable=process_data,
135
dag=dag
136
)
137
138
file_sensor >> process_task
139
```
140
141
### Multiple Files Sensor
142
143
Wait for multiple files to appear in a directory:
144
145
```python
146
from airflow import DAG
147
from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor
148
from airflow.operators.bash import BashOperator
149
from datetime import datetime, timedelta
150
151
dag = DAG(
152
'hdfs_multiple_files_sensor',
153
default_args={'owner': 'data_team'},
154
description='Wait for multiple HDFS files',
155
schedule_interval='@daily',
156
start_date=datetime(2024, 1, 1),
157
catchup=False
158
)
159
160
# Wait for multiple required files
161
files_sensor = MultipleFilesWebHdfsSensor(
162
task_id='wait_for_batch_files',
163
directory_path='/data/batch/{{ ds }}/',
164
expected_filenames=[
165
'transactions.parquet',
166
'customers.parquet',
167
'products.parquet',
168
'inventory.parquet'
169
],
170
webhdfs_conn_id='batch_hdfs',
171
poke_interval=120, # Check every 2 minutes
172
timeout=7200, # Timeout after 2 hours
173
dag=dag
174
)
175
176
# Start batch processing when all files are ready
177
batch_process = BashOperator(
178
task_id='start_batch_processing',
179
bash_command='spark-submit /scripts/batch_processing.py --date {{ ds }}',
180
dag=dag
181
)
182
183
files_sensor >> batch_process
184
```
185
186
### Advanced Sensor Configuration
187
188
Configure sensors with custom retry logic and failure handling:
189
190
```python
191
from airflow import DAG
192
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
193
from airflow.operators.python import PythonOperator
194
from airflow.utils.trigger_rule import TriggerRule
195
from datetime import datetime, timedelta
196
197
def handle_missing_file():
198
print("File was not found within timeout period")
199
# Implement fallback logic or notifications
200
201
def process_when_ready():
202
print("File found, proceeding with processing")
203
204
dag = DAG(
205
'robust_hdfs_sensor',
206
default_args={'owner': 'data_team'},
207
start_date=datetime(2024, 1, 1),
208
schedule_interval='@hourly'
209
)
210
211
# Primary sensor with shorter timeout
212
primary_sensor = WebHdfsSensor(
213
task_id='wait_for_primary_file',
214
filepath='/data/primary/hourly_{{ ts_nodash }}.json',
215
webhdfs_conn_id='primary_hdfs',
216
poke_interval=30,
217
timeout=1800, # 30 minutes
218
soft_fail=True, # Don't fail the DAG if timeout
219
dag=dag
220
)
221
222
# Fallback sensor for backup location
223
fallback_sensor = WebHdfsSensor(
224
task_id='wait_for_backup_file',
225
filepath='/data/backup/hourly_{{ ts_nodash }}.json',
226
webhdfs_conn_id='backup_hdfs',
227
poke_interval=60,
228
timeout=900, # 15 minutes
229
trigger_rule=TriggerRule.ALL_FAILED, # Only run if primary fails
230
dag=dag
231
)
232
233
# Processing task that runs if either sensor succeeds
234
process_task = PythonOperator(
235
task_id='process_data',
236
python_callable=process_when_ready,
237
trigger_rule=TriggerRule.ONE_SUCCESS,
238
dag=dag
239
)
240
241
# Cleanup task for failed scenarios
242
cleanup_task = PythonOperator(
243
task_id='handle_missing_files',
244
python_callable=handle_missing_file,
245
trigger_rule=TriggerRule.ALL_FAILED,
246
dag=dag
247
)
248
249
# Task dependencies
250
[primary_sensor, fallback_sensor] >> process_task
251
[primary_sensor, fallback_sensor] >> cleanup_task
252
```
253
254
### Sensor with Dynamic File Patterns
255
256
Use templated filepaths for dynamic file monitoring:
257
258
```python
259
from airflow import DAG
260
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
261
from datetime import datetime, timedelta
262
263
dag = DAG(
264
'dynamic_hdfs_sensor',
265
start_date=datetime(2024, 1, 1),
266
schedule_interval='@daily'
267
)
268
269
# Sensor with Jinja templating
270
dynamic_sensor = WebHdfsSensor(
271
task_id='wait_for_dated_file',
272
# Wait for file with current date in path and filename
273
filepath='/warehouse/{{ macros.ds_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}/data_{{ ds_nodash }}.parquet',
274
webhdfs_conn_id='warehouse_hdfs',
275
poke_interval=300, # 5 minutes
276
timeout=14400, # 4 hours
277
dag=dag
278
)
279
280
# Multiple sensors for different file types
281
sensor_configs = [
282
{'name': 'transactions', 'path': '/raw/transactions/{{ ds }}/'},
283
{'name': 'customers', 'path': '/raw/customers/{{ ds }}/'},
284
{'name': 'products', 'path': '/raw/products/{{ ds }}/'}
285
]
286
287
sensors = []
288
for config in sensor_configs:
289
sensor = WebHdfsSensor(
290
task_id=f'wait_for_{config["name"]}_files',
291
filepath=f'{config["path"]}_SUCCESS', # Wait for success marker
292
webhdfs_conn_id='data_lake_hdfs',
293
poke_interval=120,
294
timeout=3600,
295
dag=dag
296
)
297
sensors.append(sensor)
298
299
# All sensors must complete before downstream processing
300
from airflow.operators.dummy import DummyOperator
301
302
all_ready = DummyOperator(
303
task_id='all_files_ready',
304
trigger_rule=TriggerRule.ALL_SUCCESS,
305
dag=dag
306
)
307
308
sensors >> all_ready
309
```
310
311
## Integration Patterns
312
313
### Combining with Hook Operations
314
315
Use sensors to trigger hook-based file operations:
316
317
```python
318
from airflow import DAG
319
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
320
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
321
from airflow.operators.python import PythonOperator
322
from datetime import datetime
323
324
def copy_and_process():
325
"""Copy file from input to processing directory and validate."""
326
hook = WebHDFSHook(webhdfs_conn_id='main_hdfs')
327
328
input_path = '/input/raw_data.csv'
329
processing_path = '/processing/raw_data.csv'
330
331
# Read from input location
332
data = hook.read_file(input_path)
333
334
# Write to temporary file for processing
335
with open('/tmp/processing_data.csv', 'wb') as f:
336
f.write(data)
337
338
# Upload to processing directory
339
hook.load_file('/tmp/processing_data.csv', processing_path)
340
341
print(f"File copied and ready for processing: {processing_path}")
342
343
dag = DAG('sensor_hook_integration', start_date=datetime(2024, 1, 1))
344
345
# Wait for input file
346
sensor = WebHdfsSensor(
347
task_id='wait_for_input',
348
filepath='/input/raw_data.csv',
349
webhdfs_conn_id='main_hdfs',
350
dag=dag
351
)
352
353
# Copy and prepare for processing
354
copy_task = PythonOperator(
355
task_id='copy_and_process',
356
python_callable=copy_and_process,
357
dag=dag
358
)
359
360
sensor >> copy_task
361
```
362
363
### Custom Sensor Logic
364
365
Extend sensors for custom monitoring logic:
366
367
```python
368
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
369
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
370
from airflow.utils.context import Context
371
372
class FileSizeWebHdfsSensor(WebHdfsSensor):
373
"""Custom sensor that checks both file existence and minimum size."""
374
375
def __init__(self, min_size_bytes: int = 0, **kwargs):
376
super().__init__(**kwargs)
377
self.min_size_bytes = min_size_bytes
378
379
def poke(self, context: Context) -> bool:
380
"""Check if file exists and meets minimum size requirement."""
381
hook = WebHDFSHook(self.webhdfs_conn_id)
382
383
# Check if file exists
384
if not hook.check_for_path(self.filepath):
385
self.log.info(f"File {self.filepath} does not exist yet")
386
return False
387
388
# Check file size if minimum size specified
389
if self.min_size_bytes > 0:
390
client = hook.get_conn()
391
file_status = client.status(self.filepath)
392
file_size = file_status.get('length', 0)
393
394
if file_size < self.min_size_bytes:
395
self.log.info(f"File {self.filepath} exists but size {file_size} < {self.min_size_bytes} bytes")
396
return False
397
398
self.log.info(f"File {self.filepath} exists and meets size requirements")
399
return True
400
401
# Usage
402
custom_sensor = FileSizeWebHdfsSensor(
403
task_id='wait_for_large_file',
404
filepath='/data/large_dataset.parquet',
405
min_size_bytes=1024 * 1024, # Minimum 1MB
406
webhdfs_conn_id='data_hdfs',
407
poke_interval=60,
408
timeout=3600
409
)
410
```
411
412
## Sensor Configuration Best Practices
413
414
### Optimal Polling Configuration
415
416
```python
417
# Short-lived files (expected within minutes)
418
quick_sensor = WebHdfsSensor(
419
task_id='wait_for_quick_file',
420
filepath='/tmp/quick_process.flag',
421
poke_interval=10, # Check every 10 seconds
422
timeout=300, # 5 minute timeout
423
dag=dag
424
)
425
426
# Regular batch files (expected within hours)
427
batch_sensor = WebHdfsSensor(
428
task_id='wait_for_batch_file',
429
filepath='/batch/daily_extract.csv',
430
poke_interval=300, # Check every 5 minutes
431
timeout=14400, # 4 hour timeout
432
dag=dag
433
)
434
435
# Large ETL files (expected within day)
436
etl_sensor = WebHdfsSensor(
437
task_id='wait_for_etl_file',
438
filepath='/warehouse/etl_complete.marker',
439
poke_interval=1800, # Check every 30 minutes
440
timeout=86400, # 24 hour timeout
441
dag=dag
442
)
443
```
444
445
### Error Handling and Monitoring
446
447
```python
448
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
449
from airflow.utils.email import send_email
450
from airflow.operators.python import PythonOperator
451
452
def send_timeout_notification(context):
453
"""Send notification when sensor times out."""
454
task_instance = context['task_instance']
455
send_email(
456
to=['data-team@company.com'],
457
subject=f'HDFS Sensor Timeout: {task_instance.task_id}',
458
html_content=f'Sensor {task_instance.task_id} timed out waiting for file.'
459
)
460
461
monitored_sensor = WebHdfsSensor(
462
task_id='monitored_file_sensor',
463
filepath='/critical/daily_report.csv',
464
webhdfs_conn_id='production_hdfs',
465
poke_interval=120,
466
timeout=7200,
467
on_failure_callback=send_timeout_notification,
468
dag=dag
469
)
470
```