CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

messages.mddocs/

Messages

Messages in Dramatiq represent tasks to be executed by workers. They contain all the information needed to invoke an actor, including arguments, metadata, and processing options. The message system also handles serialization, routing, and integration with composition features.

Capabilities

Message Class

The core message data structure that represents a task.

class Message:
    def __init__(
        self,
        queue_name: str,
        actor_name: str,
        args: tuple,
        kwargs: Dict[str, Any],
        options: Dict[str, Any],
        message_id: str = None,
        message_timestamp: int = None
    ):
        """
        Create a message instance.
        
        Parameters:
        - queue_name: Name of the queue for this message
        - actor_name: Name of the actor to execute
        - args: Positional arguments for the actor
        - kwargs: Keyword arguments for the actor
        - options: Message options and metadata
        - message_id: Unique message identifier (auto-generated if None)
        - message_timestamp: Unix timestamp in milliseconds (auto-generated if None)
        """
    
    def encode(self) -> bytes:
        """
        Encode message to bytes for transmission.
        
        Returns:
        Serialized message data
        """
    
    @classmethod
    def decode(cls, data: bytes) -> 'Message':
        """
        Decode message from bytes.
        
        Parameters:
        - data: Serialized message data
        
        Returns:
        Message instance
        
        Raises:
        DecodeError: If data cannot be decoded
        """
    
    def copy(self, **attributes) -> 'Message':
        """
        Create a copy of the message with modified attributes.
        
        Parameters:
        - **attributes: Attributes to modify in the copy
        
        Returns:
        New message instance with modifications
        """
    
    def get_result(
        self, 
        *, 
        backend: ResultBackend = None, 
        block: bool = False, 
        timeout: int = None
    ):
        """
        Get result for this message (requires Results middleware).
        
        Parameters:
        - backend: Result backend to use (uses broker's backend if None)
        - block: Whether to block waiting for result
        - timeout: Timeout in milliseconds when blocking
        
        Returns:
        Task result
        
        Raises:
        ResultMissing: If result not available
        ResultTimeout: If timeout exceeded while blocking
        ResultFailure: If task failed with exception
        """
    
    def asdict(self) -> Dict[str, Any]:
        """
        Convert message to dictionary representation.
        
        Returns:
        Dictionary containing message data
        """
    
    def __or__(self, other) -> 'pipeline':
        """
        Create pipeline with this message using | operator.
        
        Parameters:
        - other: Message or pipeline to chain with
        
        Returns:
        Pipeline containing both messages
        """
    
    # Properties
    queue_name: str              # Queue name
    actor_name: str             # Actor name  
    args: tuple                 # Positional arguments
    kwargs: Dict[str, Any]      # Keyword arguments
    options: Dict[str, Any]     # Message options
    message_id: str             # Unique identifier
    message_timestamp: int      # Creation timestamp

Usage:

import dramatiq

@dramatiq.actor
def example_task(x, y, multiplier=1):
    return (x + y) * multiplier

# Create message manually
message = dramatiq.Message(
    queue_name="default",
    actor_name="example_task",
    args=(10, 20),
    kwargs={"multiplier": 2},
    options={"max_retries": 5}
)

# Send message
broker = dramatiq.get_broker()
broker.enqueue(message)

# Message created by actor
auto_message = example_task.message(10, 20, multiplier=2)
print(f"Message ID: {auto_message.message_id}")
print(f"Message data: {auto_message.asdict()}")

# Send the message
auto_message = example_task.send(10, 20, multiplier=2)

Message Encoding

The encoding system handles serialization and deserialization of messages for transmission between brokers and workers.

Encoder Base Class

class Encoder:
    """
    Abstract base class for message encoders.
    
    Encoders handle serialization of message data for storage
    and transmission through brokers.
    """
    
    def encode(self, data: MessageData) -> bytes:
        """
        Encode message data to bytes.
        
        Parameters:
        - data: Message data to encode
        
        Returns:
        Encoded message as bytes
        """
        raise NotImplementedError
    
    def decode(self, data: bytes) -> MessageData:
        """
        Decode bytes to message data.
        
        Parameters:
        - data: Encoded message bytes
        
        Returns:
        Decoded message data
        
        Raises:
        DecodeError: If data cannot be decoded
        """
        raise NotImplementedError

JSON Encoder

Default encoder using JSON serialization.

class JSONEncoder(Encoder):
    def __init__(self):
        """
        Create JSON encoder for message serialization.
        
        Uses JSON for cross-language compatibility and human readability.
        Handles standard Python types: str, int, float, bool, list, dict, None.
        """
    
    def encode(self, data: MessageData) -> bytes:
        """
        Encode message data to JSON bytes.
        
        Returns:
        UTF-8 encoded JSON data
        """
    
    def decode(self, data: bytes) -> MessageData:
        """
        Decode JSON bytes to message data.
        
        Returns:
        Decoded Python objects
        """

Usage:

from dramatiq import JSONEncoder

# JSON encoder (default)
json_encoder = JSONEncoder()

@dramatiq.actor
def json_compatible_task(data):
    # Data must be JSON-serializable
    return {
        "processed": data["input"],
        "timestamp": time.time(),
        "items": [1, 2, 3, 4, 5]
    }

# Send JSON-compatible data
json_compatible_task.send({
    "input": "test data",
    "config": {"option1": True, "option2": "value"}
})

Pickle Encoder

Encoder using Python's pickle for complex object serialization.

class PickleEncoder(Encoder):
    def __init__(self):
        """
        Create Pickle encoder for Python object serialization.
        
        WARNING: Pickle can execute arbitrary code during deserialization.
        Only use with trusted data sources.
        """
    
    def encode(self, data: MessageData) -> bytes:
        """
        Encode message data using pickle.
        
        Returns:
        Pickled data bytes
        """
    
    def decode(self, data: bytes) -> MessageData:
        """
        Decode pickled bytes to message data.
        
        Returns:
        Unpickled Python objects
        """

Usage:

from dramatiq import PickleEncoder, set_encoder
import datetime

# Set pickle encoder globally (use with caution)
pickle_encoder = PickleEncoder()
dramatiq.set_encoder(pickle_encoder)

class CustomObject:
    def __init__(self, value):
        self.value = value
        self.created = datetime.datetime.now()

@dramatiq.actor
def pickle_compatible_task(obj):
    # Can handle complex Python objects
    return {
        "object_value": obj.value,
        "created": obj.created,
        "processed": datetime.datetime.now()
    }

# Send complex objects
custom_obj = CustomObject("test value")
pickle_compatible_task.send(custom_obj)

Global Encoder Management

Functions for managing the global message encoder.

def get_encoder() -> Encoder:
    """
    Get the current global message encoder.
    
    Returns:
    Current encoder instance (defaults to JSONEncoder)
    """

def set_encoder(encoder: Encoder):
    """
    Set the global message encoder.
    
    Parameters:
    - encoder: Encoder instance to use globally
    """

Usage:

# Check current encoder
current_encoder = dramatiq.get_encoder()
print(f"Current encoder: {type(current_encoder).__name__}")

# Switch to custom encoder
custom_encoder = MyCustomEncoder()
dramatiq.set_encoder(custom_encoder)

# All messages will now use the custom encoder
@dramatiq.actor
def task_with_custom_encoding(data):
    return process_with_custom_format(data)

Advanced Message Patterns

Message Metadata and Options

@dramatiq.actor
def metadata_aware_task(data):
    # Access current message in middleware
    from dramatiq.middleware import get_current_message
    
    try:
        current_msg = get_current_message()
        
        return {
            "data": data,
            "message_id": current_msg.message_id,
            "retry_count": current_msg.options.get("retries", 0),
            "queue": current_msg.queue_name,
            "created_at": current_msg.message_timestamp
        }
    except:
        # Fallback when no current message available
        return {"data": data, "no_metadata": True}

# Send with custom options
message = metadata_aware_task.message_with_options(
    args=("test_data",),
    delay=5000,  # 5 second delay
    max_retries=3,
    custom_option="custom_value"
)
message_id = broker.enqueue(message).message_id

Message Copying and Modification

@dramatiq.actor
def original_task(data, multiplier=1):
    return data * multiplier

# Create base message
base_message = original_task.message("hello", multiplier=2)

# Create variations
urgent_message = base_message.copy(
    queue_name="urgent",
    options={**base_message.options, "priority": 0}
)

delayed_message = base_message.copy(
    args=("goodbye",),
    options={**base_message.options, "delay": 30000}
)

# Send variations
broker.enqueue(urgent_message)
broker.enqueue(delayed_message)

Conditional Message Creation

def create_conditional_message(data, priority="normal"):
    """Create message based on priority level"""
    
    if priority == "urgent":
        return urgent_task.message_with_options(
            args=(data,),
            queue_name="urgent",
            priority=0,
            max_retries=1
        )
    elif priority == "high":
        return high_priority_task.message_with_options(
            args=(data,),
            queue_name="high_priority", 
            priority=1,
            max_retries=3
        )
    else:
        return normal_task.message_with_options(
            args=(data,),
            queue_name="normal",
            priority=5,
            max_retries=5
        )

# Usage
urgent_msg = create_conditional_message("critical_data", "urgent")
normal_msg = create_conditional_message("regular_data", "normal")

broker.enqueue(urgent_msg)
broker.enqueue(normal_msg)

Message Lifecycle and Tracking

Message State Tracking

import time
from enum import Enum

class MessageState(Enum):
    CREATED = "created"
    ENQUEUED = "enqueued"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class TrackedMessage:
    def __init__(self, message):
        self.message = message
        self.state = MessageState.CREATED
        self.state_history = [(MessageState.CREATED, time.time())]
        self.result = None
        self.error = None
    
    def update_state(self, new_state, error=None):
        self.state = new_state
        self.state_history.append((new_state, time.time()))
        if error:
            self.error = error
    
    def get_duration(self):
        if len(self.state_history) < 2:
            return 0
        start_time = self.state_history[0][1]
        end_time = self.state_history[-1][1]
        return end_time - start_time

# Middleware for message tracking
class MessageTrackingMiddleware(dramatiq.Middleware):
    def __init__(self):
        self.tracked_messages = {}
    
    def before_enqueue(self, broker, message, delay):
        tracked = TrackedMessage(message)
        tracked.update_state(MessageState.ENQUEUED)
        self.tracked_messages[message.message_id] = tracked
    
    def before_process_message(self, broker, message):
        if message.message_id in self.tracked_messages:
            tracked = self.tracked_messages[message.message_id]
            tracked.update_state(MessageState.PROCESSING)
    
    def after_process_message(self, broker, message, *, result=None, exception=None):
        if message.message_id in self.tracked_messages:
            tracked = self.tracked_messages[message.message_id]
            if exception:
                tracked.update_state(MessageState.FAILED, error=str(exception))
            else:
                tracked.update_state(MessageState.COMPLETED)
                tracked.result = result

# Usage
tracking_middleware = MessageTrackingMiddleware()
broker.add_middleware(tracking_middleware)

@dramatiq.actor
def tracked_task(data):
    time.sleep(1)  # Simulate work
    return f"Processed: {data}"

# Send tracked message
message = tracked_task.send("test_data")

# Check tracking later
time.sleep(2)
tracked = tracking_middleware.tracked_messages.get(message.message_id)
if tracked:
    print(f"Message state: {tracked.state}")
    print(f"Duration: {tracked.get_duration():.2f}s")

Message Batching

class MessageBatch:
    def __init__(self, broker):
        self.broker = broker
        self.messages = []
    
    def add(self, message):
        """Add message to batch"""
        self.messages.append(message)
    
    def send_all(self, delay=None):
        """Send all messages in the batch"""
        sent_messages = []
        
        for message in self.messages:
            if delay:
                message = message.copy(options={**message.options, "delay": delay})
            
            sent_message = self.broker.enqueue(message)
            sent_messages.append(sent_message)
        
        self.messages.clear()
        return sent_messages
    
    def __len__(self):
        return len(self.messages)

# Usage
batch = MessageBatch(broker)

# Add messages to batch
for i in range(10):
    msg = process_item.message(f"item_{i}", {"config": "batch_mode"})
    batch.add(msg)

# Send entire batch with delay
sent_messages = batch.send_all(delay=1000)  # 1 second delay
print(f"Sent {len(sent_messages)} messages")

Custom Message Serialization

Custom Encoder Implementation

import msgpack
from dramatiq import Encoder, DecodeError

class MessagePackEncoder(Encoder):
    """Custom encoder using MessagePack for efficient serialization"""
    
    def encode(self, data):
        try:
            return msgpack.packb(data, use_bin_type=True)
        except Exception as e:
            raise DecodeError(f"Failed to encode with MessagePack: {e}")
    
    def decode(self, data):
        try:
            return msgpack.unpackb(data, raw=False, strict_map_key=False)
        except Exception as e:
            raise DecodeError(f"Failed to decode with MessagePack: {e}")

# Use custom encoder
msgpack_encoder = MessagePackEncoder()
dramatiq.set_encoder(msgpack_encoder)

@dramatiq.actor
def msgpack_task(data):
    """Task using MessagePack encoding"""
    return {
        "input_size": len(str(data)),
        "processed": True,
        "binary_data": b"binary content"
    }

# Send binary-friendly data
msgpack_task.send({
    "text": "Hello World",
    "binary": b"\\x00\\x01\\x02\\x03",
    "numbers": [1, 2, 3, 4, 5]
})

Compression Support

import gzip
import json

class CompressedJSONEncoder(dramatiq.Encoder):
    """JSON encoder with gzip compression"""
    
    def __init__(self, compression_level=6):
        self.compression_level = compression_level
    
    def encode(self, data):
        try:
            json_data = json.dumps(data).encode('utf-8')
            return gzip.compress(json_data, compresslevel=self.compression_level)
        except Exception as e:
            raise dramatiq.DecodeError(f"Failed to encode/compress: {e}")
    
    def decode(self, data):
        try:
            decompressed = gzip.decompress(data)
            return json.loads(decompressed.decode('utf-8'))
        except Exception as e:
            raise dramatiq.DecodeError(f"Failed to decompress/decode: {e}")

# Use compressed encoder for large messages
compressed_encoder = CompressedJSONEncoder(compression_level=9)
dramatiq.set_encoder(compressed_encoder)

@dramatiq.actor
def large_data_task(large_data):
    """Task handling large data with compression"""
    return {
        "processed_items": len(large_data),
        "sample": large_data[:10] if large_data else [],
        "compression_effective": True
    }

# Send large dataset
large_dataset = list(range(10000))
large_data_task.send(large_dataset)

Message Debugging and Inspection

Message Inspector

import pprint

class MessageInspector:
    def __init__(self, broker):
        self.broker = broker
    
    def inspect_message(self, message):
        """Detailed message inspection"""
        print(f"=== Message Inspection ===")
        print(f"ID: {message.message_id}")
        print(f"Actor: {message.actor_name}")
        print(f"Queue: {message.queue_name}")
        print(f"Timestamp: {message.message_timestamp}")
        print(f"Args: {message.args}")
        print(f"Kwargs: {message.kwargs}")
        print(f"Options:")
        pprint.pprint(message.options, indent=2)
        
        # Size information
        encoded = message.encode()
        print(f"Encoded size: {len(encoded)} bytes")
        
        # Validate encoding/decoding
        try:
            decoded = dramatiq.Message.decode(encoded)
            print("✓ Encoding/decoding successful")
        except Exception as e:
            print(f"✗ Encoding/decoding failed: {e}")
    
    def compare_messages(self, msg1, msg2):
        """Compare two messages"""
        print(f"=== Message Comparison ===")
        
        fields = ['message_id', 'actor_name', 'queue_name', 'args', 'kwargs', 'options']
        for field in fields:
            val1 = getattr(msg1, field)
            val2 = getattr(msg2, field)
            
            if val1 == val2:
                print(f"✓ {field}: MATCH")
            else:
                print(f"✗ {field}: DIFFER")
                print(f"  Message 1: {val1}")
                print(f"  Message 2: {val2}")

# Usage
inspector = MessageInspector(broker)

@dramatiq.actor
def debug_task(data, option=None):
    return f"Debug: {data} with {option}"

# Create and inspect message
message = debug_task.message("test_data", option="debug_mode")
inspector.inspect_message(message)

# Create modified copy and compare
modified = message.copy(queue_name="debug", options={"debug": True})
inspector.compare_messages(message, modified)

Message Queue Monitoring

def monitor_message_queues(broker, interval=5):
    """Monitor message queues for debugging"""
    
    def queue_stats():
        while True:
            try:
                # Get queue information (broker-specific)
                if hasattr(broker, 'client') and hasattr(broker.client, 'info'):
                    # Redis broker
                    for queue_name in broker.queues:
                        key = f"{broker.namespace}:{queue_name}.msgs"
                        length = broker.client.llen(key)
                        print(f"Queue '{queue_name}': {length} messages")
                
                print("---")
                time.sleep(interval)
                
            except Exception as e:
                print(f"Error monitoring queues: {e}")
                time.sleep(interval)
    
    monitor_thread = threading.Thread(target=queue_stats, daemon=True)
    monitor_thread.start()
    return monitor_thread

# Start queue monitoring
monitor_thread = monitor_message_queues(broker)

# Send some messages to observe
for i in range(20):
    debug_task.send(f"message_{i}")

This comprehensive message documentation covers all aspects of working with messages in Dramatiq, from basic usage to advanced patterns for tracking, batching, custom serialization, and debugging.

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json