Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
The middleware system in Dramatiq provides a powerful plugin architecture for extending message processing functionality. Middleware components can intercept and modify message processing at various stages, enabling features like retries, time limits, rate limiting, monitoring, and custom processing logic.
The foundation for all middleware components, defining hook methods for different processing stages.
class Middleware:
"""
Base class for middleware components.
Middleware can intercept message processing at various stages
and modify behavior through hook methods.
"""
@property
def actor_options(self) -> Set[str]:
"""
Set of actor options this middleware supports.
Returns:
Set of option names that actors can use with this middleware
"""
return set()
@property
def forks(self) -> List[Callable]:
"""
List of fork functions for process-based middleware.
Returns:
List of functions to call when worker process forks
"""
return []
# Message acknowledgment hooks
def before_ack(self, broker: Broker, message: Message):
"""Called before message acknowledgment."""
def after_ack(self, broker: Broker, message: Message):
"""Called after message acknowledgment."""
def before_nack(self, broker: Broker, message: Message):
"""Called before message negative acknowledgment."""
def after_nack(self, broker: Broker, message: Message):
"""Called after message negative acknowledgment."""
# Actor lifecycle hooks
def before_declare_actor(self, broker: Broker, actor: Actor):
"""Called before actor is declared to broker."""
def after_declare_actor(self, broker: Broker, actor: Actor):
"""Called after actor is declared to broker."""
# Message enqueue hooks
def before_enqueue(self, broker: Broker, message: Message, delay: int):
"""Called before message is enqueued."""
def after_enqueue(self, broker: Broker, message: Message, delay: int):
"""Called after message is enqueued."""
# Message processing hooks
def before_process_message(self, broker: Broker, message: Message):
"""
Called before message processing.
Can raise SkipMessage to skip processing this message.
"""
def after_process_message(
self,
broker: Broker,
message: Message,
*,
result=None,
exception=None
):
"""
Called after message processing completes.
Parameters:
- result: Result from successful processing (if no exception)
- exception: Exception from failed processing (if failed)
"""
def after_skip_message(self, broker: Broker, message: Message):
"""Called when message processing is skipped."""
# Worker lifecycle hooks
def before_worker_boot(self, broker: Broker, worker: Worker):
"""Called before worker starts processing."""
def after_worker_boot(self, broker: Broker, worker: Worker):
"""Called after worker starts processing."""
def before_worker_shutdown(self, broker: Broker, worker: Worker):
"""Called before worker shuts down."""
def after_worker_shutdown(self, broker: Broker, worker: Worker):
"""Called after worker shuts down."""Automatically retry failed messages with exponential backoff.
class Retries(Middleware):
def __init__(
self, *,
max_retries: int = 20,
min_backoff: int = 15000,
max_backoff: int = 604800000,
retry_when: Callable = None
):
"""
Initialize retry middleware.
Parameters:
- max_retries: Maximum number of retry attempts
- min_backoff: Minimum backoff time in milliseconds
- max_backoff: Maximum backoff time in milliseconds
- retry_when: Function to determine if retry should occur
"""
@property
def actor_options(self) -> Set[str]:
return {"max_retries", "min_backoff", "max_backoff", "retry_when"}Usage:
# Default retries
retries = Retries()
# Custom retry configuration
retries = Retries(
max_retries=5,
min_backoff=1000, # 1 second
max_backoff=300000, # 5 minutes
)
# Custom retry logic
def should_retry(retries_so_far, exception):
# Only retry on specific exceptions
return isinstance(exception, (ConnectionError, TimeoutError)) and retries_so_far < 3
retries = Retries(retry_when=should_retry)
broker.add_middleware(retries)
# Actor-specific retry settings
@dramatiq.actor(max_retries=3, min_backoff=5000)
def fragile_task(data):
if random.random() < 0.5:
raise Exception("Random failure")
return "Success"Enforce maximum execution time for tasks.
class TimeLimit(Middleware):
def __init__(self, *, time_limit: int = 600000, interval: int = 1000):
"""
Initialize time limit middleware.
Parameters:
- time_limit: Maximum execution time in milliseconds (default: 10 minutes)
- interval: Check interval in milliseconds (default: 1 second)
"""
@property
def actor_options(self) -> Set[str]:
return {"time_limit"}
class TimeLimitExceeded(Exception):
"""Raised when task execution exceeds time limit."""Usage:
time_limit = TimeLimit(time_limit=30000) # 30 seconds
broker.add_middleware(time_limit)
@dramatiq.actor(time_limit=60000) # 1 minute limit
def long_running_task(data):
# Long-running processing
time.sleep(120) # Will be interrupted after 1 minute
return "Finished"
try:
long_running_task.send({"data": "test"})
except TimeLimitExceeded:
print("Task exceeded time limit")Reject messages that are too old.
class AgeLimit(Middleware):
def __init__(self, *, max_age: int = None):
"""
Initialize age limit middleware.
Parameters:
- max_age: Maximum message age in milliseconds
"""
@property
def actor_options(self) -> Set[str]:
return {"max_age"}Usage:
age_limit = AgeLimit(max_age=3600000) # 1 hour
broker.add_middleware(age_limit)
@dramatiq.actor(max_age=1800000) # 30 minutes
def time_sensitive_task(data):
return f"Processing {data}"Execute callback functions on task success or failure.
class Callbacks(Middleware):
def __init__(self):
"""Initialize callbacks middleware."""
@property
def actor_options(self) -> Set[str]:
return {"on_success", "on_failure"}Usage:
callbacks = Callbacks()
broker.add_middleware(callbacks)
@dramatiq.actor
def success_callback(message_data, result):
print(f"Task {message_data.message_id} succeeded with result: {result}")
@dramatiq.actor
def failure_callback(message_data, exception_data):
print(f"Task {message_data.message_id} failed: {exception_data}")
@dramatiq.actor(
on_success="success_callback",
on_failure="failure_callback"
)
def monitored_task(data):
if data == "fail":
raise ValueError("Intentional failure")
return f"Processed: {data}"Enable pipeline composition functionality.
class Pipelines(Middleware):
def __init__(self):
"""Initialize pipelines middleware."""Handle group completion callbacks and coordination.
class GroupCallbacks(Middleware):
def __init__(self, rate_limiter_backend):
"""
Initialize group callbacks middleware.
Parameters:
- rate_limiter_backend: Backend for coordination
"""Export metrics to Prometheus for monitoring.
class Prometheus(Middleware):
def __init__(
self, *,
http_host: str = "127.0.0.1",
http_port: int = 9191,
registry = None
):
"""
Initialize Prometheus metrics middleware.
Parameters:
- http_host: HTTP server host for metrics endpoint
- http_port: HTTP server port for metrics endpoint
- registry: Prometheus registry (uses default if None)
"""Usage:
prometheus = Prometheus(http_host="0.0.0.0", http_port=8000)
broker.add_middleware(prometheus)
# Metrics available at http://localhost:8000/metrics
# - dramatiq_messages_total: Total messages processed
# - dramatiq_message_errors_total: Total message errors
# - dramatiq_message_duration_seconds: Message processing duration
# - dramatiq_workers_total: Number of active workersStore and retrieve task results.
class Results(Middleware):
def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):
"""
Initialize results middleware.
Parameters:
- backend: Result storage backend
- store_results: Whether to store results by default
"""
@property
def actor_options(self) -> Set[str]:
return {"store_results"}Usage:
from dramatiq.results.backends import RedisBackend
result_backend = RedisBackend()
results = Results(backend=result_backend, store_results=True)
broker.add_middleware(results)
@dramatiq.actor(store_results=True)
def task_with_result(data):
return {"processed": data, "timestamp": time.time()}
message = task_with_result.send("test_data")
result = message.get_result(block=True, timeout=30000)
print(f"Task result: {result}")Provide access to current message in actors.
class CurrentMessage(Middleware):
def __init__(self):
"""Initialize current message middleware."""
# Access current message in actors
from dramatiq.middleware import CurrentMessage
def get_current_message() -> Message:
"""Get the currently processing message."""Usage:
current_message = CurrentMessage()
broker.add_middleware(current_message)
@dramatiq.actor
def message_aware_task(data):
from dramatiq.middleware import get_current_message
current = get_current_message()
print(f"Processing message {current.message_id} with data: {data}")
return {
"data": data,
"message_id": current.message_id,
"retry_count": current.options.get("retries", 0)
}Handle graceful worker shutdown.
class Shutdown(Middleware):
def __init__(self):
"""Initialize shutdown middleware."""
class ShutdownNotifications(Middleware):
def __init__(self, notify_shutdown: Callable = None):
"""
Initialize shutdown notifications middleware.
Parameters:
- notify_shutdown: Function to call on shutdown
"""Support for async actors.
class AsyncIO(Middleware):
def __init__(self):
"""Initialize AsyncIO middleware for async actors."""Usage:
asyncio_middleware = AsyncIO()
broker.add_middleware(asyncio_middleware)
@dramatiq.actor
async def async_task(data):
await asyncio.sleep(1) # Async operation
return f"Async processed: {data}"
# Send async task
async_task.send("test_data")class MiddlewareError(Exception):
"""Base exception for middleware errors."""
class SkipMessage(Exception):
"""
Exception raised to skip message processing.
When raised in before_process_message, the message
will be acknowledged without processing.
"""Utilities for thread-based middleware operations.
class Interrupt(Exception):
"""Exception used to interrupt thread execution."""
def raise_thread_exception(thread_id: int, exception: Exception):
"""
Raise an exception in a specific thread.
Parameters:
- thread_id: Target thread ID
- exception: Exception to raise in the thread
"""default_middleware = [
Prometheus,
AgeLimit,
TimeLimit,
ShutdownNotifications,
Callbacks,
Pipelines,
Retries
]class LoggingMiddleware(dramatiq.Middleware):
def __init__(self, log_level="INFO"):
self.logger = logging.getLogger("dramatiq.custom")
self.logger.setLevel(log_level)
def before_process_message(self, broker, message):
self.logger.info(f"Starting processing: {message.actor_name}")
def after_process_message(self, broker, message, *, result=None, exception=None):
if exception:
self.logger.error(f"Failed processing {message.actor_name}: {exception}")
else:
self.logger.info(f"Completed processing: {message.actor_name}")
# Add custom middleware
logging_middleware = LoggingMiddleware()
broker.add_middleware(logging_middleware)class RateLimitingMiddleware(dramatiq.Middleware):
def __init__(self, default_limit=100):
self.default_limit = default_limit
self.counters = {}
@property
def actor_options(self):
return {"rate_limit"}
def before_process_message(self, broker, message):
actor_name = message.actor_name
rate_limit = message.options.get("rate_limit", self.default_limit)
# Simple in-memory rate limiting (use Redis in production)
current_count = self.counters.get(actor_name, 0)
if current_count >= rate_limit:
raise dramatiq.RateLimitExceeded(f"Rate limit {rate_limit} exceeded for {actor_name}")
self.counters[actor_name] = current_count + 1
def after_process_message(self, broker, message, *, result=None, exception=None):
# Reset counter after processing
actor_name = message.actor_name
if actor_name in self.counters:
self.counters[actor_name] -= 1
# Usage
rate_limiting = RateLimitingMiddleware(default_limit=50)
broker.add_middleware(rate_limiting)
@dramatiq.actor(rate_limit=10)
def rate_limited_task(data):
return f"Processed: {data}"class DatabaseLoggingMiddleware(dramatiq.Middleware):
def __init__(self, database_url):
self.db = database.connect(database_url)
def after_process_message(self, broker, message, *, result=None, exception=None):
# Log to database
self.db.execute(
"INSERT INTO task_log (message_id, actor_name, success, error) VALUES (?, ?, ?, ?)",
(message.message_id, message.actor_name, exception is None, str(exception) if exception else None)
)
self.db.commit()
def before_worker_shutdown(self, broker, worker):
self.db.close()
# Usage
db_logging = DatabaseLoggingMiddleware("sqlite:///tasks.db")
broker.add_middleware(db_logging)Middleware order matters as each middleware can modify message processing:
# Careful ordering for proper functionality
broker = RedisBroker(middleware=[
Prometheus(), # Metrics first
AgeLimit(), # Filter old messages early
TimeLimit(), # Set time limits
Results(), # Store results before retries
Retries(), # Retry logic
Callbacks(), # Callbacks after retries
Pipelines(), # Pipeline support
])
# Add middleware with specific positioning
broker.add_middleware(CustomMiddleware(), after=TimeLimit)
broker.add_middleware(AnotherMiddleware(), before=Retries)Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq