Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
The actor system in Dramatiq provides the foundation for defining and managing background tasks. Actors are functions or classes decorated to run asynchronously, supporting queues, priorities, retries, and custom processing options.
Transform regular functions into actors that can be executed asynchronously by workers.
@actor(
fn=None, *,
actor_class=Actor,
actor_name: str = None,
queue_name: str = "default",
priority: int = 0,
broker: Broker = None,
**options
)
def decorated_function(*args, **kwargs): ...Parameters:
fn: Function to decorate (optional for parametrized decorator)actor_class: Actor class to use (default: Actor)actor_name: Actor name (defaults to function name)queue_name: Queue name (default: "default")priority: Priority level (default: 0, lower values = higher priority)broker: Broker instance (uses global broker if None)**options: Additional actor options for middleware/brokerUsage:
# Simple actor
@dramatiq.actor
def send_email(to, subject, body):
print(f"Sending email to {to}: {subject}")
# Actor with options
@dramatiq.actor(queue_name="high_priority", priority=1, max_retries=5)
def process_payment(user_id, amount):
print(f"Processing ${amount} for user {user_id}")
# Send messages
send_email.send("user@example.com", "Hello", "Welcome!")
process_payment.send(123, 99.99)The core actor implementation that wraps functions for asynchronous execution.
class Actor:
def __init__(self, fn, *, broker, actor_name, queue_name, priority, options):
"""
Create an actor instance.
Parameters:
- fn: Callable function (supports sync/async)
- broker: Broker instance
- actor_name: str - Name of the actor
- queue_name: str - Queue name
- priority: int - Actor priority (lower = higher priority)
- options: Dict[str, Any] - Arbitrary options for broker/middleware
"""
def message(self, *args, **kwargs) -> Message:
"""
Build a message without sending it.
Returns:
Message object that can be sent later or used in composition
"""
def message_with_options(self, *, args=(), kwargs=None, **options) -> Message:
"""
Build message with custom options.
Parameters:
- args: tuple - Arguments for the actor
- kwargs: dict - Keyword arguments for the actor
- **options: Additional message options (delay, etc.)
Returns:
Message object with specified options
"""
def send(self, *args, **kwargs) -> Message:
"""
Send message asynchronously to broker.
Returns:
Message object representing the enqueued task
"""
def send_with_options(self, *, args=(), kwargs=None, delay=None, **options) -> Message:
"""
Send message with custom options.
Parameters:
- args: tuple - Arguments for the actor
- kwargs: dict - Keyword arguments for the actor
- delay: int - Delay in milliseconds before processing
- **options: Additional message options
Returns:
Message object representing the enqueued task
"""
def __call__(self, *args, **kwargs):
"""
Execute the actor synchronously (for testing/development).
Returns:
Result of the wrapped function
"""
# Properties
logger: Logger # Actor's logger instance
fn: Callable # Underlying callable function
broker: Broker # Associated broker
actor_name: str # Actor name
queue_name: str # Queue name
priority: int # Priority level
options: Dict[str, Any] # Actor optionsUsage:
# Create actor manually
def my_function(x, y):
return x + y
my_actor = dramatiq.Actor(
my_function,
broker=dramatiq.get_broker(),
actor_name="adder",
queue_name="math",
priority=5,
options={"max_retries": 3}
)
# Use the actor
result_msg = my_actor.send(10, 20)
direct_result = my_actor(10, 20) # Synchronous executionBase class for creating class-based actors with metaclass support and configuration via Meta class.
class GenericActor:
"""
Base class for class-based actors.
Subclasses must implement the perform() method and can define
configuration through a Meta inner class.
"""
class Meta:
# Configuration options (all optional)
actor_name: str = None
queue_name: str = "default"
priority: int = 0
broker: Broker = None
# Any additional options...
def perform(self, *args, **kwargs):
"""
Abstract method that subclasses must implement.
This method contains the actual task logic.
Parameters:
- *args: Variable positional arguments
- **kwargs: Variable keyword arguments
Returns:
Task result (any type)
"""
raise NotImplementedErrorUsage:
class EmailActor(dramatiq.GenericActor):
class Meta:
queue_name = "emails"
priority = 10
max_retries = 5
def perform(self, to, subject, body):
# Email sending logic
print(f"Sending email to {to}: {subject}")
# ... actual email sending ...
return f"Email sent to {to}"
class PaymentActor(dramatiq.GenericActor):
class Meta:
queue_name = "payments"
priority = 1 # Higher priority
time_limit = 30000 # 30 seconds
def perform(self, user_id, amount, payment_method):
# Payment processing logic
print(f"Processing ${amount} payment for user {user_id}")
# ... payment processing ...
return {"status": "success", "transaction_id": "12345"}
# Send messages to class-based actors
EmailActor.send("user@example.com", "Welcome", "Thanks for signing up!")
PaymentActor.send(123, 99.99, "credit_card")Actors support various options that control their behavior and integration with middleware:
# Common actor options
ACTOR_OPTIONS = {
# Retry configuration
"max_retries": int, # Maximum retry attempts (default: 20)
"min_backoff": int, # Minimum backoff in ms (default: 15000)
"max_backoff": int, # Maximum backoff in ms (default: 604800000)
"retry_when": Callable, # Function to determine if retry should occur
# Time limits
"time_limit": int, # Maximum execution time in ms (default: 600000)
# Age limits
"max_age": int, # Maximum message age in ms before rejection
# Results storage
"store_results": bool, # Whether to store results (default: False)
# Callbacks
"on_success": str, # Actor name to call on success
"on_failure": str, # Actor name to call on failure
# Custom options for specific middleware/brokers
# (any additional key-value pairs)
}Usage:
@dramatiq.actor(
queue_name="critical",
priority=1,
max_retries=3,
time_limit=60000, # 1 minute
store_results=True,
on_success="log_success",
on_failure="handle_failure"
)
def critical_task(data):
# Critical processing logic
return process_critical_data(data)
@dramatiq.actor
def log_success(message_data, result):
print(f"Task {message_data.message_id} succeeded: {result}")
@dramatiq.actor
def handle_failure(message_data, exception_data):
print(f"Task {message_data.message_id} failed: {exception_data}")def should_retry(retries_so_far, exception):
"""Custom retry logic"""
if isinstance(exception, TemporaryError):
return retries_so_far < 5
return False
@dramatiq.actor(retry_when=should_retry)
def smart_retry_task(data):
# Task that uses custom retry logic
if random.random() < 0.3:
raise TemporaryError("Temporary failure")
return "Success"def create_actor(name, queue, priority=0):
"""Factory function for creating actors dynamically"""
@dramatiq.actor(
actor_name=name,
queue_name=queue,
priority=priority
)
def dynamic_task(*args, **kwargs):
print(f"Actor {name} processing: {args}, {kwargs}")
return f"Processed by {name}"
return dynamic_task
# Create actors dynamically
email_actor = create_actor("email_sender", "emails", priority=5)
sms_actor = create_actor("sms_sender", "sms", priority=3)
email_actor.send("user@example.com", "Hello")
sms_actor.send("+1234567890", "Hello")@dramatiq.actor
def step_one(data):
return {"processed": data, "step": 1}
@dramatiq.actor
def step_two(data):
return {"processed": data, "step": 2}
@dramatiq.actor
def final_step(data):
return {"final": data}
# Build messages for composition
msg1 = step_one.message({"input": "data"})
msg2 = step_two.message({"from_step_one": True})
msg3 = final_step.message({"complete": True})
# Create pipeline
pipeline = msg1 | msg2 | msg3
pipeline.run()Queue names must follow specific naming rules:
# Valid queue name pattern
QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"
# Valid examples:
"default" # ✓
"high_priority" # ✓
"user.emails" # ✓
"queue-1" # ✓
"_internal" # ✓
# Invalid examples:
"123-queue" # ✗ (starts with number)
"my queue" # ✗ (contains space)
"queue@domain" # ✗ (contains @)Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq