0
# Composition
1
2
Task composition in Dramatiq enables complex workflows by chaining tasks sequentially with pipelines or executing multiple tasks in parallel with groups. This allows building sophisticated data processing workflows from simple actor building blocks.
3
4
## Capabilities
5
6
### Pipeline
7
8
Sequential task execution where each task's output becomes the next task's input, creating processing chains.
9
10
```python { .api }
11
class pipeline:
12
def __init__(self, children: Iterable[Message | pipeline], *, broker: Broker = None):
13
"""
14
Create a pipeline from message objects or other pipelines.
15
16
Parameters:
17
- children: Iterable of Message objects or pipeline objects
18
- broker: Broker instance (uses global broker if None)
19
"""
20
21
def run(self, *, delay: int = None) -> pipeline:
22
"""
23
Execute the pipeline by sending all messages to the broker.
24
25
Parameters:
26
- delay: Delay in milliseconds before starting execution
27
28
Returns:
29
Self (for method chaining)
30
"""
31
32
def get_result(self, *, block: bool = False, timeout: int = None):
33
"""
34
Get the result of the final task in the pipeline.
35
36
Parameters:
37
- block: Whether to block waiting for result
38
- timeout: Timeout in milliseconds when blocking
39
40
Returns:
41
Result of the last task in the pipeline
42
43
Raises:
44
ResultMissing: If result is not available
45
ResultTimeout: If timeout exceeded while blocking
46
"""
47
48
def get_results(self, *, block: bool = False, timeout: int = None) -> List:
49
"""
50
Get results from all tasks in the pipeline.
51
52
Parameters:
53
- block: Whether to block waiting for results
54
- timeout: Timeout in milliseconds when blocking
55
56
Returns:
57
List of results from all pipeline tasks
58
"""
59
60
def __or__(self, other) -> pipeline:
61
"""
62
Chain this pipeline with another message or pipeline using | operator.
63
64
Parameters:
65
- other: Message or pipeline to chain with
66
67
Returns:
68
New pipeline containing both sequences
69
"""
70
71
def __len__(self) -> int:
72
"""
73
Get the number of tasks in the pipeline.
74
75
Returns:
76
Number of messages in the pipeline
77
"""
78
79
# Properties
80
completed: bool # True if all tasks completed
81
completed_count: int # Number of completed tasks
82
messages: List[Message] # List of pipeline messages
83
```
84
85
**Usage:**
86
87
```python
88
@dramatiq.actor
89
def fetch_data(url):
90
"""Fetch data from URL"""
91
return {"data": f"content from {url}", "size": 1024}
92
93
@dramatiq.actor
94
def process_data(data_info):
95
"""Process the fetched data"""
96
processed = data_info["data"].upper()
97
return {"processed": processed, "original_size": data_info["size"]}
98
99
@dramatiq.actor
100
def save_data(processed_info):
101
"""Save processed data"""
102
print(f"Saving: {processed_info['processed']}")
103
return {"saved": True, "id": "12345"}
104
105
# Create pipeline using | operator
106
pipeline = (
107
fetch_data.message("https://api.example.com/data") |
108
process_data.message() | # Will receive output from previous task
109
save_data.message()
110
)
111
112
# Execute pipeline
113
pipeline.run()
114
115
# Get final result (blocking)
116
final_result = pipeline.get_result(block=True, timeout=30000)
117
print(f"Pipeline result: {final_result}")
118
119
# Get all results
120
all_results = pipeline.get_results(block=True)
121
print(f"All results: {all_results}")
122
```
123
124
### Group
125
126
Parallel task execution where multiple tasks run concurrently and can be synchronized.
127
128
```python { .api }
129
class group:
130
def __init__(self, children: Iterable[Message], *, broker: Broker = None):
131
"""
132
Create a group from message objects.
133
134
Parameters:
135
- children: Iterable of Message objects
136
- broker: Broker instance (uses global broker if None)
137
"""
138
139
def run(self, *, delay: int = None) -> group:
140
"""
141
Execute the group by sending all messages to the broker.
142
143
Parameters:
144
- delay: Delay in milliseconds before starting execution
145
146
Returns:
147
Self (for method chaining)
148
"""
149
150
def get_results(self, *, block: bool = False, timeout: int = None) -> List:
151
"""
152
Get results from all tasks in the group.
153
154
Parameters:
155
- block: Whether to block waiting for all results
156
- timeout: Timeout in milliseconds when blocking
157
158
Returns:
159
List of results from all group tasks
160
161
Raises:
162
ResultTimeout: If timeout exceeded while blocking
163
"""
164
165
def wait(self, *, timeout: int = None):
166
"""
167
Wait for all tasks in the group to complete.
168
169
Parameters:
170
- timeout: Timeout in milliseconds
171
172
Raises:
173
ResultTimeout: If timeout exceeded
174
"""
175
176
def add_completion_callback(self, message: Message):
177
"""
178
Add a callback to be executed when all group tasks complete.
179
180
Parameters:
181
- message: Message to execute as completion callback
182
"""
183
184
def __len__(self) -> int:
185
"""
186
Get the number of tasks in the group.
187
188
Returns:
189
Number of messages in the group
190
"""
191
192
# Properties
193
completed: bool # True if all tasks completed
194
completed_count: int # Number of completed tasks
195
```
196
197
**Usage:**
198
199
```python
200
@dramatiq.actor
201
def process_item(item_id, item_data):
202
"""Process individual item"""
203
print(f"Processing item {item_id}: {item_data}")
204
return {"id": item_id, "processed": True, "result": item_data * 2}
205
206
@dramatiq.actor
207
def group_completion_handler(group_id):
208
"""Handle group completion"""
209
print(f"Group {group_id} completed!")
210
211
# Create group of parallel tasks
212
items = [
213
{"id": 1, "data": 10},
214
{"id": 2, "data": 20},
215
{"id": 3, "data": 30},
216
{"id": 4, "data": 40}
217
]
218
219
task_group = group([
220
process_item.message(item["id"], item["data"])
221
for item in items
222
])
223
224
# Add completion callback
225
task_group.add_completion_callback(
226
group_completion_handler.message("batch_001")
227
)
228
229
# Execute group
230
task_group.run()
231
232
# Wait for completion
233
task_group.wait(timeout=60000) # 1 minute timeout
234
235
# Get all results
236
results = task_group.get_results(block=True)
237
print(f"Group results: {results}")
238
```
239
240
### Advanced Composition Patterns
241
242
#### Mixed Pipeline and Group Composition
243
244
```python
245
@dramatiq.actor
246
def fetch_urls(urls):
247
"""Fetch multiple URLs"""
248
return [{"url": url, "data": f"content from {url}"} for url in urls]
249
250
@dramatiq.actor
251
def process_single_url(url_data):
252
"""Process single URL data"""
253
return {
254
"url": url_data["url"],
255
"processed": url_data["data"].upper(),
256
"length": len(url_data["data"])
257
}
258
259
@dramatiq.actor
260
def aggregate_results(results):
261
"""Aggregate all processed results"""
262
total_length = sum(r["length"] for r in results)
263
return {"total_items": len(results), "total_length": total_length}
264
265
# Complex composition: pipeline -> group -> pipeline
266
urls = ["https://api1.com", "https://api2.com", "https://api3.com"]
267
268
# Step 1: Fetch all URLs (single task)
269
fetch_step = fetch_urls.message(urls)
270
271
# Step 2: Process each URL in parallel (group)
272
# Note: This would require custom logic to split fetch results into group
273
# For demonstration, we'll create the group directly
274
process_group = group([
275
process_single_url.message({"url": url, "data": f"content from {url}"})
276
for url in urls
277
])
278
279
# Step 3: Aggregate results (single task)
280
aggregate_step = aggregate_results.message()
281
282
# Execute steps sequentially
283
fetch_step.send()
284
# Wait for fetch to complete, then create group with actual data
285
process_group.run()
286
process_group.wait()
287
aggregate_step.send()
288
```
289
290
#### Pipeline with Error Handling
291
292
```python
293
@dramatiq.actor
294
def safe_fetch(url):
295
"""Fetch with error handling"""
296
try:
297
# Simulate fetch operation
298
if "error" in url:
299
raise ValueError("Simulated fetch error")
300
return {"url": url, "data": f"content from {url}", "success": True}
301
except Exception as e:
302
return {"url": url, "error": str(e), "success": False}
303
304
@dramatiq.actor
305
def process_or_skip(fetch_result):
306
"""Process successful fetches, skip errors"""
307
if fetch_result["success"]:
308
return {
309
"processed": fetch_result["data"].upper(),
310
"original_url": fetch_result["url"]
311
}
312
else:
313
print(f"Skipping failed fetch: {fetch_result['error']}")
314
return {"skipped": True, "error": fetch_result["error"]}
315
316
@dramatiq.actor
317
def finalize_result(process_result):
318
"""Finalize the processing result"""
319
if process_result.get("skipped"):
320
return {"status": "failed", "reason": process_result["error"]}
321
else:
322
return {
323
"status": "success",
324
"result": process_result["processed"],
325
"url": process_result["original_url"]
326
}
327
328
# Error-resilient pipeline
329
error_pipeline = (
330
safe_fetch.message("https://error.example.com/data") |
331
process_or_skip.message() |
332
finalize_result.message()
333
)
334
335
error_pipeline.run()
336
result = error_pipeline.get_result(block=True)
337
print(f"Pipeline handled error gracefully: {result}")
338
```
339
340
#### Dynamic Group Creation
341
342
```python
343
@dramatiq.actor
344
def create_tasks_for_batch(batch_data):
345
"""Dynamically create tasks based on batch data"""
346
tasks = []
347
for item in batch_data["items"]:
348
if item["type"] == "email":
349
tasks.append(send_email.message(item["to"], item["subject"], item["body"]))
350
elif item["type"] == "sms":
351
tasks.append(send_sms.message(item["to"], item["message"]))
352
elif item["type"] == "push":
353
tasks.append(send_push.message(item["device_id"], item["message"]))
354
355
# Create and run group
356
notification_group = group(tasks)
357
notification_group.run()
358
359
return {"batch_id": batch_data["batch_id"], "task_count": len(tasks)}
360
361
# Usage
362
batch_data = {
363
"batch_id": "batch_123",
364
"items": [
365
{"type": "email", "to": "user1@example.com", "subject": "Hello", "body": "Message"},
366
{"type": "sms", "to": "+1234567890", "message": "Hello via SMS"},
367
{"type": "push", "device_id": "device123", "message": "Hello via push"}
368
]
369
}
370
371
create_tasks_for_batch.send(batch_data)
372
```
373
374
#### Conditional Pipeline Execution
375
376
```python
377
@dramatiq.actor
378
def check_condition(data):
379
"""Check if pipeline should continue"""
380
return {"continue": data["value"] > 10, "data": data}
381
382
@dramatiq.actor
383
def conditional_processor(check_result):
384
"""Process only if condition was met"""
385
if check_result["continue"]:
386
return {"processed": check_result["data"]["value"] * 2}
387
else:
388
return {"skipped": True, "reason": "Condition not met"}
389
390
@dramatiq.actor
391
def final_handler(process_result):
392
"""Handle final result regardless of path taken"""
393
if process_result.get("skipped"):
394
return {"status": "skipped", "reason": process_result["reason"]}
395
else:
396
return {"status": "completed", "result": process_result["processed"]}
397
398
# Conditional pipeline
399
conditional_pipeline = (
400
check_condition.message({"value": 5}) | # Will not meet condition
401
conditional_processor.message() |
402
final_handler.message()
403
)
404
405
conditional_pipeline.run()
406
result = conditional_pipeline.get_result(block=True)
407
print(f"Conditional result: {result}")
408
```
409
410
### Composition with Results Storage
411
412
When using the Results middleware, composition objects can retrieve and work with stored results:
413
414
```python
415
# Enable results storage
416
from dramatiq.middleware import Results
417
from dramatiq.results.backends import RedisBackend
418
419
result_backend = RedisBackend()
420
results_middleware = Results(backend=result_backend, store_results=True)
421
broker.add_middleware(results_middleware)
422
423
@dramatiq.actor(store_results=True)
424
def data_processor(data):
425
return {"processed": data, "timestamp": time.time()}
426
427
@dramatiq.actor(store_results=True)
428
def data_validator(processed_data):
429
return {"valid": True, "data": processed_data}
430
431
# Pipeline with result storage
432
result_pipeline = (
433
data_processor.message({"input": "test_data"}) |
434
data_validator.message()
435
)
436
437
result_pipeline.run()
438
439
# Get individual step results
440
step_results = result_pipeline.get_results(block=True, timeout=30000)
441
print(f"Each step result: {step_results}")
442
443
# Get final result
444
final_result = result_pipeline.get_result(block=True)
445
print(f"Final result: {final_result}")
446
```
447
448
### Composition Monitoring
449
450
```python
451
import time
452
453
def monitor_composition(composition, name):
454
"""Monitor composition progress"""
455
print(f"Starting {name} with {len(composition)} tasks")
456
457
start_time = time.time()
458
while not composition.completed:
459
elapsed = time.time() - start_time
460
print(f"{name}: {composition.completed_count}/{len(composition)} completed ({elapsed:.1f}s)")
461
time.sleep(1.0)
462
463
total_time = time.time() - start_time
464
print(f"{name} completed in {total_time:.1f}s")
465
466
# Usage with monitoring
467
large_group = group([
468
process_item.message(i, f"data_{i}")
469
for i in range(100)
470
])
471
472
large_group.run()
473
monitor_composition(large_group, "Large Group Processing")
474
```