0
# Task Result Models
1
2
Django models for storing and querying Celery task results, group results, and chord coordination data. These models provide full ORM capabilities for result management and include custom managers with transaction retry logic.
3
4
## Capabilities
5
6
### Task Result Storage
7
8
The primary model for storing individual Celery task results with comprehensive metadata and execution information.
9
10
```python { .api }
11
class TaskResult(models.Model):
12
"""Task result and status storage model."""
13
14
# Identity and naming
15
task_id: models.CharField # max_length=255, unique=True
16
task_name: models.CharField # max_length=255, null=True
17
periodic_task_name: models.CharField # max_length=255, null=True
18
19
# Task parameters and execution context
20
task_args: models.TextField # null=True, JSON serialized positional arguments
21
task_kwargs: models.TextField # null=True, JSON serialized keyword arguments
22
worker: models.CharField # max_length=100, null=True, default=None
23
24
# Result and status information
25
status: models.CharField # max_length=50, default='PENDING'
26
result: models.TextField # null=True, default=None, editable=False
27
traceback: models.TextField # blank=True, null=True
28
meta: models.TextField # null=True, default=None, editable=False
29
30
# Content encoding and type
31
content_type: models.CharField # max_length=128
32
content_encoding: models.CharField # max_length=64
33
34
# Timestamps
35
date_created: models.DateTimeField # auto_now_add=True
36
date_started: models.DateTimeField # null=True, default=None
37
date_done: models.DateTimeField # auto_now=True
38
39
# Custom manager
40
objects: TaskResultManager
41
42
def as_dict(self):
43
"""
44
Convert task result to dictionary.
45
46
Returns:
47
dict: Task data as dictionary with all fields
48
"""
49
50
def __str__(self):
51
"""String representation showing task ID and status."""
52
53
class Meta:
54
ordering = ['-date_done']
55
verbose_name = 'task result'
56
verbose_name_plural = 'task results'
57
```
58
59
#### Usage Example
60
61
```python
62
from django_celery_results.models import TaskResult
63
from django.utils import timezone
64
from datetime import timedelta
65
66
# Query recent successful tasks
67
recent_success = TaskResult.objects.filter(
68
status='SUCCESS',
69
date_done__gte=timezone.now() - timedelta(hours=1)
70
)
71
72
# Find failed tasks with traceback
73
failed_tasks = TaskResult.objects.filter(
74
status='FAILURE',
75
traceback__isnull=False
76
).order_by('-date_done')[:10]
77
78
# Get task by ID
79
try:
80
task = TaskResult.objects.get(task_id='abc123')
81
print(f"Task status: {task.status}")
82
print(f"Task result: {task.result}")
83
task_data = task.as_dict()
84
except TaskResult.DoesNotExist:
85
print("Task not found")
86
87
# Filter by task name
88
specific_tasks = TaskResult.objects.filter(
89
task_name='myapp.tasks.process_data',
90
status='SUCCESS'
91
)
92
```
93
94
### Group Result Storage
95
96
Model for storing results of Celery group operations, enabling tracking of multiple related tasks.
97
98
```python { .api }
99
class GroupResult(models.Model):
100
"""Task group result and status storage model."""
101
102
# Identity
103
group_id: models.CharField # max_length=255, unique=True
104
105
# Result information
106
result: models.TextField # null=True, default=None, editable=False
107
content_type: models.CharField # max_length=128
108
content_encoding: models.CharField # max_length=64
109
110
# Timestamps
111
date_created: models.DateTimeField # auto_now_add=True
112
date_done: models.DateTimeField # auto_now=True
113
114
# Custom manager
115
objects: GroupResultManager
116
117
def as_dict(self):
118
"""
119
Convert group result to dictionary.
120
121
Returns:
122
dict: Group data as dictionary with all fields
123
"""
124
125
def __str__(self):
126
"""String representation showing group ID."""
127
128
class Meta:
129
ordering = ['-date_done']
130
verbose_name = 'group result'
131
verbose_name_plural = 'group results'
132
```
133
134
#### Usage Example
135
136
```python
137
from django_celery_results.models import GroupResult
138
139
# Query recent group results
140
recent_groups = GroupResult.objects.filter(
141
date_done__gte=timezone.now() - timedelta(days=1)
142
).order_by('-date_done')
143
144
# Get specific group
145
try:
146
group = GroupResult.objects.get(group_id='group-abc123')
147
group_data = group.as_dict()
148
print(f"Group completed: {group.date_done}")
149
except GroupResult.DoesNotExist:
150
print("Group not found")
151
```
152
153
### Chord Coordination
154
155
Model for managing Celery chord synchronization, tracking completion of chord header tasks.
156
157
```python { .api }
158
class ChordCounter(models.Model):
159
"""Chord synchronization and coordination model."""
160
161
# Identity and coordination
162
group_id: models.CharField # max_length=255, unique=True
163
sub_tasks: models.TextField # JSON serialized list of task result tuples
164
count: models.PositiveIntegerField # Starts at chord header length, decrements
165
166
def group_result(self, app=None):
167
"""
168
Return the GroupResult instance for this chord.
169
170
Args:
171
app: Celery app instance (optional)
172
173
Returns:
174
celery.result.GroupResult: Group result with all sub-task results
175
"""
176
```
177
178
#### Usage Example
179
180
```python
181
from django_celery_results.models import ChordCounter
182
from celery import current_app
183
184
# Check chord progress
185
try:
186
chord = ChordCounter.objects.get(group_id='chord-group-123')
187
print(f"Remaining tasks: {chord.count}")
188
189
# Get full group result
190
group_result = chord.group_result(app=current_app)
191
print(f"Group ready: {group_result.ready()}")
192
193
if group_result.ready():
194
results = group_result.results
195
print(f"All {len(results)} tasks completed")
196
except ChordCounter.DoesNotExist:
197
print("Chord already completed or not found")
198
```
199
200
## Custom Managers
201
202
### ResultManager
203
204
Base manager class providing common functionality for both TaskResult and GroupResult managers.
205
206
```python { .api }
207
class ResultManager(models.Manager):
208
"""Base manager for celery result models."""
209
210
def connection_for_write(self):
211
"""Get database connection for write operations."""
212
213
def connection_for_read(self):
214
"""Get database connection for read operations."""
215
216
def current_engine(self):
217
"""Get current database engine name."""
218
219
def get_all_expired(self, expires):
220
"""
221
Get all expired results.
222
223
Args:
224
expires: Expiration time delta
225
226
Returns:
227
QuerySet: Expired result instances
228
"""
229
230
def delete_expired(self, expires):
231
"""
232
Delete all expired results.
233
234
Args:
235
expires: Expiration time delta
236
"""
237
238
def warn_if_repeatable_read(self):
239
"""Warn if MySQL transaction isolation level is suboptimal."""
240
```
241
242
### TaskResultManager
243
244
Custom manager for TaskResult model with enhanced database operations and retry logic.
245
246
```python { .api }
247
class TaskResultManager(ResultManager):
248
"""Manager for TaskResult model with retry logic."""
249
250
_last_id: str # Last queried task ID for optimization
251
252
def get_task(self, task_id):
253
"""
254
Get or create task result by task ID.
255
256
Args:
257
task_id (str): Task ID to retrieve
258
259
Returns:
260
TaskResult: Existing or new TaskResult instance
261
"""
262
263
def store_result(self, content_type, content_encoding, task_id, result,
264
status, traceback=None, meta=None, periodic_task_name=None,
265
task_name=None, task_args=None, task_kwargs=None,
266
worker=None, using=None, **kwargs):
267
"""
268
Store task result with transaction retry logic.
269
270
Args:
271
content_type (str): MIME type of result content
272
content_encoding (str): Encoding type
273
task_id (str): Unique task identifier
274
result (str): Serialized task result
275
status (str): Task status
276
traceback (str, optional): Exception traceback
277
meta (str, optional): Serialized metadata
278
periodic_task_name (str, optional): Periodic task name
279
task_name (str, optional): Task name
280
task_args (str, optional): Serialized arguments
281
task_kwargs (str, optional): Serialized keyword arguments
282
worker (str, optional): Worker hostname
283
using (str, optional): Database connection name
284
**kwargs: Additional fields including date_started
285
286
Returns:
287
TaskResult: Created or updated TaskResult instance
288
"""
289
290
def get_all_expired(self, expires):
291
"""
292
Get all expired task results.
293
294
Args:
295
expires: Expiration time delta
296
297
Returns:
298
QuerySet: Expired TaskResult instances
299
"""
300
301
def delete_expired(self, expires):
302
"""
303
Delete expired task results.
304
305
Args:
306
expires: Expiration time delta
307
"""
308
309
def warn_if_repeatable_read(self):
310
"""Warn if MySQL transaction isolation level is suboptimal."""
311
```
312
313
### GroupResultManager
314
315
Custom manager for GroupResult model with database operations and retry logic.
316
317
```python { .api }
318
class GroupResultManager(ResultManager):
319
"""Manager for GroupResult model with retry logic."""
320
321
_last_id: str # Last queried group ID for optimization
322
323
def get_group(self, group_id):
324
"""
325
Get or create group result by group ID.
326
327
Args:
328
group_id (str): Group ID to retrieve
329
330
Returns:
331
GroupResult: Existing or new GroupResult instance
332
"""
333
334
def store_group_result(self, content_type, content_encoding, group_id,
335
result, using=None):
336
"""
337
Store group result with transaction retry logic.
338
339
Args:
340
content_type (str): MIME type of result content
341
content_encoding (str): Encoding type
342
group_id (str): Unique group identifier
343
result (str): Serialized group result
344
using (str, optional): Database connection name
345
346
Returns:
347
GroupResult: Created or updated GroupResult instance
348
"""
349
350
def get_all_expired(self, expires):
351
"""
352
Get all expired group results.
353
354
Args:
355
expires: Expiration time delta
356
357
Returns:
358
QuerySet: Expired GroupResult instances
359
"""
360
361
def delete_expired(self, expires):
362
"""
363
Delete expired group results.
364
365
Args:
366
expires: Expiration time delta
367
"""
368
369
def warn_if_repeatable_read(self):
370
"""Warn if MySQL transaction isolation level is suboptimal."""
371
```
372
373
## Database Schema
374
375
### Indexes
376
377
The models include optimized database indexes for common query patterns:
378
379
**TaskResult indexes:**
380
- `task_name` - For filtering by task type
381
- `status` - For filtering by task status
382
- `worker` - For filtering by worker
383
- `date_created` - For chronological ordering
384
- `date_done` - For completion time queries
385
- `periodic_task_name` - For periodic task queries
386
387
**GroupResult indexes:**
388
- `date_created` - For chronological ordering
389
- `date_done` - For completion time queries
390
391
### Configuration Settings
392
393
- **DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH**: Maximum length for task_id fields (default: 255)
394
- **DJANGO_CELERY_RESULTS['ALLOW_EDITS']**: Enable editing in Django admin (default: False)