Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
Messages in Dramatiq represent tasks to be executed by workers. They contain all the information needed to invoke an actor, including arguments, metadata, and processing options. The message system also handles serialization, routing, and integration with composition features.
The core message data structure that represents a task.
class Message:
def __init__(
self,
queue_name: str,
actor_name: str,
args: tuple,
kwargs: Dict[str, Any],
options: Dict[str, Any],
message_id: str = None,
message_timestamp: int = None
):
"""
Create a message instance.
Parameters:
- queue_name: Name of the queue for this message
- actor_name: Name of the actor to execute
- args: Positional arguments for the actor
- kwargs: Keyword arguments for the actor
- options: Message options and metadata
- message_id: Unique message identifier (auto-generated if None)
- message_timestamp: Unix timestamp in milliseconds (auto-generated if None)
"""
def encode(self) -> bytes:
"""
Encode message to bytes for transmission.
Returns:
Serialized message data
"""
@classmethod
def decode(cls, data: bytes) -> 'Message':
"""
Decode message from bytes.
Parameters:
- data: Serialized message data
Returns:
Message instance
Raises:
DecodeError: If data cannot be decoded
"""
def copy(self, **attributes) -> 'Message':
"""
Create a copy of the message with modified attributes.
Parameters:
- **attributes: Attributes to modify in the copy
Returns:
New message instance with modifications
"""
def get_result(
self,
*,
backend: ResultBackend = None,
block: bool = False,
timeout: int = None
):
"""
Get result for this message (requires Results middleware).
Parameters:
- backend: Result backend to use (uses broker's backend if None)
- block: Whether to block waiting for result
- timeout: Timeout in milliseconds when blocking
Returns:
Task result
Raises:
ResultMissing: If result not available
ResultTimeout: If timeout exceeded while blocking
ResultFailure: If task failed with exception
"""
def asdict(self) -> Dict[str, Any]:
"""
Convert message to dictionary representation.
Returns:
Dictionary containing message data
"""
def __or__(self, other) -> 'pipeline':
"""
Create pipeline with this message using | operator.
Parameters:
- other: Message or pipeline to chain with
Returns:
Pipeline containing both messages
"""
# Properties
queue_name: str # Queue name
actor_name: str # Actor name
args: tuple # Positional arguments
kwargs: Dict[str, Any] # Keyword arguments
options: Dict[str, Any] # Message options
message_id: str # Unique identifier
message_timestamp: int # Creation timestampUsage:
import dramatiq
@dramatiq.actor
def example_task(x, y, multiplier=1):
return (x + y) * multiplier
# Create message manually
message = dramatiq.Message(
queue_name="default",
actor_name="example_task",
args=(10, 20),
kwargs={"multiplier": 2},
options={"max_retries": 5}
)
# Send message
broker = dramatiq.get_broker()
broker.enqueue(message)
# Message created by actor
auto_message = example_task.message(10, 20, multiplier=2)
print(f"Message ID: {auto_message.message_id}")
print(f"Message data: {auto_message.asdict()}")
# Send the message
auto_message = example_task.send(10, 20, multiplier=2)The encoding system handles serialization and deserialization of messages for transmission between brokers and workers.
class Encoder:
"""
Abstract base class for message encoders.
Encoders handle serialization of message data for storage
and transmission through brokers.
"""
def encode(self, data: MessageData) -> bytes:
"""
Encode message data to bytes.
Parameters:
- data: Message data to encode
Returns:
Encoded message as bytes
"""
raise NotImplementedError
def decode(self, data: bytes) -> MessageData:
"""
Decode bytes to message data.
Parameters:
- data: Encoded message bytes
Returns:
Decoded message data
Raises:
DecodeError: If data cannot be decoded
"""
raise NotImplementedErrorDefault encoder using JSON serialization.
class JSONEncoder(Encoder):
def __init__(self):
"""
Create JSON encoder for message serialization.
Uses JSON for cross-language compatibility and human readability.
Handles standard Python types: str, int, float, bool, list, dict, None.
"""
def encode(self, data: MessageData) -> bytes:
"""
Encode message data to JSON bytes.
Returns:
UTF-8 encoded JSON data
"""
def decode(self, data: bytes) -> MessageData:
"""
Decode JSON bytes to message data.
Returns:
Decoded Python objects
"""Usage:
from dramatiq import JSONEncoder
# JSON encoder (default)
json_encoder = JSONEncoder()
@dramatiq.actor
def json_compatible_task(data):
# Data must be JSON-serializable
return {
"processed": data["input"],
"timestamp": time.time(),
"items": [1, 2, 3, 4, 5]
}
# Send JSON-compatible data
json_compatible_task.send({
"input": "test data",
"config": {"option1": True, "option2": "value"}
})Encoder using Python's pickle for complex object serialization.
class PickleEncoder(Encoder):
def __init__(self):
"""
Create Pickle encoder for Python object serialization.
WARNING: Pickle can execute arbitrary code during deserialization.
Only use with trusted data sources.
"""
def encode(self, data: MessageData) -> bytes:
"""
Encode message data using pickle.
Returns:
Pickled data bytes
"""
def decode(self, data: bytes) -> MessageData:
"""
Decode pickled bytes to message data.
Returns:
Unpickled Python objects
"""Usage:
from dramatiq import PickleEncoder, set_encoder
import datetime
# Set pickle encoder globally (use with caution)
pickle_encoder = PickleEncoder()
dramatiq.set_encoder(pickle_encoder)
class CustomObject:
def __init__(self, value):
self.value = value
self.created = datetime.datetime.now()
@dramatiq.actor
def pickle_compatible_task(obj):
# Can handle complex Python objects
return {
"object_value": obj.value,
"created": obj.created,
"processed": datetime.datetime.now()
}
# Send complex objects
custom_obj = CustomObject("test value")
pickle_compatible_task.send(custom_obj)Functions for managing the global message encoder.
def get_encoder() -> Encoder:
"""
Get the current global message encoder.
Returns:
Current encoder instance (defaults to JSONEncoder)
"""
def set_encoder(encoder: Encoder):
"""
Set the global message encoder.
Parameters:
- encoder: Encoder instance to use globally
"""Usage:
# Check current encoder
current_encoder = dramatiq.get_encoder()
print(f"Current encoder: {type(current_encoder).__name__}")
# Switch to custom encoder
custom_encoder = MyCustomEncoder()
dramatiq.set_encoder(custom_encoder)
# All messages will now use the custom encoder
@dramatiq.actor
def task_with_custom_encoding(data):
return process_with_custom_format(data)@dramatiq.actor
def metadata_aware_task(data):
# Access current message in middleware
from dramatiq.middleware import get_current_message
try:
current_msg = get_current_message()
return {
"data": data,
"message_id": current_msg.message_id,
"retry_count": current_msg.options.get("retries", 0),
"queue": current_msg.queue_name,
"created_at": current_msg.message_timestamp
}
except:
# Fallback when no current message available
return {"data": data, "no_metadata": True}
# Send with custom options
message = metadata_aware_task.message_with_options(
args=("test_data",),
delay=5000, # 5 second delay
max_retries=3,
custom_option="custom_value"
)
message_id = broker.enqueue(message).message_id@dramatiq.actor
def original_task(data, multiplier=1):
return data * multiplier
# Create base message
base_message = original_task.message("hello", multiplier=2)
# Create variations
urgent_message = base_message.copy(
queue_name="urgent",
options={**base_message.options, "priority": 0}
)
delayed_message = base_message.copy(
args=("goodbye",),
options={**base_message.options, "delay": 30000}
)
# Send variations
broker.enqueue(urgent_message)
broker.enqueue(delayed_message)def create_conditional_message(data, priority="normal"):
"""Create message based on priority level"""
if priority == "urgent":
return urgent_task.message_with_options(
args=(data,),
queue_name="urgent",
priority=0,
max_retries=1
)
elif priority == "high":
return high_priority_task.message_with_options(
args=(data,),
queue_name="high_priority",
priority=1,
max_retries=3
)
else:
return normal_task.message_with_options(
args=(data,),
queue_name="normal",
priority=5,
max_retries=5
)
# Usage
urgent_msg = create_conditional_message("critical_data", "urgent")
normal_msg = create_conditional_message("regular_data", "normal")
broker.enqueue(urgent_msg)
broker.enqueue(normal_msg)import time
from enum import Enum
class MessageState(Enum):
CREATED = "created"
ENQUEUED = "enqueued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
class TrackedMessage:
def __init__(self, message):
self.message = message
self.state = MessageState.CREATED
self.state_history = [(MessageState.CREATED, time.time())]
self.result = None
self.error = None
def update_state(self, new_state, error=None):
self.state = new_state
self.state_history.append((new_state, time.time()))
if error:
self.error = error
def get_duration(self):
if len(self.state_history) < 2:
return 0
start_time = self.state_history[0][1]
end_time = self.state_history[-1][1]
return end_time - start_time
# Middleware for message tracking
class MessageTrackingMiddleware(dramatiq.Middleware):
def __init__(self):
self.tracked_messages = {}
def before_enqueue(self, broker, message, delay):
tracked = TrackedMessage(message)
tracked.update_state(MessageState.ENQUEUED)
self.tracked_messages[message.message_id] = tracked
def before_process_message(self, broker, message):
if message.message_id in self.tracked_messages:
tracked = self.tracked_messages[message.message_id]
tracked.update_state(MessageState.PROCESSING)
def after_process_message(self, broker, message, *, result=None, exception=None):
if message.message_id in self.tracked_messages:
tracked = self.tracked_messages[message.message_id]
if exception:
tracked.update_state(MessageState.FAILED, error=str(exception))
else:
tracked.update_state(MessageState.COMPLETED)
tracked.result = result
# Usage
tracking_middleware = MessageTrackingMiddleware()
broker.add_middleware(tracking_middleware)
@dramatiq.actor
def tracked_task(data):
time.sleep(1) # Simulate work
return f"Processed: {data}"
# Send tracked message
message = tracked_task.send("test_data")
# Check tracking later
time.sleep(2)
tracked = tracking_middleware.tracked_messages.get(message.message_id)
if tracked:
print(f"Message state: {tracked.state}")
print(f"Duration: {tracked.get_duration():.2f}s")class MessageBatch:
def __init__(self, broker):
self.broker = broker
self.messages = []
def add(self, message):
"""Add message to batch"""
self.messages.append(message)
def send_all(self, delay=None):
"""Send all messages in the batch"""
sent_messages = []
for message in self.messages:
if delay:
message = message.copy(options={**message.options, "delay": delay})
sent_message = self.broker.enqueue(message)
sent_messages.append(sent_message)
self.messages.clear()
return sent_messages
def __len__(self):
return len(self.messages)
# Usage
batch = MessageBatch(broker)
# Add messages to batch
for i in range(10):
msg = process_item.message(f"item_{i}", {"config": "batch_mode"})
batch.add(msg)
# Send entire batch with delay
sent_messages = batch.send_all(delay=1000) # 1 second delay
print(f"Sent {len(sent_messages)} messages")import msgpack
from dramatiq import Encoder, DecodeError
class MessagePackEncoder(Encoder):
"""Custom encoder using MessagePack for efficient serialization"""
def encode(self, data):
try:
return msgpack.packb(data, use_bin_type=True)
except Exception as e:
raise DecodeError(f"Failed to encode with MessagePack: {e}")
def decode(self, data):
try:
return msgpack.unpackb(data, raw=False, strict_map_key=False)
except Exception as e:
raise DecodeError(f"Failed to decode with MessagePack: {e}")
# Use custom encoder
msgpack_encoder = MessagePackEncoder()
dramatiq.set_encoder(msgpack_encoder)
@dramatiq.actor
def msgpack_task(data):
"""Task using MessagePack encoding"""
return {
"input_size": len(str(data)),
"processed": True,
"binary_data": b"binary content"
}
# Send binary-friendly data
msgpack_task.send({
"text": "Hello World",
"binary": b"\\x00\\x01\\x02\\x03",
"numbers": [1, 2, 3, 4, 5]
})import gzip
import json
class CompressedJSONEncoder(dramatiq.Encoder):
"""JSON encoder with gzip compression"""
def __init__(self, compression_level=6):
self.compression_level = compression_level
def encode(self, data):
try:
json_data = json.dumps(data).encode('utf-8')
return gzip.compress(json_data, compresslevel=self.compression_level)
except Exception as e:
raise dramatiq.DecodeError(f"Failed to encode/compress: {e}")
def decode(self, data):
try:
decompressed = gzip.decompress(data)
return json.loads(decompressed.decode('utf-8'))
except Exception as e:
raise dramatiq.DecodeError(f"Failed to decompress/decode: {e}")
# Use compressed encoder for large messages
compressed_encoder = CompressedJSONEncoder(compression_level=9)
dramatiq.set_encoder(compressed_encoder)
@dramatiq.actor
def large_data_task(large_data):
"""Task handling large data with compression"""
return {
"processed_items": len(large_data),
"sample": large_data[:10] if large_data else [],
"compression_effective": True
}
# Send large dataset
large_dataset = list(range(10000))
large_data_task.send(large_dataset)import pprint
class MessageInspector:
def __init__(self, broker):
self.broker = broker
def inspect_message(self, message):
"""Detailed message inspection"""
print(f"=== Message Inspection ===")
print(f"ID: {message.message_id}")
print(f"Actor: {message.actor_name}")
print(f"Queue: {message.queue_name}")
print(f"Timestamp: {message.message_timestamp}")
print(f"Args: {message.args}")
print(f"Kwargs: {message.kwargs}")
print(f"Options:")
pprint.pprint(message.options, indent=2)
# Size information
encoded = message.encode()
print(f"Encoded size: {len(encoded)} bytes")
# Validate encoding/decoding
try:
decoded = dramatiq.Message.decode(encoded)
print("✓ Encoding/decoding successful")
except Exception as e:
print(f"✗ Encoding/decoding failed: {e}")
def compare_messages(self, msg1, msg2):
"""Compare two messages"""
print(f"=== Message Comparison ===")
fields = ['message_id', 'actor_name', 'queue_name', 'args', 'kwargs', 'options']
for field in fields:
val1 = getattr(msg1, field)
val2 = getattr(msg2, field)
if val1 == val2:
print(f"✓ {field}: MATCH")
else:
print(f"✗ {field}: DIFFER")
print(f" Message 1: {val1}")
print(f" Message 2: {val2}")
# Usage
inspector = MessageInspector(broker)
@dramatiq.actor
def debug_task(data, option=None):
return f"Debug: {data} with {option}"
# Create and inspect message
message = debug_task.message("test_data", option="debug_mode")
inspector.inspect_message(message)
# Create modified copy and compare
modified = message.copy(queue_name="debug", options={"debug": True})
inspector.compare_messages(message, modified)def monitor_message_queues(broker, interval=5):
"""Monitor message queues for debugging"""
def queue_stats():
while True:
try:
# Get queue information (broker-specific)
if hasattr(broker, 'client') and hasattr(broker.client, 'info'):
# Redis broker
for queue_name in broker.queues:
key = f"{broker.namespace}:{queue_name}.msgs"
length = broker.client.llen(key)
print(f"Queue '{queue_name}': {length} messages")
print("---")
time.sleep(interval)
except Exception as e:
print(f"Error monitoring queues: {e}")
time.sleep(interval)
monitor_thread = threading.Thread(target=queue_stats, daemon=True)
monitor_thread.start()
return monitor_thread
# Start queue monitoring
monitor_thread = monitor_message_queues(broker)
# Send some messages to observe
for i in range(20):
debug_task.send(f"message_{i}")This comprehensive message documentation covers all aspects of working with messages in Dramatiq, from basic usage to advanced patterns for tracking, batching, custom serialization, and debugging.
Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq