CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

middleware.mddocs/

Middleware

The middleware system in Dramatiq provides a powerful plugin architecture for extending message processing functionality. Middleware components can intercept and modify message processing at various stages, enabling features like retries, time limits, rate limiting, monitoring, and custom processing logic.

Capabilities

Middleware Base Class

The foundation for all middleware components, defining hook methods for different processing stages.

class Middleware:
    """
    Base class for middleware components.
    
    Middleware can intercept message processing at various stages
    and modify behavior through hook methods.
    """
    
    @property
    def actor_options(self) -> Set[str]:
        """
        Set of actor options this middleware supports.
        
        Returns:
        Set of option names that actors can use with this middleware
        """
        return set()
    
    @property  
    def forks(self) -> List[Callable]:
        """
        List of fork functions for process-based middleware.
        
        Returns:
        List of functions to call when worker process forks
        """
        return []
    
    # Message acknowledgment hooks
    def before_ack(self, broker: Broker, message: Message):
        """Called before message acknowledgment."""
    
    def after_ack(self, broker: Broker, message: Message):
        """Called after message acknowledgment."""
    
    def before_nack(self, broker: Broker, message: Message):
        """Called before message negative acknowledgment."""
    
    def after_nack(self, broker: Broker, message: Message):
        """Called after message negative acknowledgment."""
    
    # Actor lifecycle hooks
    def before_declare_actor(self, broker: Broker, actor: Actor):
        """Called before actor is declared to broker."""
    
    def after_declare_actor(self, broker: Broker, actor: Actor):
        """Called after actor is declared to broker."""
    
    # Message enqueue hooks
    def before_enqueue(self, broker: Broker, message: Message, delay: int):
        """Called before message is enqueued."""
    
    def after_enqueue(self, broker: Broker, message: Message, delay: int):
        """Called after message is enqueued."""
    
    # Message processing hooks
    def before_process_message(self, broker: Broker, message: Message):
        """
        Called before message processing.
        
        Can raise SkipMessage to skip processing this message.
        """
    
    def after_process_message(
        self, 
        broker: Broker, 
        message: Message, 
        *, 
        result=None, 
        exception=None
    ):
        """
        Called after message processing completes.
        
        Parameters:
        - result: Result from successful processing (if no exception)
        - exception: Exception from failed processing (if failed)
        """
    
    def after_skip_message(self, broker: Broker, message: Message):
        """Called when message processing is skipped."""
    
    # Worker lifecycle hooks  
    def before_worker_boot(self, broker: Broker, worker: Worker):
        """Called before worker starts processing."""
    
    def after_worker_boot(self, broker: Broker, worker: Worker):
        """Called after worker starts processing."""
    
    def before_worker_shutdown(self, broker: Broker, worker: Worker):
        """Called before worker shuts down."""
    
    def after_worker_shutdown(self, broker: Broker, worker: Worker):
        """Called after worker shuts down."""

Built-in Middleware Components

Retries Middleware

Automatically retry failed messages with exponential backoff.

class Retries(Middleware):
    def __init__(
        self, *,
        max_retries: int = 20,
        min_backoff: int = 15000,
        max_backoff: int = 604800000,
        retry_when: Callable = None
    ):
        """
        Initialize retry middleware.
        
        Parameters:
        - max_retries: Maximum number of retry attempts
        - min_backoff: Minimum backoff time in milliseconds
        - max_backoff: Maximum backoff time in milliseconds  
        - retry_when: Function to determine if retry should occur
        """
    
    @property
    def actor_options(self) -> Set[str]:
        return {"max_retries", "min_backoff", "max_backoff", "retry_when"}

Usage:

# Default retries
retries = Retries()

# Custom retry configuration
retries = Retries(
    max_retries=5,
    min_backoff=1000,   # 1 second
    max_backoff=300000, # 5 minutes
)

# Custom retry logic
def should_retry(retries_so_far, exception):
    # Only retry on specific exceptions
    return isinstance(exception, (ConnectionError, TimeoutError)) and retries_so_far < 3

retries = Retries(retry_when=should_retry)

broker.add_middleware(retries)

# Actor-specific retry settings
@dramatiq.actor(max_retries=3, min_backoff=5000)
def fragile_task(data):
    if random.random() < 0.5:
        raise Exception("Random failure")
    return "Success"

Time Limit Middleware

Enforce maximum execution time for tasks.

class TimeLimit(Middleware):
    def __init__(self, *, time_limit: int = 600000, interval: int = 1000):
        """
        Initialize time limit middleware.
        
        Parameters:
        - time_limit: Maximum execution time in milliseconds (default: 10 minutes)
        - interval: Check interval in milliseconds (default: 1 second)
        """
    
    @property
    def actor_options(self) -> Set[str]:
        return {"time_limit"}

class TimeLimitExceeded(Exception):
    """Raised when task execution exceeds time limit."""

Usage:

time_limit = TimeLimit(time_limit=30000)  # 30 seconds
broker.add_middleware(time_limit)

@dramatiq.actor(time_limit=60000)  # 1 minute limit
def long_running_task(data):
    # Long-running processing
    time.sleep(120)  # Will be interrupted after 1 minute
    return "Finished"

try:
    long_running_task.send({"data": "test"})
except TimeLimitExceeded:
    print("Task exceeded time limit")

Age Limit Middleware

Reject messages that are too old.

class AgeLimit(Middleware):
    def __init__(self, *, max_age: int = None):
        """
        Initialize age limit middleware.
        
        Parameters:
        - max_age: Maximum message age in milliseconds
        """
    
    @property
    def actor_options(self) -> Set[str]:
        return {"max_age"}

Usage:

age_limit = AgeLimit(max_age=3600000)  # 1 hour
broker.add_middleware(age_limit)

@dramatiq.actor(max_age=1800000)  # 30 minutes
def time_sensitive_task(data):
    return f"Processing {data}"

Callbacks Middleware

Execute callback functions on task success or failure.

class Callbacks(Middleware):
    def __init__(self):
        """Initialize callbacks middleware."""
    
    @property
    def actor_options(self) -> Set[str]:
        return {"on_success", "on_failure"}

Usage:

callbacks = Callbacks()
broker.add_middleware(callbacks)

@dramatiq.actor
def success_callback(message_data, result):
    print(f"Task {message_data.message_id} succeeded with result: {result}")

@dramatiq.actor
def failure_callback(message_data, exception_data):
    print(f"Task {message_data.message_id} failed: {exception_data}")

@dramatiq.actor(
    on_success="success_callback",
    on_failure="failure_callback"
)
def monitored_task(data):
    if data == "fail":
        raise ValueError("Intentional failure")
    return f"Processed: {data}"

Pipelines Middleware

Enable pipeline composition functionality.

class Pipelines(Middleware):
    def __init__(self):
        """Initialize pipelines middleware."""

Group Callbacks Middleware

Handle group completion callbacks and coordination.

class GroupCallbacks(Middleware):
    def __init__(self, rate_limiter_backend):
        """
        Initialize group callbacks middleware.
        
        Parameters:
        - rate_limiter_backend: Backend for coordination
        """

Prometheus Middleware

Export metrics to Prometheus for monitoring.

class Prometheus(Middleware):
    def __init__(
        self, *,
        http_host: str = "127.0.0.1",
        http_port: int = 9191,
        registry = None
    ):
        """
        Initialize Prometheus metrics middleware.
        
        Parameters:
        - http_host: HTTP server host for metrics endpoint
        - http_port: HTTP server port for metrics endpoint
        - registry: Prometheus registry (uses default if None)
        """

Usage:

prometheus = Prometheus(http_host="0.0.0.0", http_port=8000)
broker.add_middleware(prometheus)

# Metrics available at http://localhost:8000/metrics
# - dramatiq_messages_total: Total messages processed
# - dramatiq_message_errors_total: Total message errors
# - dramatiq_message_duration_seconds: Message processing duration
# - dramatiq_workers_total: Number of active workers

Results Middleware

Store and retrieve task results.

class Results(Middleware):
    def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):
        """
        Initialize results middleware.
        
        Parameters:
        - backend: Result storage backend
        - store_results: Whether to store results by default
        """
    
    @property
    def actor_options(self) -> Set[str]:
        return {"store_results"}

Usage:

from dramatiq.results.backends import RedisBackend

result_backend = RedisBackend()
results = Results(backend=result_backend, store_results=True)
broker.add_middleware(results)

@dramatiq.actor(store_results=True)
def task_with_result(data):
    return {"processed": data, "timestamp": time.time()}

message = task_with_result.send("test_data")
result = message.get_result(block=True, timeout=30000)
print(f"Task result: {result}")

Current Message Middleware

Provide access to current message in actors.

class CurrentMessage(Middleware):
    def __init__(self):
        """Initialize current message middleware."""

# Access current message in actors
from dramatiq.middleware import CurrentMessage

def get_current_message() -> Message:
    """Get the currently processing message."""

Usage:

current_message = CurrentMessage()
broker.add_middleware(current_message)

@dramatiq.actor
def message_aware_task(data):
    from dramatiq.middleware import get_current_message
    
    current = get_current_message()
    print(f"Processing message {current.message_id} with data: {data}")
    
    return {
        "data": data,
        "message_id": current.message_id,
        "retry_count": current.options.get("retries", 0)
    }

Shutdown Middleware

Handle graceful worker shutdown.

class Shutdown(Middleware):
    def __init__(self):
        """Initialize shutdown middleware."""

class ShutdownNotifications(Middleware):
    def __init__(self, notify_shutdown: Callable = None):
        """
        Initialize shutdown notifications middleware.
        
        Parameters:
        - notify_shutdown: Function to call on shutdown
        """

AsyncIO Middleware

Support for async actors.

class AsyncIO(Middleware):
    def __init__(self):
        """Initialize AsyncIO middleware for async actors."""

Usage:

asyncio_middleware = AsyncIO()
broker.add_middleware(asyncio_middleware)

@dramatiq.actor
async def async_task(data):
    await asyncio.sleep(1)  # Async operation
    return f"Async processed: {data}"

# Send async task
async_task.send("test_data")

Middleware Errors

class MiddlewareError(Exception):
    """Base exception for middleware errors."""

class SkipMessage(Exception):
    """
    Exception raised to skip message processing.
    
    When raised in before_process_message, the message
    will be acknowledged without processing.
    """

Threading Utilities

Utilities for thread-based middleware operations.

class Interrupt(Exception):
    """Exception used to interrupt thread execution."""

def raise_thread_exception(thread_id: int, exception: Exception):
    """
    Raise an exception in a specific thread.
    
    Parameters:
    - thread_id: Target thread ID
    - exception: Exception to raise in the thread
    """

Default Middleware Stack

default_middleware = [
    Prometheus,
    AgeLimit, 
    TimeLimit,
    ShutdownNotifications,
    Callbacks,
    Pipelines,
    Retries
]

Custom Middleware Development

Basic Custom Middleware

class LoggingMiddleware(dramatiq.Middleware):
    def __init__(self, log_level="INFO"):
        self.logger = logging.getLogger("dramatiq.custom")
        self.logger.setLevel(log_level)
    
    def before_process_message(self, broker, message):
        self.logger.info(f"Starting processing: {message.actor_name}")
    
    def after_process_message(self, broker, message, *, result=None, exception=None):
        if exception:
            self.logger.error(f"Failed processing {message.actor_name}: {exception}")
        else:
            self.logger.info(f"Completed processing: {message.actor_name}")

# Add custom middleware
logging_middleware = LoggingMiddleware()
broker.add_middleware(logging_middleware)

Advanced Custom Middleware with Options

class RateLimitingMiddleware(dramatiq.Middleware):
    def __init__(self, default_limit=100):
        self.default_limit = default_limit
        self.counters = {}
    
    @property
    def actor_options(self):
        return {"rate_limit"}
    
    def before_process_message(self, broker, message):
        actor_name = message.actor_name
        rate_limit = message.options.get("rate_limit", self.default_limit)
        
        # Simple in-memory rate limiting (use Redis in production)
        current_count = self.counters.get(actor_name, 0)
        if current_count >= rate_limit:
            raise dramatiq.RateLimitExceeded(f"Rate limit {rate_limit} exceeded for {actor_name}")
        
        self.counters[actor_name] = current_count + 1
    
    def after_process_message(self, broker, message, *, result=None, exception=None):
        # Reset counter after processing
        actor_name = message.actor_name
        if actor_name in self.counters:
            self.counters[actor_name] -= 1

# Usage
rate_limiting = RateLimitingMiddleware(default_limit=50)
broker.add_middleware(rate_limiting)

@dramatiq.actor(rate_limit=10)
def rate_limited_task(data):
    return f"Processed: {data}"

Middleware with External Dependencies

class DatabaseLoggingMiddleware(dramatiq.Middleware):
    def __init__(self, database_url):
        self.db = database.connect(database_url)
    
    def after_process_message(self, broker, message, *, result=None, exception=None):
        # Log to database
        self.db.execute(
            "INSERT INTO task_log (message_id, actor_name, success, error) VALUES (?, ?, ?, ?)",
            (message.message_id, message.actor_name, exception is None, str(exception) if exception else None)
        )
        self.db.commit()
    
    def before_worker_shutdown(self, broker, worker):
        self.db.close()

# Usage
db_logging = DatabaseLoggingMiddleware("sqlite:///tasks.db")
broker.add_middleware(db_logging)

Middleware Ordering

Middleware order matters as each middleware can modify message processing:

# Careful ordering for proper functionality
broker = RedisBroker(middleware=[
    Prometheus(),        # Metrics first
    AgeLimit(),         # Filter old messages early
    TimeLimit(),        # Set time limits
    Results(),          # Store results before retries
    Retries(),          # Retry logic
    Callbacks(),        # Callbacks after retries
    Pipelines(),        # Pipeline support
])

# Add middleware with specific positioning
broker.add_middleware(CustomMiddleware(), after=TimeLimit)
broker.add_middleware(AnotherMiddleware(), before=Retries)

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json