Apache Airflow provider package for IMAP email server integration and attachment processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Sensor operators that monitor email mailboxes for specific attachments, enabling event-driven workflows triggered by incoming emails. IMAP sensors provide a way to pause DAG execution until specific email conditions are met.
Sensor that waits for specific email attachments to arrive in a mailbox, enabling email-triggered workflow automation.
class ImapAttachmentSensor:
"""
Sensor that waits for specific attachments on a mail server.
Inherits from BaseSensorOperator, providing standard sensor capabilities
like poke_interval, timeout, and soft_fail configurations.
Parameters:
- attachment_name: Name or pattern of attachment to wait for
- check_regex (bool): If True, treat attachment_name as regex pattern
- mail_folder (str): Mail folder to monitor (default: "INBOX")
- mail_filter (str): IMAP search filter for message selection
- conn_id (str): Airflow connection ID for IMAP server
- **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
Attributes:
- template_fields: ["attachment_name", "mail_filter"]
Template Fields:
- attachment_name: Can be templated with Airflow variables/macros
- mail_filter: Can be templated with Airflow variables/macros
"""
def __init__(
self,
*,
attachment_name,
check_regex=False,
mail_folder="INBOX",
mail_filter="All",
conn_id="imap_default",
**kwargs,
) -> None: ...Core sensor method that checks for attachment presence during each sensor poke.
def poke(self, context) -> bool:
"""
Check for attachment presence in the mail server.
This method is called repeatedly by the Airflow scheduler according
to the poke_interval until it returns True or the sensor times out.
Parameters:
- context: Airflow execution context containing task information
Returns:
bool: True if attachment found, False to continue waiting
"""from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor
from airflow import DAG
from datetime import datetime, timedelta
dag = DAG(
'email_processing_workflow',
start_date=datetime(2024, 1, 1),
schedule_interval=None, # Triggered by email arrival
catchup=False
)
# Wait for daily report attachment
wait_for_report = ImapAttachmentSensor(
task_id="wait_for_daily_report",
attachment_name="daily_report.xlsx",
mail_folder="Reports",
conn_id="company_imap",
poke_interval=60, # Check every minute
timeout=3600, # Timeout after 1 hour
dag=dag
)# Wait for any CSV file matching pattern
wait_for_csv = ImapAttachmentSensor(
task_id="wait_for_any_csv",
attachment_name=r"data_\d{8}\.csv", # Matches data_20240101.csv format
check_regex=True,
mail_folder="DataImports",
poke_interval=300, # Check every 5 minutes
dag=dag
)
# Wait for files with specific prefix
wait_for_backup = ImapAttachmentSensor(
task_id="wait_for_backup_file",
attachment_name=r"backup_.*\.(zip|tar\.gz)",
check_regex=True,
mail_folder="Backups",
dag=dag
)# Wait for attachment from specific sender
wait_from_sender = ImapAttachmentSensor(
task_id="wait_for_vendor_report",
attachment_name="invoice.pdf",
mail_filter='FROM "vendor@supplier.com"',
poke_interval=120,
dag=dag
)
# Wait for recent emails only (last hour)
wait_recent = ImapAttachmentSensor(
task_id="wait_for_recent_attachment",
attachment_name="urgent_update.xlsx",
mail_filter='SINCE "1-hour-ago"',
dag=dag
)
# Complex filtering criteria
wait_complex = ImapAttachmentSensor(
task_id="wait_for_specific_conditions",
attachment_name="monthly_report.pdf",
mail_filter='FROM "reports@company.com" SUBJECT "Monthly" UNSEEN',
mail_folder="Corporate",
dag=dag
)from airflow.models import Variable
# Use Airflow variables and macros in sensor configuration
dynamic_sensor = ImapAttachmentSensor(
task_id="wait_for_templated_attachment",
attachment_name="{{ var.value.report_filename }}", # From Airflow Variable
mail_filter='SINCE "{{ ds }}"', # Use execution date
conn_id="{{ var.value.imap_connection }}",
dag=dag
)
# Template with custom parameters
date_based_sensor = ImapAttachmentSensor(
task_id="wait_for_date_based_file",
attachment_name="report_{{ ds_nodash }}.csv", # Uses YYYYMMDD format
mail_filter='SINCE "{{ macros.ds_add(ds, -1) }}"', # Yesterday
dag=dag
)# Advanced sensor configuration
advanced_sensor = ImapAttachmentSensor(
task_id="advanced_email_sensor",
attachment_name="critical_data.json",
# IMAP-specific settings
mail_folder="Priority",
mail_filter='FROM "system@critical-app.com" UNSEEN',
conn_id="secure_imap",
# Sensor behavior settings
poke_interval=30, # Check every 30 seconds
timeout=7200, # 2 hour timeout
mode='poke', # Poke mode (vs reschedule)
soft_fail=False, # Fail task if sensor times out
# Retry configuration
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag
)from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor
from airflow.providers.imap.hooks.imap import ImapHook
from airflow.operators.python import PythonOperator
from airflow import DAG
from datetime import datetime
def process_email_attachment(**context):
"""Download and process the detected attachment"""
attachment_name = context['params']['attachment_name']
with ImapHook(imap_conn_id="imap_default") as hook:
# Download the attachment that triggered the sensor
hook.download_mail_attachments(
name=attachment_name,
local_output_directory="/tmp/processing",
latest_only=True
)
# Process the downloaded file
print(f"Processing {attachment_name}")
dag = DAG('email_driven_processing', start_date=datetime(2024, 1, 1))
# Step 1: Wait for attachment
sensor = ImapAttachmentSensor(
task_id="wait_for_data_file",
attachment_name="data_export.csv",
mail_folder="Imports",
dag=dag
)
# Step 2: Process the attachment
processor = PythonOperator(
task_id="process_attachment",
python_callable=process_email_attachment,
params={'attachment_name': 'data_export.csv'},
dag=dag
)
# Set up dependency
sensor >> processorfrom airflow.operators.dummy import DummyOperator
# Monitor multiple different attachments
dag = DAG('multi_attachment_monitoring', start_date=datetime(2024, 1, 1))
# Different sensors for different file types
csv_sensor = ImapAttachmentSensor(
task_id="wait_for_csv_data",
attachment_name=r".*\.csv$",
check_regex=True,
dag=dag
)
pdf_sensor = ImapAttachmentSensor(
task_id="wait_for_pdf_report",
attachment_name=r"report.*\.pdf$",
check_regex=True,
dag=dag
)
xml_sensor = ImapAttachmentSensor(
task_id="wait_for_xml_config",
attachment_name="config.xml",
dag=dag
)
# Convergence point - continue when any attachment arrives
any_attachment_ready = DummyOperator(
task_id="any_attachment_detected",
trigger_rule='one_success', # Trigger when any upstream task succeeds
dag=dag
)
# Set up parallel monitoring
[csv_sensor, pdf_sensor, xml_sensor] >> any_attachment_readyfrom airflow.operators.python import BranchPythonOperator
def decide_processing_path(**context):
"""Determine processing path based on which attachment was found"""
# Check which sensor succeeded to determine processing type
upstream_task_ids = context['task'].get_direct_relatives(upstream=True)
for task_id in upstream_task_ids:
task_instance = context['task_instance']
upstream_ti = task_instance.get_dagrun().get_task_instance(task_id)
if upstream_ti.state == 'success':
if 'csv' in task_id:
return 'process_csv_data'
elif 'pdf' in task_id:
return 'process_pdf_report'
elif 'xml' in task_id:
return 'process_xml_config'
return 'no_processing_needed'
# Branching based on sensor results
decision = BranchPythonOperator(
task_id="decide_processing",
python_callable=decide_processing_path,
dag=dag
)
# Connect sensors to decision point
[csv_sensor, pdf_sensor, xml_sensor] >> decision# Poke mode: Keeps worker slot occupied, good for short waits
poke_sensor = ImapAttachmentSensor(
task_id="quick_check",
attachment_name="quick_update.txt",
mode='poke', # Default mode
poke_interval=30, # Check every 30 seconds
timeout=600, # 10 minute timeout
dag=dag
)
# Reschedule mode: Releases worker slot between checks, good for long waits
reschedule_sensor = ImapAttachmentSensor(
task_id="long_wait_check",
attachment_name="weekly_report.xlsx",
mode='reschedule', # More resource-efficient for long waits
poke_interval=1800, # Check every 30 minutes
timeout=86400, # 24 hour timeout
dag=dag
)# Optimized for high-frequency monitoring
high_frequency = ImapAttachmentSensor(
task_id="real_time_monitoring",
attachment_name="urgent_alert.json",
poke_interval=10, # Very frequent checking
timeout=300, # Short timeout for urgent items
mail_filter='UNSEEN', # Only check unread messages
dag=dag
)
# Optimized for resource efficiency
resource_efficient = ImapAttachmentSensor(
task_id="batch_processing_trigger",
attachment_name="batch_data.zip",
mode='reschedule', # Don't hold worker slots
poke_interval=3600, # Check hourly
timeout=172800, # 48 hour timeout
dag=dag
)# Graceful failure handling
robust_sensor = ImapAttachmentSensor(
task_id="robust_attachment_check",
attachment_name="important_file.xlsx",
# Failure handling
soft_fail=True, # Don't fail entire DAG if sensor times out
retries=2, # Retry on connection errors
retry_delay=timedelta(minutes=10),
# Monitoring
timeout=3600, # 1 hour timeout
poke_interval=120, # Check every 2 minutes
dag=dag
)from airflow.exceptions import AirflowException
def handle_sensor_failure(**context):
"""Handle sensor failure and optionally retry with different parameters"""
task_instance = context['task_instance']
if task_instance.state == 'failed':
# Log the failure and optionally trigger alternative processing
print("Email sensor failed - checking alternative data sources")
# Could trigger alternative data ingestion workflow
return 'alternative_data_source'
return 'normal_processing'
# Alternative processing path for sensor failures
fallback_handler = PythonOperator(
task_id="handle_email_sensor_failure",
python_callable=handle_sensor_failure,
trigger_rule='one_failed', # Trigger only if sensor fails
dag=dag
)
sensor >> [processor, fallback_handler]# Recent messages only
recent_sensor = ImapAttachmentSensor(
task_id="recent_files_only",
attachment_name="latest_data.csv",
mail_filter='SINCE "1-day-ago"',
dag=dag
)
# Specific date range
date_range_sensor = ImapAttachmentSensor(
task_id="monthly_files",
attachment_name="monthly_report.pdf",
mail_filter='SINCE "01-Jan-2024" BEFORE "31-Jan-2024"',
dag=dag
)# Specific sender
sender_sensor = ImapAttachmentSensor(
task_id="vendor_reports",
attachment_name="invoice.pdf",
mail_filter='FROM "accounting@vendor.com"',
dag=dag
)
# Multiple senders
multi_sender_sensor = ImapAttachmentSensor(
task_id="partner_files",
attachment_name="data_export.csv",
mail_filter='OR FROM "partner1@company.com" FROM "partner2@company.com"',
dag=dag
)# Subject-based filtering
subject_sensor = ImapAttachmentSensor(
task_id="urgent_reports",
attachment_name="emergency_data.xlsx",
mail_filter='SUBJECT "URGENT"',
dag=dag
)
# Combined criteria
complex_sensor = ImapAttachmentSensor(
task_id="specific_conditions",
attachment_name="report.pdf",
mail_filter='FROM "reports@company.com" SUBJECT "Daily" UNSEEN SINCE "today"',
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-imap