0
# Results
1
2
Result storage in Dramatiq enables tasks to store and retrieve their return values, making it possible to build complex workflows where tasks depend on the results of previous tasks. The system supports multiple storage backends and provides both synchronous and asynchronous result retrieval.
3
4
## Capabilities
5
6
### Results Middleware
7
8
The core middleware component that handles result storage and retrieval.
9
10
```python { .api }
11
class Results(Middleware):
12
def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):
13
"""
14
Initialize results middleware.
15
16
Parameters:
17
- backend: Result storage backend (uses StubBackend if None)
18
- store_results: Whether to store results by default for all actors
19
"""
20
21
@property
22
def actor_options(self) -> Set[str]:
23
return {"store_results"}
24
```
25
26
**Usage:**
27
28
```python
29
from dramatiq.middleware import Results
30
from dramatiq.results.backends import RedisBackend
31
32
# Set up result backend
33
result_backend = RedisBackend()
34
35
# Create results middleware
36
results_middleware = Results(
37
backend=result_backend,
38
store_results=False # Only store when explicitly requested
39
)
40
41
# Add to broker
42
broker.add_middleware(results_middleware)
43
44
# Actors with result storage
45
@dramatiq.actor(store_results=True)
46
def compute_task(x, y):
47
result = x * y + 42
48
return {"computation": result, "inputs": [x, y]}
49
50
# Send task and get result
51
message = compute_task.send(10, 20)
52
result = message.get_result(block=True, timeout=30000)
53
print(f"Task result: {result}")
54
```
55
56
### Result Backend Base Class
57
58
Abstract base class for result storage backends.
59
60
```python { .api }
61
class ResultBackend:
62
def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):
63
"""
64
Initialize result backend.
65
66
Parameters:
67
- namespace: Key namespace for storing results
68
- encoder: Encoder for serializing results (uses JSON if None)
69
"""
70
71
def get_result(
72
self,
73
message: Message,
74
*,
75
block: bool = False,
76
timeout: int = 10000
77
):
78
"""
79
Get result for a message.
80
81
Parameters:
82
- message: Message to get result for
83
- block: Whether to block waiting for result
84
- timeout: Timeout in milliseconds when blocking
85
86
Returns:
87
Task result or Missing sentinel
88
89
Raises:
90
ResultTimeout: If timeout exceeded while blocking
91
ResultMissing: If result not found (when not blocking)
92
ResultFailure: If task failed with exception
93
"""
94
95
def store_result(self, message: Message, result, ttl: int):
96
"""
97
Store result for a message.
98
99
Parameters:
100
- message: Message to store result for
101
- result: Result value to store
102
- ttl: Time-to-live in milliseconds
103
"""
104
105
def delete_result(self, message: Message):
106
"""
107
Delete stored result for a message.
108
109
Parameters:
110
- message: Message to delete result for
111
"""
112
```
113
114
### Result Backend Implementations
115
116
#### Redis Backend
117
118
Production backend using Redis for result storage.
119
120
```python { .api }
121
class RedisBackend(ResultBackend):
122
def __init__(
123
self,
124
client: redis.Redis,
125
*,
126
namespace: str = "dramatiq-results",
127
encoder: Encoder = None
128
):
129
"""
130
Create Redis result backend.
131
132
Parameters:
133
- client: Redis client instance
134
- namespace: Key namespace for results (default: "dramatiq-results")
135
- encoder: Result encoder (uses JSON if None)
136
"""
137
```
138
139
**Usage:**
140
141
```python
142
import redis
143
from dramatiq.results.backends import RedisBackend
144
145
# Create Redis client
146
redis_client = redis.Redis(host="localhost", port=6379, db=2, decode_responses=True)
147
148
# Create Redis backend
149
redis_backend = RedisBackend(
150
redis_client,
151
namespace="myapp-results"
152
)
153
154
# Use with Results middleware
155
results = Results(backend=redis_backend, store_results=True)
156
broker.add_middleware(results)
157
```
158
159
#### Memcached Backend
160
161
Memcached backend for result storage.
162
163
```python { .api }
164
class MemcachedBackend(ResultBackend):
165
def __init__(
166
self,
167
client,
168
*,
169
namespace: str = "dramatiq-results",
170
encoder: Encoder = None
171
):
172
"""
173
Create Memcached result backend.
174
175
Parameters:
176
- client: Memcached client instance
177
- namespace: Key namespace for results
178
- encoder: Result encoder (uses JSON if None)
179
"""
180
```
181
182
**Usage:**
183
184
```python
185
import pylibmc
186
from dramatiq.results.backends import MemcachedBackend
187
188
# Create Memcached client
189
mc_client = pylibmc.Client(["127.0.0.1:11211"])
190
191
# Create Memcached backend
192
mc_backend = MemcachedBackend(mc_client)
193
194
# Use with Results middleware
195
results = Results(backend=mc_backend)
196
broker.add_middleware(results)
197
```
198
199
#### Stub Backend
200
201
In-memory backend for testing and development.
202
203
```python { .api }
204
class StubBackend(ResultBackend):
205
def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):
206
"""
207
Create in-memory result backend for testing.
208
209
Parameters:
210
- namespace: Key namespace for results
211
- encoder: Result encoder (uses JSON if None)
212
"""
213
```
214
215
**Usage:**
216
217
```python
218
from dramatiq.results.backends import StubBackend
219
220
# Create stub backend for testing
221
stub_backend = StubBackend()
222
223
# Use in tests
224
results = Results(backend=stub_backend, store_results=True)
225
broker.add_middleware(results)
226
227
# Test result storage
228
@dramatiq.actor(store_results=True)
229
def test_task(value):
230
return value * 2
231
232
message = test_task.send(21)
233
result = message.get_result(block=True)
234
assert result == 42
235
```
236
237
### Message Result Interface
238
239
Messages provide direct access to result operations.
240
241
```python { .api }
242
class Message:
243
def get_result(
244
self,
245
*,
246
backend: ResultBackend = None,
247
block: bool = False,
248
timeout: int = None
249
):
250
"""
251
Get result for this message.
252
253
Parameters:
254
- backend: Result backend to use (uses broker's backend if None)
255
- block: Whether to block waiting for result
256
- timeout: Timeout in milliseconds when blocking
257
258
Returns:
259
Task result
260
261
Raises:
262
ResultMissing: If result not available (when not blocking)
263
ResultTimeout: If timeout exceeded while blocking
264
ResultFailure: If task failed with exception
265
"""
266
```
267
268
**Usage:**
269
270
```python
271
@dramatiq.actor(store_results=True)
272
def long_running_task(duration):
273
import time
274
time.sleep(duration)
275
return {"completed_after": duration, "timestamp": time.time()}
276
277
# Send task
278
message = long_running_task.send(5)
279
280
# Non-blocking result check
281
try:
282
result = message.get_result(block=False)
283
print(f"Task completed: {result}")
284
except dramatiq.ResultMissing:
285
print("Task still running...")
286
287
# Blocking result retrieval
288
result = message.get_result(block=True, timeout=10000)
289
print(f"Final result: {result}")
290
```
291
292
### Result Errors
293
294
Specialized exceptions for result operations.
295
296
```python { .api }
297
class ResultError(Exception):
298
"""Base exception for result operations."""
299
300
class ResultMissing(ResultError):
301
"""Raised when result is not available."""
302
303
class ResultTimeout(ResultError):
304
"""Raised when timeout exceeded while waiting for result."""
305
306
class ResultFailure(ResultError):
307
"""
308
Raised when task failed with an exception.
309
310
Contains the original exception information.
311
"""
312
def __init__(self, exception_type, exception_value, traceback):
313
self.exception_type = exception_type
314
self.exception_value = exception_value
315
self.traceback = traceback
316
317
# Missing sentinel value
318
class Missing:
319
"""Sentinel value indicating missing result."""
320
321
Missing = Missing()
322
```
323
324
**Usage:**
325
326
```python
327
@dramatiq.actor(store_results=True)
328
def failing_task(should_fail):
329
if should_fail:
330
raise ValueError("Task intentionally failed")
331
return "Success"
332
333
# Handle different result scenarios
334
message = failing_task.send(True) # Will fail
335
336
try:
337
result = message.get_result(block=True, timeout=5000)
338
print(f"Success: {result}")
339
except dramatiq.ResultTimeout:
340
print("Task timed out")
341
except dramatiq.ResultFailure as e:
342
print(f"Task failed: {e.exception_type.__name__}: {e.exception_value}")
343
except dramatiq.ResultMissing:
344
print("Result not found")
345
```
346
347
### Advanced Result Patterns
348
349
#### Pipeline Results
350
351
```python
352
@dramatiq.actor(store_results=True)
353
def step_one(data):
354
processed = data.upper()
355
return {"step": 1, "data": processed, "length": len(processed)}
356
357
@dramatiq.actor(store_results=True)
358
def step_two(step_one_result):
359
data = step_one_result["data"]
360
return {"step": 2, "data": data + "!!!", "prev_length": step_one_result["length"]}
361
362
@dramatiq.actor(store_results=True)
363
def step_three(step_two_result):
364
return {
365
"step": 3,
366
"final_data": step_two_result["data"],
367
"total_transformations": 3
368
}
369
370
# Create pipeline with result dependencies
371
msg1 = step_one.send("hello world")
372
result1 = msg1.get_result(block=True)
373
374
msg2 = step_two.send(result1)
375
result2 = msg2.get_result(block=True)
376
377
msg3 = step_three.send(result2)
378
final_result = msg3.get_result(block=True)
379
380
print(f"Pipeline result: {final_result}")
381
```
382
383
#### Batch Result Collection
384
385
```python
386
@dramatiq.actor(store_results=True)
387
def process_item(item_id, item_data):
388
# Simulate processing
389
import time, random
390
time.sleep(random.uniform(0.1, 0.5))
391
return {
392
"item_id": item_id,
393
"processed_data": item_data.upper(),
394
"processing_time": random.uniform(0.1, 0.5)
395
}
396
397
# Send batch of tasks
398
batch_items = [
399
{"id": i, "data": f"item_{i}"}
400
for i in range(10)
401
]
402
403
messages = []
404
for item in batch_items:
405
msg = process_item.send(item["id"], item["data"])
406
messages.append(msg)
407
408
# Collect all results
409
results = []
410
for msg in messages:
411
try:
412
result = msg.get_result(block=True, timeout=10000)
413
results.append(result)
414
except dramatiq.ResultTimeout:
415
print(f"Message {msg.message_id} timed out")
416
except dramatiq.ResultFailure as e:
417
print(f"Message {msg.message_id} failed: {e}")
418
419
print(f"Collected {len(results)} results from {len(messages)} tasks")
420
```
421
422
#### Result Caching and TTL Management
423
424
```python
425
@dramatiq.actor(store_results=True)
426
def expensive_computation(input_data):
427
"""Expensive computation with result caching"""
428
import time, hashlib
429
430
# Simulate expensive operation
431
time.sleep(2)
432
433
# Generate deterministic result
434
hash_input = str(input_data).encode()
435
result_hash = hashlib.md5(hash_input).hexdigest()
436
437
return {
438
"input": input_data,
439
"result": result_hash,
440
"computed_at": time.time()
441
}
442
443
# Custom result storage with longer TTL
444
def store_with_custom_ttl(message, result, ttl_hours=24):
445
"""Store result with custom TTL"""
446
backend = broker.get_results_backend()
447
ttl_ms = ttl_hours * 3600 * 1000 # Convert to milliseconds
448
backend.store_result(message, result, ttl_ms)
449
450
# Usage with custom TTL
451
message = expensive_computation.send({"complex": "data"})
452
result = message.get_result(block=True)
453
454
# Store with longer TTL for caching
455
store_with_custom_ttl(message, result, ttl_hours=48)
456
```
457
458
#### Result Aggregation
459
460
```python
461
@dramatiq.actor(store_results=True)
462
def partial_computation(chunk_id, data_chunk):
463
"""Process a chunk of data"""
464
return {
465
"chunk_id": chunk_id,
466
"sum": sum(data_chunk),
467
"count": len(data_chunk),
468
"min": min(data_chunk),
469
"max": max(data_chunk)
470
}
471
472
@dramatiq.actor(store_results=True)
473
def aggregate_results(message_ids):
474
"""Aggregate results from multiple partial computations"""
475
backend = broker.get_results_backend()
476
477
partial_results = []
478
for msg_id in message_ids:
479
# Create dummy message for result retrieval
480
msg = dramatiq.Message(
481
queue_name="default",
482
actor_name="partial_computation",
483
args=(), kwargs={},
484
options={},
485
message_id=msg_id,
486
message_timestamp=0
487
)
488
489
try:
490
result = backend.get_result(msg, block=False)
491
if result != dramatiq.results.Missing:
492
partial_results.append(result)
493
except dramatiq.ResultMissing:
494
continue
495
496
# Aggregate all partial results
497
total_sum = sum(r["sum"] for r in partial_results)
498
total_count = sum(r["count"] for r in partial_results)
499
overall_min = min(r["min"] for r in partial_results)
500
overall_max = max(r["max"] for r in partial_results)
501
502
return {
503
"total_sum": total_sum,
504
"total_count": total_count,
505
"average": total_sum / total_count if total_count > 0 else 0,
506
"min": overall_min,
507
"max": overall_max,
508
"chunks_processed": len(partial_results)
509
}
510
511
# Usage
512
large_dataset = list(range(1000))
513
chunk_size = 100
514
chunks = [large_dataset[i:i+chunk_size] for i in range(0, len(large_dataset), chunk_size)]
515
516
# Process chunks
517
message_ids = []
518
for i, chunk in enumerate(chunks):
519
msg = partial_computation.send(i, chunk)
520
message_ids.append(msg.message_id)
521
522
# Wait for partial results to complete
523
import time
524
time.sleep(5)
525
526
# Aggregate results
527
aggregation_msg = aggregate_results.send(message_ids)
528
final_result = aggregation_msg.get_result(block=True)
529
print(f"Aggregated result: {final_result}")
530
```
531
532
#### Result-Based Conditional Execution
533
534
```python
535
@dramatiq.actor(store_results=True)
536
def data_quality_check(data):
537
"""Check data quality and return score"""
538
import random
539
quality_score = random.uniform(0, 1)
540
541
return {
542
"data": data,
543
"quality_score": quality_score,
544
"passed": quality_score > 0.7
545
}
546
547
@dramatiq.actor(store_results=True)
548
def high_quality_processing(quality_result):
549
"""Process high-quality data"""
550
if not quality_result["passed"]:
551
return {"status": "skipped", "reason": "Low quality data"}
552
553
return {
554
"status": "processed",
555
"result": f"High quality processing of: {quality_result['data']}",
556
"quality_score": quality_result["quality_score"]
557
}
558
559
@dramatiq.actor(store_results=True)
560
def basic_processing(quality_result):
561
"""Basic processing for any data"""
562
return {
563
"status": "basic_processed",
564
"result": f"Basic processing of: {quality_result['data']}",
565
"quality_score": quality_result["quality_score"]
566
}
567
568
# Conditional processing based on results
569
def process_with_quality_check(data):
570
# Step 1: Quality check
571
quality_msg = data_quality_check.send(data)
572
quality_result = quality_msg.get_result(block=True)
573
574
# Step 2: Conditional processing based on quality
575
if quality_result["passed"]:
576
processing_msg = high_quality_processing.send(quality_result)
577
else:
578
processing_msg = basic_processing.send(quality_result)
579
580
final_result = processing_msg.get_result(block=True)
581
return final_result
582
583
# Usage
584
test_data = "sample data for processing"
585
result = process_with_quality_check(test_data)
586
print(f"Processing result: {result}")
587
```
588
589
### Result Monitoring and Debugging
590
591
```python
592
def monitor_task_results(messages, timeout=30000):
593
"""Monitor multiple task results with progress tracking"""
594
import time
595
596
start_time = time.time()
597
completed = {}
598
599
while len(completed) < len(messages):
600
for i, msg in enumerate(messages):
601
if i in completed:
602
continue
603
604
try:
605
result = msg.get_result(block=False)
606
completed[i] = result
607
print(f"Task {i} completed: {type(result)}")
608
except dramatiq.ResultMissing:
609
continue
610
except dramatiq.ResultFailure as e:
611
completed[i] = e
612
print(f"Task {i} failed: {e.exception_type.__name__}")
613
614
elapsed = (time.time() - start_time) * 1000
615
if elapsed > timeout:
616
print(f"Timeout: {len(completed)}/{len(messages)} completed")
617
break
618
619
if len(completed) < len(messages):
620
time.sleep(0.1)
621
622
return completed
623
624
# Usage
625
messages = [compute_task.send(i, i*2) for i in range(10)]
626
results = monitor_task_results(messages)
627
print(f"Final results: {len(results)} tasks completed")
628
```