or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlogging-integration.mdmonitoring-sensors.mdwebhdfs-operations.md

monitoring-sensors.mddocs/

0

# File System Monitoring

1

2

Sensors for monitoring HDFS file system states and coordinating workflow execution based on file availability. These sensors enable event-driven data pipelines that wait for specific files or file sets before proceeding with downstream processing.

3

4

## Capabilities

5

6

### Single File Monitoring

7

8

Wait for a specific file or directory to appear in HDFS before proceeding with downstream tasks.

9

10

```python { .api }

11

class WebHdfsSensor:

12

"""

13

Sensor that waits for a file or folder to land in HDFS.

14

15

Attributes:

16

template_fields: Sequence of template fields ("filepath",)

17

"""

18

19

def __init__(

20

self,

21

*,

22

filepath: str,

23

webhdfs_conn_id: str = "webhdfs_default",

24

**kwargs

25

) -> None:

26

"""

27

Initialize WebHDFS sensor for single file monitoring.

28

29

Parameters:

30

filepath: The path to monitor in HDFS

31

webhdfs_conn_id: The connection id for the webhdfs client

32

**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)

33

"""

34

35

def poke(self, context) -> bool:

36

"""

37

Check if the filepath exists in HDFS.

38

39

Parameters:

40

context: Airflow task context

41

42

Returns:

43

bool: True if file exists, False otherwise

44

"""

45

```

46

47

### Multiple Files Monitoring

48

49

Wait for multiple specific files to appear in a directory before proceeding, useful for batch processing scenarios where multiple input files are required.

50

51

```python { .api }

52

class MultipleFilesWebHdfsSensor:

53

"""

54

Sensor that waits for multiple files in a folder to land in HDFS.

55

56

Attributes:

57

template_fields: Sequence of template fields ("directory_path", "expected_filenames")

58

"""

59

60

def __init__(

61

self,

62

*,

63

directory_path: str,

64

expected_filenames: Sequence[str],

65

webhdfs_conn_id: str = "webhdfs_default",

66

**kwargs

67

) -> None:

68

"""

69

Initialize WebHDFS sensor for multiple files monitoring.

70

71

Parameters:

72

directory_path: The directory path to monitor in HDFS

73

expected_filenames: Sequence of expected filenames to wait for

74

webhdfs_conn_id: The connection id for the webhdfs client

75

**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)

76

"""

77

78

def poke(self, context) -> bool:

79

"""

80

Check if all expected files exist in the directory.

81

82

Parameters:

83

context: Airflow task context

84

85

Returns:

86

bool: True if all expected files exist, False if any are missing

87

"""

88

```

89

90

## Usage Examples

91

92

### Basic File Sensor

93

94

Wait for a single file to appear in HDFS:

95

96

```python

97

from airflow import DAG

98

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

99

from airflow.operators.python import PythonOperator

100

from datetime import datetime, timedelta

101

102

def process_data():

103

print("File is ready, processing data...")

104

105

dag = DAG(

106

'hdfs_file_sensor_example',

107

default_args={

108

'owner': 'data_team',

109

'depends_on_past': False,

110

'email_on_failure': False,

111

'email_on_retry': False,

112

'retries': 1,

113

'retry_delay': timedelta(minutes=5)

114

},

115

description='Wait for HDFS file and process',

116

schedule_interval=timedelta(hours=1),

117

start_date=datetime(2024, 1, 1),

118

catchup=False

119

)

120

121

# Sensor task

122

file_sensor = WebHdfsSensor(

123

task_id='wait_for_input_file',

124

filepath='/data/input/daily_sales_{{ ds }}.csv', # Templated filepath

125

webhdfs_conn_id='production_hdfs',

126

poke_interval=60, # Check every 60 seconds

127

timeout=3600, # Timeout after 1 hour

128

dag=dag

129

)

130

131

# Processing task

132

process_task = PythonOperator(

133

task_id='process_sales_data',

134

python_callable=process_data,

135

dag=dag

136

)

137

138

file_sensor >> process_task

139

```

140

141

### Multiple Files Sensor

142

143

Wait for multiple files to appear in a directory:

144

145

```python

146

from airflow import DAG

147

from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor

148

from airflow.operators.bash import BashOperator

149

from datetime import datetime, timedelta

150

151

dag = DAG(

152

'hdfs_multiple_files_sensor',

153

default_args={'owner': 'data_team'},

154

description='Wait for multiple HDFS files',

155

schedule_interval='@daily',

156

start_date=datetime(2024, 1, 1),

157

catchup=False

158

)

159

160

# Wait for multiple required files

161

files_sensor = MultipleFilesWebHdfsSensor(

162

task_id='wait_for_batch_files',

163

directory_path='/data/batch/{{ ds }}/',

164

expected_filenames=[

165

'transactions.parquet',

166

'customers.parquet',

167

'products.parquet',

168

'inventory.parquet'

169

],

170

webhdfs_conn_id='batch_hdfs',

171

poke_interval=120, # Check every 2 minutes

172

timeout=7200, # Timeout after 2 hours

173

dag=dag

174

)

175

176

# Start batch processing when all files are ready

177

batch_process = BashOperator(

178

task_id='start_batch_processing',

179

bash_command='spark-submit /scripts/batch_processing.py --date {{ ds }}',

180

dag=dag

181

)

182

183

files_sensor >> batch_process

184

```

185

186

### Advanced Sensor Configuration

187

188

Configure sensors with custom retry logic and failure handling:

189

190

```python

191

from airflow import DAG

192

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

193

from airflow.operators.python import PythonOperator

194

from airflow.utils.trigger_rule import TriggerRule

195

from datetime import datetime, timedelta

196

197

def handle_missing_file():

198

print("File was not found within timeout period")

199

# Implement fallback logic or notifications

200

201

def process_when_ready():

202

print("File found, proceeding with processing")

203

204

dag = DAG(

205

'robust_hdfs_sensor',

206

default_args={'owner': 'data_team'},

207

start_date=datetime(2024, 1, 1),

208

schedule_interval='@hourly'

209

)

210

211

# Primary sensor with shorter timeout

212

primary_sensor = WebHdfsSensor(

213

task_id='wait_for_primary_file',

214

filepath='/data/primary/hourly_{{ ts_nodash }}.json',

215

webhdfs_conn_id='primary_hdfs',

216

poke_interval=30,

217

timeout=1800, # 30 minutes

218

soft_fail=True, # Don't fail the DAG if timeout

219

dag=dag

220

)

221

222

# Fallback sensor for backup location

223

fallback_sensor = WebHdfsSensor(

224

task_id='wait_for_backup_file',

225

filepath='/data/backup/hourly_{{ ts_nodash }}.json',

226

webhdfs_conn_id='backup_hdfs',

227

poke_interval=60,

228

timeout=900, # 15 minutes

229

trigger_rule=TriggerRule.ALL_FAILED, # Only run if primary fails

230

dag=dag

231

)

232

233

# Processing task that runs if either sensor succeeds

234

process_task = PythonOperator(

235

task_id='process_data',

236

python_callable=process_when_ready,

237

trigger_rule=TriggerRule.ONE_SUCCESS,

238

dag=dag

239

)

240

241

# Cleanup task for failed scenarios

242

cleanup_task = PythonOperator(

243

task_id='handle_missing_files',

244

python_callable=handle_missing_file,

245

trigger_rule=TriggerRule.ALL_FAILED,

246

dag=dag

247

)

248

249

# Task dependencies

250

[primary_sensor, fallback_sensor] >> process_task

251

[primary_sensor, fallback_sensor] >> cleanup_task

252

```

253

254

### Sensor with Dynamic File Patterns

255

256

Use templated filepaths for dynamic file monitoring:

257

258

```python

259

from airflow import DAG

260

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

261

from datetime import datetime, timedelta

262

263

dag = DAG(

264

'dynamic_hdfs_sensor',

265

start_date=datetime(2024, 1, 1),

266

schedule_interval='@daily'

267

)

268

269

# Sensor with Jinja templating

270

dynamic_sensor = WebHdfsSensor(

271

task_id='wait_for_dated_file',

272

# Wait for file with current date in path and filename

273

filepath='/warehouse/{{ macros.ds_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}/data_{{ ds_nodash }}.parquet',

274

webhdfs_conn_id='warehouse_hdfs',

275

poke_interval=300, # 5 minutes

276

timeout=14400, # 4 hours

277

dag=dag

278

)

279

280

# Multiple sensors for different file types

281

sensor_configs = [

282

{'name': 'transactions', 'path': '/raw/transactions/{{ ds }}/'},

283

{'name': 'customers', 'path': '/raw/customers/{{ ds }}/'},

284

{'name': 'products', 'path': '/raw/products/{{ ds }}/'}

285

]

286

287

sensors = []

288

for config in sensor_configs:

289

sensor = WebHdfsSensor(

290

task_id=f'wait_for_{config["name"]}_files',

291

filepath=f'{config["path"]}_SUCCESS', # Wait for success marker

292

webhdfs_conn_id='data_lake_hdfs',

293

poke_interval=120,

294

timeout=3600,

295

dag=dag

296

)

297

sensors.append(sensor)

298

299

# All sensors must complete before downstream processing

300

from airflow.operators.dummy import DummyOperator

301

302

all_ready = DummyOperator(

303

task_id='all_files_ready',

304

trigger_rule=TriggerRule.ALL_SUCCESS,

305

dag=dag

306

)

307

308

sensors >> all_ready

309

```

310

311

## Integration Patterns

312

313

### Combining with Hook Operations

314

315

Use sensors to trigger hook-based file operations:

316

317

```python

318

from airflow import DAG

319

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

320

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

321

from airflow.operators.python import PythonOperator

322

from datetime import datetime

323

324

def copy_and_process():

325

"""Copy file from input to processing directory and validate."""

326

hook = WebHDFSHook(webhdfs_conn_id='main_hdfs')

327

328

input_path = '/input/raw_data.csv'

329

processing_path = '/processing/raw_data.csv'

330

331

# Read from input location

332

data = hook.read_file(input_path)

333

334

# Write to temporary file for processing

335

with open('/tmp/processing_data.csv', 'wb') as f:

336

f.write(data)

337

338

# Upload to processing directory

339

hook.load_file('/tmp/processing_data.csv', processing_path)

340

341

print(f"File copied and ready for processing: {processing_path}")

342

343

dag = DAG('sensor_hook_integration', start_date=datetime(2024, 1, 1))

344

345

# Wait for input file

346

sensor = WebHdfsSensor(

347

task_id='wait_for_input',

348

filepath='/input/raw_data.csv',

349

webhdfs_conn_id='main_hdfs',

350

dag=dag

351

)

352

353

# Copy and prepare for processing

354

copy_task = PythonOperator(

355

task_id='copy_and_process',

356

python_callable=copy_and_process,

357

dag=dag

358

)

359

360

sensor >> copy_task

361

```

362

363

### Custom Sensor Logic

364

365

Extend sensors for custom monitoring logic:

366

367

```python

368

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

369

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

370

from airflow.utils.context import Context

371

372

class FileSizeWebHdfsSensor(WebHdfsSensor):

373

"""Custom sensor that checks both file existence and minimum size."""

374

375

def __init__(self, min_size_bytes: int = 0, **kwargs):

376

super().__init__(**kwargs)

377

self.min_size_bytes = min_size_bytes

378

379

def poke(self, context: Context) -> bool:

380

"""Check if file exists and meets minimum size requirement."""

381

hook = WebHDFSHook(self.webhdfs_conn_id)

382

383

# Check if file exists

384

if not hook.check_for_path(self.filepath):

385

self.log.info(f"File {self.filepath} does not exist yet")

386

return False

387

388

# Check file size if minimum size specified

389

if self.min_size_bytes > 0:

390

client = hook.get_conn()

391

file_status = client.status(self.filepath)

392

file_size = file_status.get('length', 0)

393

394

if file_size < self.min_size_bytes:

395

self.log.info(f"File {self.filepath} exists but size {file_size} < {self.min_size_bytes} bytes")

396

return False

397

398

self.log.info(f"File {self.filepath} exists and meets size requirements")

399

return True

400

401

# Usage

402

custom_sensor = FileSizeWebHdfsSensor(

403

task_id='wait_for_large_file',

404

filepath='/data/large_dataset.parquet',

405

min_size_bytes=1024 * 1024, # Minimum 1MB

406

webhdfs_conn_id='data_hdfs',

407

poke_interval=60,

408

timeout=3600

409

)

410

```

411

412

## Sensor Configuration Best Practices

413

414

### Optimal Polling Configuration

415

416

```python

417

# Short-lived files (expected within minutes)

418

quick_sensor = WebHdfsSensor(

419

task_id='wait_for_quick_file',

420

filepath='/tmp/quick_process.flag',

421

poke_interval=10, # Check every 10 seconds

422

timeout=300, # 5 minute timeout

423

dag=dag

424

)

425

426

# Regular batch files (expected within hours)

427

batch_sensor = WebHdfsSensor(

428

task_id='wait_for_batch_file',

429

filepath='/batch/daily_extract.csv',

430

poke_interval=300, # Check every 5 minutes

431

timeout=14400, # 4 hour timeout

432

dag=dag

433

)

434

435

# Large ETL files (expected within day)

436

etl_sensor = WebHdfsSensor(

437

task_id='wait_for_etl_file',

438

filepath='/warehouse/etl_complete.marker',

439

poke_interval=1800, # Check every 30 minutes

440

timeout=86400, # 24 hour timeout

441

dag=dag

442

)

443

```

444

445

### Error Handling and Monitoring

446

447

```python

448

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

449

from airflow.utils.email import send_email

450

from airflow.operators.python import PythonOperator

451

452

def send_timeout_notification(context):

453

"""Send notification when sensor times out."""

454

task_instance = context['task_instance']

455

send_email(

456

to=['data-team@company.com'],

457

subject=f'HDFS Sensor Timeout: {task_instance.task_id}',

458

html_content=f'Sensor {task_instance.task_id} timed out waiting for file.'

459

)

460

461

monitored_sensor = WebHdfsSensor(

462

task_id='monitored_file_sensor',

463

filepath='/critical/daily_report.csv',

464

webhdfs_conn_id='production_hdfs',

465

poke_interval=120,

466

timeout=7200,

467

on_failure_callback=send_timeout_notification,

468

dag=dag

469

)

470

```