Celery Result Backends using the Django ORM/Cache framework
—
Result backend implementations for storing Celery task results using Django's database ORM or cache framework. These backends integrate with Celery's result backend system to provide persistent storage and retrieval of task results.
Stores task results in Django database tables using the ORM, providing full query capabilities and persistence across application restarts.
class DatabaseBackend(BaseDictBackend):
"""Database backend for storing task results using Django ORM."""
TaskModel: type # Reference to TaskResult model
GroupModel: type # Reference to GroupResult model
subpolling_interval: float # Polling interval in seconds (0.5)
def __init__(self, *args, **kwargs):
"""Initialize the database backend."""
def exception_safe_to_retry(self, exc):
"""
Check if an exception is safe to retry.
Args:
exc: Exception instance to check
Returns:
bool: True if exception is safe to retry
"""
def _get_extended_properties(self, request, traceback):
"""
Extract extended properties from request.
Args:
request: Task request object
traceback: Task traceback string
Returns:
dict: Extended properties including task args, kwargs, worker, etc.
"""
def _get_meta_from_request(self, request=None):
"""
Extract meta attribute from request or current task.
Args:
request: Task request object (optional)
Returns:
dict: Meta data from request
"""
def _store_result(self, task_id, result, status, traceback=None, request=None, using=None):
"""
Store return value and status of an executed task.
Args:
task_id (str): Task ID
result: Task result value
status (str): Task status
traceback (str, optional): Task traceback
request: Task request object (optional)
using (str, optional): Database alias to use
Returns:
Encoded result value
"""
def _get_task_meta_for(self, task_id):
"""
Get task metadata for a task by id.
Args:
task_id (str): Task ID
Returns:
dict: Task metadata including result, status, args, kwargs
"""
def encode_content(self, data):
"""
Encode result data for storage.
Args:
data: Data to encode
Returns:
tuple: (content_type, content_encoding, encoded_content)
"""
def decode_content(self, obj, content):
"""
Decode stored result data.
Args:
obj: TaskResult model instance
content: Encoded content to decode
Returns:
Decoded data or None if content is empty
"""
def _forget(self, task_id):
"""
Delete task result by task_id.
Args:
task_id (str): Task ID to forget
"""
def cleanup(self):
"""Delete expired task and group results."""
def _restore_group(self, group_id):
"""
Return result value for a group by id.
Args:
group_id (str): Group ID
Returns:
dict or None: Group result data
"""
def _save_group(self, group_id, group_result):
"""
Store return value of group.
Args:
group_id (str): Group ID
group_result: GroupResult instance
Returns:
GroupResult: The stored group result
"""
def _delete_group(self, group_id):
"""
Delete group result by group_id.
Args:
group_id (str): Group ID to delete
"""
def apply_chord(self, header_result_args, body, **kwargs):
"""
Add a ChordCounter with the expected number of results.
Args:
header_result_args: GroupResult args or GroupResult instance
body: Chord body configuration
**kwargs: Additional keyword arguments
"""
def on_chord_part_return(self, request, state, result, **kwargs):
"""
Called when each part of a Chord header completes.
Args:
request: Task request object
state: Task state
result: Task result
**kwargs: Additional keyword arguments
"""# In your Django settings.py
INSTALLED_APPS = [
# ... other apps
'django_celery_results',
]
# Configure Celery to use the database backend
CELERY_RESULT_BACKEND = 'django-db'
# Or configure explicitly in your Celery configuration
from celery import Celery
from django_celery_results.backends import DatabaseBackend
app = Celery('myapp')
app.conf.result_backend = 'django_celery_results.backends:DatabaseBackend'
# Enable extended result information (stores args, kwargs, worker, etc.)
app.conf.result_extended = True
# Enable automatic retry on connection errors
app.conf.result_backend_always_retry = TrueStores task results in Django's cache framework, providing fast access but with potential for data loss if cache is cleared.
class CacheBackend(KeyValueStoreBackend):
"""Cache backend using Django cache framework for task metadata."""
serializer: str # Serializer name (set to 'pickle')
def __init__(self, *args, **kwargs):
"""Initialize the cache backend."""
def get(self, key):
"""
Get value from cache.
Args:
key (str or bytes): Cache key to retrieve
Returns:
Cached value or None if not found
"""
def set(self, key, value):
"""
Set value in cache.
Args:
key (str or bytes): Cache key
value: Value to store
"""
def delete(self, key):
"""
Delete value from cache.
Args:
key (str or bytes): Cache key to delete
"""
def encode(self, data):
"""
Encode data (pass-through for cache backend).
Args:
data: Data to encode
Returns:
Original data unchanged
"""
def decode(self, data):
"""
Decode data (pass-through for cache backend).
Args:
data: Data to decode
Returns:
Original data unchanged
"""
@property
def cache_backend(self):
"""
Get Django cache backend instance.
Returns:
Django cache backend based on configuration
"""# In your Celery configuration
CELERY_RESULT_BACKEND = 'django-cache'
# Specify which Django cache to use (optional)
CELERY_CACHE_BACKEND = 'redis' # Must match CACHES configuration
# Django cache configuration
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
}
}Both backends include built-in error handling and retry logic: