or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md
tile.json

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dramatiq@1.18.x

To install, run

npx @tessl/cli install tessl/pypi-dramatiq@1.18.0

index.mddocs/

Dramatiq

A fast and reliable distributed task processing library for Python 3 that provides a simple API for defining background tasks (actors) and distributing them across workers. Dramatiq supports multiple message brokers including RabbitMQ and Redis, offers comprehensive middleware for rate limiting, retries, and result storage, and includes advanced features like task composition, gevent integration, and robust error handling.

Package Information

  • Package Name: dramatiq
  • Language: Python
  • Installation: pip install dramatiq
  • Extras: pip install dramatiq[redis], pip install dramatiq[rabbitmq], pip install dramatiq[all]

Core Imports

import dramatiq

Common imports for actors and brokers:

from dramatiq import actor, get_broker, set_broker
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.rabbitmq import RabbitmqBroker

Basic Usage

import dramatiq
from dramatiq.brokers.redis import RedisBroker

# Set up the broker
redis_broker = RedisBroker(host="localhost", port=6379, db=0)
dramatiq.set_broker(redis_broker)

# Define actors (background tasks)
@dramatiq.actor
def send_email(to, subject, body):
    # Send email implementation
    print(f"Sending email to {to}: {subject}")
    # ... email sending logic ...

@dramatiq.actor(queue_name="critical", priority=10)
def process_payment(user_id, amount):
    # Process payment implementation
    print(f"Processing payment: ${amount} for user {user_id}")
    # ... payment processing logic ...

# Send messages (enqueue tasks)
send_email.send("user@example.com", "Welcome!", "Thanks for signing up!")
process_payment.send(123, 50.00)

# Run workers to process tasks
# In terminal: dramatiq my_module

Advanced usage with composition:

from dramatiq import pipeline, group

# Pipeline: sequential execution
pipe = send_email.message("user@example.com", "Step 1", "First step") | \\
       process_payment.message(123, 50.00) | \\
       send_email.message("user@example.com", "Step 2", "Payment processed")
pipe.run()

# Group: parallel execution
tasks = group([
    send_email.message("user1@example.com", "Bulk", "Message 1"),
    send_email.message("user2@example.com", "Bulk", "Message 2"),
    send_email.message("user3@example.com", "Bulk", "Message 3")
])
tasks.run()

Architecture

Dramatiq uses an Actor model where tasks are defined as actors and messages are sent to these actors for processing:

  • Actors: Decorated functions or classes that define background tasks
  • Brokers: Message brokers (Redis, RabbitMQ) that handle message routing and persistence
  • Workers: Processes that consume messages from brokers and execute actors
  • Messages: Serialized task data containing actor name, arguments, and metadata
  • Middleware: Components that intercept and modify message processing (retries, time limits, etc.)
  • Composition: Tools for chaining tasks in pipelines or grouping tasks for parallel execution

Capabilities

Actor System

Define and manage background tasks using decorators or classes, with support for queues, priorities, and custom options.

@actor(queue_name: str = "default", priority: int = 0, **options)
def my_task(arg1, arg2): ...

class Actor:
    def __init__(fn, *, broker, actor_name, queue_name, priority, options): ...
    def send(*args, **kwargs) -> Message: ...
    def send_with_options(*, args=(), kwargs=None, delay=None, **options) -> Message: ...

class GenericActor:
    def perform(*args, **kwargs): ...  # Abstract method

Actors

Message Brokers

Connect to Redis, RabbitMQ, or use in-memory brokers for development and testing.

class RedisBroker(Broker):
    def __init__(*, url=None, namespace="dramatiq", heartbeat_timeout=60000, **params): ...

class RabbitmqBroker(Broker):
    def __init__(*, url=None, confirm_delivery=False, max_priority=None, **kwargs): ...

class StubBroker(Broker):
    def __init__(middleware=None): ...

def get_broker() -> Broker: ...
def set_broker(broker: Broker): ...

Brokers

Task Composition

Chain tasks sequentially with pipelines or execute multiple tasks in parallel with groups.

class pipeline:
    def __init__(children: Iterable[Message], *, broker=None): ...
    def run(*, delay=None) -> pipeline: ...
    def get_result(*, block=False, timeout=None): ...

class group:
    def __init__(children, *, broker=None): ...
    def run(*, delay=None) -> group: ...
    def get_results(*, block=False, timeout=None): ...
    def wait(*, timeout=None): ...

Composition

Middleware System

Extend functionality with built-in middleware for retries, time limits, rate limiting, and custom processing.

class Middleware:
    def before_process_message(broker, message): ...
    def after_process_message(broker, message, *, result=None, exception=None): ...

class Retries(Middleware):
    def __init__(*, max_retries=20, min_backoff=15000, max_backoff=604800000): ...

class TimeLimit(Middleware):
    def __init__(*, time_limit=600000, interval=1000): ...

class AgeLimit(Middleware):
    def __init__(*, max_age=None): ...

Middleware

Rate Limiting

Control task execution rates and implement synchronization barriers using various rate limiting strategies.

class BucketRateLimiter(RateLimiter):
    def __init__(backend, key, *, limit, bucket): ...

class ConcurrentRateLimiter(RateLimiter):
    def __init__(backend, key, *, limit, ttl=900000): ...

class WindowRateLimiter(RateLimiter):
    def __init__(backend, key, *, limit, window): ...

class Barrier:
    def __init__(backend, key, *, ttl=900000): ...
    def create(size): ...
    def wait(timeout=None): ...

Rate Limiting

Result Storage

Store and retrieve task results using Redis, Memcached, or in-memory backends.

class Results(Middleware):
    def __init__(*, backend=None, store_results=False): ...

class ResultBackend:
    def get_result(message, *, block=False, timeout=10000): ...
    def store_result(message, result, ttl): ...

class Message:
    def get_result(*, backend=None, block=False, timeout=None): ...

Results

Worker Management

Configure and run workers to process messages from brokers with customizable threading and timeout settings.

class Worker:
    def __init__(broker, *, queues=None, worker_timeout=1000, worker_threads=8): ...
    def start(): ...
    def stop(): ...
    def join(): ...

Workers

Message Handling

Work with message objects and customize encoding for different serialization needs.

class Message:
    def __init__(queue_name, actor_name, args, kwargs, options, message_id, message_timestamp): ...
    def encode() -> bytes: ...
    def copy(**attributes) -> Message: ...

class JSONEncoder(Encoder):
    def encode(data) -> bytes: ...
    def decode(data: bytes): ...

def get_encoder() -> Encoder: ...
def set_encoder(encoder: Encoder): ...

Messages

Error Handling

Dramatiq provides a comprehensive error hierarchy for handling various failure scenarios:

# Base errors
class DramatiqError(Exception): ...
class BrokerError(DramatiqError): ...
class ActorNotFound(DramatiqError): ...
class QueueNotFound(DramatiqError): ...
class RateLimitExceeded(DramatiqError): ...

# Connection errors
class ConnectionError(BrokerError): ...
class ConnectionFailed(ConnectionError): ...
class ConnectionClosed(ConnectionError): ...

# Processing errors
class Retry(Exception):  # Signals intentional retry
    def __init__(delay=None): ...

class TimeLimitExceeded(Exception): ...

# Result errors
class ResultError(Exception): ...
class ResultMissing(ResultError): ...
class ResultTimeout(ResultError): ...
class ResultFailure(ResultError): ...

Common error handling patterns:

@dramatiq.actor(max_retries=5)
def reliable_task(data):
    try:
        # Task implementation
        process_data(data)
    except TemporaryError as e:
        # Retry with custom delay
        raise dramatiq.Retry(delay=30000)  # 30 seconds
    except PermanentError as e:
        # Log and don't retry
        logger.error(f"Permanent failure: {e}")
        raise

Constants and Configuration

# Default values
DEFAULT_QUEUE_NAME = "default"
DEFAULT_PRIORITY = 0
DEFAULT_WORKER_THREADS = 8
DEFAULT_WORKER_TIMEOUT = 1000  # milliseconds
DEFAULT_TIME_LIMIT = 600000    # 10 minutes
DEFAULT_MAX_RETRIES = 20

# Queue name validation pattern
QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"