Celery Result Backends using the Django ORM/Cache framework
—
Django models for storing and querying Celery task results, group results, and chord coordination data. These models provide full ORM capabilities for result management and include custom managers with transaction retry logic.
The primary model for storing individual Celery task results with comprehensive metadata and execution information.
class TaskResult(models.Model):
"""Task result and status storage model."""
# Identity and naming
task_id: models.CharField # max_length=255, unique=True
task_name: models.CharField # max_length=255, null=True
periodic_task_name: models.CharField # max_length=255, null=True
# Task parameters and execution context
task_args: models.TextField # null=True, JSON serialized positional arguments
task_kwargs: models.TextField # null=True, JSON serialized keyword arguments
worker: models.CharField # max_length=100, null=True, default=None
# Result and status information
status: models.CharField # max_length=50, default='PENDING'
result: models.TextField # null=True, default=None, editable=False
traceback: models.TextField # blank=True, null=True
meta: models.TextField # null=True, default=None, editable=False
# Content encoding and type
content_type: models.CharField # max_length=128
content_encoding: models.CharField # max_length=64
# Timestamps
date_created: models.DateTimeField # auto_now_add=True
date_started: models.DateTimeField # null=True, default=None
date_done: models.DateTimeField # auto_now=True
# Custom manager
objects: TaskResultManager
def as_dict(self):
"""
Convert task result to dictionary.
Returns:
dict: Task data as dictionary with all fields
"""
def __str__(self):
"""String representation showing task ID and status."""
class Meta:
ordering = ['-date_done']
verbose_name = 'task result'
verbose_name_plural = 'task results'from django_celery_results.models import TaskResult
from django.utils import timezone
from datetime import timedelta
# Query recent successful tasks
recent_success = TaskResult.objects.filter(
status='SUCCESS',
date_done__gte=timezone.now() - timedelta(hours=1)
)
# Find failed tasks with traceback
failed_tasks = TaskResult.objects.filter(
status='FAILURE',
traceback__isnull=False
).order_by('-date_done')[:10]
# Get task by ID
try:
task = TaskResult.objects.get(task_id='abc123')
print(f"Task status: {task.status}")
print(f"Task result: {task.result}")
task_data = task.as_dict()
except TaskResult.DoesNotExist:
print("Task not found")
# Filter by task name
specific_tasks = TaskResult.objects.filter(
task_name='myapp.tasks.process_data',
status='SUCCESS'
)Model for storing results of Celery group operations, enabling tracking of multiple related tasks.
class GroupResult(models.Model):
"""Task group result and status storage model."""
# Identity
group_id: models.CharField # max_length=255, unique=True
# Result information
result: models.TextField # null=True, default=None, editable=False
content_type: models.CharField # max_length=128
content_encoding: models.CharField # max_length=64
# Timestamps
date_created: models.DateTimeField # auto_now_add=True
date_done: models.DateTimeField # auto_now=True
# Custom manager
objects: GroupResultManager
def as_dict(self):
"""
Convert group result to dictionary.
Returns:
dict: Group data as dictionary with all fields
"""
def __str__(self):
"""String representation showing group ID."""
class Meta:
ordering = ['-date_done']
verbose_name = 'group result'
verbose_name_plural = 'group results'from django_celery_results.models import GroupResult
# Query recent group results
recent_groups = GroupResult.objects.filter(
date_done__gte=timezone.now() - timedelta(days=1)
).order_by('-date_done')
# Get specific group
try:
group = GroupResult.objects.get(group_id='group-abc123')
group_data = group.as_dict()
print(f"Group completed: {group.date_done}")
except GroupResult.DoesNotExist:
print("Group not found")Model for managing Celery chord synchronization, tracking completion of chord header tasks.
class ChordCounter(models.Model):
"""Chord synchronization and coordination model."""
# Identity and coordination
group_id: models.CharField # max_length=255, unique=True
sub_tasks: models.TextField # JSON serialized list of task result tuples
count: models.PositiveIntegerField # Starts at chord header length, decrements
def group_result(self, app=None):
"""
Return the GroupResult instance for this chord.
Args:
app: Celery app instance (optional)
Returns:
celery.result.GroupResult: Group result with all sub-task results
"""from django_celery_results.models import ChordCounter
from celery import current_app
# Check chord progress
try:
chord = ChordCounter.objects.get(group_id='chord-group-123')
print(f"Remaining tasks: {chord.count}")
# Get full group result
group_result = chord.group_result(app=current_app)
print(f"Group ready: {group_result.ready()}")
if group_result.ready():
results = group_result.results
print(f"All {len(results)} tasks completed")
except ChordCounter.DoesNotExist:
print("Chord already completed or not found")Base manager class providing common functionality for both TaskResult and GroupResult managers.
class ResultManager(models.Manager):
"""Base manager for celery result models."""
def connection_for_write(self):
"""Get database connection for write operations."""
def connection_for_read(self):
"""Get database connection for read operations."""
def current_engine(self):
"""Get current database engine name."""
def get_all_expired(self, expires):
"""
Get all expired results.
Args:
expires: Expiration time delta
Returns:
QuerySet: Expired result instances
"""
def delete_expired(self, expires):
"""
Delete all expired results.
Args:
expires: Expiration time delta
"""
def warn_if_repeatable_read(self):
"""Warn if MySQL transaction isolation level is suboptimal."""Custom manager for TaskResult model with enhanced database operations and retry logic.
class TaskResultManager(ResultManager):
"""Manager for TaskResult model with retry logic."""
_last_id: str # Last queried task ID for optimization
def get_task(self, task_id):
"""
Get or create task result by task ID.
Args:
task_id (str): Task ID to retrieve
Returns:
TaskResult: Existing or new TaskResult instance
"""
def store_result(self, content_type, content_encoding, task_id, result,
status, traceback=None, meta=None, periodic_task_name=None,
task_name=None, task_args=None, task_kwargs=None,
worker=None, using=None, **kwargs):
"""
Store task result with transaction retry logic.
Args:
content_type (str): MIME type of result content
content_encoding (str): Encoding type
task_id (str): Unique task identifier
result (str): Serialized task result
status (str): Task status
traceback (str, optional): Exception traceback
meta (str, optional): Serialized metadata
periodic_task_name (str, optional): Periodic task name
task_name (str, optional): Task name
task_args (str, optional): Serialized arguments
task_kwargs (str, optional): Serialized keyword arguments
worker (str, optional): Worker hostname
using (str, optional): Database connection name
**kwargs: Additional fields including date_started
Returns:
TaskResult: Created or updated TaskResult instance
"""
def get_all_expired(self, expires):
"""
Get all expired task results.
Args:
expires: Expiration time delta
Returns:
QuerySet: Expired TaskResult instances
"""
def delete_expired(self, expires):
"""
Delete expired task results.
Args:
expires: Expiration time delta
"""
def warn_if_repeatable_read(self):
"""Warn if MySQL transaction isolation level is suboptimal."""Custom manager for GroupResult model with database operations and retry logic.
class GroupResultManager(ResultManager):
"""Manager for GroupResult model with retry logic."""
_last_id: str # Last queried group ID for optimization
def get_group(self, group_id):
"""
Get or create group result by group ID.
Args:
group_id (str): Group ID to retrieve
Returns:
GroupResult: Existing or new GroupResult instance
"""
def store_group_result(self, content_type, content_encoding, group_id,
result, using=None):
"""
Store group result with transaction retry logic.
Args:
content_type (str): MIME type of result content
content_encoding (str): Encoding type
group_id (str): Unique group identifier
result (str): Serialized group result
using (str, optional): Database connection name
Returns:
GroupResult: Created or updated GroupResult instance
"""
def get_all_expired(self, expires):
"""
Get all expired group results.
Args:
expires: Expiration time delta
Returns:
QuerySet: Expired GroupResult instances
"""
def delete_expired(self, expires):
"""
Delete expired group results.
Args:
expires: Expiration time delta
"""
def warn_if_repeatable_read(self):
"""Warn if MySQL transaction isolation level is suboptimal."""The models include optimized database indexes for common query patterns:
TaskResult indexes:
task_name - For filtering by task typestatus - For filtering by task statusworker - For filtering by workerdate_created - For chronological orderingdate_done - For completion time queriesperiodic_task_name - For periodic task queriesGroupResult indexes:
date_created - For chronological orderingdate_done - For completion time queries