0
# Results and State Management
1
2
Task result handling and state monitoring capabilities for tracking task execution, retrieving results, and managing task lifecycle. These components provide comprehensive result access, state inspection, and task control functionality.
3
4
## Capabilities
5
6
### AsyncResult
7
8
Represents the result of an asynchronously executed task, providing methods to check status, retrieve results, and control task execution.
9
10
```python { .api }
11
class AsyncResult:
12
def __init__(self, id, backend=None, task_name=None, app=None, parent=None):
13
"""
14
Create AsyncResult instance.
15
16
Args:
17
id (str): Task ID
18
backend: Result backend instance
19
task_name (str): Task name
20
app: Celery app instance
21
parent: Parent result
22
"""
23
24
def get(
25
self,
26
timeout=None,
27
propagate=True,
28
interval=0.5,
29
no_ack=True,
30
follow_parents=True,
31
callback=None,
32
on_message=None,
33
on_interval=None,
34
disable_sync_subtasks=True,
35
EXCEPTION_STATES=None,
36
PROPAGATE_STATES=None
37
):
38
"""
39
Get task result, waiting if necessary.
40
41
Args:
42
timeout (float): Maximum time to wait in seconds
43
propagate (bool): Re-raise task exceptions
44
interval (float): Polling interval in seconds
45
no_ack (bool): Don't acknowledge result
46
follow_parents (bool): Follow parent results
47
callback (callable): Called with result when ready
48
on_message (callable): Called for each message received
49
on_interval (callable): Called on each polling interval
50
disable_sync_subtasks (bool): Disable synchronous subtasks
51
52
Returns:
53
Task result value
54
55
Raises:
56
TimeoutError: If timeout exceeded
57
Exception: Task exception if propagate=True
58
"""
59
60
def ready(self):
61
"""
62
Check if task has finished executing.
63
64
Returns:
65
bool: True if task complete (success or failure)
66
"""
67
68
def successful(self):
69
"""
70
Check if task completed successfully.
71
72
Returns:
73
bool: True if task succeeded, False otherwise
74
75
Raises:
76
ValueError: If task not ready yet
77
"""
78
79
def failed(self):
80
"""
81
Check if task execution failed.
82
83
Returns:
84
bool: True if task failed, False otherwise
85
86
Raises:
87
ValueError: If task not ready yet
88
"""
89
90
def retry(self):
91
"""
92
Check if task is waiting for retry.
93
94
Returns:
95
bool: True if task will be retried
96
"""
97
98
def revoke(self, connection=None, terminate=False, signal='SIGTERM', wait=False, timeout=None):
99
"""
100
Revoke/cancel task execution.
101
102
Args:
103
connection: Broker connection
104
terminate (bool): Terminate worker process
105
signal (str): Signal to send if terminating
106
wait (bool): Wait for termination confirmation
107
timeout (float): Termination timeout
108
"""
109
110
def forget(self):
111
"""
112
Remove result from backend storage.
113
"""
114
115
def build_graph(self, intermediate=False, formatter=None):
116
"""
117
Build task dependency graph.
118
119
Args:
120
intermediate (bool): Include intermediate results
121
formatter (callable): Result formatter function
122
123
Returns:
124
dict: Dependency graph structure
125
"""
126
127
@property
128
def result(self):
129
"""Task result value or exception."""
130
131
@property
132
def return_value(self):
133
"""Alias for result property."""
134
135
@property
136
def state(self):
137
"""Current task state."""
138
139
@property
140
def status(self):
141
"""Alias for state property."""
142
143
@property
144
def info(self):
145
"""Additional state information."""
146
147
@property
148
def traceback(self):
149
"""Exception traceback if task failed."""
150
151
@property
152
def id(self):
153
"""Task ID."""
154
155
@property
156
def task_id(self):
157
"""Alias for id property."""
158
159
@property
160
def name(self):
161
"""Task name."""
162
163
@property
164
def args(self):
165
"""Task positional arguments."""
166
167
@property
168
def kwargs(self):
169
"""Task keyword arguments."""
170
171
@property
172
def backend(self):
173
"""Result backend instance."""
174
175
@property
176
def children(self):
177
"""Child task results."""
178
```
179
180
### GroupResult
181
182
Result collection for group task execution, providing methods to check group completion status and retrieve all results.
183
184
```python { .api }
185
class GroupResult:
186
def __init__(self, id=None, results=None, **kwargs):
187
"""
188
Create GroupResult instance.
189
190
Args:
191
id (str): Group ID
192
results (list): List of AsyncResult instances
193
"""
194
195
def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):
196
"""
197
Get results from all tasks in group.
198
199
Args:
200
timeout (float): Maximum time to wait
201
propagate (bool): Re-raise task exceptions
202
interval (float): Polling interval
203
callback (callable): Result callback
204
no_ack (bool): Don't acknowledge results
205
206
Returns:
207
list: Results from all group tasks
208
"""
209
210
def ready(self):
211
"""
212
Check if all group tasks are complete.
213
214
Returns:
215
bool: True if all tasks finished
216
"""
217
218
def successful(self):
219
"""
220
Check if all group tasks succeeded.
221
222
Returns:
223
bool: True if all tasks successful
224
"""
225
226
def failed(self):
227
"""
228
Check if any group tasks failed.
229
230
Returns:
231
bool: True if any task failed
232
"""
233
234
def waiting(self):
235
"""
236
Check if any group tasks are still waiting.
237
238
Returns:
239
bool: True if any tasks not ready
240
"""
241
242
def revoke(self, connection=None, terminate=False, signal='SIGTERM'):
243
"""
244
Revoke all tasks in group.
245
246
Args:
247
connection: Broker connection
248
terminate (bool): Terminate worker processes
249
signal (str): Termination signal
250
"""
251
252
def forget(self):
253
"""Remove all results from backend."""
254
255
def iterate(self, timeout=None, propagate=True, interval=0.5):
256
"""
257
Iterate over results as they become ready.
258
259
Args:
260
timeout (float): Overall timeout
261
propagate (bool): Re-raise exceptions
262
interval (float): Polling interval
263
264
Yields:
265
Task results as they complete
266
"""
267
268
@property
269
def results(self):
270
"""List of AsyncResult instances."""
271
272
@property
273
def children(self):
274
"""Alias for results property."""
275
```
276
277
### ResultSet
278
279
Collection of results that can be managed as a single unit, providing batch operations over multiple AsyncResult instances.
280
281
```python { .api }
282
class ResultSet:
283
def __init__(self, results=None, **kwargs):
284
"""
285
Create ResultSet instance.
286
287
Args:
288
results (list): AsyncResult instances
289
"""
290
291
def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):
292
"""
293
Get all results in set.
294
295
Args:
296
timeout (float): Maximum wait time
297
propagate (bool): Re-raise exceptions
298
interval (float): Polling interval
299
callback (callable): Result callback
300
no_ack (bool): Don't acknowledge results
301
302
Returns:
303
list: All results
304
"""
305
306
def ready(self):
307
"""
308
Check if all results are ready.
309
310
Returns:
311
bool: True if all complete
312
"""
313
314
def successful(self):
315
"""
316
Check if all results are successful.
317
318
Returns:
319
bool: True if all succeeded
320
"""
321
322
def failed(self):
323
"""
324
Check if any results failed.
325
326
Returns:
327
bool: True if any failed
328
"""
329
330
def revoke(self, connection=None, terminate=False, signal='SIGTERM'):
331
"""
332
Revoke all results.
333
334
Args:
335
connection: Broker connection
336
terminate (bool): Terminate processes
337
signal (str): Termination signal
338
"""
339
340
def iterate(self, timeout=None, propagate=True, interval=0.5):
341
"""
342
Iterate over results as ready.
343
344
Args:
345
timeout (float): Overall timeout
346
propagate (bool): Re-raise exceptions
347
interval (float): Check interval
348
349
Yields:
350
Results as they complete
351
"""
352
353
def add(self, result):
354
"""
355
Add result to set.
356
357
Args:
358
result (AsyncResult): Result to add
359
"""
360
361
def remove(self, result):
362
"""
363
Remove result from set.
364
365
Args:
366
result (AsyncResult): Result to remove
367
"""
368
369
@property
370
def results(self):
371
"""List of AsyncResult instances in set."""
372
```
373
374
### Task States
375
376
State constants and utilities for tracking task execution status and lifecycle.
377
378
```python { .api }
379
# State constants
380
PENDING = 'PENDING' # Task waiting for execution or unknown
381
RECEIVED = 'RECEIVED' # Task received by worker
382
STARTED = 'STARTED' # Task started by worker
383
SUCCESS = 'SUCCESS' # Task executed successfully
384
FAILURE = 'FAILURE' # Task execution failed
385
REVOKED = 'REVOKED' # Task revoked/cancelled
386
RETRY = 'RETRY' # Task will be retried
387
IGNORED = 'IGNORED' # Task result ignored
388
389
# State collections
390
READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
391
UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])
392
EXCEPTION_STATES = frozenset([RETRY, FAILURE, REVOKED])
393
PROPAGATE_STATES = frozenset([FAILURE, REVOKED])
394
395
class state(str):
396
"""
397
String subclass with state comparison methods.
398
"""
399
400
def __lt__(self, other):
401
"""Compare state precedence."""
402
403
def __gt__(self, other):
404
"""Compare state precedence."""
405
```
406
407
### State Utilities
408
409
Helper functions for working with task states and result objects.
410
411
```python { .api }
412
def result_from_tuple(r, app=None):
413
"""
414
Create result object from tuple representation.
415
416
Args:
417
r (tuple): Result tuple (task_id, status, result, traceback, children)
418
app: Celery app instance
419
420
Returns:
421
AsyncResult instance
422
"""
423
```
424
425
## Usage Examples
426
427
### Basic Result Handling
428
429
```python
430
from celery import Celery
431
432
app = Celery('example', broker='redis://localhost:6379')
433
434
@app.task
435
def add(x, y):
436
import time
437
time.sleep(2) # Simulate work
438
return x + y
439
440
@app.task
441
def divide(x, y):
442
if y == 0:
443
raise ValueError("Cannot divide by zero")
444
return x / y
445
446
# Execute task and get result
447
result = add.delay(4, 4)
448
449
# Check status without blocking
450
print(f"Task ID: {result.id}")
451
print(f"Ready: {result.ready()}")
452
print(f"State: {result.state}")
453
454
# Get result with timeout
455
try:
456
value = result.get(timeout=10)
457
print(f"Result: {value}")
458
except TimeoutError:
459
print("Task took too long")
460
461
# Check if successful
462
if result.successful():
463
print("Task completed successfully")
464
else:
465
print("Task failed or not ready")
466
```
467
468
### Error Handling and Propagation
469
470
```python
471
# Task that may fail
472
result = divide.delay(10, 0)
473
474
# Get result with error handling
475
try:
476
value = result.get(propagate=True)
477
print(f"Result: {value}")
478
except ValueError as exc:
479
print(f"Task failed: {exc}")
480
print(f"Traceback: {result.traceback}")
481
482
# Get result without propagating errors
483
value = result.get(propagate=False)
484
if result.failed():
485
print(f"Task failed with: {result.result}")
486
print(f"Info: {result.info}")
487
```
488
489
### Group Results
490
491
```python
492
from celery import group
493
494
# Create and execute group
495
job = group([
496
add.s(2, 2),
497
add.s(4, 4),
498
add.s(8, 8)
499
])
500
result = job.apply_async()
501
502
# Check group status
503
print(f"Group ready: {result.ready()}")
504
print(f"Group successful: {result.successful() if result.ready() else 'Not ready'}")
505
506
# Get all results
507
try:
508
results = result.get(timeout=30)
509
print(f"All results: {results}") # [4, 8, 16]
510
except Exception as exc:
511
print(f"Group failed: {exc}")
512
513
# Iterate over results as they complete
514
print("Results as they complete:")
515
for task_result in result.iterate(timeout=30):
516
print(f"Got result: {task_result}")
517
```
518
519
### ResultSet Operations
520
521
```python
522
from celery.result import ResultSet
523
524
# Create multiple tasks
525
tasks = [add.delay(i, i) for i in range(5)]
526
527
# Create result set
528
result_set = ResultSet(tasks)
529
530
# Batch operations
531
print(f"All ready: {result_set.ready()}")
532
533
# Get results with error handling
534
results = []
535
for result in result_set.iterate(timeout=60):
536
try:
537
value = result
538
results.append(value)
539
print(f"Task completed with result: {value}")
540
except Exception as exc:
541
print(f"Task failed: {exc}")
542
543
print(f"Final results: {results}")
544
```
545
546
### Task Revocation and Control
547
548
```python
549
import time
550
551
# Start long-running task
552
result = add.delay(1000000, 2000000)
553
554
# Check if we can revoke it
555
if not result.ready():
556
print("Revoking task...")
557
result.revoke(terminate=True)
558
559
# Wait a moment and check
560
time.sleep(1)
561
if result.state == 'REVOKED':
562
print("Task was revoked")
563
564
# Forget about result
565
result.forget()
566
```
567
568
### Advanced Result Monitoring
569
570
```python
571
import time
572
573
def monitor_task(result, name="Task"):
574
"""Monitor task execution with detailed status."""
575
576
print(f"Monitoring {name} (ID: {result.id})")
577
578
# Polling loop
579
while not result.ready():
580
print(f" Status: {result.state}")
581
if hasattr(result, 'info') and result.info:
582
print(f" Info: {result.info}")
583
time.sleep(1)
584
585
# Final status
586
if result.successful():
587
print(f" ✓ Completed: {result.result}")
588
else:
589
print(f" ✗ Failed: {result.result}")
590
if result.traceback:
591
print(f" Traceback: {result.traceback}")
592
593
# Monitor task execution
594
result = add.delay(10, 20)
595
monitor_task(result, "Addition")
596
```
597
598
### Working with Task Metadata
599
600
```python
601
@app.task(bind=True)
602
def task_with_progress(self, total_items):
603
"""Task that reports progress."""
604
605
for i in range(total_items):
606
# Update task state with progress info
607
self.update_state(
608
state='PROGRESS',
609
meta={'current': i + 1, 'total': total_items}
610
)
611
time.sleep(0.1)
612
613
return {'status': 'Complete', 'processed': total_items}
614
615
# Execute and monitor progress
616
result = task_with_progress.delay(10)
617
618
while not result.ready():
619
if result.state == 'PROGRESS':
620
meta = result.info
621
progress = (meta['current'] / meta['total']) * 100
622
print(f"Progress: {progress:.1f}% ({meta['current']}/{meta['total']})")
623
time.sleep(0.5)
624
625
print(f"Final result: {result.get()}")
626
```
627
628
### Handling Task Dependencies
629
630
```python
631
from celery import chain
632
633
# Create dependent tasks
634
workflow = chain(
635
add.s(2, 3), # First task: 2 + 3 = 5
636
add.s(10), # Second task: 5 + 10 = 15
637
add.s(5) # Third task: 15 + 5 = 20
638
)
639
640
result = workflow.apply_async()
641
642
# The result represents the final task in the chain
643
print(f"Final result: {result.get()}") # 20
644
645
# Access parent results if needed
646
if hasattr(result, 'parent') and result.parent:
647
print(f"Parent result: {result.parent.get()}") # 15
648
if hasattr(result.parent, 'parent') and result.parent.parent:
649
print(f"Grandparent result: {result.parent.parent.get()}") # 5
650
```