Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
Message brokers handle the routing, persistence, and delivery of task messages between actors and workers. Dramatiq supports Redis, RabbitMQ, and in-memory brokers for different deployment scenarios.
The abstract base class that defines the broker interface for all implementations.
class Broker:
def __init__(self, middleware=None):
"""
Initialize broker with optional middleware.
Parameters:
- middleware: List of middleware instances
"""
def add_middleware(self, middleware, *, before=None, after=None):
"""
Add middleware to the broker.
Parameters:
- middleware: Middleware instance to add
- before: Add before this middleware class
- after: Add after this middleware class
"""
def declare_actor(self, actor):
"""
Register an actor with the broker.
Parameters:
- actor: Actor instance to register
"""
def declare_queue(self, queue_name):
"""
Declare a queue on the broker.
Parameters:
- queue_name: str - Name of the queue to declare
"""
def enqueue(self, message, *, delay=None) -> Message:
"""
Enqueue a message for processing.
Parameters:
- message: Message instance to enqueue
- delay: int - Delay in milliseconds before processing
Returns:
Message instance with updated metadata
"""
def consume(self, queue_name, prefetch=1, timeout=30000) -> Consumer:
"""
Create a consumer for a queue.
Parameters:
- queue_name: str - Queue to consume from
- prefetch: int - Number of messages to prefetch
- timeout: int - Consumer timeout in milliseconds
Returns:
Consumer instance for processing messages
"""
def get_actor(self, actor_name) -> Actor:
"""
Get registered actor by name.
Parameters:
- actor_name: str - Name of the actor
Returns:
Actor instance
Raises:
ActorNotFound: If actor is not registered
"""
def get_results_backend(self) -> ResultBackend:
"""
Get the results backend associated with this broker.
Returns:
ResultBackend instance or None
"""
def flush(self, queue_name):
"""
Remove all messages from a queue.
Parameters:
- queue_name: str - Queue to flush
"""
def flush_all(self):
"""Remove all messages from all queues."""
def join(self, queue_name, *, timeout=None):
"""
Wait for all messages in a queue to be processed.
Parameters:
- queue_name: str - Queue to wait for
- timeout: int - Timeout in milliseconds
Raises:
QueueJoinTimeout: If timeout is exceeded
"""
def close(self):
"""Close the broker and clean up connections."""
# Properties
actors: Dict[str, Actor] # Registered actors
queues: Dict[str, Queue] # Declared queues
middleware: List[Middleware] # Middleware stack
actor_options: Set[str] # Set of valid actor optionsProduction-ready broker using Redis as the message transport and storage backend.
class RedisBroker(Broker):
def __init__(
self, *,
url: str = None,
middleware: List[Middleware] = None,
namespace: str = "dramatiq",
maintenance_chance: int = 1000,
heartbeat_timeout: int = 60000,
dead_message_ttl: int = 604800000,
requeue_deadline: int = None,
requeue_interval: int = None,
client: redis.Redis = None,
**parameters
):
"""
Create Redis broker instance.
Parameters:
- url: Redis connection URL (redis://host:port/db)
- middleware: List of middleware instances
- namespace: Key namespace prefix (default: "dramatiq")
- maintenance_chance: Probability of running maintenance (1/chance)
- heartbeat_timeout: Worker heartbeat timeout in ms
- dead_message_ttl: Dead message TTL in ms (7 days)
- requeue_deadline: Message requeue deadline in ms
- requeue_interval: Message requeue check interval in ms
- client: Existing Redis client instance
- **parameters: Additional Redis connection parameters
"""Usage:
# Basic Redis broker
redis_broker = RedisBroker(host="localhost", port=6379, db=0)
# Redis broker with URL
redis_broker = RedisBroker(url="redis://localhost:6379/0")
# Redis broker with custom settings
redis_broker = RedisBroker(
host="redis.example.com",
port=6379,
password="secret",
namespace="myapp",
heartbeat_timeout=120000, # 2 minutes
dead_message_ttl=86400000 # 1 day
)
# Redis broker with existing client
import redis
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
redis_broker = RedisBroker(client=redis_client)
dramatiq.set_broker(redis_broker)Enterprise-grade broker using RabbitMQ for high-throughput message processing with advanced routing features.
class RabbitmqBroker(Broker):
def __init__(
self, *,
confirm_delivery: bool = False,
url: str = None,
middleware: List[Middleware] = None,
max_priority: int = None,
parameters: pika.ConnectionParameters = None,
**kwargs
):
"""
Create RabbitMQ broker instance.
Parameters:
- confirm_delivery: Enable delivery confirmations
- url: AMQP connection URL (amqp://user:pass@host:port/vhost)
- middleware: List of middleware instances
- max_priority: Maximum message priority (enables priority queues)
- parameters: Pika ConnectionParameters instance
- **kwargs: Additional connection parameters
"""Usage:
# Basic RabbitMQ broker
rabbitmq_broker = RabbitmqBroker(host="localhost", port=5672)
# RabbitMQ broker with URL
rabbitmq_broker = RabbitmqBroker(
url="amqp://user:password@rabbitmq.example.com:5672/myapp"
)
# RabbitMQ broker with priorities and confirmations
rabbitmq_broker = RabbitmqBroker(
host="localhost",
port=5672,
confirm_delivery=True,
max_priority=255,
heartbeat=600,
connection_attempts=3
)
# RabbitMQ broker with custom parameters
import pika
params = pika.ConnectionParameters(
host="rabbitmq.example.com",
port=5672,
credentials=pika.PlainCredentials("user", "password"),
heartbeat=600
)
rabbitmq_broker = RabbitmqBroker(parameters=params)
dramatiq.set_broker(rabbitmq_broker)In-memory broker for testing and development environments.
class StubBroker(Broker):
def __init__(self, middleware=None):
"""
Create in-memory broker for testing.
Parameters:
- middleware: List of middleware instances
"""
# Testing-specific properties
dead_letters: List[Message] # All dead-lettered messages
dead_letters_by_queue: Dict[str, List[Message]] # Dead letters grouped by queueUsage:
# Create stub broker for testing
stub_broker = StubBroker()
dramatiq.set_broker(stub_broker)
# Define and test actors
@dramatiq.actor
def test_task(value):
return value * 2
# Send message
test_task.send(21)
# Process messages synchronously in tests
import dramatiq
worker = dramatiq.Worker(stub_broker, worker_timeout=100)
worker.start()
worker.join()
worker.stop()
# Check results
assert len(stub_broker.dead_letters) == 0 # No failuresGlobal functions for managing the broker instance used by actors.
def get_broker() -> Broker:
"""
Get the current global broker instance.
Returns:
Current global broker
Raises:
RuntimeError: If no broker has been set
"""
def set_broker(broker: Broker):
"""
Set the global broker instance.
Parameters:
- broker: Broker instance to set as global
"""Usage:
# Set up broker
redis_broker = RedisBroker()
dramatiq.set_broker(redis_broker)
# Get current broker
current_broker = dramatiq.get_broker()
print(f"Using broker: {type(current_broker).__name__}")
# Add middleware to current broker
from dramatiq.middleware import Prometheus
current_broker.add_middleware(Prometheus())Interface for consuming messages from broker queues.
class Consumer:
"""
Interface for consuming messages from a queue.
Consumers are created by calling broker.consume() and provide
an iterator interface for processing messages.
"""
def __iter__(self):
"""Return iterator for message consumption."""
def __next__(self) -> Message:
"""
Get next message from queue.
Returns:
Message instance to process
Raises:
StopIteration: When no more messages or timeout
"""
def ack(self, message):
"""
Acknowledge successful message processing.
Parameters:
- message: Message to acknowledge
"""
def nack(self, message):
"""
Negative acknowledge failed message processing.
Parameters:
- message: Message to nack
"""
def close(self):
"""Close the consumer and clean up resources."""Proxy object for delayed message operations and broker interaction.
class MessageProxy:
"""
Proxy for message operations that may be delayed or batched.
Used internally by brokers for optimizing message operations.
"""
def __init__(self, broker, message):
"""
Create message proxy.
Parameters:
- broker: Broker instance
- message: Message instance
"""from dramatiq.middleware import AgeLimit, TimeLimit, Retries, Prometheus
# Create broker with custom middleware
custom_middleware = [
Prometheus(),
AgeLimit(max_age=3600000), # 1 hour
TimeLimit(time_limit=300000), # 5 minutes
Retries(max_retries=5)
]
broker = RedisBroker(middleware=custom_middleware)
dramatiq.set_broker(broker)# Redis with connection pooling
import redis
pool = redis.ConnectionPool(
host="redis.example.com",
port=6379,
max_connections=20,
retry_on_timeout=True
)
redis_client = redis.Redis(connection_pool=pool)
redis_broker = RedisBroker(client=redis_client)
# RabbitMQ with HA setup
rabbitmq_broker = RabbitmqBroker(
url="amqp://user:pass@rabbitmq-cluster.example.com:5672/prod",
confirm_delivery=True,
connection_attempts=5,
retry_delay=2.0
)# Different brokers for different environments
import os
if os.getenv("ENVIRONMENT") == "production":
broker = RabbitmqBroker(
url=os.getenv("RABBITMQ_URL"),
confirm_delivery=True
)
elif os.getenv("ENVIRONMENT") == "development":
broker = RedisBroker(
url=os.getenv("REDIS_URL", "redis://localhost:6379/0")
)
else: # testing
broker = StubBroker()
dramatiq.set_broker(broker)redis_broker = RedisBroker()
# Access underlying Redis client
redis_client = redis_broker.client
# Custom Redis operations
redis_client.set("custom_key", "value")
queue_length = redis_client.llen(f"{redis_broker.namespace}:default.msgs")rabbitmq_broker = RabbitmqBroker(max_priority=10)
# Priority queue support (RabbitMQ only)
@dramatiq.actor(priority=5)
def high_priority_task():
pass
@dramatiq.actor(priority=1) # Higher priority (lower number)
def critical_task():
passBrokers raise specific exceptions for different error conditions:
try:
broker.get_actor("nonexistent_actor")
except dramatiq.ActorNotFound:
print("Actor not found")
try:
broker.join("queue_name", timeout=5000)
except dramatiq.QueueJoinTimeout:
print("Queue join timed out")
try:
message = broker.enqueue(invalid_message)
except dramatiq.BrokerError as e:
print(f"Broker error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq