CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-django-celery-results

Celery Result Backends using the Django ORM/Cache framework

Pending
Overview
Eval results
Files

backends.mddocs/

Backend Configuration

Result backend implementations for storing Celery task results using Django's database ORM or cache framework. These backends integrate with Celery's result backend system to provide persistent storage and retrieval of task results.

Capabilities

Database Backend

Stores task results in Django database tables using the ORM, providing full query capabilities and persistence across application restarts.

class DatabaseBackend(BaseDictBackend):
    """Database backend for storing task results using Django ORM."""
    
    TaskModel: type  # Reference to TaskResult model
    GroupModel: type  # Reference to GroupResult model  
    subpolling_interval: float  # Polling interval in seconds (0.5)
    
    def __init__(self, *args, **kwargs):
        """Initialize the database backend."""
    
    def exception_safe_to_retry(self, exc):
        """
        Check if an exception is safe to retry.
        
        Args:
            exc: Exception instance to check
            
        Returns:
            bool: True if exception is safe to retry
        """
    
    def _get_extended_properties(self, request, traceback):
        """
        Extract extended properties from request.
        
        Args:
            request: Task request object
            traceback: Task traceback string
            
        Returns:
            dict: Extended properties including task args, kwargs, worker, etc.
        """
        
    def _get_meta_from_request(self, request=None):
        """
        Extract meta attribute from request or current task.
        
        Args:
            request: Task request object (optional)
            
        Returns:
            dict: Meta data from request
        """
    
    def _store_result(self, task_id, result, status, traceback=None, request=None, using=None):
        """
        Store return value and status of an executed task.
        
        Args:
            task_id (str): Task ID
            result: Task result value
            status (str): Task status
            traceback (str, optional): Task traceback
            request: Task request object (optional)
            using (str, optional): Database alias to use
            
        Returns:
            Encoded result value
        """
    
    def _get_task_meta_for(self, task_id):
        """
        Get task metadata for a task by id.
        
        Args:
            task_id (str): Task ID
            
        Returns:
            dict: Task metadata including result, status, args, kwargs
        """
    
    def encode_content(self, data):
        """
        Encode result data for storage.
        
        Args:
            data: Data to encode
            
        Returns:
            tuple: (content_type, content_encoding, encoded_content)
        """
    
    def decode_content(self, obj, content):
        """
        Decode stored result data.
        
        Args:
            obj: TaskResult model instance
            content: Encoded content to decode
            
        Returns:
            Decoded data or None if content is empty
        """
    
    def _forget(self, task_id):
        """
        Delete task result by task_id.
        
        Args:
            task_id (str): Task ID to forget
        """
    
    def cleanup(self):
        """Delete expired task and group results."""
        
    def _restore_group(self, group_id):
        """
        Return result value for a group by id.
        
        Args:
            group_id (str): Group ID
            
        Returns:
            dict or None: Group result data
        """
        
    def _save_group(self, group_id, group_result):
        """
        Store return value of group.
        
        Args:
            group_id (str): Group ID
            group_result: GroupResult instance
            
        Returns:
            GroupResult: The stored group result
        """
        
    def _delete_group(self, group_id):
        """
        Delete group result by group_id.
        
        Args:
            group_id (str): Group ID to delete
        """
    
    def apply_chord(self, header_result_args, body, **kwargs):
        """
        Add a ChordCounter with the expected number of results.
        
        Args:
            header_result_args: GroupResult args or GroupResult instance
            body: Chord body configuration
            **kwargs: Additional keyword arguments
        """
    
    def on_chord_part_return(self, request, state, result, **kwargs):
        """
        Called when each part of a Chord header completes.
        
        Args:
            request: Task request object
            state: Task state
            result: Task result
            **kwargs: Additional keyword arguments
        """

Usage Example

# In your Django settings.py
INSTALLED_APPS = [
    # ... other apps
    'django_celery_results',
]

# Configure Celery to use the database backend
CELERY_RESULT_BACKEND = 'django-db'

# Or configure explicitly in your Celery configuration
from celery import Celery
from django_celery_results.backends import DatabaseBackend

app = Celery('myapp')
app.conf.result_backend = 'django_celery_results.backends:DatabaseBackend'

# Enable extended result information (stores args, kwargs, worker, etc.)
app.conf.result_extended = True

# Enable automatic retry on connection errors
app.conf.result_backend_always_retry = True

Cache Backend

Stores task results in Django's cache framework, providing fast access but with potential for data loss if cache is cleared.

class CacheBackend(KeyValueStoreBackend):
    """Cache backend using Django cache framework for task metadata."""
    
    serializer: str  # Serializer name (set to 'pickle')
    
    def __init__(self, *args, **kwargs):
        """Initialize the cache backend."""
    
    def get(self, key):
        """
        Get value from cache.
        
        Args:
            key (str or bytes): Cache key to retrieve
            
        Returns:
            Cached value or None if not found
        """
    
    def set(self, key, value):
        """
        Set value in cache.
        
        Args:
            key (str or bytes): Cache key
            value: Value to store
        """
    
    def delete(self, key):
        """
        Delete value from cache.
        
        Args:
            key (str or bytes): Cache key to delete
        """
    
    def encode(self, data):
        """
        Encode data (pass-through for cache backend).
        
        Args:
            data: Data to encode
            
        Returns:
            Original data unchanged
        """
    
    def decode(self, data):
        """
        Decode data (pass-through for cache backend).
        
        Args:
            data: Data to decode
            
        Returns:
            Original data unchanged
        """
    
    @property
    def cache_backend(self):
        """
        Get Django cache backend instance.
        
        Returns:
            Django cache backend based on configuration
        """

Usage Example

# In your Celery configuration
CELERY_RESULT_BACKEND = 'django-cache'

# Specify which Django cache to use (optional)
CELERY_CACHE_BACKEND = 'redis'  # Must match CACHES configuration

# Django cache configuration
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',
    }
}

Configuration Options

Database Backend Settings

  • DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH: Maximum length for task IDs (default: 255, recommended: 191 for MySQL)
  • result_backend_always_retry: Enable automatic retry on database connection errors
  • result_extended: Store extended task information (args, kwargs, worker, etc.)

Cache Backend Settings

  • cache_backend: Name of Django cache backend to use (default: 'default')
  • result_expires: Expiration time for cached results (default: 1 day)

Error Handling

Both backends include built-in error handling and retry logic:

  • Connection Errors: Automatically retry on database/cache connection failures
  • Transaction Conflicts: Retry database operations with exponential backoff
  • Encoding Errors: Graceful handling of data serialization issues
  • Chord Coordination: Safe handling of concurrent chord part completion

Install with Tessl CLI

npx tessl i tessl/pypi-django-celery-results

docs

admin.md

backends.md

index.md

models.md

views.md

tile.json