0
# IMAP Sensors
1
2
Sensor operators that monitor email mailboxes for specific attachments, enabling event-driven workflows triggered by incoming emails. IMAP sensors provide a way to pause DAG execution until specific email conditions are met.
3
4
## Capabilities
5
6
### ImapAttachmentSensor Class
7
8
Sensor that waits for specific email attachments to arrive in a mailbox, enabling email-triggered workflow automation.
9
10
```python { .api }
11
class ImapAttachmentSensor:
12
"""
13
Sensor that waits for specific attachments on a mail server.
14
15
Inherits from BaseSensorOperator, providing standard sensor capabilities
16
like poke_interval, timeout, and soft_fail configurations.
17
18
Parameters:
19
- attachment_name: Name or pattern of attachment to wait for
20
- check_regex (bool): If True, treat attachment_name as regex pattern
21
- mail_folder (str): Mail folder to monitor (default: "INBOX")
22
- mail_filter (str): IMAP search filter for message selection
23
- conn_id (str): Airflow connection ID for IMAP server
24
- **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
25
26
Attributes:
27
- template_fields: ["attachment_name", "mail_filter"]
28
29
Template Fields:
30
- attachment_name: Can be templated with Airflow variables/macros
31
- mail_filter: Can be templated with Airflow variables/macros
32
"""
33
34
def __init__(
35
self,
36
*,
37
attachment_name,
38
check_regex=False,
39
mail_folder="INBOX",
40
mail_filter="All",
41
conn_id="imap_default",
42
**kwargs,
43
) -> None: ...
44
```
45
46
### Sensor Execution
47
48
Core sensor method that checks for attachment presence during each sensor poke.
49
50
```python { .api }
51
def poke(self, context) -> bool:
52
"""
53
Check for attachment presence in the mail server.
54
55
This method is called repeatedly by the Airflow scheduler according
56
to the poke_interval until it returns True or the sensor times out.
57
58
Parameters:
59
- context: Airflow execution context containing task information
60
61
Returns:
62
bool: True if attachment found, False to continue waiting
63
"""
64
```
65
66
## Usage Patterns
67
68
### Basic Attachment Monitoring
69
70
```python
71
from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor
72
from airflow import DAG
73
from datetime import datetime, timedelta
74
75
dag = DAG(
76
'email_processing_workflow',
77
start_date=datetime(2024, 1, 1),
78
schedule_interval=None, # Triggered by email arrival
79
catchup=False
80
)
81
82
# Wait for daily report attachment
83
wait_for_report = ImapAttachmentSensor(
84
task_id="wait_for_daily_report",
85
attachment_name="daily_report.xlsx",
86
mail_folder="Reports",
87
conn_id="company_imap",
88
poke_interval=60, # Check every minute
89
timeout=3600, # Timeout after 1 hour
90
dag=dag
91
)
92
```
93
94
### Regex Pattern Matching
95
96
```python
97
# Wait for any CSV file matching pattern
98
wait_for_csv = ImapAttachmentSensor(
99
task_id="wait_for_any_csv",
100
attachment_name=r"data_\d{8}\.csv", # Matches data_20240101.csv format
101
check_regex=True,
102
mail_folder="DataImports",
103
poke_interval=300, # Check every 5 minutes
104
dag=dag
105
)
106
107
# Wait for files with specific prefix
108
wait_for_backup = ImapAttachmentSensor(
109
task_id="wait_for_backup_file",
110
attachment_name=r"backup_.*\.(zip|tar\.gz)",
111
check_regex=True,
112
mail_folder="Backups",
113
dag=dag
114
)
115
```
116
117
### Email Filtering
118
119
```python
120
# Wait for attachment from specific sender
121
wait_from_sender = ImapAttachmentSensor(
122
task_id="wait_for_vendor_report",
123
attachment_name="invoice.pdf",
124
mail_filter='FROM "vendor@supplier.com"',
125
poke_interval=120,
126
dag=dag
127
)
128
129
# Wait for recent emails only (last hour)
130
wait_recent = ImapAttachmentSensor(
131
task_id="wait_for_recent_attachment",
132
attachment_name="urgent_update.xlsx",
133
mail_filter='SINCE "1-hour-ago"',
134
dag=dag
135
)
136
137
# Complex filtering criteria
138
wait_complex = ImapAttachmentSensor(
139
task_id="wait_for_specific_conditions",
140
attachment_name="monthly_report.pdf",
141
mail_filter='FROM "reports@company.com" SUBJECT "Monthly" UNSEEN',
142
mail_folder="Corporate",
143
dag=dag
144
)
145
```
146
147
### Templated Parameters
148
149
```python
150
from airflow.models import Variable
151
152
# Use Airflow variables and macros in sensor configuration
153
dynamic_sensor = ImapAttachmentSensor(
154
task_id="wait_for_templated_attachment",
155
attachment_name="{{ var.value.report_filename }}", # From Airflow Variable
156
mail_filter='SINCE "{{ ds }}"', # Use execution date
157
conn_id="{{ var.value.imap_connection }}",
158
dag=dag
159
)
160
161
# Template with custom parameters
162
date_based_sensor = ImapAttachmentSensor(
163
task_id="wait_for_date_based_file",
164
attachment_name="report_{{ ds_nodash }}.csv", # Uses YYYYMMDD format
165
mail_filter='SINCE "{{ macros.ds_add(ds, -1) }}"', # Yesterday
166
dag=dag
167
)
168
```
169
170
### Sensor Configuration Options
171
172
```python
173
# Advanced sensor configuration
174
advanced_sensor = ImapAttachmentSensor(
175
task_id="advanced_email_sensor",
176
attachment_name="critical_data.json",
177
178
# IMAP-specific settings
179
mail_folder="Priority",
180
mail_filter='FROM "system@critical-app.com" UNSEEN',
181
conn_id="secure_imap",
182
183
# Sensor behavior settings
184
poke_interval=30, # Check every 30 seconds
185
timeout=7200, # 2 hour timeout
186
mode='poke', # Poke mode (vs reschedule)
187
soft_fail=False, # Fail task if sensor times out
188
189
# Retry configuration
190
retries=3,
191
retry_delay=timedelta(minutes=5),
192
193
dag=dag
194
)
195
```
196
197
## Integration with Processing Tasks
198
199
### Complete Email Processing Workflow
200
201
```python
202
from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor
203
from airflow.providers.imap.hooks.imap import ImapHook
204
from airflow.operators.python import PythonOperator
205
from airflow import DAG
206
from datetime import datetime
207
208
def process_email_attachment(**context):
209
"""Download and process the detected attachment"""
210
attachment_name = context['params']['attachment_name']
211
212
with ImapHook(imap_conn_id="imap_default") as hook:
213
# Download the attachment that triggered the sensor
214
hook.download_mail_attachments(
215
name=attachment_name,
216
local_output_directory="/tmp/processing",
217
latest_only=True
218
)
219
220
# Process the downloaded file
221
print(f"Processing {attachment_name}")
222
223
dag = DAG('email_driven_processing', start_date=datetime(2024, 1, 1))
224
225
# Step 1: Wait for attachment
226
sensor = ImapAttachmentSensor(
227
task_id="wait_for_data_file",
228
attachment_name="data_export.csv",
229
mail_folder="Imports",
230
dag=dag
231
)
232
233
# Step 2: Process the attachment
234
processor = PythonOperator(
235
task_id="process_attachment",
236
python_callable=process_email_attachment,
237
params={'attachment_name': 'data_export.csv'},
238
dag=dag
239
)
240
241
# Set up dependency
242
sensor >> processor
243
```
244
245
### Multiple Attachment Monitoring
246
247
```python
248
from airflow.operators.dummy import DummyOperator
249
250
# Monitor multiple different attachments
251
dag = DAG('multi_attachment_monitoring', start_date=datetime(2024, 1, 1))
252
253
# Different sensors for different file types
254
csv_sensor = ImapAttachmentSensor(
255
task_id="wait_for_csv_data",
256
attachment_name=r".*\.csv$",
257
check_regex=True,
258
dag=dag
259
)
260
261
pdf_sensor = ImapAttachmentSensor(
262
task_id="wait_for_pdf_report",
263
attachment_name=r"report.*\.pdf$",
264
check_regex=True,
265
dag=dag
266
)
267
268
xml_sensor = ImapAttachmentSensor(
269
task_id="wait_for_xml_config",
270
attachment_name="config.xml",
271
dag=dag
272
)
273
274
# Convergence point - continue when any attachment arrives
275
any_attachment_ready = DummyOperator(
276
task_id="any_attachment_detected",
277
trigger_rule='one_success', # Trigger when any upstream task succeeds
278
dag=dag
279
)
280
281
# Set up parallel monitoring
282
[csv_sensor, pdf_sensor, xml_sensor] >> any_attachment_ready
283
```
284
285
### Conditional Processing
286
287
```python
288
from airflow.operators.python import BranchPythonOperator
289
290
def decide_processing_path(**context):
291
"""Determine processing path based on which attachment was found"""
292
# Check which sensor succeeded to determine processing type
293
upstream_task_ids = context['task'].get_direct_relatives(upstream=True)
294
295
for task_id in upstream_task_ids:
296
task_instance = context['task_instance']
297
upstream_ti = task_instance.get_dagrun().get_task_instance(task_id)
298
299
if upstream_ti.state == 'success':
300
if 'csv' in task_id:
301
return 'process_csv_data'
302
elif 'pdf' in task_id:
303
return 'process_pdf_report'
304
elif 'xml' in task_id:
305
return 'process_xml_config'
306
307
return 'no_processing_needed'
308
309
# Branching based on sensor results
310
decision = BranchPythonOperator(
311
task_id="decide_processing",
312
python_callable=decide_processing_path,
313
dag=dag
314
)
315
316
# Connect sensors to decision point
317
[csv_sensor, pdf_sensor, xml_sensor] >> decision
318
```
319
320
## Sensor Modes and Performance
321
322
### Poke vs Reschedule Mode
323
324
```python
325
# Poke mode: Keeps worker slot occupied, good for short waits
326
poke_sensor = ImapAttachmentSensor(
327
task_id="quick_check",
328
attachment_name="quick_update.txt",
329
mode='poke', # Default mode
330
poke_interval=30, # Check every 30 seconds
331
timeout=600, # 10 minute timeout
332
dag=dag
333
)
334
335
# Reschedule mode: Releases worker slot between checks, good for long waits
336
reschedule_sensor = ImapAttachmentSensor(
337
task_id="long_wait_check",
338
attachment_name="weekly_report.xlsx",
339
mode='reschedule', # More resource-efficient for long waits
340
poke_interval=1800, # Check every 30 minutes
341
timeout=86400, # 24 hour timeout
342
dag=dag
343
)
344
```
345
346
### Performance Considerations
347
348
```python
349
# Optimized for high-frequency monitoring
350
high_frequency = ImapAttachmentSensor(
351
task_id="real_time_monitoring",
352
attachment_name="urgent_alert.json",
353
poke_interval=10, # Very frequent checking
354
timeout=300, # Short timeout for urgent items
355
mail_filter='UNSEEN', # Only check unread messages
356
dag=dag
357
)
358
359
# Optimized for resource efficiency
360
resource_efficient = ImapAttachmentSensor(
361
task_id="batch_processing_trigger",
362
attachment_name="batch_data.zip",
363
mode='reschedule', # Don't hold worker slots
364
poke_interval=3600, # Check hourly
365
timeout=172800, # 48 hour timeout
366
dag=dag
367
)
368
```
369
370
## Error Handling and Monitoring
371
372
### Sensor Failure Handling
373
374
```python
375
# Graceful failure handling
376
robust_sensor = ImapAttachmentSensor(
377
task_id="robust_attachment_check",
378
attachment_name="important_file.xlsx",
379
380
# Failure handling
381
soft_fail=True, # Don't fail entire DAG if sensor times out
382
retries=2, # Retry on connection errors
383
retry_delay=timedelta(minutes=10),
384
385
# Monitoring
386
timeout=3600, # 1 hour timeout
387
poke_interval=120, # Check every 2 minutes
388
389
dag=dag
390
)
391
```
392
393
### Connection Error Recovery
394
395
```python
396
from airflow.exceptions import AirflowException
397
398
def handle_sensor_failure(**context):
399
"""Handle sensor failure and optionally retry with different parameters"""
400
task_instance = context['task_instance']
401
402
if task_instance.state == 'failed':
403
# Log the failure and optionally trigger alternative processing
404
print("Email sensor failed - checking alternative data sources")
405
406
# Could trigger alternative data ingestion workflow
407
return 'alternative_data_source'
408
409
return 'normal_processing'
410
411
# Alternative processing path for sensor failures
412
fallback_handler = PythonOperator(
413
task_id="handle_email_sensor_failure",
414
python_callable=handle_sensor_failure,
415
trigger_rule='one_failed', # Trigger only if sensor fails
416
dag=dag
417
)
418
419
sensor >> [processor, fallback_handler]
420
```
421
422
## IMAP Search Filters for Sensors
423
424
### Time-Based Filters
425
426
```python
427
# Recent messages only
428
recent_sensor = ImapAttachmentSensor(
429
task_id="recent_files_only",
430
attachment_name="latest_data.csv",
431
mail_filter='SINCE "1-day-ago"',
432
dag=dag
433
)
434
435
# Specific date range
436
date_range_sensor = ImapAttachmentSensor(
437
task_id="monthly_files",
438
attachment_name="monthly_report.pdf",
439
mail_filter='SINCE "01-Jan-2024" BEFORE "31-Jan-2024"',
440
dag=dag
441
)
442
```
443
444
### Sender-Based Filters
445
446
```python
447
# Specific sender
448
sender_sensor = ImapAttachmentSensor(
449
task_id="vendor_reports",
450
attachment_name="invoice.pdf",
451
mail_filter='FROM "accounting@vendor.com"',
452
dag=dag
453
)
454
455
# Multiple senders
456
multi_sender_sensor = ImapAttachmentSensor(
457
task_id="partner_files",
458
attachment_name="data_export.csv",
459
mail_filter='OR FROM "partner1@company.com" FROM "partner2@company.com"',
460
dag=dag
461
)
462
```
463
464
### Subject and Content Filters
465
466
```python
467
# Subject-based filtering
468
subject_sensor = ImapAttachmentSensor(
469
task_id="urgent_reports",
470
attachment_name="emergency_data.xlsx",
471
mail_filter='SUBJECT "URGENT"',
472
dag=dag
473
)
474
475
# Combined criteria
476
complex_sensor = ImapAttachmentSensor(
477
task_id="specific_conditions",
478
attachment_name="report.pdf",
479
mail_filter='FROM "reports@company.com" SUBJECT "Daily" UNSEEN SINCE "today"',
480
dag=dag
481
)
482
```