or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

sensors.mddocs/

0

# File Monitoring

1

2

Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers. The SFTP sensor provides comprehensive file system monitoring capabilities with support for both blocking and deferrable execution modes.

3

4

## Capabilities

5

6

### SFTP Sensor

7

8

Main sensor for monitoring file and directory presence on SFTP servers with extensive filtering and condition checking capabilities.

9

10

```python { .api }

11

class SFTPSensor(BaseSensorOperator):

12

"""

13

Waits for a file or directory to be present on SFTP.

14

15

Monitors SFTP locations for file presence, pattern matching, and modification

16

time conditions. Supports both synchronous polling and asynchronous deferrable

17

execution for efficient resource utilization.

18

"""

19

20

template_fields: Sequence[str] = ("path", "newer_than")

21

22

def __init__(

23

self,

24

*,

25

path: str,

26

file_pattern: str = "",

27

newer_than: datetime | str | None = None,

28

sftp_conn_id: str = "sftp_default",

29

python_callable: Callable | None = None,

30

op_args: list | None = None,

31

op_kwargs: dict[str, Any] | None = None,

32

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

33

**kwargs,

34

) -> None:

35

"""

36

Initialize SFTP sensor.

37

38

Parameters:

39

- path: Remote file or directory path (templated)

40

- file_pattern: Pattern for file matching using fnmatch format

41

- sftp_conn_id: Connection to run the sensor against (default: sftp_default)

42

- newer_than: DateTime for which file should be newer than (templated)

43

- python_callable: Optional callable to execute when files are found

44

- op_args: Arguments for python_callable

45

- op_kwargs: Keyword arguments for python_callable

46

- deferrable: Whether to defer the task until done (default: False)

47

"""

48

49

def poke(self, context: Context) -> PokeReturnValue | bool:

50

"""Check for file existence and conditions."""

51

52

def execute(self, context: Context) -> Any:

53

"""Execute the sensor, either synchronously or by deferring to trigger."""

54

55

def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:

56

"""Execute callback when the trigger fires; returns immediately."""

57

```

58

59

### File Existence Monitoring

60

61

```python { .api }

62

def poke(self, context: Context) -> PokeReturnValue | bool:

63

"""

64

Check file conditions and return status.

65

66

Performs the core sensing logic including file existence checks,

67

pattern matching, modification time comparisons, and optional

68

python callable execution.

69

70

Parameters:

71

- context: Airflow task execution context

72

73

Returns:

74

PokeReturnValue with completion status and XCom values, or boolean

75

indicating whether conditions are met

76

"""

77

```

78

79

### Synchronous and Asynchronous Execution

80

81

```python { .api }

82

def execute(self, context: Context) -> Any:

83

"""

84

Execute the sensor, either synchronously or by deferring to trigger.

85

86

When deferrable=False, uses traditional polling approach.

87

When deferrable=True, defers to SFTPTrigger for async monitoring.

88

89

Parameters:

90

- context: Airflow task execution context

91

92

Returns:

93

Sensor result or defers to trigger for async execution

94

"""

95

96

def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:

97

"""

98

Execute callback when the trigger fires.

99

100

Called when deferrable sensor completes via trigger.

101

Processes trigger results and returns immediately.

102

103

Parameters:

104

- context: Airflow task execution context

105

- event: Event data from the trigger

106

"""

107

```

108

109

## Usage Examples

110

111

### Basic File Monitoring

112

113

```python

114

from airflow import DAG

115

from airflow.providers.sftp.sensors.sftp import SFTPSensor

116

from datetime import datetime, timedelta

117

118

dag = DAG(

119

'sftp_sensor_basic',

120

start_date=datetime(2023, 1, 1),

121

schedule_interval=timedelta(hours=1)

122

)

123

124

# Wait for a specific file to appear

125

wait_for_file = SFTPSensor(

126

task_id='wait_for_data_file',

127

path='/remote/incoming/data.csv',

128

sftp_conn_id='sftp_default',

129

timeout=3600, # Wait up to 1 hour

130

poke_interval=300, # Check every 5 minutes

131

dag=dag

132

)

133

```

134

135

### Pattern-Based File Monitoring

136

137

```python

138

from airflow import DAG

139

from airflow.providers.sftp.sensors.sftp import SFTPSensor

140

from datetime import datetime, timedelta

141

142

dag = DAG(

143

'sftp_sensor_pattern',

144

start_date=datetime(2023, 1, 1),

145

schedule_interval=timedelta(days=1)

146

)

147

148

# Wait for any CSV file matching a pattern

149

wait_for_csv_files = SFTPSensor(

150

task_id='wait_for_csv_files',

151

path='/remote/incoming',

152

file_pattern='daily_report_*.csv', # Match files like daily_report_20230101.csv

153

sftp_conn_id='sftp_default',

154

timeout=7200, # Wait up to 2 hours

155

poke_interval=600, # Check every 10 minutes

156

dag=dag

157

)

158

159

# Wait for files with date patterns

160

wait_for_dated_files = SFTPSensor(

161

task_id='wait_for_dated_files',

162

path='/remote/exports',

163

file_pattern='export_{{ ds_nodash }}_*.json', # Templated pattern

164

sftp_conn_id='sftp_default',

165

dag=dag

166

)

167

```

168

169

### Modification Time Monitoring

170

171

```python

172

from airflow import DAG

173

from airflow.providers.sftp.sensors.sftp import SFTPSensor

174

from datetime import datetime, timedelta

175

176

dag = DAG(

177

'sftp_sensor_mod_time',

178

start_date=datetime(2023, 1, 1),

179

schedule_interval=timedelta(hours=6)

180

)

181

182

# Wait for file newer than a specific time

183

wait_for_recent_file = SFTPSensor(

184

task_id='wait_for_recent_file',

185

path='/remote/data/latest.csv',

186

newer_than='2023-01-01T00:00:00', # ISO format string

187

sftp_conn_id='sftp_default',

188

dag=dag

189

)

190

191

# Wait for file newer than task execution time

192

wait_for_fresh_file = SFTPSensor(

193

task_id='wait_for_fresh_file',

194

path='/remote/data/hourly.json',

195

newer_than='{{ ts }}', # Templated to task execution time

196

sftp_conn_id='sftp_default',

197

dag=dag

198

)

199

200

# Wait for file newer than yesterday

201

wait_for_daily_update = SFTPSensor(

202

task_id='wait_for_daily_update',

203

path='/remote/reports',

204

file_pattern='daily_*.csv',

205

newer_than='{{ yesterday_ds }}T00:00:00', # Yesterday at midnight

206

sftp_conn_id='sftp_default',

207

dag=dag

208

)

209

```

210

211

### Custom Processing with Python Callable

212

213

```python

214

from airflow import DAG

215

from airflow.providers.sftp.sensors.sftp import SFTPSensor

216

from datetime import datetime, timedelta

217

218

def process_found_files(files_found, **context):

219

"""Custom processing function for found files."""

220

print(f"Found {len(files_found)} files: {files_found}")

221

222

# Custom logic for file validation

223

for file_path in files_found:

224

print(f"Processing: {file_path}")

225

# Add custom validation or processing logic

226

227

return {"processed_files": len(files_found), "status": "success"}

228

229

dag = DAG(

230

'sftp_sensor_callable',

231

start_date=datetime(2023, 1, 1),

232

schedule_interval=timedelta(hours=2)

233

)

234

235

sensor_with_processing = SFTPSensor(

236

task_id='sensor_with_processing',

237

path='/remote/incoming',

238

file_pattern='*.xml',

239

sftp_conn_id='sftp_default',

240

python_callable=process_found_files,

241

op_kwargs={'extra_param': 'custom_value'},

242

dag=dag

243

)

244

```

245

246

### Deferrable Sensor for Resource Efficiency

247

248

```python

249

from airflow import DAG

250

from airflow.providers.sftp.sensors.sftp import SFTPSensor

251

from datetime import datetime, timedelta

252

253

dag = DAG(

254

'sftp_sensor_deferrable',

255

start_date=datetime(2023, 1, 1),

256

schedule_interval=timedelta(hours=1)

257

)

258

259

# Use deferrable sensor to free up worker slots while waiting

260

deferrable_sensor = SFTPSensor(

261

task_id='deferrable_file_sensor',

262

path='/remote/large_files',

263

file_pattern='bigdata_*.parquet',

264

sftp_conn_id='sftp_default',

265

deferrable=True, # Use async trigger instead of blocking

266

timeout=14400, # Wait up to 4 hours

267

dag=dag

268

)

269

```

270

271

### Complex File Monitoring Workflow

272

273

```python

274

from airflow import DAG

275

from airflow.providers.sftp.sensors.sftp import SFTPSensor

276

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

277

from airflow.operators.python import PythonOperator

278

from datetime import datetime, timedelta

279

280

def validate_files(**context):

281

"""Validate downloaded files before processing."""

282

files_found = context['task_instance'].xcom_pull(task_ids='wait_for_source_files')

283

print(f"Validating files: {files_found}")

284

# Add validation logic

285

return True

286

287

dag = DAG(

288

'sftp_complex_monitoring',

289

start_date=datetime(2023, 1, 1),

290

schedule_interval=timedelta(days=1)

291

)

292

293

# Wait for multiple files with specific pattern and recency

294

wait_for_source_files = SFTPSensor(

295

task_id='wait_for_source_files',

296

path='/remote/daily_exports',

297

file_pattern='export_{{ ds_nodash }}_*.csv',

298

newer_than='{{ ds }}T06:00:00', # Files newer than 6 AM on execution date

299

sftp_conn_id='sftp_source',

300

timeout=10800, # 3 hours timeout

301

poke_interval=900, # Check every 15 minutes

302

dag=dag

303

)

304

305

# Download files once they're available

306

download_files = SFTPOperator(

307

task_id='download_files',

308

ssh_conn_id='sftp_source',

309

local_filepath='/local/staging/{{ ds }}/',

310

remote_filepath='/remote/daily_exports/export_{{ ds_nodash }}_*.csv',

311

operation=SFTPOperation.GET,

312

create_intermediate_dirs=True,

313

dag=dag

314

)

315

316

# Validate downloaded files

317

validate = PythonOperator(

318

task_id='validate_files',

319

python_callable=validate_files,

320

dag=dag

321

)

322

323

# Wait for processing completion signal

324

wait_for_completion = SFTPSensor(

325

task_id='wait_for_completion',

326

path='/remote/status/processing_complete_{{ ds_nodash }}.flag',

327

sftp_conn_id='sftp_source',

328

timeout=7200, # 2 hours for processing

329

dag=dag

330

)

331

332

wait_for_source_files >> download_files >> validate >> wait_for_completion

333

```

334

335

### Directory Monitoring

336

337

```python

338

from airflow import DAG

339

from airflow.providers.sftp.sensors.sftp import SFTPSensor

340

from datetime import datetime, timedelta

341

342

dag = DAG(

343

'sftp_directory_monitoring',

344

start_date=datetime(2023, 1, 1),

345

schedule_interval=timedelta(hours=4)

346

)

347

348

# Monitor for any file in a directory

349

wait_for_any_file = SFTPSensor(

350

task_id='wait_for_any_file',

351

path='/remote/incoming',

352

file_pattern='*', # Match any file

353

sftp_conn_id='sftp_default',

354

dag=dag

355

)

356

357

# Monitor for specific file types

358

wait_for_json_files = SFTPSensor(

359

task_id='wait_for_json_files',

360

path='/remote/api_exports',

361

file_pattern='*.json',

362

newer_than='{{ ds }}T00:00:00', # Today's files only

363

sftp_conn_id='sftp_default',

364

dag=dag

365

)

366

```

367

368

### Error Handling and Monitoring

369

370

```python

371

from airflow import DAG

372

from airflow.providers.sftp.sensors.sftp import SFTPSensor

373

from airflow.operators.email import EmailOperator

374

from datetime import datetime, timedelta

375

376

default_args = {

377

'retries': 2,

378

'retry_delay': timedelta(minutes=10),

379

'email_on_failure': True,

380

'email_on_retry': False

381

}

382

383

dag = DAG(

384

'sftp_sensor_monitoring',

385

default_args=default_args,

386

start_date=datetime(2023, 1, 1),

387

schedule_interval=timedelta(hours=1)

388

)

389

390

# Critical file monitoring with failure notifications

391

critical_file_sensor = SFTPSensor(

392

task_id='critical_file_sensor',

393

path='/remote/critical/daily_feed.csv',

394

newer_than='{{ ds }}T07:00:00', # Must be from today after 7 AM

395

sftp_conn_id='sftp_critical',

396

timeout=7200, # 2 hour timeout

397

poke_interval=300, # Check every 5 minutes

398

dag=dag

399

)

400

401

# Send alert if sensor fails

402

failure_alert = EmailOperator(

403

task_id='failure_alert',

404

to=['ops@company.com'],

405

subject='Critical SFTP File Missing - {{ ds }}',

406

html_content='''

407

<h3>Alert: Critical SFTP File Missing</h3>

408

<p>The daily feed file was not found within the expected timeframe.</p>

409

<p>Execution Date: {{ ds }}</p>

410

<p>Please check the SFTP server and data pipeline.</p>

411

''',

412

trigger_rule='one_failed', # Trigger on sensor failure

413

dag=dag

414

)

415

416

critical_file_sensor >> failure_alert

417

```

418

419

## Best Practices

420

421

### Performance Optimization

422

423

- Use appropriate `poke_interval` values to balance responsiveness with server load

424

- Set reasonable `timeout` values based on expected file arrival patterns

425

- Use `deferrable=True` for long-running sensors to free up worker slots

426

- Consider file pattern specificity to reduce unnecessary checks

427

428

### Resource Management

429

430

- Configure sensor pools to limit concurrent SFTP connections

431

- Use connection pooling for sensors monitoring the same SFTP server

432

- Monitor sensor task duration and adjust timeouts accordingly

433

- Implement sensor retries with exponential backoff for transient failures

434

435

### Monitoring and Alerting

436

437

- Set up alerts for sensor timeout failures

438

- Monitor sensor execution patterns to optimize scheduling

439

- Use XCom to pass file information to downstream tasks

440

- Implement custom logging for sensor status tracking

441

442

### Pattern Matching

443

444

- Use specific patterns to avoid false positives

445

- Test fnmatch patterns thoroughly with expected file names

446

- Consider using templated patterns for date-based file monitoring

447

- Document pattern expectations for team maintenance

448

449

### Time-based Conditions

450

451

- Use UTC timestamps consistently across all sensors

452

- Account for timezone differences between Airflow and SFTP servers

453

- Implement buffer times for file processing delays

454

- Consider file system timestamp precision limitations