or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

imap-hooks.mdimap-sensors.mdindex.md

imap-sensors.mddocs/

0

# IMAP Sensors

1

2

Sensor operators that monitor email mailboxes for specific attachments, enabling event-driven workflows triggered by incoming emails. IMAP sensors provide a way to pause DAG execution until specific email conditions are met.

3

4

## Capabilities

5

6

### ImapAttachmentSensor Class

7

8

Sensor that waits for specific email attachments to arrive in a mailbox, enabling email-triggered workflow automation.

9

10

```python { .api }

11

class ImapAttachmentSensor:

12

"""

13

Sensor that waits for specific attachments on a mail server.

14

15

Inherits from BaseSensorOperator, providing standard sensor capabilities

16

like poke_interval, timeout, and soft_fail configurations.

17

18

Parameters:

19

- attachment_name: Name or pattern of attachment to wait for

20

- check_regex (bool): If True, treat attachment_name as regex pattern

21

- mail_folder (str): Mail folder to monitor (default: "INBOX")

22

- mail_filter (str): IMAP search filter for message selection

23

- conn_id (str): Airflow connection ID for IMAP server

24

- **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)

25

26

Attributes:

27

- template_fields: ["attachment_name", "mail_filter"]

28

29

Template Fields:

30

- attachment_name: Can be templated with Airflow variables/macros

31

- mail_filter: Can be templated with Airflow variables/macros

32

"""

33

34

def __init__(

35

self,

36

*,

37

attachment_name,

38

check_regex=False,

39

mail_folder="INBOX",

40

mail_filter="All",

41

conn_id="imap_default",

42

**kwargs,

43

) -> None: ...

44

```

45

46

### Sensor Execution

47

48

Core sensor method that checks for attachment presence during each sensor poke.

49

50

```python { .api }

51

def poke(self, context) -> bool:

52

"""

53

Check for attachment presence in the mail server.

54

55

This method is called repeatedly by the Airflow scheduler according

56

to the poke_interval until it returns True or the sensor times out.

57

58

Parameters:

59

- context: Airflow execution context containing task information

60

61

Returns:

62

bool: True if attachment found, False to continue waiting

63

"""

64

```

65

66

## Usage Patterns

67

68

### Basic Attachment Monitoring

69

70

```python

71

from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor

72

from airflow import DAG

73

from datetime import datetime, timedelta

74

75

dag = DAG(

76

'email_processing_workflow',

77

start_date=datetime(2024, 1, 1),

78

schedule_interval=None, # Triggered by email arrival

79

catchup=False

80

)

81

82

# Wait for daily report attachment

83

wait_for_report = ImapAttachmentSensor(

84

task_id="wait_for_daily_report",

85

attachment_name="daily_report.xlsx",

86

mail_folder="Reports",

87

conn_id="company_imap",

88

poke_interval=60, # Check every minute

89

timeout=3600, # Timeout after 1 hour

90

dag=dag

91

)

92

```

93

94

### Regex Pattern Matching

95

96

```python

97

# Wait for any CSV file matching pattern

98

wait_for_csv = ImapAttachmentSensor(

99

task_id="wait_for_any_csv",

100

attachment_name=r"data_\d{8}\.csv", # Matches data_20240101.csv format

101

check_regex=True,

102

mail_folder="DataImports",

103

poke_interval=300, # Check every 5 minutes

104

dag=dag

105

)

106

107

# Wait for files with specific prefix

108

wait_for_backup = ImapAttachmentSensor(

109

task_id="wait_for_backup_file",

110

attachment_name=r"backup_.*\.(zip|tar\.gz)",

111

check_regex=True,

112

mail_folder="Backups",

113

dag=dag

114

)

115

```

116

117

### Email Filtering

118

119

```python

120

# Wait for attachment from specific sender

121

wait_from_sender = ImapAttachmentSensor(

122

task_id="wait_for_vendor_report",

123

attachment_name="invoice.pdf",

124

mail_filter='FROM "vendor@supplier.com"',

125

poke_interval=120,

126

dag=dag

127

)

128

129

# Wait for recent emails only (last hour)

130

wait_recent = ImapAttachmentSensor(

131

task_id="wait_for_recent_attachment",

132

attachment_name="urgent_update.xlsx",

133

mail_filter='SINCE "1-hour-ago"',

134

dag=dag

135

)

136

137

# Complex filtering criteria

138

wait_complex = ImapAttachmentSensor(

139

task_id="wait_for_specific_conditions",

140

attachment_name="monthly_report.pdf",

141

mail_filter='FROM "reports@company.com" SUBJECT "Monthly" UNSEEN',

142

mail_folder="Corporate",

143

dag=dag

144

)

145

```

146

147

### Templated Parameters

148

149

```python

150

from airflow.models import Variable

151

152

# Use Airflow variables and macros in sensor configuration

153

dynamic_sensor = ImapAttachmentSensor(

154

task_id="wait_for_templated_attachment",

155

attachment_name="{{ var.value.report_filename }}", # From Airflow Variable

156

mail_filter='SINCE "{{ ds }}"', # Use execution date

157

conn_id="{{ var.value.imap_connection }}",

158

dag=dag

159

)

160

161

# Template with custom parameters

162

date_based_sensor = ImapAttachmentSensor(

163

task_id="wait_for_date_based_file",

164

attachment_name="report_{{ ds_nodash }}.csv", # Uses YYYYMMDD format

165

mail_filter='SINCE "{{ macros.ds_add(ds, -1) }}"', # Yesterday

166

dag=dag

167

)

168

```

169

170

### Sensor Configuration Options

171

172

```python

173

# Advanced sensor configuration

174

advanced_sensor = ImapAttachmentSensor(

175

task_id="advanced_email_sensor",

176

attachment_name="critical_data.json",

177

178

# IMAP-specific settings

179

mail_folder="Priority",

180

mail_filter='FROM "system@critical-app.com" UNSEEN',

181

conn_id="secure_imap",

182

183

# Sensor behavior settings

184

poke_interval=30, # Check every 30 seconds

185

timeout=7200, # 2 hour timeout

186

mode='poke', # Poke mode (vs reschedule)

187

soft_fail=False, # Fail task if sensor times out

188

189

# Retry configuration

190

retries=3,

191

retry_delay=timedelta(minutes=5),

192

193

dag=dag

194

)

195

```

196

197

## Integration with Processing Tasks

198

199

### Complete Email Processing Workflow

200

201

```python

202

from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor

203

from airflow.providers.imap.hooks.imap import ImapHook

204

from airflow.operators.python import PythonOperator

205

from airflow import DAG

206

from datetime import datetime

207

208

def process_email_attachment(**context):

209

"""Download and process the detected attachment"""

210

attachment_name = context['params']['attachment_name']

211

212

with ImapHook(imap_conn_id="imap_default") as hook:

213

# Download the attachment that triggered the sensor

214

hook.download_mail_attachments(

215

name=attachment_name,

216

local_output_directory="/tmp/processing",

217

latest_only=True

218

)

219

220

# Process the downloaded file

221

print(f"Processing {attachment_name}")

222

223

dag = DAG('email_driven_processing', start_date=datetime(2024, 1, 1))

224

225

# Step 1: Wait for attachment

226

sensor = ImapAttachmentSensor(

227

task_id="wait_for_data_file",

228

attachment_name="data_export.csv",

229

mail_folder="Imports",

230

dag=dag

231

)

232

233

# Step 2: Process the attachment

234

processor = PythonOperator(

235

task_id="process_attachment",

236

python_callable=process_email_attachment,

237

params={'attachment_name': 'data_export.csv'},

238

dag=dag

239

)

240

241

# Set up dependency

242

sensor >> processor

243

```

244

245

### Multiple Attachment Monitoring

246

247

```python

248

from airflow.operators.dummy import DummyOperator

249

250

# Monitor multiple different attachments

251

dag = DAG('multi_attachment_monitoring', start_date=datetime(2024, 1, 1))

252

253

# Different sensors for different file types

254

csv_sensor = ImapAttachmentSensor(

255

task_id="wait_for_csv_data",

256

attachment_name=r".*\.csv$",

257

check_regex=True,

258

dag=dag

259

)

260

261

pdf_sensor = ImapAttachmentSensor(

262

task_id="wait_for_pdf_report",

263

attachment_name=r"report.*\.pdf$",

264

check_regex=True,

265

dag=dag

266

)

267

268

xml_sensor = ImapAttachmentSensor(

269

task_id="wait_for_xml_config",

270

attachment_name="config.xml",

271

dag=dag

272

)

273

274

# Convergence point - continue when any attachment arrives

275

any_attachment_ready = DummyOperator(

276

task_id="any_attachment_detected",

277

trigger_rule='one_success', # Trigger when any upstream task succeeds

278

dag=dag

279

)

280

281

# Set up parallel monitoring

282

[csv_sensor, pdf_sensor, xml_sensor] >> any_attachment_ready

283

```

284

285

### Conditional Processing

286

287

```python

288

from airflow.operators.python import BranchPythonOperator

289

290

def decide_processing_path(**context):

291

"""Determine processing path based on which attachment was found"""

292

# Check which sensor succeeded to determine processing type

293

upstream_task_ids = context['task'].get_direct_relatives(upstream=True)

294

295

for task_id in upstream_task_ids:

296

task_instance = context['task_instance']

297

upstream_ti = task_instance.get_dagrun().get_task_instance(task_id)

298

299

if upstream_ti.state == 'success':

300

if 'csv' in task_id:

301

return 'process_csv_data'

302

elif 'pdf' in task_id:

303

return 'process_pdf_report'

304

elif 'xml' in task_id:

305

return 'process_xml_config'

306

307

return 'no_processing_needed'

308

309

# Branching based on sensor results

310

decision = BranchPythonOperator(

311

task_id="decide_processing",

312

python_callable=decide_processing_path,

313

dag=dag

314

)

315

316

# Connect sensors to decision point

317

[csv_sensor, pdf_sensor, xml_sensor] >> decision

318

```

319

320

## Sensor Modes and Performance

321

322

### Poke vs Reschedule Mode

323

324

```python

325

# Poke mode: Keeps worker slot occupied, good for short waits

326

poke_sensor = ImapAttachmentSensor(

327

task_id="quick_check",

328

attachment_name="quick_update.txt",

329

mode='poke', # Default mode

330

poke_interval=30, # Check every 30 seconds

331

timeout=600, # 10 minute timeout

332

dag=dag

333

)

334

335

# Reschedule mode: Releases worker slot between checks, good for long waits

336

reschedule_sensor = ImapAttachmentSensor(

337

task_id="long_wait_check",

338

attachment_name="weekly_report.xlsx",

339

mode='reschedule', # More resource-efficient for long waits

340

poke_interval=1800, # Check every 30 minutes

341

timeout=86400, # 24 hour timeout

342

dag=dag

343

)

344

```

345

346

### Performance Considerations

347

348

```python

349

# Optimized for high-frequency monitoring

350

high_frequency = ImapAttachmentSensor(

351

task_id="real_time_monitoring",

352

attachment_name="urgent_alert.json",

353

poke_interval=10, # Very frequent checking

354

timeout=300, # Short timeout for urgent items

355

mail_filter='UNSEEN', # Only check unread messages

356

dag=dag

357

)

358

359

# Optimized for resource efficiency

360

resource_efficient = ImapAttachmentSensor(

361

task_id="batch_processing_trigger",

362

attachment_name="batch_data.zip",

363

mode='reschedule', # Don't hold worker slots

364

poke_interval=3600, # Check hourly

365

timeout=172800, # 48 hour timeout

366

dag=dag

367

)

368

```

369

370

## Error Handling and Monitoring

371

372

### Sensor Failure Handling

373

374

```python

375

# Graceful failure handling

376

robust_sensor = ImapAttachmentSensor(

377

task_id="robust_attachment_check",

378

attachment_name="important_file.xlsx",

379

380

# Failure handling

381

soft_fail=True, # Don't fail entire DAG if sensor times out

382

retries=2, # Retry on connection errors

383

retry_delay=timedelta(minutes=10),

384

385

# Monitoring

386

timeout=3600, # 1 hour timeout

387

poke_interval=120, # Check every 2 minutes

388

389

dag=dag

390

)

391

```

392

393

### Connection Error Recovery

394

395

```python

396

from airflow.exceptions import AirflowException

397

398

def handle_sensor_failure(**context):

399

"""Handle sensor failure and optionally retry with different parameters"""

400

task_instance = context['task_instance']

401

402

if task_instance.state == 'failed':

403

# Log the failure and optionally trigger alternative processing

404

print("Email sensor failed - checking alternative data sources")

405

406

# Could trigger alternative data ingestion workflow

407

return 'alternative_data_source'

408

409

return 'normal_processing'

410

411

# Alternative processing path for sensor failures

412

fallback_handler = PythonOperator(

413

task_id="handle_email_sensor_failure",

414

python_callable=handle_sensor_failure,

415

trigger_rule='one_failed', # Trigger only if sensor fails

416

dag=dag

417

)

418

419

sensor >> [processor, fallback_handler]

420

```

421

422

## IMAP Search Filters for Sensors

423

424

### Time-Based Filters

425

426

```python

427

# Recent messages only

428

recent_sensor = ImapAttachmentSensor(

429

task_id="recent_files_only",

430

attachment_name="latest_data.csv",

431

mail_filter='SINCE "1-day-ago"',

432

dag=dag

433

)

434

435

# Specific date range

436

date_range_sensor = ImapAttachmentSensor(

437

task_id="monthly_files",

438

attachment_name="monthly_report.pdf",

439

mail_filter='SINCE "01-Jan-2024" BEFORE "31-Jan-2024"',

440

dag=dag

441

)

442

```

443

444

### Sender-Based Filters

445

446

```python

447

# Specific sender

448

sender_sensor = ImapAttachmentSensor(

449

task_id="vendor_reports",

450

attachment_name="invoice.pdf",

451

mail_filter='FROM "accounting@vendor.com"',

452

dag=dag

453

)

454

455

# Multiple senders

456

multi_sender_sensor = ImapAttachmentSensor(

457

task_id="partner_files",

458

attachment_name="data_export.csv",

459

mail_filter='OR FROM "partner1@company.com" FROM "partner2@company.com"',

460

dag=dag

461

)

462

```

463

464

### Subject and Content Filters

465

466

```python

467

# Subject-based filtering

468

subject_sensor = ImapAttachmentSensor(

469

task_id="urgent_reports",

470

attachment_name="emergency_data.xlsx",

471

mail_filter='SUBJECT "URGENT"',

472

dag=dag

473

)

474

475

# Combined criteria

476

complex_sensor = ImapAttachmentSensor(

477

task_id="specific_conditions",

478

attachment_name="report.pdf",

479

mail_filter='FROM "reports@company.com" SUBJECT "Daily" UNSEEN SINCE "today"',

480

dag=dag

481

)

482

```