Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
npx @tessl/cli install tessl/pypi-dramatiq@1.18.0A fast and reliable distributed task processing library for Python 3 that provides a simple API for defining background tasks (actors) and distributing them across workers. Dramatiq supports multiple message brokers including RabbitMQ and Redis, offers comprehensive middleware for rate limiting, retries, and result storage, and includes advanced features like task composition, gevent integration, and robust error handling.
pip install dramatiqpip install dramatiq[redis], pip install dramatiq[rabbitmq], pip install dramatiq[all]import dramatiqCommon imports for actors and brokers:
from dramatiq import actor, get_broker, set_broker
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.rabbitmq import RabbitmqBrokerimport dramatiq
from dramatiq.brokers.redis import RedisBroker
# Set up the broker
redis_broker = RedisBroker(host="localhost", port=6379, db=0)
dramatiq.set_broker(redis_broker)
# Define actors (background tasks)
@dramatiq.actor
def send_email(to, subject, body):
# Send email implementation
print(f"Sending email to {to}: {subject}")
# ... email sending logic ...
@dramatiq.actor(queue_name="critical", priority=10)
def process_payment(user_id, amount):
# Process payment implementation
print(f"Processing payment: ${amount} for user {user_id}")
# ... payment processing logic ...
# Send messages (enqueue tasks)
send_email.send("user@example.com", "Welcome!", "Thanks for signing up!")
process_payment.send(123, 50.00)
# Run workers to process tasks
# In terminal: dramatiq my_moduleAdvanced usage with composition:
from dramatiq import pipeline, group
# Pipeline: sequential execution
pipe = send_email.message("user@example.com", "Step 1", "First step") | \\
process_payment.message(123, 50.00) | \\
send_email.message("user@example.com", "Step 2", "Payment processed")
pipe.run()
# Group: parallel execution
tasks = group([
send_email.message("user1@example.com", "Bulk", "Message 1"),
send_email.message("user2@example.com", "Bulk", "Message 2"),
send_email.message("user3@example.com", "Bulk", "Message 3")
])
tasks.run()Dramatiq uses an Actor model where tasks are defined as actors and messages are sent to these actors for processing:
Define and manage background tasks using decorators or classes, with support for queues, priorities, and custom options.
@actor(queue_name: str = "default", priority: int = 0, **options)
def my_task(arg1, arg2): ...
class Actor:
def __init__(fn, *, broker, actor_name, queue_name, priority, options): ...
def send(*args, **kwargs) -> Message: ...
def send_with_options(*, args=(), kwargs=None, delay=None, **options) -> Message: ...
class GenericActor:
def perform(*args, **kwargs): ... # Abstract methodConnect to Redis, RabbitMQ, or use in-memory brokers for development and testing.
class RedisBroker(Broker):
def __init__(*, url=None, namespace="dramatiq", heartbeat_timeout=60000, **params): ...
class RabbitmqBroker(Broker):
def __init__(*, url=None, confirm_delivery=False, max_priority=None, **kwargs): ...
class StubBroker(Broker):
def __init__(middleware=None): ...
def get_broker() -> Broker: ...
def set_broker(broker: Broker): ...Chain tasks sequentially with pipelines or execute multiple tasks in parallel with groups.
class pipeline:
def __init__(children: Iterable[Message], *, broker=None): ...
def run(*, delay=None) -> pipeline: ...
def get_result(*, block=False, timeout=None): ...
class group:
def __init__(children, *, broker=None): ...
def run(*, delay=None) -> group: ...
def get_results(*, block=False, timeout=None): ...
def wait(*, timeout=None): ...Extend functionality with built-in middleware for retries, time limits, rate limiting, and custom processing.
class Middleware:
def before_process_message(broker, message): ...
def after_process_message(broker, message, *, result=None, exception=None): ...
class Retries(Middleware):
def __init__(*, max_retries=20, min_backoff=15000, max_backoff=604800000): ...
class TimeLimit(Middleware):
def __init__(*, time_limit=600000, interval=1000): ...
class AgeLimit(Middleware):
def __init__(*, max_age=None): ...Control task execution rates and implement synchronization barriers using various rate limiting strategies.
class BucketRateLimiter(RateLimiter):
def __init__(backend, key, *, limit, bucket): ...
class ConcurrentRateLimiter(RateLimiter):
def __init__(backend, key, *, limit, ttl=900000): ...
class WindowRateLimiter(RateLimiter):
def __init__(backend, key, *, limit, window): ...
class Barrier:
def __init__(backend, key, *, ttl=900000): ...
def create(size): ...
def wait(timeout=None): ...Store and retrieve task results using Redis, Memcached, or in-memory backends.
class Results(Middleware):
def __init__(*, backend=None, store_results=False): ...
class ResultBackend:
def get_result(message, *, block=False, timeout=10000): ...
def store_result(message, result, ttl): ...
class Message:
def get_result(*, backend=None, block=False, timeout=None): ...Configure and run workers to process messages from brokers with customizable threading and timeout settings.
class Worker:
def __init__(broker, *, queues=None, worker_timeout=1000, worker_threads=8): ...
def start(): ...
def stop(): ...
def join(): ...Work with message objects and customize encoding for different serialization needs.
class Message:
def __init__(queue_name, actor_name, args, kwargs, options, message_id, message_timestamp): ...
def encode() -> bytes: ...
def copy(**attributes) -> Message: ...
class JSONEncoder(Encoder):
def encode(data) -> bytes: ...
def decode(data: bytes): ...
def get_encoder() -> Encoder: ...
def set_encoder(encoder: Encoder): ...Dramatiq provides a comprehensive error hierarchy for handling various failure scenarios:
# Base errors
class DramatiqError(Exception): ...
class BrokerError(DramatiqError): ...
class ActorNotFound(DramatiqError): ...
class QueueNotFound(DramatiqError): ...
class RateLimitExceeded(DramatiqError): ...
# Connection errors
class ConnectionError(BrokerError): ...
class ConnectionFailed(ConnectionError): ...
class ConnectionClosed(ConnectionError): ...
# Processing errors
class Retry(Exception): # Signals intentional retry
def __init__(delay=None): ...
class TimeLimitExceeded(Exception): ...
# Result errors
class ResultError(Exception): ...
class ResultMissing(ResultError): ...
class ResultTimeout(ResultError): ...
class ResultFailure(ResultError): ...Common error handling patterns:
@dramatiq.actor(max_retries=5)
def reliable_task(data):
try:
# Task implementation
process_data(data)
except TemporaryError as e:
# Retry with custom delay
raise dramatiq.Retry(delay=30000) # 30 seconds
except PermanentError as e:
# Log and don't retry
logger.error(f"Permanent failure: {e}")
raise# Default values
DEFAULT_QUEUE_NAME = "default"
DEFAULT_PRIORITY = 0
DEFAULT_WORKER_THREADS = 8
DEFAULT_WORKER_TIMEOUT = 1000 # milliseconds
DEFAULT_TIME_LIMIT = 600000 # 10 minutes
DEFAULT_MAX_RETRIES = 20
# Queue name validation pattern
QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"