Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
Result storage in Dramatiq enables tasks to store and retrieve their return values, making it possible to build complex workflows where tasks depend on the results of previous tasks. The system supports multiple storage backends and provides both synchronous and asynchronous result retrieval.
The core middleware component that handles result storage and retrieval.
class Results(Middleware):
def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):
"""
Initialize results middleware.
Parameters:
- backend: Result storage backend (uses StubBackend if None)
- store_results: Whether to store results by default for all actors
"""
@property
def actor_options(self) -> Set[str]:
return {"store_results"}Usage:
from dramatiq.middleware import Results
from dramatiq.results.backends import RedisBackend
# Set up result backend
result_backend = RedisBackend()
# Create results middleware
results_middleware = Results(
backend=result_backend,
store_results=False # Only store when explicitly requested
)
# Add to broker
broker.add_middleware(results_middleware)
# Actors with result storage
@dramatiq.actor(store_results=True)
def compute_task(x, y):
result = x * y + 42
return {"computation": result, "inputs": [x, y]}
# Send task and get result
message = compute_task.send(10, 20)
result = message.get_result(block=True, timeout=30000)
print(f"Task result: {result}")Abstract base class for result storage backends.
class ResultBackend:
def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):
"""
Initialize result backend.
Parameters:
- namespace: Key namespace for storing results
- encoder: Encoder for serializing results (uses JSON if None)
"""
def get_result(
self,
message: Message,
*,
block: bool = False,
timeout: int = 10000
):
"""
Get result for a message.
Parameters:
- message: Message to get result for
- block: Whether to block waiting for result
- timeout: Timeout in milliseconds when blocking
Returns:
Task result or Missing sentinel
Raises:
ResultTimeout: If timeout exceeded while blocking
ResultMissing: If result not found (when not blocking)
ResultFailure: If task failed with exception
"""
def store_result(self, message: Message, result, ttl: int):
"""
Store result for a message.
Parameters:
- message: Message to store result for
- result: Result value to store
- ttl: Time-to-live in milliseconds
"""
def delete_result(self, message: Message):
"""
Delete stored result for a message.
Parameters:
- message: Message to delete result for
"""Production backend using Redis for result storage.
class RedisBackend(ResultBackend):
def __init__(
self,
client: redis.Redis,
*,
namespace: str = "dramatiq-results",
encoder: Encoder = None
):
"""
Create Redis result backend.
Parameters:
- client: Redis client instance
- namespace: Key namespace for results (default: "dramatiq-results")
- encoder: Result encoder (uses JSON if None)
"""Usage:
import redis
from dramatiq.results.backends import RedisBackend
# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=2, decode_responses=True)
# Create Redis backend
redis_backend = RedisBackend(
redis_client,
namespace="myapp-results"
)
# Use with Results middleware
results = Results(backend=redis_backend, store_results=True)
broker.add_middleware(results)Memcached backend for result storage.
class MemcachedBackend(ResultBackend):
def __init__(
self,
client,
*,
namespace: str = "dramatiq-results",
encoder: Encoder = None
):
"""
Create Memcached result backend.
Parameters:
- client: Memcached client instance
- namespace: Key namespace for results
- encoder: Result encoder (uses JSON if None)
"""Usage:
import pylibmc
from dramatiq.results.backends import MemcachedBackend
# Create Memcached client
mc_client = pylibmc.Client(["127.0.0.1:11211"])
# Create Memcached backend
mc_backend = MemcachedBackend(mc_client)
# Use with Results middleware
results = Results(backend=mc_backend)
broker.add_middleware(results)In-memory backend for testing and development.
class StubBackend(ResultBackend):
def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):
"""
Create in-memory result backend for testing.
Parameters:
- namespace: Key namespace for results
- encoder: Result encoder (uses JSON if None)
"""Usage:
from dramatiq.results.backends import StubBackend
# Create stub backend for testing
stub_backend = StubBackend()
# Use in tests
results = Results(backend=stub_backend, store_results=True)
broker.add_middleware(results)
# Test result storage
@dramatiq.actor(store_results=True)
def test_task(value):
return value * 2
message = test_task.send(21)
result = message.get_result(block=True)
assert result == 42Messages provide direct access to result operations.
class Message:
def get_result(
self,
*,
backend: ResultBackend = None,
block: bool = False,
timeout: int = None
):
"""
Get result for this message.
Parameters:
- backend: Result backend to use (uses broker's backend if None)
- block: Whether to block waiting for result
- timeout: Timeout in milliseconds when blocking
Returns:
Task result
Raises:
ResultMissing: If result not available (when not blocking)
ResultTimeout: If timeout exceeded while blocking
ResultFailure: If task failed with exception
"""Usage:
@dramatiq.actor(store_results=True)
def long_running_task(duration):
import time
time.sleep(duration)
return {"completed_after": duration, "timestamp": time.time()}
# Send task
message = long_running_task.send(5)
# Non-blocking result check
try:
result = message.get_result(block=False)
print(f"Task completed: {result}")
except dramatiq.ResultMissing:
print("Task still running...")
# Blocking result retrieval
result = message.get_result(block=True, timeout=10000)
print(f"Final result: {result}")Specialized exceptions for result operations.
class ResultError(Exception):
"""Base exception for result operations."""
class ResultMissing(ResultError):
"""Raised when result is not available."""
class ResultTimeout(ResultError):
"""Raised when timeout exceeded while waiting for result."""
class ResultFailure(ResultError):
"""
Raised when task failed with an exception.
Contains the original exception information.
"""
def __init__(self, exception_type, exception_value, traceback):
self.exception_type = exception_type
self.exception_value = exception_value
self.traceback = traceback
# Missing sentinel value
class Missing:
"""Sentinel value indicating missing result."""
Missing = Missing()Usage:
@dramatiq.actor(store_results=True)
def failing_task(should_fail):
if should_fail:
raise ValueError("Task intentionally failed")
return "Success"
# Handle different result scenarios
message = failing_task.send(True) # Will fail
try:
result = message.get_result(block=True, timeout=5000)
print(f"Success: {result}")
except dramatiq.ResultTimeout:
print("Task timed out")
except dramatiq.ResultFailure as e:
print(f"Task failed: {e.exception_type.__name__}: {e.exception_value}")
except dramatiq.ResultMissing:
print("Result not found")@dramatiq.actor(store_results=True)
def step_one(data):
processed = data.upper()
return {"step": 1, "data": processed, "length": len(processed)}
@dramatiq.actor(store_results=True)
def step_two(step_one_result):
data = step_one_result["data"]
return {"step": 2, "data": data + "!!!", "prev_length": step_one_result["length"]}
@dramatiq.actor(store_results=True)
def step_three(step_two_result):
return {
"step": 3,
"final_data": step_two_result["data"],
"total_transformations": 3
}
# Create pipeline with result dependencies
msg1 = step_one.send("hello world")
result1 = msg1.get_result(block=True)
msg2 = step_two.send(result1)
result2 = msg2.get_result(block=True)
msg3 = step_three.send(result2)
final_result = msg3.get_result(block=True)
print(f"Pipeline result: {final_result}")@dramatiq.actor(store_results=True)
def process_item(item_id, item_data):
# Simulate processing
import time, random
time.sleep(random.uniform(0.1, 0.5))
return {
"item_id": item_id,
"processed_data": item_data.upper(),
"processing_time": random.uniform(0.1, 0.5)
}
# Send batch of tasks
batch_items = [
{"id": i, "data": f"item_{i}"}
for i in range(10)
]
messages = []
for item in batch_items:
msg = process_item.send(item["id"], item["data"])
messages.append(msg)
# Collect all results
results = []
for msg in messages:
try:
result = msg.get_result(block=True, timeout=10000)
results.append(result)
except dramatiq.ResultTimeout:
print(f"Message {msg.message_id} timed out")
except dramatiq.ResultFailure as e:
print(f"Message {msg.message_id} failed: {e}")
print(f"Collected {len(results)} results from {len(messages)} tasks")@dramatiq.actor(store_results=True)
def expensive_computation(input_data):
"""Expensive computation with result caching"""
import time, hashlib
# Simulate expensive operation
time.sleep(2)
# Generate deterministic result
hash_input = str(input_data).encode()
result_hash = hashlib.md5(hash_input).hexdigest()
return {
"input": input_data,
"result": result_hash,
"computed_at": time.time()
}
# Custom result storage with longer TTL
def store_with_custom_ttl(message, result, ttl_hours=24):
"""Store result with custom TTL"""
backend = broker.get_results_backend()
ttl_ms = ttl_hours * 3600 * 1000 # Convert to milliseconds
backend.store_result(message, result, ttl_ms)
# Usage with custom TTL
message = expensive_computation.send({"complex": "data"})
result = message.get_result(block=True)
# Store with longer TTL for caching
store_with_custom_ttl(message, result, ttl_hours=48)@dramatiq.actor(store_results=True)
def partial_computation(chunk_id, data_chunk):
"""Process a chunk of data"""
return {
"chunk_id": chunk_id,
"sum": sum(data_chunk),
"count": len(data_chunk),
"min": min(data_chunk),
"max": max(data_chunk)
}
@dramatiq.actor(store_results=True)
def aggregate_results(message_ids):
"""Aggregate results from multiple partial computations"""
backend = broker.get_results_backend()
partial_results = []
for msg_id in message_ids:
# Create dummy message for result retrieval
msg = dramatiq.Message(
queue_name="default",
actor_name="partial_computation",
args=(), kwargs={},
options={},
message_id=msg_id,
message_timestamp=0
)
try:
result = backend.get_result(msg, block=False)
if result != dramatiq.results.Missing:
partial_results.append(result)
except dramatiq.ResultMissing:
continue
# Aggregate all partial results
total_sum = sum(r["sum"] for r in partial_results)
total_count = sum(r["count"] for r in partial_results)
overall_min = min(r["min"] for r in partial_results)
overall_max = max(r["max"] for r in partial_results)
return {
"total_sum": total_sum,
"total_count": total_count,
"average": total_sum / total_count if total_count > 0 else 0,
"min": overall_min,
"max": overall_max,
"chunks_processed": len(partial_results)
}
# Usage
large_dataset = list(range(1000))
chunk_size = 100
chunks = [large_dataset[i:i+chunk_size] for i in range(0, len(large_dataset), chunk_size)]
# Process chunks
message_ids = []
for i, chunk in enumerate(chunks):
msg = partial_computation.send(i, chunk)
message_ids.append(msg.message_id)
# Wait for partial results to complete
import time
time.sleep(5)
# Aggregate results
aggregation_msg = aggregate_results.send(message_ids)
final_result = aggregation_msg.get_result(block=True)
print(f"Aggregated result: {final_result}")@dramatiq.actor(store_results=True)
def data_quality_check(data):
"""Check data quality and return score"""
import random
quality_score = random.uniform(0, 1)
return {
"data": data,
"quality_score": quality_score,
"passed": quality_score > 0.7
}
@dramatiq.actor(store_results=True)
def high_quality_processing(quality_result):
"""Process high-quality data"""
if not quality_result["passed"]:
return {"status": "skipped", "reason": "Low quality data"}
return {
"status": "processed",
"result": f"High quality processing of: {quality_result['data']}",
"quality_score": quality_result["quality_score"]
}
@dramatiq.actor(store_results=True)
def basic_processing(quality_result):
"""Basic processing for any data"""
return {
"status": "basic_processed",
"result": f"Basic processing of: {quality_result['data']}",
"quality_score": quality_result["quality_score"]
}
# Conditional processing based on results
def process_with_quality_check(data):
# Step 1: Quality check
quality_msg = data_quality_check.send(data)
quality_result = quality_msg.get_result(block=True)
# Step 2: Conditional processing based on quality
if quality_result["passed"]:
processing_msg = high_quality_processing.send(quality_result)
else:
processing_msg = basic_processing.send(quality_result)
final_result = processing_msg.get_result(block=True)
return final_result
# Usage
test_data = "sample data for processing"
result = process_with_quality_check(test_data)
print(f"Processing result: {result}")def monitor_task_results(messages, timeout=30000):
"""Monitor multiple task results with progress tracking"""
import time
start_time = time.time()
completed = {}
while len(completed) < len(messages):
for i, msg in enumerate(messages):
if i in completed:
continue
try:
result = msg.get_result(block=False)
completed[i] = result
print(f"Task {i} completed: {type(result)}")
except dramatiq.ResultMissing:
continue
except dramatiq.ResultFailure as e:
completed[i] = e
print(f"Task {i} failed: {e.exception_type.__name__}")
elapsed = (time.time() - start_time) * 1000
if elapsed > timeout:
print(f"Timeout: {len(completed)}/{len(messages)} completed")
break
if len(completed) < len(messages):
time.sleep(0.1)
return completed
# Usage
messages = [compute_task.send(i, i*2) for i in range(10)]
results = monitor_task_results(messages)
print(f"Final results: {len(results)} tasks completed")Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq