CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-django-celery-results

Celery Result Backends using the Django ORM/Cache framework

Pending
Overview
Eval results
Files

models.mddocs/

Task Result Models

Django models for storing and querying Celery task results, group results, and chord coordination data. These models provide full ORM capabilities for result management and include custom managers with transaction retry logic.

Capabilities

Task Result Storage

The primary model for storing individual Celery task results with comprehensive metadata and execution information.

class TaskResult(models.Model):
    """Task result and status storage model."""
    
    # Identity and naming
    task_id: models.CharField  # max_length=255, unique=True
    task_name: models.CharField  # max_length=255, null=True
    periodic_task_name: models.CharField  # max_length=255, null=True
    
    # Task parameters and execution context
    task_args: models.TextField  # null=True, JSON serialized positional arguments
    task_kwargs: models.TextField  # null=True, JSON serialized keyword arguments
    worker: models.CharField  # max_length=100, null=True, default=None
    
    # Result and status information
    status: models.CharField  # max_length=50, default='PENDING'
    result: models.TextField  # null=True, default=None, editable=False
    traceback: models.TextField  # blank=True, null=True
    meta: models.TextField  # null=True, default=None, editable=False
    
    # Content encoding and type
    content_type: models.CharField  # max_length=128
    content_encoding: models.CharField  # max_length=64
    
    # Timestamps
    date_created: models.DateTimeField  # auto_now_add=True
    date_started: models.DateTimeField  # null=True, default=None
    date_done: models.DateTimeField  # auto_now=True
    
    # Custom manager
    objects: TaskResultManager
    
    def as_dict(self):
        """
        Convert task result to dictionary.
        
        Returns:
            dict: Task data as dictionary with all fields
        """
    
    def __str__(self):
        """String representation showing task ID and status."""
    
    class Meta:
        ordering = ['-date_done']
        verbose_name = 'task result'
        verbose_name_plural = 'task results'

Usage Example

from django_celery_results.models import TaskResult
from django.utils import timezone
from datetime import timedelta

# Query recent successful tasks
recent_success = TaskResult.objects.filter(
    status='SUCCESS',
    date_done__gte=timezone.now() - timedelta(hours=1)
)

# Find failed tasks with traceback
failed_tasks = TaskResult.objects.filter(
    status='FAILURE',
    traceback__isnull=False
).order_by('-date_done')[:10]

# Get task by ID
try:
    task = TaskResult.objects.get(task_id='abc123')
    print(f"Task status: {task.status}")
    print(f"Task result: {task.result}")
    task_data = task.as_dict()
except TaskResult.DoesNotExist:
    print("Task not found")

# Filter by task name
specific_tasks = TaskResult.objects.filter(
    task_name='myapp.tasks.process_data',
    status='SUCCESS'
)

Group Result Storage

Model for storing results of Celery group operations, enabling tracking of multiple related tasks.

class GroupResult(models.Model):
    """Task group result and status storage model."""
    
    # Identity
    group_id: models.CharField  # max_length=255, unique=True
    
    # Result information
    result: models.TextField  # null=True, default=None, editable=False
    content_type: models.CharField  # max_length=128
    content_encoding: models.CharField  # max_length=64
    
    # Timestamps
    date_created: models.DateTimeField  # auto_now_add=True
    date_done: models.DateTimeField  # auto_now=True
    
    # Custom manager
    objects: GroupResultManager
    
    def as_dict(self):
        """
        Convert group result to dictionary.
        
        Returns:
            dict: Group data as dictionary with all fields
        """
    
    def __str__(self):
        """String representation showing group ID."""
        
    class Meta:
        ordering = ['-date_done']
        verbose_name = 'group result'
        verbose_name_plural = 'group results'

Usage Example

from django_celery_results.models import GroupResult

# Query recent group results
recent_groups = GroupResult.objects.filter(
    date_done__gte=timezone.now() - timedelta(days=1)
).order_by('-date_done')

# Get specific group
try:
    group = GroupResult.objects.get(group_id='group-abc123')
    group_data = group.as_dict()
    print(f"Group completed: {group.date_done}")
except GroupResult.DoesNotExist:
    print("Group not found")

Chord Coordination

Model for managing Celery chord synchronization, tracking completion of chord header tasks.

class ChordCounter(models.Model):
    """Chord synchronization and coordination model."""
    
    # Identity and coordination
    group_id: models.CharField  # max_length=255, unique=True
    sub_tasks: models.TextField  # JSON serialized list of task result tuples
    count: models.PositiveIntegerField  # Starts at chord header length, decrements
    
    def group_result(self, app=None):
        """
        Return the GroupResult instance for this chord.
        
        Args:
            app: Celery app instance (optional)
            
        Returns:
            celery.result.GroupResult: Group result with all sub-task results
        """

Usage Example

from django_celery_results.models import ChordCounter
from celery import current_app

# Check chord progress
try:
    chord = ChordCounter.objects.get(group_id='chord-group-123')
    print(f"Remaining tasks: {chord.count}")
    
    # Get full group result
    group_result = chord.group_result(app=current_app)
    print(f"Group ready: {group_result.ready()}")
    
    if group_result.ready():
        results = group_result.results
        print(f"All {len(results)} tasks completed")
except ChordCounter.DoesNotExist:
    print("Chord already completed or not found")

Custom Managers

ResultManager

Base manager class providing common functionality for both TaskResult and GroupResult managers.

class ResultManager(models.Manager):
    """Base manager for celery result models."""

    def connection_for_write(self):
        """Get database connection for write operations."""

    def connection_for_read(self):
        """Get database connection for read operations."""

    def current_engine(self):
        """Get current database engine name."""

    def get_all_expired(self, expires):
        """
        Get all expired results.
        
        Args:
            expires: Expiration time delta
            
        Returns:
            QuerySet: Expired result instances
        """

    def delete_expired(self, expires):
        """
        Delete all expired results.
        
        Args:
            expires: Expiration time delta
        """

    def warn_if_repeatable_read(self):
        """Warn if MySQL transaction isolation level is suboptimal."""

TaskResultManager

Custom manager for TaskResult model with enhanced database operations and retry logic.

class TaskResultManager(ResultManager):
    """Manager for TaskResult model with retry logic."""
    
    _last_id: str  # Last queried task ID for optimization
    
    def get_task(self, task_id):
        """
        Get or create task result by task ID.
        
        Args:
            task_id (str): Task ID to retrieve
            
        Returns:
            TaskResult: Existing or new TaskResult instance
        """
    
    def store_result(self, content_type, content_encoding, task_id, result, 
                    status, traceback=None, meta=None, periodic_task_name=None,
                    task_name=None, task_args=None, task_kwargs=None, 
                    worker=None, using=None, **kwargs):
        """
        Store task result with transaction retry logic.
        
        Args:
            content_type (str): MIME type of result content
            content_encoding (str): Encoding type
            task_id (str): Unique task identifier
            result (str): Serialized task result
            status (str): Task status
            traceback (str, optional): Exception traceback
            meta (str, optional): Serialized metadata
            periodic_task_name (str, optional): Periodic task name
            task_name (str, optional): Task name
            task_args (str, optional): Serialized arguments
            task_kwargs (str, optional): Serialized keyword arguments
            worker (str, optional): Worker hostname
            using (str, optional): Database connection name
            **kwargs: Additional fields including date_started
            
        Returns:
            TaskResult: Created or updated TaskResult instance
        """
    
    def get_all_expired(self, expires):
        """
        Get all expired task results.
        
        Args:
            expires: Expiration time delta
            
        Returns:
            QuerySet: Expired TaskResult instances
        """
    
    def delete_expired(self, expires):
        """
        Delete expired task results.
        
        Args:
            expires: Expiration time delta
        """
    
    def warn_if_repeatable_read(self):
        """Warn if MySQL transaction isolation level is suboptimal."""

GroupResultManager

Custom manager for GroupResult model with database operations and retry logic.

class GroupResultManager(ResultManager):
    """Manager for GroupResult model with retry logic."""
    
    _last_id: str  # Last queried group ID for optimization
    
    def get_group(self, group_id):
        """
        Get or create group result by group ID.
        
        Args:
            group_id (str): Group ID to retrieve
            
        Returns:
            GroupResult: Existing or new GroupResult instance
        """
    
    def store_group_result(self, content_type, content_encoding, group_id, 
                          result, using=None):
        """
        Store group result with transaction retry logic.
        
        Args:
            content_type (str): MIME type of result content
            content_encoding (str): Encoding type
            group_id (str): Unique group identifier
            result (str): Serialized group result
            using (str, optional): Database connection name
            
        Returns:
            GroupResult: Created or updated GroupResult instance
        """
    
    def get_all_expired(self, expires):
        """
        Get all expired group results.
        
        Args:
            expires: Expiration time delta
            
        Returns:
            QuerySet: Expired GroupResult instances
        """
    
    def delete_expired(self, expires):
        """
        Delete expired group results.
        
        Args:
            expires: Expiration time delta
        """
    
    def warn_if_repeatable_read(self):
        """Warn if MySQL transaction isolation level is suboptimal."""

Database Schema

Indexes

The models include optimized database indexes for common query patterns:

TaskResult indexes:

  • task_name - For filtering by task type
  • status - For filtering by task status
  • worker - For filtering by worker
  • date_created - For chronological ordering
  • date_done - For completion time queries
  • periodic_task_name - For periodic task queries

GroupResult indexes:

  • date_created - For chronological ordering
  • date_done - For completion time queries

Configuration Settings

  • DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH: Maximum length for task_id fields (default: 255)
  • DJANGO_CELERY_RESULTS['ALLOW_EDITS']: Enable editing in Django admin (default: False)

Install with Tessl CLI

npx tessl i tessl/pypi-django-celery-results

docs

admin.md

backends.md

index.md

models.md

views.md

tile.json