CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

results.mddocs/

Results

Result storage in Dramatiq enables tasks to store and retrieve their return values, making it possible to build complex workflows where tasks depend on the results of previous tasks. The system supports multiple storage backends and provides both synchronous and asynchronous result retrieval.

Capabilities

Results Middleware

The core middleware component that handles result storage and retrieval.

class Results(Middleware):
    def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):
        """
        Initialize results middleware.
        
        Parameters:
        - backend: Result storage backend (uses StubBackend if None)
        - store_results: Whether to store results by default for all actors
        """
    
    @property
    def actor_options(self) -> Set[str]:
        return {"store_results"}

Usage:

from dramatiq.middleware import Results
from dramatiq.results.backends import RedisBackend

# Set up result backend
result_backend = RedisBackend()

# Create results middleware
results_middleware = Results(
    backend=result_backend,
    store_results=False  # Only store when explicitly requested
)

# Add to broker
broker.add_middleware(results_middleware)

# Actors with result storage
@dramatiq.actor(store_results=True)
def compute_task(x, y):
    result = x * y + 42
    return {"computation": result, "inputs": [x, y]}

# Send task and get result
message = compute_task.send(10, 20)
result = message.get_result(block=True, timeout=30000)
print(f"Task result: {result}")

Result Backend Base Class

Abstract base class for result storage backends.

class ResultBackend:
    def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):
        """
        Initialize result backend.
        
        Parameters:
        - namespace: Key namespace for storing results
        - encoder: Encoder for serializing results (uses JSON if None)
        """
    
    def get_result(
        self, 
        message: Message, 
        *, 
        block: bool = False, 
        timeout: int = 10000
    ):
        """
        Get result for a message.
        
        Parameters:
        - message: Message to get result for
        - block: Whether to block waiting for result
        - timeout: Timeout in milliseconds when blocking
        
        Returns:
        Task result or Missing sentinel
        
        Raises:
        ResultTimeout: If timeout exceeded while blocking
        ResultMissing: If result not found (when not blocking)
        ResultFailure: If task failed with exception
        """
    
    def store_result(self, message: Message, result, ttl: int):
        """
        Store result for a message.
        
        Parameters:
        - message: Message to store result for
        - result: Result value to store
        - ttl: Time-to-live in milliseconds
        """
    
    def delete_result(self, message: Message):
        """
        Delete stored result for a message.
        
        Parameters:
        - message: Message to delete result for
        """

Result Backend Implementations

Redis Backend

Production backend using Redis for result storage.

class RedisBackend(ResultBackend):
    def __init__(
        self, 
        client: redis.Redis, 
        *, 
        namespace: str = "dramatiq-results", 
        encoder: Encoder = None
    ):
        """
        Create Redis result backend.
        
        Parameters:
        - client: Redis client instance
        - namespace: Key namespace for results (default: "dramatiq-results")
        - encoder: Result encoder (uses JSON if None)
        """

Usage:

import redis
from dramatiq.results.backends import RedisBackend

# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=2, decode_responses=True)

# Create Redis backend
redis_backend = RedisBackend(
    redis_client, 
    namespace="myapp-results"
)

# Use with Results middleware
results = Results(backend=redis_backend, store_results=True)
broker.add_middleware(results)

Memcached Backend

Memcached backend for result storage.

class MemcachedBackend(ResultBackend):
    def __init__(
        self, 
        client, 
        *, 
        namespace: str = "dramatiq-results", 
        encoder: Encoder = None
    ):
        """
        Create Memcached result backend.
        
        Parameters:
        - client: Memcached client instance
        - namespace: Key namespace for results
        - encoder: Result encoder (uses JSON if None)
        """

Usage:

import pylibmc
from dramatiq.results.backends import MemcachedBackend

# Create Memcached client
mc_client = pylibmc.Client(["127.0.0.1:11211"])

# Create Memcached backend
mc_backend = MemcachedBackend(mc_client)

# Use with Results middleware
results = Results(backend=mc_backend)
broker.add_middleware(results)

Stub Backend

In-memory backend for testing and development.

class StubBackend(ResultBackend):
    def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):
        """
        Create in-memory result backend for testing.
        
        Parameters:
        - namespace: Key namespace for results
        - encoder: Result encoder (uses JSON if None)
        """

Usage:

from dramatiq.results.backends import StubBackend

# Create stub backend for testing
stub_backend = StubBackend()

# Use in tests
results = Results(backend=stub_backend, store_results=True)
broker.add_middleware(results)

# Test result storage
@dramatiq.actor(store_results=True)
def test_task(value):
    return value * 2

message = test_task.send(21)
result = message.get_result(block=True)
assert result == 42

Message Result Interface

Messages provide direct access to result operations.

class Message:
    def get_result(
        self, 
        *, 
        backend: ResultBackend = None, 
        block: bool = False, 
        timeout: int = None
    ):
        """
        Get result for this message.
        
        Parameters:
        - backend: Result backend to use (uses broker's backend if None)
        - block: Whether to block waiting for result
        - timeout: Timeout in milliseconds when blocking
        
        Returns:
        Task result
        
        Raises:
        ResultMissing: If result not available (when not blocking)
        ResultTimeout: If timeout exceeded while blocking
        ResultFailure: If task failed with exception
        """

Usage:

@dramatiq.actor(store_results=True)
def long_running_task(duration):
    import time
    time.sleep(duration)
    return {"completed_after": duration, "timestamp": time.time()}

# Send task
message = long_running_task.send(5)

# Non-blocking result check
try:
    result = message.get_result(block=False)
    print(f"Task completed: {result}")
except dramatiq.ResultMissing:
    print("Task still running...")

# Blocking result retrieval
result = message.get_result(block=True, timeout=10000)
print(f"Final result: {result}")

Result Errors

Specialized exceptions for result operations.

class ResultError(Exception):
    """Base exception for result operations."""

class ResultMissing(ResultError):
    """Raised when result is not available."""

class ResultTimeout(ResultError):
    """Raised when timeout exceeded while waiting for result."""

class ResultFailure(ResultError):
    """
    Raised when task failed with an exception.
    
    Contains the original exception information.
    """
    def __init__(self, exception_type, exception_value, traceback):
        self.exception_type = exception_type
        self.exception_value = exception_value
        self.traceback = traceback

# Missing sentinel value
class Missing:
    """Sentinel value indicating missing result."""

Missing = Missing()

Usage:

@dramatiq.actor(store_results=True)
def failing_task(should_fail):
    if should_fail:
        raise ValueError("Task intentionally failed")
    return "Success"

# Handle different result scenarios
message = failing_task.send(True)  # Will fail

try:
    result = message.get_result(block=True, timeout=5000)
    print(f"Success: {result}")
except dramatiq.ResultTimeout:
    print("Task timed out")
except dramatiq.ResultFailure as e:
    print(f"Task failed: {e.exception_type.__name__}: {e.exception_value}")
except dramatiq.ResultMissing:
    print("Result not found")

Advanced Result Patterns

Pipeline Results

@dramatiq.actor(store_results=True)
def step_one(data):
    processed = data.upper()
    return {"step": 1, "data": processed, "length": len(processed)}

@dramatiq.actor(store_results=True)
def step_two(step_one_result):
    data = step_one_result["data"]
    return {"step": 2, "data": data + "!!!", "prev_length": step_one_result["length"]}

@dramatiq.actor(store_results=True)
def step_three(step_two_result):
    return {
        "step": 3, 
        "final_data": step_two_result["data"],
        "total_transformations": 3
    }

# Create pipeline with result dependencies
msg1 = step_one.send("hello world")
result1 = msg1.get_result(block=True)

msg2 = step_two.send(result1)
result2 = msg2.get_result(block=True)

msg3 = step_three.send(result2)
final_result = msg3.get_result(block=True)

print(f"Pipeline result: {final_result}")

Batch Result Collection

@dramatiq.actor(store_results=True)
def process_item(item_id, item_data):
    # Simulate processing
    import time, random
    time.sleep(random.uniform(0.1, 0.5))
    return {
        "item_id": item_id,
        "processed_data": item_data.upper(),
        "processing_time": random.uniform(0.1, 0.5)
    }

# Send batch of tasks
batch_items = [
    {"id": i, "data": f"item_{i}"}
    for i in range(10)
]

messages = []
for item in batch_items:
    msg = process_item.send(item["id"], item["data"])
    messages.append(msg)

# Collect all results
results = []
for msg in messages:
    try:
        result = msg.get_result(block=True, timeout=10000)
        results.append(result)
    except dramatiq.ResultTimeout:
        print(f"Message {msg.message_id} timed out")
    except dramatiq.ResultFailure as e:
        print(f"Message {msg.message_id} failed: {e}")

print(f"Collected {len(results)} results from {len(messages)} tasks")

Result Caching and TTL Management

@dramatiq.actor(store_results=True)
def expensive_computation(input_data):
    """Expensive computation with result caching"""
    import time, hashlib
    
    # Simulate expensive operation
    time.sleep(2)
    
    # Generate deterministic result
    hash_input = str(input_data).encode()
    result_hash = hashlib.md5(hash_input).hexdigest()
    
    return {
        "input": input_data,
        "result": result_hash,
        "computed_at": time.time()
    }

# Custom result storage with longer TTL
def store_with_custom_ttl(message, result, ttl_hours=24):
    """Store result with custom TTL"""
    backend = broker.get_results_backend()
    ttl_ms = ttl_hours * 3600 * 1000  # Convert to milliseconds
    backend.store_result(message, result, ttl_ms)

# Usage with custom TTL
message = expensive_computation.send({"complex": "data"})
result = message.get_result(block=True)

# Store with longer TTL for caching
store_with_custom_ttl(message, result, ttl_hours=48)

Result Aggregation

@dramatiq.actor(store_results=True)
def partial_computation(chunk_id, data_chunk):
    """Process a chunk of data"""
    return {
        "chunk_id": chunk_id,
        "sum": sum(data_chunk),
        "count": len(data_chunk),
        "min": min(data_chunk),
        "max": max(data_chunk)
    }

@dramatiq.actor(store_results=True)
def aggregate_results(message_ids):
    """Aggregate results from multiple partial computations"""
    backend = broker.get_results_backend()
    
    partial_results = []
    for msg_id in message_ids:
        # Create dummy message for result retrieval
        msg = dramatiq.Message(
            queue_name="default",
            actor_name="partial_computation",
            args=(), kwargs={},
            options={},
            message_id=msg_id,
            message_timestamp=0
        )
        
        try:
            result = backend.get_result(msg, block=False)
            if result != dramatiq.results.Missing:
                partial_results.append(result)
        except dramatiq.ResultMissing:
            continue
    
    # Aggregate all partial results
    total_sum = sum(r["sum"] for r in partial_results)
    total_count = sum(r["count"] for r in partial_results)
    overall_min = min(r["min"] for r in partial_results)
    overall_max = max(r["max"] for r in partial_results)
    
    return {
        "total_sum": total_sum,
        "total_count": total_count,
        "average": total_sum / total_count if total_count > 0 else 0,
        "min": overall_min,
        "max": overall_max,
        "chunks_processed": len(partial_results)
    }

# Usage
large_dataset = list(range(1000))
chunk_size = 100
chunks = [large_dataset[i:i+chunk_size] for i in range(0, len(large_dataset), chunk_size)]

# Process chunks
message_ids = []
for i, chunk in enumerate(chunks):
    msg = partial_computation.send(i, chunk)
    message_ids.append(msg.message_id)

# Wait for partial results to complete
import time
time.sleep(5)

# Aggregate results
aggregation_msg = aggregate_results.send(message_ids)
final_result = aggregation_msg.get_result(block=True)
print(f"Aggregated result: {final_result}")

Result-Based Conditional Execution

@dramatiq.actor(store_results=True)
def data_quality_check(data):
    """Check data quality and return score"""
    import random
    quality_score = random.uniform(0, 1)
    
    return {
        "data": data,
        "quality_score": quality_score,
        "passed": quality_score > 0.7
    }

@dramatiq.actor(store_results=True)
def high_quality_processing(quality_result):
    """Process high-quality data"""
    if not quality_result["passed"]:
        return {"status": "skipped", "reason": "Low quality data"}
    
    return {
        "status": "processed",
        "result": f"High quality processing of: {quality_result['data']}",
        "quality_score": quality_result["quality_score"]
    }

@dramatiq.actor(store_results=True)
def basic_processing(quality_result):
    """Basic processing for any data"""
    return {
        "status": "basic_processed",
        "result": f"Basic processing of: {quality_result['data']}",
        "quality_score": quality_result["quality_score"]
    }

# Conditional processing based on results
def process_with_quality_check(data):
    # Step 1: Quality check
    quality_msg = data_quality_check.send(data)
    quality_result = quality_msg.get_result(block=True)
    
    # Step 2: Conditional processing based on quality
    if quality_result["passed"]:
        processing_msg = high_quality_processing.send(quality_result)
    else:
        processing_msg = basic_processing.send(quality_result)
    
    final_result = processing_msg.get_result(block=True)
    return final_result

# Usage
test_data = "sample data for processing"
result = process_with_quality_check(test_data)
print(f"Processing result: {result}")

Result Monitoring and Debugging

def monitor_task_results(messages, timeout=30000):
    """Monitor multiple task results with progress tracking"""
    import time
    
    start_time = time.time()
    completed = {}
    
    while len(completed) < len(messages):
        for i, msg in enumerate(messages):
            if i in completed:
                continue
                
            try:
                result = msg.get_result(block=False)
                completed[i] = result
                print(f"Task {i} completed: {type(result)}")
            except dramatiq.ResultMissing:
                continue
            except dramatiq.ResultFailure as e:
                completed[i] = e
                print(f"Task {i} failed: {e.exception_type.__name__}")
        
        elapsed = (time.time() - start_time) * 1000
        if elapsed > timeout:
            print(f"Timeout: {len(completed)}/{len(messages)} completed")
            break
        
        if len(completed) < len(messages):
            time.sleep(0.1)
    
    return completed

# Usage
messages = [compute_task.send(i, i*2) for i in range(10)]
results = monitor_task_results(messages)
print(f"Final results: {len(results)} tasks completed")

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json