Apache Airflow provider package for IMAP email server integration and attachment processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive IMAP server connectivity with SSL/TLS support, email searching, attachment detection, and secure file downloads. The ImapHook provides low-level interface for interacting with IMAP email servers.
Main hook class for IMAP server connections with automatic connection management and context manager support.
class ImapHook:
"""
Hook for connecting to mail servers using IMAP protocol.
Parameters:
- imap_conn_id (str): Connection ID for IMAP server credentials
Attributes:
- conn_name_attr (str): "imap_conn_id"
- default_conn_name (str): "imap_default"
- conn_type (str): "imap"
- hook_name (str): "IMAP"
"""
def __init__(self, imap_conn_id: str = "imap_default") -> None: ...Methods for establishing and managing IMAP server connections with automatic SSL/TLS configuration.
def get_conn(self) -> ImapHook:
"""
Login to the mail server and return authorized hook instance.
Returns:
ImapHook: Authorized hook object ready for operations
Note: Use as context manager with 'with' statement for automatic cleanup
"""
def __enter__(self) -> ImapHook:
"""Context manager entry - returns connected hook instance."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - automatically logs out from mail server."""Usage Example:
from airflow.providers.imap.hooks.imap import ImapHook
# Recommended: Use as context manager for automatic connection cleanup
with ImapHook(imap_conn_id="my_imap_conn") as hook:
# Connection is automatically established and cleaned up
attachments = hook.retrieve_mail_attachments("*.pdf")
# Manual connection management (not recommended)
hook = ImapHook(imap_conn_id="my_imap_conn")
connected_hook = hook.get_conn()
# Remember to call hook.mail_client.logout() manuallyMethods for checking the existence of email attachments with flexible search patterns.
def has_mail_attachment(
self,
name: str,
*,
check_regex: bool = False,
mail_folder: str = "INBOX",
mail_filter: str = "All"
) -> bool:
"""
Check if mail folder contains attachments with the given name.
Parameters:
- name (str): Attachment name to search for
- check_regex (bool): If True, treat name as regular expression pattern
- mail_folder (str): Mail folder to search in (default: "INBOX")
- mail_filter (str): IMAP search filter (default: "All")
Returns:
bool: True if attachment found, False otherwise
"""Usage Example:
with ImapHook() as hook:
# Check for exact attachment name
has_report = hook.has_mail_attachment("daily_report.csv")
# Check using regex pattern
has_any_csv = hook.has_mail_attachment(
r".*\.csv$",
check_regex=True
)
# Search in specific folder with date filter
has_recent = hook.has_mail_attachment(
"invoice.pdf",
mail_folder="Business",
mail_filter='(SINCE "01-Jan-2024")'
)Methods for retrieving attachment data as in-memory content for processing.
def retrieve_mail_attachments(
self,
name: str,
*,
check_regex: bool = False,
latest_only: bool = False,
mail_folder: str = "INBOX",
mail_filter: str = "All",
not_found_mode: str = "raise",
) -> list[tuple[str, bytes]]:
"""
Retrieve attachment data from emails matching the criteria.
Parameters:
- name (str): Attachment name to search for
- check_regex (bool): If True, treat name as regular expression
- latest_only (bool): If True, return only the first matching attachment
- mail_folder (str): Mail folder to search in (default: "INBOX")
- mail_filter (str): IMAP search filter (default: "All")
- not_found_mode (str): Error handling mode ("raise", "warn", "ignore")
Returns:
list[tuple[str, bytes]]: List of (filename, payload) tuples containing attachment data
"""Usage Example:
with ImapHook() as hook:
# Retrieve all CSV attachments
attachments = hook.retrieve_mail_attachments("*.csv", check_regex=True)
for filename, payload in attachments:
print(f"Found attachment: {filename}, size: {len(payload)} bytes")
# Get only the latest matching attachment
latest = hook.retrieve_mail_attachments(
"report.xlsx",
latest_only=True,
not_found_mode="warn" # Just log warning if not found
)Methods for downloading attachments to local filesystem with security protections.
def download_mail_attachments(
self,
name: str,
local_output_directory: str,
*,
check_regex: bool = False,
latest_only: bool = False,
mail_folder: str = "INBOX",
mail_filter: str = "All",
not_found_mode: str = "raise",
) -> None:
"""
Download attachments from emails to local directory.
Parameters:
- name (str): Attachment name to search for
- local_output_directory (str): Local directory path for downloads
- check_regex (bool): If True, treat name as regular expression
- latest_only (bool): If True, download only the first matching attachment
- mail_folder (str): Mail folder to search in (default: "INBOX")
- mail_filter (str): IMAP search filter (default: "All")
- not_found_mode (str): Error handling mode ("raise", "warn", "ignore")
Security Features:
- Prevents directory traversal attacks (blocks "../" in filenames)
- Blocks symlink creation for security
- Validates output directory paths
"""Usage Example:
import os
with ImapHook() as hook:
# Create download directory
download_dir = "/tmp/email_attachments"
os.makedirs(download_dir, exist_ok=True)
# Download all PDF attachments from last 7 days
hook.download_mail_attachments(
name=r".*\.pdf$",
local_output_directory=download_dir,
check_regex=True,
mail_filter='(SINCE "07-days-ago")'
)
# Download only the latest report
hook.download_mail_attachments(
name="daily_report.xlsx",
local_output_directory=download_dir,
latest_only=True,
not_found_mode="ignore" # Continue processing even if not found
)Helper class for parsing and working with individual email messages.
class Mail:
"""
Helper class for working with mail messages from imaplib.
Parameters:
- mail_body (str): Raw email message body from IMAP server
"""
def __init__(self, mail_body: str) -> None: ...
def has_attachments(self) -> bool:
"""
Check if the email message contains attachments.
Returns:
bool: True if email has attachments, False otherwise
"""
def get_attachments_by_name(
self,
name: str,
check_regex: bool,
find_first: bool = False
) -> list[tuple[str, bytes]]:
"""
Extract attachments from email by name pattern.
Parameters:
- name (str): Attachment name or pattern to match
- check_regex (bool): If True, use regex matching
- find_first (bool): If True, return only first match
Returns:
list[tuple[str, bytes]]: List of (filename, payload) tuples
"""Helper class for working with individual email message parts and attachments.
class MailPart:
"""
Wrapper for individual email parts with attachment functionality.
Parameters:
- part: Email message part from email.message
"""
def __init__(self, part) -> None: ...
def is_attachment(self) -> bool:
"""
Check if the message part is a valid attachment.
Returns:
bool: True if part is an attachment, False otherwise
"""
def has_matching_name(self, name: str) -> re.Match[str] | None:
"""
Check if attachment name matches regex pattern.
Parameters:
- name (str): Regular expression pattern to match
Returns:
re.Match[str] | None: Match object if pattern matches, None otherwise
"""
def has_equal_name(self, name: str) -> bool:
"""
Check if attachment name equals the given name exactly.
Parameters:
- name (str): Exact name to match
Returns:
bool: True if names are equal, False otherwise
"""
def get_file(self) -> tuple[str, bytes]:
"""
Extract filename and payload from attachment.
Returns:
tuple[str, bytes]: (filename, payload) where payload is decoded bytes
"""The mail_filter parameter supports IMAP search criteria. Common examples:
# All messages (default)
mail_filter = "All"
# Messages from specific sender
mail_filter = 'FROM "sender@example.com"'
# Messages with specific subject
mail_filter = 'SUBJECT "Monthly Report"'
# Messages since specific date
mail_filter = 'SINCE "01-Jan-2024"'
# Messages from last N days
mail_filter = 'SINCE "7-days-ago"'
# Unread messages only
mail_filter = "UNSEEN"
# Combine multiple criteria
mail_filter = 'FROM "reports@company.com" SINCE "01-Jan-2024" UNSEEN'The hook automatically configures SSL/TLS based on connection settings:
# Connection Extra field configuration
{
"use_ssl": true, # Enable SSL/TLS (default: true)
"ssl_context": "default" # SSL context ("default" or "none")
}ssl.create_default_context() for secure connectionsSSL context can be configured globally in Airflow configuration:
[imap]
ssl_context = default
# Or fallback to email section
[email]
ssl_context = defaultfrom airflow.exceptions import AirflowException
try:
with ImapHook() as hook:
attachments = hook.retrieve_mail_attachments("report.csv")
except AirflowException:
# Handle case where no attachments were found
print("No matching attachments found")
except RuntimeError as e:
# Handle SSL configuration or connection errors
print(f"Connection error: {e}")Configure how the hook handles missing attachments:
AirflowException if no attachments found# Different error handling approaches
with ImapHook() as hook:
# Strict mode - will raise exception if not found
attachments = hook.retrieve_mail_attachments("critical_report.csv")
# Lenient mode - will continue even if not found
optional_attachments = hook.retrieve_mail_attachments(
"optional_data.csv",
not_found_mode="ignore"
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-imap