0
# Workflow Primitives (Canvas)
1
2
Canvas workflow primitives enable complex task composition and orchestration patterns. These building blocks allow creating sophisticated distributed workflows including sequential chains, parallel execution, callbacks, and functional programming patterns over tasks.
3
4
## Capabilities
5
6
### Signature
7
8
Task signature that wraps a task call with its arguments and execution options, forming the foundation for all Canvas workflow patterns.
9
10
```python { .api }
11
class Signature:
12
def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
13
"""
14
Create task signature.
15
16
Args:
17
task (str|Task): Task name or task instance
18
args (tuple): Positional arguments
19
kwargs (dict): Keyword arguments
20
options (dict): Execution options
21
"""
22
23
def apply_async(self, args=None, kwargs=None, **options):
24
"""
25
Execute signature asynchronously.
26
27
Args:
28
args (tuple): Additional positional arguments
29
kwargs (dict): Additional keyword arguments
30
31
Returns:
32
AsyncResult instance
33
"""
34
35
def apply(self, args=None, kwargs=None, **options):
36
"""
37
Execute signature synchronously.
38
39
Args:
40
args (tuple): Additional positional arguments
41
kwargs (dict): Additional keyword arguments
42
43
Returns:
44
Task result
45
"""
46
47
def clone(self, args=None, kwargs=None, **opts):
48
"""
49
Create copy with modified arguments or options.
50
51
Args:
52
args (tuple): New positional arguments
53
kwargs (dict): New keyword arguments
54
**opts: New execution options
55
56
Returns:
57
New Signature instance
58
"""
59
60
def freeze(self, id=None):
61
"""
62
Make signature immutable with optional custom ID.
63
64
Args:
65
id (str): Custom task ID
66
67
Returns:
68
Immutable signature
69
"""
70
71
def set(self, immutable=None, **options):
72
"""
73
Set signature options.
74
75
Args:
76
immutable (bool): Make signature immutable
77
**options: Execution options to set
78
79
Returns:
80
Self for chaining
81
"""
82
83
def link(self, callback):
84
"""
85
Add success callback.
86
87
Args:
88
callback (Signature): Callback signature
89
90
Returns:
91
Self for chaining
92
"""
93
94
def link_error(self, errback):
95
"""
96
Add error callback.
97
98
Args:
99
errback (Signature): Error callback signature
100
101
Returns:
102
Self for chaining
103
"""
104
105
def signature(task, args=None, kwargs=None, **options):
106
"""
107
Create task signature.
108
109
Args:
110
task (str|Task): Task name or instance
111
args (tuple): Positional arguments
112
kwargs (dict): Keyword arguments
113
**options: Execution options
114
115
Returns:
116
Signature instance
117
"""
118
```
119
120
### Chain
121
122
Execute tasks sequentially, passing the result of each task as the first argument to the next task in the chain.
123
124
```python { .api }
125
class chain:
126
def __init__(self, *tasks, **kwargs):
127
"""
128
Create task chain.
129
130
Args:
131
*tasks: Task signatures to chain
132
**kwargs: Chain options
133
"""
134
135
def apply_async(self, args=None, kwargs=None, **options):
136
"""
137
Execute chain asynchronously.
138
139
Args:
140
args (tuple): Arguments for first task
141
kwargs (dict): Keyword arguments for first task
142
143
Returns:
144
AsyncResult for final task
145
"""
146
147
def apply(self, args=None, kwargs=None, **options):
148
"""
149
Execute chain synchronously.
150
151
Args:
152
args (tuple): Arguments for first task
153
kwargs (dict): Keyword arguments for first task
154
155
Returns:
156
Final task result
157
"""
158
159
def chain(*tasks, **kwargs):
160
"""
161
Create sequential task chain.
162
163
Args:
164
*tasks: Task signatures to execute in order
165
**kwargs: Chain execution options
166
167
Returns:
168
chain instance
169
"""
170
```
171
172
### Group
173
174
Execute multiple tasks in parallel, collecting results when all tasks complete.
175
176
```python { .api }
177
class group:
178
def __init__(self, *tasks, **kwargs):
179
"""
180
Create task group.
181
182
Args:
183
*tasks: Task signatures to execute in parallel
184
**kwargs: Group options
185
"""
186
187
def apply_async(self, args=None, kwargs=None, **options):
188
"""
189
Execute group asynchronously.
190
191
Args:
192
args (tuple): Arguments to add to each task
193
kwargs (dict): Keyword arguments to add to each task
194
195
Returns:
196
GroupResult instance
197
"""
198
199
def apply(self, args=None, kwargs=None, **options):
200
"""
201
Execute group synchronously.
202
203
Args:
204
args (tuple): Arguments to add to each task
205
kwargs (dict): Keyword arguments to add to each task
206
207
Returns:
208
List of task results
209
"""
210
211
def group(*tasks, **kwargs):
212
"""
213
Create parallel task group.
214
215
Args:
216
*tasks: Task signatures to execute in parallel
217
**kwargs: Group execution options
218
219
Returns:
220
group instance
221
"""
222
```
223
224
### Chord
225
226
Execute a group of tasks in parallel, then execute a callback task with the results when all tasks in the group complete.
227
228
```python { .api }
229
class chord:
230
def __init__(self, header, body, **kwargs):
231
"""
232
Create task chord.
233
234
Args:
235
header: Group of tasks to execute in parallel
236
body (Signature): Callback task to execute with results
237
**kwargs: Chord options
238
"""
239
240
def apply_async(self, args=None, kwargs=None, **options):
241
"""
242
Execute chord asynchronously.
243
244
Args:
245
args (tuple): Arguments for header tasks
246
kwargs (dict): Keyword arguments for header tasks
247
248
Returns:
249
AsyncResult for callback task
250
"""
251
252
def apply(self, args=None, kwargs=None, **options):
253
"""
254
Execute chord synchronously.
255
256
Args:
257
args (tuple): Arguments for header tasks
258
kwargs (dict): Keyword arguments for header tasks
259
260
Returns:
261
Callback task result
262
"""
263
264
def chord(header, body, **kwargs):
265
"""
266
Create chord (group + callback).
267
268
Args:
269
header: Group or list of tasks for parallel execution
270
body (Signature): Callback task executed with group results
271
**kwargs: Chord execution options
272
273
Returns:
274
chord instance
275
"""
276
```
277
278
### Chunks
279
280
Split an iterable into chunks and create tasks to process each chunk in parallel.
281
282
```python { .api }
283
class chunks:
284
def __init__(self, it, n, task):
285
"""
286
Create chunked task processing.
287
288
Args:
289
it: Iterable to chunk
290
n (int): Chunk size
291
task (Signature): Task to process each chunk
292
"""
293
294
def apply_async(self, **options):
295
"""
296
Execute chunks asynchronously.
297
298
Returns:
299
GroupResult instance
300
"""
301
302
def chunks(it, n, task):
303
"""
304
Split iterable into chunks for parallel processing.
305
306
Args:
307
it: Iterable to split
308
n (int): Size of each chunk
309
task (Signature): Task to process each chunk
310
311
Returns:
312
chunks instance
313
"""
314
```
315
316
### Map Operations
317
318
Functional programming style operations for mapping tasks over iterables.
319
320
```python { .api }
321
def xmap(task, it):
322
"""
323
Map task over iterable arguments.
324
325
Args:
326
task (Signature): Task to map
327
it: Iterable of argument tuples
328
329
Returns:
330
group instance
331
"""
332
333
def xstarmap(task, it):
334
"""
335
Map task over iterable with argument unpacking.
336
337
Args:
338
task (Signature): Task to map
339
it: Iterable of argument tuples to unpack
340
341
Returns:
342
group instance
343
"""
344
```
345
346
### Utility Functions
347
348
Helper functions for working with signatures and Canvas primitives.
349
350
```python { .api }
351
def maybe_signature(d, app=None):
352
"""
353
Convert signature-like object to actual signature.
354
355
Args:
356
d: Object that might be signature (dict, signature, etc.)
357
app: Celery app instance
358
359
Returns:
360
Signature instance or original object
361
"""
362
```
363
364
## Usage Examples
365
366
### Basic Signature Usage
367
368
```python
369
from celery import signature, Celery
370
371
app = Celery('example')
372
373
@app.task
374
def add(x, y):
375
return x + y
376
377
@app.task
378
def mul(x, y):
379
return x * y
380
381
# Create and execute signature
382
sig = signature('add', args=(2, 3))
383
result = sig.apply_async()
384
print(result.get()) # 5
385
386
# Using task shortcut methods
387
sig = add.s(2, 3) # Equivalent to signature
388
result = sig()
389
print(result.get()) # 5
390
391
# Immutable signature
392
sig = add.si(2, 3) # Won't accept additional arguments
393
```
394
395
### Chain Workflows
396
397
```python
398
from celery import chain
399
400
# Sequential processing - result of each becomes first arg of next
401
workflow = chain(
402
add.s(2, 3), # 2 + 3 = 5
403
mul.s(4), # 5 * 4 = 20
404
add.s(10) # 20 + 10 = 30
405
)
406
result = workflow()
407
print(result.get()) # 30
408
409
# Partial chain application
410
partial_chain = chain(mul.s(2), add.s(10))
411
result = partial_chain.apply_async(args=(5,)) # (5 * 2) + 10 = 20
412
print(result.get()) # 20
413
```
414
415
### Parallel Groups
416
417
```python
418
from celery import group
419
420
# Execute tasks in parallel
421
job = group([
422
add.s(2, 2),
423
add.s(4, 4),
424
add.s(8, 8),
425
add.s(16, 16)
426
])
427
result = job.apply_async()
428
429
# Get all results
430
results = result.get()
431
print(results) # [4, 8, 16, 32]
432
433
# Check completion status
434
print(result.ready()) # True when all complete
435
print(result.successful()) # True when all successful
436
```
437
438
### Chord Patterns
439
440
```python
441
from celery import chord
442
443
@app.task
444
def sum_results(numbers):
445
return sum(numbers)
446
447
# Parallel execution with callback
448
callback_workflow = chord([
449
add.s(2, 2),
450
add.s(4, 4),
451
add.s(8, 8)
452
])(sum_results.s())
453
454
result = callback_workflow.apply_async()
455
print(result.get()) # 28 (4 + 8 + 16)
456
457
# Nested chord with error handling
458
try:
459
result = chord([
460
add.s(1, 1),
461
add.s(2, 2)
462
])(mul.s(5)).apply_async()
463
464
final_result = result.get() # (1+1 + 2+2) * 5 = 30
465
except Exception as exc:
466
print(f"Chord failed: {exc}")
467
```
468
469
### Chunked Processing
470
471
```python
472
from celery import chunks
473
474
@app.task
475
def process_batch(items):
476
return [item * 2 for item in items]
477
478
# Process large dataset in chunks
479
data = list(range(100))
480
job = chunks(data, 10, process_batch.s())
481
result = job.apply_async()
482
483
# Get all batch results
484
batch_results = result.get()
485
print(len(batch_results)) # 10 batches
486
```
487
488
### Functional Map Operations
489
490
```python
491
from celery import xmap, xstarmap
492
493
# Map task over arguments
494
arguments = [(1, 2), (3, 4), (5, 6)]
495
job = xmap(add.s(), arguments)
496
results = job.apply_async().get()
497
print(results) # [3, 7, 11]
498
499
# Map with argument unpacking
500
job = xstarmap(add.s(), arguments)
501
results = job.apply_async().get()
502
print(results) # [3, 7, 11] - same result
503
504
# More complex mapping
505
@app.task
506
def process_user(user_id, action, **options):
507
return f"User {user_id}: {action}"
508
509
user_actions = [
510
(1, 'login', {'timestamp': '2023-01-01'}),
511
(2, 'logout', {'timestamp': '2023-01-02'})
512
]
513
514
job = xstarmap(process_user.s(), user_actions)
515
results = job.apply_async().get()
516
```
517
518
### Complex Workflow Composition
519
520
```python
521
# Combine multiple patterns
522
from celery import chain, group, chord
523
524
@app.task
525
def fetch_data(source):
526
return f"data_from_{source}"
527
528
@app.task
529
def process_data(data):
530
return f"processed_{data}"
531
532
@app.task
533
def aggregate_results(results):
534
return f"aggregated_{len(results)}_items"
535
536
# Complex nested workflow
537
workflow = chain(
538
# Step 1: Fetch data from multiple sources in parallel
539
group([
540
fetch_data.s('db'),
541
fetch_data.s('api'),
542
fetch_data.s('cache')
543
]),
544
545
# Step 2: Process each result and aggregate
546
chord(
547
group([process_data.s() for _ in range(3)]),
548
aggregate_results.s()
549
)
550
)
551
552
result = workflow.apply_async()
553
print(result.get()) # Final aggregated result
554
```
555
556
### Error Handling and Callbacks
557
558
```python
559
from celery import signature
560
561
@app.task
562
def may_fail(x):
563
if x < 0:
564
raise ValueError("Negative numbers not allowed")
565
return x * 2
566
567
@app.task
568
def handle_success(result):
569
print(f"Success: {result}")
570
return result
571
572
@app.task
573
def handle_failure(task_id, error, traceback):
574
print(f"Task {task_id} failed: {error}")
575
576
# Add callbacks to signature
577
sig = may_fail.s(5)
578
sig.link(handle_success.s())
579
sig.link_error(handle_failure.s())
580
581
result = sig.apply_async()
582
```