CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-imap

Apache Airflow provider package for IMAP email server integration and attachment processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

imap-sensors.mddocs/

IMAP Sensors

Sensor operators that monitor email mailboxes for specific attachments, enabling event-driven workflows triggered by incoming emails. IMAP sensors provide a way to pause DAG execution until specific email conditions are met.

Capabilities

ImapAttachmentSensor Class

Sensor that waits for specific email attachments to arrive in a mailbox, enabling email-triggered workflow automation.

class ImapAttachmentSensor:
    """
    Sensor that waits for specific attachments on a mail server.
    
    Inherits from BaseSensorOperator, providing standard sensor capabilities
    like poke_interval, timeout, and soft_fail configurations.
    
    Parameters:
    - attachment_name: Name or pattern of attachment to wait for
    - check_regex (bool): If True, treat attachment_name as regex pattern
    - mail_folder (str): Mail folder to monitor (default: "INBOX")
    - mail_filter (str): IMAP search filter for message selection
    - conn_id (str): Airflow connection ID for IMAP server
    - **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
    
    Attributes:
    - template_fields: ["attachment_name", "mail_filter"]
    
    Template Fields:
    - attachment_name: Can be templated with Airflow variables/macros
    - mail_filter: Can be templated with Airflow variables/macros
    """
    
    def __init__(
        self,
        *,
        attachment_name,
        check_regex=False,
        mail_folder="INBOX",
        mail_filter="All",
        conn_id="imap_default",
        **kwargs,
    ) -> None: ...

Sensor Execution

Core sensor method that checks for attachment presence during each sensor poke.

def poke(self, context) -> bool:
    """
    Check for attachment presence in the mail server.
    
    This method is called repeatedly by the Airflow scheduler according
    to the poke_interval until it returns True or the sensor times out.
    
    Parameters:
    - context: Airflow execution context containing task information
    
    Returns:
    bool: True if attachment found, False to continue waiting
    """

Usage Patterns

Basic Attachment Monitoring

from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor
from airflow import DAG
from datetime import datetime, timedelta

dag = DAG(
    'email_processing_workflow',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,  # Triggered by email arrival
    catchup=False
)

# Wait for daily report attachment
wait_for_report = ImapAttachmentSensor(
    task_id="wait_for_daily_report",
    attachment_name="daily_report.xlsx",
    mail_folder="Reports",
    conn_id="company_imap",
    poke_interval=60,  # Check every minute
    timeout=3600,      # Timeout after 1 hour
    dag=dag
)

Regex Pattern Matching

# Wait for any CSV file matching pattern
wait_for_csv = ImapAttachmentSensor(
    task_id="wait_for_any_csv",
    attachment_name=r"data_\d{8}\.csv",  # Matches data_20240101.csv format
    check_regex=True,
    mail_folder="DataImports",
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

# Wait for files with specific prefix
wait_for_backup = ImapAttachmentSensor(
    task_id="wait_for_backup_file",
    attachment_name=r"backup_.*\.(zip|tar\.gz)",
    check_regex=True,
    mail_folder="Backups",
    dag=dag
)

Email Filtering

# Wait for attachment from specific sender
wait_from_sender = ImapAttachmentSensor(
    task_id="wait_for_vendor_report",
    attachment_name="invoice.pdf",
    mail_filter='FROM "vendor@supplier.com"',
    poke_interval=120,
    dag=dag
)

# Wait for recent emails only (last hour)
wait_recent = ImapAttachmentSensor(
    task_id="wait_for_recent_attachment",
    attachment_name="urgent_update.xlsx",
    mail_filter='SINCE "1-hour-ago"',
    dag=dag
)

# Complex filtering criteria
wait_complex = ImapAttachmentSensor(
    task_id="wait_for_specific_conditions",
    attachment_name="monthly_report.pdf",
    mail_filter='FROM "reports@company.com" SUBJECT "Monthly" UNSEEN',
    mail_folder="Corporate",
    dag=dag
)

Templated Parameters

from airflow.models import Variable

# Use Airflow variables and macros in sensor configuration
dynamic_sensor = ImapAttachmentSensor(
    task_id="wait_for_templated_attachment",
    attachment_name="{{ var.value.report_filename }}",  # From Airflow Variable
    mail_filter='SINCE "{{ ds }}"',  # Use execution date
    conn_id="{{ var.value.imap_connection }}",
    dag=dag
)

# Template with custom parameters
date_based_sensor = ImapAttachmentSensor(
    task_id="wait_for_date_based_file",
    attachment_name="report_{{ ds_nodash }}.csv",  # Uses YYYYMMDD format
    mail_filter='SINCE "{{ macros.ds_add(ds, -1) }}"',  # Yesterday
    dag=dag
)

Sensor Configuration Options

# Advanced sensor configuration
advanced_sensor = ImapAttachmentSensor(
    task_id="advanced_email_sensor",
    attachment_name="critical_data.json",
    
    # IMAP-specific settings
    mail_folder="Priority",
    mail_filter='FROM "system@critical-app.com" UNSEEN',
    conn_id="secure_imap",
    
    # Sensor behavior settings
    poke_interval=30,           # Check every 30 seconds
    timeout=7200,               # 2 hour timeout
    mode='poke',                # Poke mode (vs reschedule)
    soft_fail=False,            # Fail task if sensor times out
    
    # Retry configuration
    retries=3,
    retry_delay=timedelta(minutes=5),
    
    dag=dag
)

Integration with Processing Tasks

Complete Email Processing Workflow

from airflow.providers.imap.sensors.imap_attachment import ImapAttachmentSensor
from airflow.providers.imap.hooks.imap import ImapHook
from airflow.operators.python import PythonOperator
from airflow import DAG
from datetime import datetime

def process_email_attachment(**context):
    """Download and process the detected attachment"""
    attachment_name = context['params']['attachment_name']
    
    with ImapHook(imap_conn_id="imap_default") as hook:
        # Download the attachment that triggered the sensor
        hook.download_mail_attachments(
            name=attachment_name,
            local_output_directory="/tmp/processing",
            latest_only=True
        )
    
    # Process the downloaded file
    print(f"Processing {attachment_name}")

dag = DAG('email_driven_processing', start_date=datetime(2024, 1, 1))

# Step 1: Wait for attachment
sensor = ImapAttachmentSensor(
    task_id="wait_for_data_file",
    attachment_name="data_export.csv",
    mail_folder="Imports",
    dag=dag
)

# Step 2: Process the attachment
processor = PythonOperator(
    task_id="process_attachment",
    python_callable=process_email_attachment,
    params={'attachment_name': 'data_export.csv'},
    dag=dag
)

# Set up dependency
sensor >> processor

Multiple Attachment Monitoring

from airflow.operators.dummy import DummyOperator

# Monitor multiple different attachments
dag = DAG('multi_attachment_monitoring', start_date=datetime(2024, 1, 1))

# Different sensors for different file types
csv_sensor = ImapAttachmentSensor(
    task_id="wait_for_csv_data",
    attachment_name=r".*\.csv$",
    check_regex=True,
    dag=dag
)

pdf_sensor = ImapAttachmentSensor(
    task_id="wait_for_pdf_report",
    attachment_name=r"report.*\.pdf$",
    check_regex=True,
    dag=dag
)

xml_sensor = ImapAttachmentSensor(
    task_id="wait_for_xml_config",
    attachment_name="config.xml",
    dag=dag
)

# Convergence point - continue when any attachment arrives
any_attachment_ready = DummyOperator(
    task_id="any_attachment_detected",
    trigger_rule='one_success',  # Trigger when any upstream task succeeds
    dag=dag
)

# Set up parallel monitoring
[csv_sensor, pdf_sensor, xml_sensor] >> any_attachment_ready

Conditional Processing

from airflow.operators.python import BranchPythonOperator

def decide_processing_path(**context):
    """Determine processing path based on which attachment was found"""
    # Check which sensor succeeded to determine processing type
    upstream_task_ids = context['task'].get_direct_relatives(upstream=True)
    
    for task_id in upstream_task_ids:
        task_instance = context['task_instance']
        upstream_ti = task_instance.get_dagrun().get_task_instance(task_id)
        
        if upstream_ti.state == 'success':
            if 'csv' in task_id:
                return 'process_csv_data'
            elif 'pdf' in task_id:
                return 'process_pdf_report'
            elif 'xml' in task_id:
                return 'process_xml_config'
    
    return 'no_processing_needed'

# Branching based on sensor results
decision = BranchPythonOperator(
    task_id="decide_processing",
    python_callable=decide_processing_path,
    dag=dag
)

# Connect sensors to decision point
[csv_sensor, pdf_sensor, xml_sensor] >> decision

Sensor Modes and Performance

Poke vs Reschedule Mode

# Poke mode: Keeps worker slot occupied, good for short waits
poke_sensor = ImapAttachmentSensor(
    task_id="quick_check",
    attachment_name="quick_update.txt",
    mode='poke',           # Default mode
    poke_interval=30,      # Check every 30 seconds
    timeout=600,           # 10 minute timeout
    dag=dag
)

# Reschedule mode: Releases worker slot between checks, good for long waits
reschedule_sensor = ImapAttachmentSensor(
    task_id="long_wait_check",
    attachment_name="weekly_report.xlsx",
    mode='reschedule',     # More resource-efficient for long waits
    poke_interval=1800,    # Check every 30 minutes
    timeout=86400,         # 24 hour timeout
    dag=dag
)

Performance Considerations

# Optimized for high-frequency monitoring
high_frequency = ImapAttachmentSensor(
    task_id="real_time_monitoring",
    attachment_name="urgent_alert.json",
    poke_interval=10,      # Very frequent checking
    timeout=300,           # Short timeout for urgent items
    mail_filter='UNSEEN',  # Only check unread messages
    dag=dag
)

# Optimized for resource efficiency
resource_efficient = ImapAttachmentSensor(
    task_id="batch_processing_trigger",
    attachment_name="batch_data.zip",
    mode='reschedule',     # Don't hold worker slots
    poke_interval=3600,    # Check hourly
    timeout=172800,        # 48 hour timeout
    dag=dag
)

Error Handling and Monitoring

Sensor Failure Handling

# Graceful failure handling
robust_sensor = ImapAttachmentSensor(
    task_id="robust_attachment_check",
    attachment_name="important_file.xlsx",
    
    # Failure handling
    soft_fail=True,        # Don't fail entire DAG if sensor times out
    retries=2,             # Retry on connection errors
    retry_delay=timedelta(minutes=10),
    
    # Monitoring
    timeout=3600,          # 1 hour timeout
    poke_interval=120,     # Check every 2 minutes
    
    dag=dag
)

Connection Error Recovery

from airflow.exceptions import AirflowException

def handle_sensor_failure(**context):
    """Handle sensor failure and optionally retry with different parameters"""
    task_instance = context['task_instance']
    
    if task_instance.state == 'failed':
        # Log the failure and optionally trigger alternative processing
        print("Email sensor failed - checking alternative data sources")
        
        # Could trigger alternative data ingestion workflow
        return 'alternative_data_source'
    
    return 'normal_processing'

# Alternative processing path for sensor failures
fallback_handler = PythonOperator(
    task_id="handle_email_sensor_failure",
    python_callable=handle_sensor_failure,
    trigger_rule='one_failed',  # Trigger only if sensor fails
    dag=dag
)

sensor >> [processor, fallback_handler]

IMAP Search Filters for Sensors

Time-Based Filters

# Recent messages only
recent_sensor = ImapAttachmentSensor(
    task_id="recent_files_only",
    attachment_name="latest_data.csv",
    mail_filter='SINCE "1-day-ago"',
    dag=dag
)

# Specific date range
date_range_sensor = ImapAttachmentSensor(
    task_id="monthly_files",
    attachment_name="monthly_report.pdf",
    mail_filter='SINCE "01-Jan-2024" BEFORE "31-Jan-2024"',
    dag=dag
)

Sender-Based Filters

# Specific sender
sender_sensor = ImapAttachmentSensor(
    task_id="vendor_reports",
    attachment_name="invoice.pdf",
    mail_filter='FROM "accounting@vendor.com"',
    dag=dag
)

# Multiple senders
multi_sender_sensor = ImapAttachmentSensor(
    task_id="partner_files",
    attachment_name="data_export.csv",
    mail_filter='OR FROM "partner1@company.com" FROM "partner2@company.com"',
    dag=dag
)

Subject and Content Filters

# Subject-based filtering
subject_sensor = ImapAttachmentSensor(
    task_id="urgent_reports",
    attachment_name="emergency_data.xlsx",
    mail_filter='SUBJECT "URGENT"',
    dag=dag
)

# Combined criteria
complex_sensor = ImapAttachmentSensor(
    task_id="specific_conditions",
    attachment_name="report.pdf",
    mail_filter='FROM "reports@company.com" SUBJECT "Daily" UNSEEN SINCE "today"',
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-imap

docs

imap-hooks.md

imap-sensors.md

index.md

tile.json