or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

triggers.mddocs/

0

# Asynchronous Triggers

1

2

Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations. The SFTP trigger provides asynchronous file system monitoring capabilities for high-performance workflows.

3

4

## Capabilities

5

6

### SFTP Trigger

7

8

Asynchronous trigger for deferrable SFTP file monitoring operations.

9

10

```python { .api }

11

class SFTPTrigger(BaseTrigger):

12

"""

13

SFTPTrigger that fires when file conditions are met on SFTP server.

14

15

Provides asynchronous monitoring of SFTP locations for file presence,

16

pattern matching, and modification time conditions. Designed for use

17

with deferrable sensors to optimize resource utilization.

18

"""

19

20

def __init__(

21

self,

22

path: str,

23

file_pattern: str = "",

24

sftp_conn_id: str = "sftp_default",

25

newer_than: datetime | str | None = None,

26

poke_interval: float = 5,

27

) -> None:

28

"""

29

Initialize SFTP trigger.

30

31

Parameters:

32

- path: Path on SFTP server to search for files

33

- file_pattern: Pattern to match against file list using fnmatch

34

- sftp_conn_id: SFTP connection ID for connecting to server

35

- newer_than: DateTime threshold for file modification time filtering

36

- poke_interval: How often, in seconds, to check for file existence

37

"""

38

39

def serialize(self) -> tuple[str, dict[str, Any]]:

40

"""Serialize SFTPTrigger arguments and classpath."""

41

42

async def run(self) -> AsyncIterator[TriggerEvent]:

43

"""Make asynchronous calls to SFTP server and yield trigger events."""

44

```

45

46

### Trigger Serialization

47

48

```python { .api }

49

def serialize(self) -> tuple[str, dict[str, Any]]:

50

"""

51

Serialize SFTPTrigger arguments and classpath.

52

53

Required for trigger persistence and recovery across Airflow restarts.

54

Returns the trigger class path and initialization parameters.

55

56

Returns:

57

Tuple containing:

58

- Class path string for trigger reconstruction

59

- Dictionary of initialization parameters

60

"""

61

```

62

63

### Asynchronous Monitoring

64

65

```python { .api }

66

async def run(self) -> AsyncIterator[TriggerEvent]:

67

"""

68

Make asynchronous calls to SFTP server and yield trigger events.

69

70

Continuously monitors SFTP server for file conditions using SFTPHookAsync.

71

Handles different monitoring scenarios:

72

- Direct file path monitoring when no pattern is specified

73

- Pattern-based file matching when file_pattern is provided

74

- Modification time filtering when newer_than is specified

75

76

Yields:

77

TriggerEvent objects indicating success/failure and found files

78

79

Raises:

80

AirflowException: For connection failures or configuration errors

81

"""

82

```

83

84

## Usage Examples

85

86

### Basic Deferrable File Monitoring

87

88

```python

89

from airflow import DAG

90

from airflow.providers.sftp.sensors.sftp import SFTPSensor

91

from datetime import datetime, timedelta

92

93

dag = DAG(

94

'sftp_deferrable_basic',

95

start_date=datetime(2023, 1, 1),

96

schedule_interval=timedelta(hours=1)

97

)

98

99

# Deferrable sensor automatically uses SFTPTrigger

100

deferrable_sensor = SFTPSensor(

101

task_id='wait_for_file',

102

path='/remote/data/important_file.csv',

103

sftp_conn_id='sftp_default',

104

deferrable=True, # Automatically uses SFTPTrigger

105

timeout=3600, # 1 hour timeout

106

dag=dag

107

)

108

```

109

110

### Pattern-Based Monitoring with Triggers

111

112

```python

113

from airflow import DAG

114

from airflow.providers.sftp.sensors.sftp import SFTPSensor

115

from datetime import datetime, timedelta

116

117

dag = DAG(

118

'sftp_deferrable_pattern',

119

start_date=datetime(2023, 1, 1),

120

schedule_interval=timedelta(days=1)

121

)

122

123

# Monitor for pattern-matched files asynchronously

124

pattern_sensor = SFTPSensor(

125

task_id='wait_for_daily_files',

126

path='/remote/daily_exports',

127

file_pattern='export_{{ ds_nodash }}_*.json',

128

sftp_conn_id='sftp_default',

129

deferrable=True, # Uses SFTPTrigger internally

130

timeout=14400, # 4 hour timeout

131

dag=dag

132

)

133

```

134

135

### Custom Trigger Usage (Advanced)

136

137

```python

138

from airflow import DAG

139

from airflow.providers.sftp.triggers.sftp import SFTPTrigger

140

from airflow.sensors.base import BaseSensorOperator

141

from airflow.triggers.base import TriggerEvent

142

from datetime import datetime, timedelta

143

144

class CustomSFTPSensor(BaseSensorOperator):

145

"""Custom sensor using SFTPTrigger directly."""

146

147

def __init__(self, sftp_path, sftp_conn_id='sftp_default', **kwargs):

148

super().__init__(**kwargs)

149

self.sftp_path = sftp_path

150

self.sftp_conn_id = sftp_conn_id

151

152

def execute(self, context):

153

"""Defer to custom trigger configuration."""

154

self.defer(

155

trigger=SFTPTrigger(

156

path=self.sftp_path,

157

file_pattern="*.csv",

158

sftp_conn_id=self.sftp_conn_id,

159

poke_interval=10.0, # Custom interval

160

),

161

method_name="execute_complete"

162

)

163

164

def execute_complete(self, context, event=None):

165

"""Handle trigger completion."""

166

if event["status"] == "success":

167

self.log.info(f"Files found: {event['files']}")

168

return event["files"]

169

else:

170

raise Exception(f"Trigger failed: {event}")

171

172

dag = DAG(

173

'custom_sftp_trigger',

174

start_date=datetime(2023, 1, 1),

175

schedule_interval=timedelta(hours=2)

176

)

177

178

custom_sensor = CustomSFTPSensor(

179

task_id='custom_sftp_monitor',

180

sftp_path='/remote/custom_data',

181

sftp_conn_id='sftp_custom',

182

dag=dag

183

)

184

```

185

186

### High-Frequency Monitoring

187

188

```python

189

from airflow import DAG

190

from airflow.providers.sftp.sensors.sftp import SFTPSensor

191

from datetime import datetime, timedelta

192

193

dag = DAG(

194

'sftp_high_frequency',

195

start_date=datetime(2023, 1, 1),

196

schedule_interval=timedelta(minutes=15),

197

max_active_runs=5 # Allow multiple concurrent runs

198

)

199

200

# High-frequency monitoring with short poke intervals

201

high_freq_sensor = SFTPSensor(

202

task_id='high_freq_monitor',

203

path='/remote/realtime_data',

204

file_pattern='realtime_*.json',

205

newer_than='{{ ts }}', # Only files newer than task start

206

sftp_conn_id='sftp_realtime',

207

deferrable=True,

208

timeout=900, # 15 minutes timeout

209

# Note: poke_interval is configured in the trigger

210

dag=dag

211

)

212

```

213

214

### Multiple File Pattern Monitoring

215

216

```python

217

from airflow import DAG

218

from airflow.providers.sftp.sensors.sftp import SFTPSensor

219

from airflow.operators.python import PythonOperator

220

from datetime import datetime, timedelta

221

222

def process_multiple_patterns(**context):

223

"""Process results from multiple pattern sensors."""

224

csv_files = context['task_instance'].xcom_pull(task_ids='wait_for_csv')

225

json_files = context['task_instance'].xcom_pull(task_ids='wait_for_json')

226

xml_files = context['task_instance'].xcom_pull(task_ids='wait_for_xml')

227

228

all_files = []

229

if csv_files: all_files.extend(csv_files.get('files_found', []))

230

if json_files: all_files.extend(json_files.get('files_found', []))

231

if xml_files: all_files.extend(xml_files.get('files_found', []))

232

233

print(f"Found {len(all_files)} total files to process")

234

return all_files

235

236

dag = DAG(

237

'sftp_multiple_patterns',

238

start_date=datetime(2023, 1, 1),

239

schedule_interval=timedelta(hours=6)

240

)

241

242

# Monitor for CSV files

243

csv_sensor = SFTPSensor(

244

task_id='wait_for_csv',

245

path='/remote/exports',

246

file_pattern='*.csv',

247

sftp_conn_id='sftp_default',

248

deferrable=True,

249

dag=dag

250

)

251

252

# Monitor for JSON files

253

json_sensor = SFTPSensor(

254

task_id='wait_for_json',

255

path='/remote/exports',

256

file_pattern='*.json',

257

sftp_conn_id='sftp_default',

258

deferrable=True,

259

dag=dag

260

)

261

262

# Monitor for XML files

263

xml_sensor = SFTPSensor(

264

task_id='wait_for_xml',

265

path='/remote/exports',

266

file_pattern='*.xml',

267

sftp_conn_id='sftp_default',

268

deferrable=True,

269

dag=dag

270

)

271

272

# Process all found files

273

process_files = PythonOperator(

274

task_id='process_all_files',

275

python_callable=process_multiple_patterns,

276

dag=dag

277

)

278

279

[csv_sensor, json_sensor, xml_sensor] >> process_files

280

```

281

282

### Time-Based Trigger Monitoring

283

284

```python

285

from airflow import DAG

286

from airflow.providers.sftp.sensors.sftp import SFTPSensor

287

from datetime import datetime, timedelta

288

289

dag = DAG(

290

'sftp_time_based_trigger',

291

start_date=datetime(2023, 1, 1),

292

schedule_interval=timedelta(days=1)

293

)

294

295

# Wait for files newer than specific time with deferrable execution

296

time_based_sensor = SFTPSensor(

297

task_id='wait_for_recent_files',

298

path='/remote/time_sensitive',

299

file_pattern='data_*.parquet',

300

newer_than='{{ ds }}T08:00:00', # Files from today after 8 AM

301

sftp_conn_id='sftp_default',

302

deferrable=True,

303

timeout=28800, # 8 hour timeout

304

dag=dag

305

)

306

```

307

308

### Error Handling with Triggers

309

310

```python

311

from airflow import DAG

312

from airflow.providers.sftp.sensors.sftp import SFTPSensor

313

from airflow.operators.python import PythonOperator

314

from airflow.operators.email import EmailOperator

315

from datetime import datetime, timedelta

316

317

def handle_sensor_success(**context):

318

"""Handle successful file detection."""

319

task_instance = context['task_instance']

320

sensor_result = task_instance.xcom_pull(task_ids='deferrable_file_sensor')

321

322

if isinstance(sensor_result, dict) and 'files_found' in sensor_result:

323

files = sensor_result['files_found']

324

print(f"Successfully found {len(files)} files: {files}")

325

return {"status": "success", "file_count": len(files)}

326

else:

327

print(f"Sensor completed with result: {sensor_result}")

328

return {"status": "completed", "result": sensor_result}

329

330

dag = DAG(

331

'sftp_trigger_error_handling',

332

start_date=datetime(2023, 1, 1),

333

schedule_interval=timedelta(hours=4)

334

)

335

336

# Deferrable sensor with comprehensive error handling

337

deferrable_sensor = SFTPSensor(

338

task_id='deferrable_file_sensor',

339

path='/remote/critical_data',

340

file_pattern='critical_*.csv',

341

newer_than='{{ ds }}T00:00:00',

342

sftp_conn_id='sftp_critical',

343

deferrable=True,

344

timeout=14400, # 4 hours

345

retries=2,

346

retry_delay=timedelta(minutes=30),

347

dag=dag

348

)

349

350

# Handle successful completion

351

success_handler = PythonOperator(

352

task_id='handle_success',

353

python_callable=handle_sensor_success,

354

dag=dag

355

)

356

357

# Send failure notification

358

failure_notification = EmailOperator(

359

task_id='failure_notification',

360

to=['ops@company.com'],

361

subject='SFTP Monitoring Failed - {{ ds }}',

362

html_content='''

363

<h3>SFTP Trigger Monitoring Failure</h3>

364

<p>The deferrable SFTP sensor failed to find required files.</p>

365

<p>Task: {{ task.task_id }}</p>

366

<p>Execution Date: {{ ds }}</p>

367

''',

368

trigger_rule='one_failed',

369

dag=dag

370

)

371

372

deferrable_sensor >> success_handler

373

deferrable_sensor >> failure_notification

374

```

375

376

### Resource-Efficient Batch Monitoring

377

378

```python

379

from airflow import DAG

380

from airflow.providers.sftp.sensors.sftp import SFTPSensor

381

from datetime import datetime, timedelta

382

383

# Configure for resource efficiency

384

dag = DAG(

385

'sftp_batch_efficient',

386

start_date=datetime(2023, 1, 1),

387

schedule_interval=timedelta(hours=1),

388

max_active_runs=10, # Allow many concurrent deferrable tasks

389

catchup=False

390

)

391

392

# Multiple sensors running concurrently with minimal resource usage

393

sensors = []

394

for i in range(5):

395

sensor = SFTPSensor(

396

task_id=f'monitor_batch_{i}',

397

path=f'/remote/batch_{i}',

398

file_pattern='*.csv',

399

sftp_conn_id='sftp_default',

400

deferrable=True, # Each sensor uses minimal resources

401

timeout=3600,

402

dag=dag

403

)

404

sensors.append(sensor)

405

406

# All sensors can run concurrently without consuming worker slots

407

```

408

409

## Trigger Lifecycle

410

411

### Initialization and Serialization

412

413

When a deferrable sensor is executed:

414

415

1. **Sensor Execution**: The sensor's `execute()` method calls `self.defer()`

416

2. **Trigger Creation**: An `SFTPTrigger` instance is created with the specified parameters

417

3. **Serialization**: The trigger is serialized using `serialize()` method for persistence

418

4. **Worker Release**: The sensor releases its worker slot and the trigger runs asynchronously

419

420

### Asynchronous Monitoring Loop

421

422

The trigger's `run()` method:

423

424

1. **Connection Setup**: Establishes async SFTP connection using `SFTPHookAsync`

425

2. **Monitoring Loop**: Continuously checks file conditions at specified intervals

426

3. **Condition Evaluation**: Evaluates file existence, patterns, and modification times

427

4. **Event Generation**: Yields `TriggerEvent` objects when conditions are met or timeouts occur

428

429

### Completion and Callback

430

431

When the trigger completes:

432

433

1. **Event Yield**: Trigger yields a final `TriggerEvent` with success/failure status

434

2. **Sensor Resumption**: The sensor's `execute_complete()` method is called

435

3. **Result Processing**: Sensor processes the trigger event and completes execution

436

437

## Best Practices

438

439

### Resource Management

440

441

- Use deferrable sensors for long-running monitoring tasks to free up worker slots

442

- Configure appropriate `poke_interval` values to balance responsiveness with server load

443

- Implement reasonable timeout values to prevent indefinite waiting

444

- Monitor trigger task queues to ensure adequate triggerer capacity

445

446

### Performance Optimization

447

448

- Use specific file patterns to reduce unnecessary server queries

449

- Implement connection pooling for triggers monitoring the same SFTP server

450

- Consider trigger serialization overhead for very high-frequency monitoring

451

- Monitor async connection pool sizes for optimal performance

452

453

### Error Handling

454

455

- Implement appropriate retry strategies for trigger failures

456

- Configure alerts for trigger timeout scenarios

457

- Use proper exception handling in custom trigger implementations

458

- Monitor trigger execution logs for connection issues

459

460

### Scalability Considerations

461

462

- Plan triggerer capacity based on expected concurrent deferrable tasks

463

- Use database connection pooling for trigger state management

464

- Implement proper cleanup for failed or interrupted triggers

465

- Consider trigger resource limits in high-throughput scenarios