CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

brokers.mddocs/

Brokers

Message brokers handle the routing, persistence, and delivery of task messages between actors and workers. Dramatiq supports Redis, RabbitMQ, and in-memory brokers for different deployment scenarios.

Capabilities

Broker Base Class

The abstract base class that defines the broker interface for all implementations.

class Broker:
    def __init__(self, middleware=None):
        """
        Initialize broker with optional middleware.
        
        Parameters:
        - middleware: List of middleware instances
        """
    
    def add_middleware(self, middleware, *, before=None, after=None):
        """
        Add middleware to the broker.
        
        Parameters:
        - middleware: Middleware instance to add
        - before: Add before this middleware class
        - after: Add after this middleware class
        """
    
    def declare_actor(self, actor):
        """
        Register an actor with the broker.
        
        Parameters:
        - actor: Actor instance to register
        """
    
    def declare_queue(self, queue_name):
        """
        Declare a queue on the broker.
        
        Parameters:
        - queue_name: str - Name of the queue to declare
        """
    
    def enqueue(self, message, *, delay=None) -> Message:
        """
        Enqueue a message for processing.
        
        Parameters:
        - message: Message instance to enqueue
        - delay: int - Delay in milliseconds before processing
        
        Returns:
        Message instance with updated metadata
        """
    
    def consume(self, queue_name, prefetch=1, timeout=30000) -> Consumer:
        """
        Create a consumer for a queue.
        
        Parameters:
        - queue_name: str - Queue to consume from
        - prefetch: int - Number of messages to prefetch
        - timeout: int - Consumer timeout in milliseconds
        
        Returns:
        Consumer instance for processing messages
        """
    
    def get_actor(self, actor_name) -> Actor:
        """
        Get registered actor by name.
        
        Parameters:
        - actor_name: str - Name of the actor
        
        Returns:
        Actor instance
        
        Raises:
        ActorNotFound: If actor is not registered
        """
    
    def get_results_backend(self) -> ResultBackend:
        """
        Get the results backend associated with this broker.
        
        Returns:
        ResultBackend instance or None
        """
    
    def flush(self, queue_name):
        """
        Remove all messages from a queue.
        
        Parameters:
        - queue_name: str - Queue to flush
        """
    
    def flush_all(self):
        """Remove all messages from all queues."""
    
    def join(self, queue_name, *, timeout=None):
        """
        Wait for all messages in a queue to be processed.
        
        Parameters:
        - queue_name: str - Queue to wait for
        - timeout: int - Timeout in milliseconds
        
        Raises:
        QueueJoinTimeout: If timeout is exceeded
        """
    
    def close(self):
        """Close the broker and clean up connections."""
    
    # Properties
    actors: Dict[str, Actor]           # Registered actors
    queues: Dict[str, Queue]          # Declared queues
    middleware: List[Middleware]      # Middleware stack
    actor_options: Set[str]           # Set of valid actor options

Redis Broker

Production-ready broker using Redis as the message transport and storage backend.

class RedisBroker(Broker):
    def __init__(
        self, *,
        url: str = None,
        middleware: List[Middleware] = None,
        namespace: str = "dramatiq",
        maintenance_chance: int = 1000,
        heartbeat_timeout: int = 60000,
        dead_message_ttl: int = 604800000,
        requeue_deadline: int = None,
        requeue_interval: int = None,
        client: redis.Redis = None,
        **parameters
    ):
        """
        Create Redis broker instance.
        
        Parameters:
        - url: Redis connection URL (redis://host:port/db)
        - middleware: List of middleware instances
        - namespace: Key namespace prefix (default: "dramatiq")
        - maintenance_chance: Probability of running maintenance (1/chance)
        - heartbeat_timeout: Worker heartbeat timeout in ms
        - dead_message_ttl: Dead message TTL in ms (7 days)
        - requeue_deadline: Message requeue deadline in ms
        - requeue_interval: Message requeue check interval in ms
        - client: Existing Redis client instance
        - **parameters: Additional Redis connection parameters
        """

Usage:

# Basic Redis broker
redis_broker = RedisBroker(host="localhost", port=6379, db=0)

# Redis broker with URL
redis_broker = RedisBroker(url="redis://localhost:6379/0")

# Redis broker with custom settings
redis_broker = RedisBroker(
    host="redis.example.com",
    port=6379,
    password="secret",
    namespace="myapp",
    heartbeat_timeout=120000,  # 2 minutes
    dead_message_ttl=86400000  # 1 day
)

# Redis broker with existing client
import redis
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
redis_broker = RedisBroker(client=redis_client)

dramatiq.set_broker(redis_broker)

RabbitMQ Broker

Enterprise-grade broker using RabbitMQ for high-throughput message processing with advanced routing features.

class RabbitmqBroker(Broker):
    def __init__(
        self, *,
        confirm_delivery: bool = False,
        url: str = None,
        middleware: List[Middleware] = None,
        max_priority: int = None,
        parameters: pika.ConnectionParameters = None,
        **kwargs
    ):
        """
        Create RabbitMQ broker instance.
        
        Parameters:
        - confirm_delivery: Enable delivery confirmations
        - url: AMQP connection URL (amqp://user:pass@host:port/vhost)
        - middleware: List of middleware instances
        - max_priority: Maximum message priority (enables priority queues)
        - parameters: Pika ConnectionParameters instance
        - **kwargs: Additional connection parameters
        """

Usage:

# Basic RabbitMQ broker
rabbitmq_broker = RabbitmqBroker(host="localhost", port=5672)

# RabbitMQ broker with URL
rabbitmq_broker = RabbitmqBroker(
    url="amqp://user:password@rabbitmq.example.com:5672/myapp"
)

# RabbitMQ broker with priorities and confirmations
rabbitmq_broker = RabbitmqBroker(
    host="localhost", 
    port=5672,
    confirm_delivery=True,
    max_priority=255,
    heartbeat=600,
    connection_attempts=3
)

# RabbitMQ broker with custom parameters
import pika
params = pika.ConnectionParameters(
    host="rabbitmq.example.com",
    port=5672,
    credentials=pika.PlainCredentials("user", "password"),
    heartbeat=600
)
rabbitmq_broker = RabbitmqBroker(parameters=params)

dramatiq.set_broker(rabbitmq_broker)

Stub Broker

In-memory broker for testing and development environments.

class StubBroker(Broker):
    def __init__(self, middleware=None):
        """
        Create in-memory broker for testing.
        
        Parameters:
        - middleware: List of middleware instances
        """
    
    # Testing-specific properties
    dead_letters: List[Message]                    # All dead-lettered messages
    dead_letters_by_queue: Dict[str, List[Message]] # Dead letters grouped by queue

Usage:

# Create stub broker for testing
stub_broker = StubBroker()
dramatiq.set_broker(stub_broker)

# Define and test actors
@dramatiq.actor
def test_task(value):
    return value * 2

# Send message
test_task.send(21)

# Process messages synchronously in tests
import dramatiq
worker = dramatiq.Worker(stub_broker, worker_timeout=100)
worker.start()
worker.join()
worker.stop()

# Check results
assert len(stub_broker.dead_letters) == 0  # No failures

Broker Management

Global functions for managing the broker instance used by actors.

def get_broker() -> Broker:
    """
    Get the current global broker instance.
    
    Returns:
    Current global broker
    
    Raises:
    RuntimeError: If no broker has been set
    """

def set_broker(broker: Broker):
    """
    Set the global broker instance.
    
    Parameters:
    - broker: Broker instance to set as global
    """

Usage:

# Set up broker
redis_broker = RedisBroker()
dramatiq.set_broker(redis_broker)

# Get current broker
current_broker = dramatiq.get_broker()
print(f"Using broker: {type(current_broker).__name__}")

# Add middleware to current broker
from dramatiq.middleware import Prometheus
current_broker.add_middleware(Prometheus())

Consumer Interface

Interface for consuming messages from broker queues.

class Consumer:
    """
    Interface for consuming messages from a queue.
    
    Consumers are created by calling broker.consume() and provide
    an iterator interface for processing messages.
    """
    
    def __iter__(self):
        """Return iterator for message consumption."""
    
    def __next__(self) -> Message:
        """
        Get next message from queue.
        
        Returns:
        Message instance to process
        
        Raises:
        StopIteration: When no more messages or timeout
        """
    
    def ack(self, message):
        """
        Acknowledge successful message processing.
        
        Parameters:
        - message: Message to acknowledge
        """
    
    def nack(self, message):
        """
        Negative acknowledge failed message processing.
        
        Parameters:
        - message: Message to nack
        """
    
    def close(self):
        """Close the consumer and clean up resources."""

Message Proxy

Proxy object for delayed message operations and broker interaction.

class MessageProxy:
    """
    Proxy for message operations that may be delayed or batched.
    
    Used internally by brokers for optimizing message operations.
    """
    
    def __init__(self, broker, message):
        """
        Create message proxy.
        
        Parameters:
        - broker: Broker instance
        - message: Message instance
        """

Advanced Broker Configuration

Custom Middleware Stack

from dramatiq.middleware import AgeLimit, TimeLimit, Retries, Prometheus

# Create broker with custom middleware
custom_middleware = [
    Prometheus(),
    AgeLimit(max_age=3600000),  # 1 hour
    TimeLimit(time_limit=300000),  # 5 minutes
    Retries(max_retries=5)
]

broker = RedisBroker(middleware=custom_middleware)
dramatiq.set_broker(broker)

Connection Pooling and High Availability

# Redis with connection pooling
import redis
pool = redis.ConnectionPool(
    host="redis.example.com",
    port=6379,
    max_connections=20,
    retry_on_timeout=True
)
redis_client = redis.Redis(connection_pool=pool)
redis_broker = RedisBroker(client=redis_client)

# RabbitMQ with HA setup
rabbitmq_broker = RabbitmqBroker(
    url="amqp://user:pass@rabbitmq-cluster.example.com:5672/prod",
    confirm_delivery=True,
    connection_attempts=5,
    retry_delay=2.0
)

Multi-Broker Setup

# Different brokers for different environments
import os

if os.getenv("ENVIRONMENT") == "production":
    broker = RabbitmqBroker(
        url=os.getenv("RABBITMQ_URL"),
        confirm_delivery=True
    )
elif os.getenv("ENVIRONMENT") == "development":
    broker = RedisBroker(
        url=os.getenv("REDIS_URL", "redis://localhost:6379/0")
    )
else:  # testing
    broker = StubBroker()

dramatiq.set_broker(broker)

Broker-Specific Features

Redis-Specific Operations

redis_broker = RedisBroker()

# Access underlying Redis client
redis_client = redis_broker.client

# Custom Redis operations
redis_client.set("custom_key", "value")
queue_length = redis_client.llen(f"{redis_broker.namespace}:default.msgs")

RabbitMQ-Specific Operations

rabbitmq_broker = RabbitmqBroker(max_priority=10)

# Priority queue support (RabbitMQ only)
@dramatiq.actor(priority=5)  
def high_priority_task():
    pass

@dramatiq.actor(priority=1)  # Higher priority (lower number)
def critical_task():
    pass

Error Handling

Brokers raise specific exceptions for different error conditions:

try:
    broker.get_actor("nonexistent_actor")
except dramatiq.ActorNotFound:
    print("Actor not found")

try:
    broker.join("queue_name", timeout=5000)
except dramatiq.QueueJoinTimeout:
    print("Queue join timed out")

try:
    message = broker.enqueue(invalid_message)
except dramatiq.BrokerError as e:
    print(f"Broker error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json