0
# Backend Configuration
1
2
Result backend implementations for storing Celery task results using Django's database ORM or cache framework. These backends integrate with Celery's result backend system to provide persistent storage and retrieval of task results.
3
4
## Capabilities
5
6
### Database Backend
7
8
Stores task results in Django database tables using the ORM, providing full query capabilities and persistence across application restarts.
9
10
```python { .api }
11
class DatabaseBackend(BaseDictBackend):
12
"""Database backend for storing task results using Django ORM."""
13
14
TaskModel: type # Reference to TaskResult model
15
GroupModel: type # Reference to GroupResult model
16
subpolling_interval: float # Polling interval in seconds (0.5)
17
18
def __init__(self, *args, **kwargs):
19
"""Initialize the database backend."""
20
21
def exception_safe_to_retry(self, exc):
22
"""
23
Check if an exception is safe to retry.
24
25
Args:
26
exc: Exception instance to check
27
28
Returns:
29
bool: True if exception is safe to retry
30
"""
31
32
def _get_extended_properties(self, request, traceback):
33
"""
34
Extract extended properties from request.
35
36
Args:
37
request: Task request object
38
traceback: Task traceback string
39
40
Returns:
41
dict: Extended properties including task args, kwargs, worker, etc.
42
"""
43
44
def _get_meta_from_request(self, request=None):
45
"""
46
Extract meta attribute from request or current task.
47
48
Args:
49
request: Task request object (optional)
50
51
Returns:
52
dict: Meta data from request
53
"""
54
55
def _store_result(self, task_id, result, status, traceback=None, request=None, using=None):
56
"""
57
Store return value and status of an executed task.
58
59
Args:
60
task_id (str): Task ID
61
result: Task result value
62
status (str): Task status
63
traceback (str, optional): Task traceback
64
request: Task request object (optional)
65
using (str, optional): Database alias to use
66
67
Returns:
68
Encoded result value
69
"""
70
71
def _get_task_meta_for(self, task_id):
72
"""
73
Get task metadata for a task by id.
74
75
Args:
76
task_id (str): Task ID
77
78
Returns:
79
dict: Task metadata including result, status, args, kwargs
80
"""
81
82
def encode_content(self, data):
83
"""
84
Encode result data for storage.
85
86
Args:
87
data: Data to encode
88
89
Returns:
90
tuple: (content_type, content_encoding, encoded_content)
91
"""
92
93
def decode_content(self, obj, content):
94
"""
95
Decode stored result data.
96
97
Args:
98
obj: TaskResult model instance
99
content: Encoded content to decode
100
101
Returns:
102
Decoded data or None if content is empty
103
"""
104
105
def _forget(self, task_id):
106
"""
107
Delete task result by task_id.
108
109
Args:
110
task_id (str): Task ID to forget
111
"""
112
113
def cleanup(self):
114
"""Delete expired task and group results."""
115
116
def _restore_group(self, group_id):
117
"""
118
Return result value for a group by id.
119
120
Args:
121
group_id (str): Group ID
122
123
Returns:
124
dict or None: Group result data
125
"""
126
127
def _save_group(self, group_id, group_result):
128
"""
129
Store return value of group.
130
131
Args:
132
group_id (str): Group ID
133
group_result: GroupResult instance
134
135
Returns:
136
GroupResult: The stored group result
137
"""
138
139
def _delete_group(self, group_id):
140
"""
141
Delete group result by group_id.
142
143
Args:
144
group_id (str): Group ID to delete
145
"""
146
147
def apply_chord(self, header_result_args, body, **kwargs):
148
"""
149
Add a ChordCounter with the expected number of results.
150
151
Args:
152
header_result_args: GroupResult args or GroupResult instance
153
body: Chord body configuration
154
**kwargs: Additional keyword arguments
155
"""
156
157
def on_chord_part_return(self, request, state, result, **kwargs):
158
"""
159
Called when each part of a Chord header completes.
160
161
Args:
162
request: Task request object
163
state: Task state
164
result: Task result
165
**kwargs: Additional keyword arguments
166
"""
167
```
168
169
#### Usage Example
170
171
```python
172
# In your Django settings.py
173
INSTALLED_APPS = [
174
# ... other apps
175
'django_celery_results',
176
]
177
178
# Configure Celery to use the database backend
179
CELERY_RESULT_BACKEND = 'django-db'
180
181
# Or configure explicitly in your Celery configuration
182
from celery import Celery
183
from django_celery_results.backends import DatabaseBackend
184
185
app = Celery('myapp')
186
app.conf.result_backend = 'django_celery_results.backends:DatabaseBackend'
187
188
# Enable extended result information (stores args, kwargs, worker, etc.)
189
app.conf.result_extended = True
190
191
# Enable automatic retry on connection errors
192
app.conf.result_backend_always_retry = True
193
```
194
195
### Cache Backend
196
197
Stores task results in Django's cache framework, providing fast access but with potential for data loss if cache is cleared.
198
199
```python { .api }
200
class CacheBackend(KeyValueStoreBackend):
201
"""Cache backend using Django cache framework for task metadata."""
202
203
serializer: str # Serializer name (set to 'pickle')
204
205
def __init__(self, *args, **kwargs):
206
"""Initialize the cache backend."""
207
208
def get(self, key):
209
"""
210
Get value from cache.
211
212
Args:
213
key (str or bytes): Cache key to retrieve
214
215
Returns:
216
Cached value or None if not found
217
"""
218
219
def set(self, key, value):
220
"""
221
Set value in cache.
222
223
Args:
224
key (str or bytes): Cache key
225
value: Value to store
226
"""
227
228
def delete(self, key):
229
"""
230
Delete value from cache.
231
232
Args:
233
key (str or bytes): Cache key to delete
234
"""
235
236
def encode(self, data):
237
"""
238
Encode data (pass-through for cache backend).
239
240
Args:
241
data: Data to encode
242
243
Returns:
244
Original data unchanged
245
"""
246
247
def decode(self, data):
248
"""
249
Decode data (pass-through for cache backend).
250
251
Args:
252
data: Data to decode
253
254
Returns:
255
Original data unchanged
256
"""
257
258
@property
259
def cache_backend(self):
260
"""
261
Get Django cache backend instance.
262
263
Returns:
264
Django cache backend based on configuration
265
"""
266
```
267
268
#### Usage Example
269
270
```python
271
# In your Celery configuration
272
CELERY_RESULT_BACKEND = 'django-cache'
273
274
# Specify which Django cache to use (optional)
275
CELERY_CACHE_BACKEND = 'redis' # Must match CACHES configuration
276
277
# Django cache configuration
278
CACHES = {
279
'default': {
280
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
281
'LOCATION': 'redis://127.0.0.1:6379/1',
282
}
283
}
284
```
285
286
## Configuration Options
287
288
### Database Backend Settings
289
290
- **DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH**: Maximum length for task IDs (default: 255, recommended: 191 for MySQL)
291
- **result_backend_always_retry**: Enable automatic retry on database connection errors
292
- **result_extended**: Store extended task information (args, kwargs, worker, etc.)
293
294
### Cache Backend Settings
295
296
- **cache_backend**: Name of Django cache backend to use (default: 'default')
297
- **result_expires**: Expiration time for cached results (default: 1 day)
298
299
## Error Handling
300
301
Both backends include built-in error handling and retry logic:
302
303
- **Connection Errors**: Automatically retry on database/cache connection failures
304
- **Transaction Conflicts**: Retry database operations with exponential backoff
305
- **Encoding Errors**: Graceful handling of data serialization issues
306
- **Chord Coordination**: Safe handling of concurrent chord part completion