CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-celery

Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Complete exception hierarchy for Celery task errors, retry mechanisms, timeout handling, backend errors, and worker-related exceptions. These exceptions provide fine-grained error handling and control flow for robust distributed task processing.

Capabilities

Core Exceptions

Base exceptions and fundamental error types that form the foundation of Celery's error handling system.

class CeleryError(Exception):
    """
    Base exception for all Celery errors.
    """

class ImproperlyConfigured(CeleryError):
    """
    Raised when Celery is improperly configured.
    
    Common causes:
    - Missing broker configuration
    - Invalid serializer settings
    - Incorrect backend configuration
    """

class SecurityError(CeleryError):
    """
    Raised when security-related operations fail.
    
    Common causes:
    - Invalid message signatures
    - Authentication failures
    - SSL/TLS errors
    """

class OperationalError(Exception):
    """
    Raised when transport connection error occurs while sending messages.
    
    Note:
        This exception does not inherit from CeleryError as it comes
        from the kombu messaging library.
    
    Common causes:
    - Broker connection failures
    - Network timeouts
    - Authentication errors with message broker
    """

Task Execution Exceptions

Exceptions related to task execution, retry logic, and task lifecycle management.

class TaskError(CeleryError):
    """Base exception for task-related errors."""

class TaskPredicate(CeleryError):
    """
    Base class for task predicate exceptions.
    
    These exceptions control task execution flow rather than
    indicating actual errors.
    """

class Retry(TaskPredicate):
    """
    Exception raised to trigger task retry.
    
    Attributes:
        message (str): Retry reason message
        exc (Exception): Original exception that caused retry
        when (datetime): When to retry (eta)
    """
    
    def __init__(self, message=None, exc=None, when=None, **kwargs):
        """
        Create retry exception.
        
        Args:
            message (str): Retry message
            exc (Exception): Causing exception
            when (datetime): Retry time
            **kwargs: Additional retry options
        """

class Ignore(TaskPredicate):
    """
    Exception raised to ignore task result.
    
    When raised, the task state will not be updated and no result
    will be stored in the result backend.
    """

class Reject(TaskPredicate):
    """
    Exception raised to reject and requeue task.
    
    Args:
        reason (str): Rejection reason
        requeue (bool): Whether to requeue the task
    """
    
    def __init__(self, reason=None, requeue=False):
        """
        Create reject exception.
        
        Args:
            reason (str): Reason for rejection
            requeue (bool): Requeue the task
        """

class NotRegistered(TaskError):
    """
    Raised when attempting to execute unregistered task.
    
    Args:
        task_name (str): Name of unregistered task
    """

class AlreadyRegistered(TaskError):
    """
    Raised when attempting to register already registered task.
    
    Args:
        task_name (str): Name of already registered task
    """

class MaxRetriesExceededError(TaskError):
    """
    Raised when task exceeds maximum retry attempts.
    
    Attributes:
        task_args (tuple): Task arguments
        task_kwargs (dict): Task keyword arguments
        task_name (str): Task name
        task_id (str): Task ID
    """

class TaskRevokedError(TaskError):
    """
    Raised when trying to execute revoked task.
    
    Args:
        message (str): Revocation reason
        task_id (str): Revoked task ID
    """

class InvalidTaskError(TaskError):
    """
    Raised when task data is invalid or corrupted.
    
    Common causes:
    - Malformed message body
    - Missing required task attributes
    - Invalid task signature
    """

class ChordError(TaskError):
    """
    Raised when chord callback fails or chord is misconfigured.
    
    Args:
        message (str): Error description
        callback (str): Callback task name
    """

class QueueNotFound(TaskError):
    """
    Raised when task is routed to a queue not in conf.queues.
    
    Args:
        queue_name (str): Name of the missing queue
    """

class IncompleteStream(TaskError):
    """
    Raised when end of data stream is found but data isn't complete.
    
    Common causes:
    - Network interruption during message transfer
    - Corrupted message payload
    - Premature connection termination
    """

Timeout Exceptions

Time-related exceptions for handling task execution limits and timeouts.

class TimeoutError(CeleryError):
    """
    Raised when operation times out.
    
    Args:
        operation (str): Operation that timed out
        timeout (float): Timeout value in seconds
    """

class SoftTimeLimitExceeded(Exception):
    """
    Raised when task exceeds soft time limit.
    
    This exception allows tasks to perform cleanup before
    hard termination occurs.
    """

class TimeLimitExceeded(Exception):
    """
    Raised when task exceeds hard time limit.
    
    This exception indicates immediate task termination.
    """

Backend Exceptions

Exceptions related to result backend operations and storage systems.

class BackendError(CeleryError):
    """
    Base exception for result backend errors.
    """

class BackendGetMetaError(BackendError):
    """
    Raised when backend fails to retrieve task metadata.
    
    Args:
        task_id (str): Task ID that failed to retrieve
        backend_type (str): Type of backend
    """

class BackendStoreError(BackendError):
    """
    Raised when backend fails to store task result.
    
    Args:
        task_id (str): Task ID that failed to store
        result: Result that failed to store
        backend_type (str): Type of backend
    """

Worker Exceptions

Exceptions related to worker processes, including termination and process management.

class WorkerLostError(Exception):
    """
    Raised when worker process is lost unexpectedly.
    
    Args:
        message (str): Loss description
        exitcode (int): Worker exit code
    """

class Terminated(Exception):
    """
    Raised when worker process is terminated.
    
    Args:
        signum (int): Signal number that caused termination
        reason (str): Termination reason
    """

class WorkerShutdown(SystemExit):
    """
    Exception raised to signal worker shutdown.
    
    Args:
        msg (str): Shutdown message
        exitcode (int): Exit code
    """

class WorkerTerminate(SystemExit):
    """
    Exception raised to signal immediate worker termination.
    
    Args:
        msg (str): Termination message  
        exitcode (int): Exit code
    """

Warning Classes

Warning categories for non-fatal issues that should be brought to user attention.

class CeleryWarning(UserWarning):
    """Base warning class for Celery."""

class AlwaysEagerIgnored(CeleryWarning):
    """
    Warning raised when task_always_eager is ignored.
    
    Occurs when eager execution is requested but not supported
    in the current context.
    """

class DuplicateNodenameWarning(CeleryWarning):
    """
    Warning raised when duplicate worker node names detected.
    
    Can cause issues with worker management and monitoring.
    """

class FixupWarning(CeleryWarning):
    """
    Warning raised during fixup operations.
    
    Indicates potential compatibility or configuration issues.
    """

class NotConfigured(CeleryWarning):
    """
    Warning raised when required configuration is missing.
    
    Indicates that default values are being used where
    explicit configuration would be preferred.
    """

Usage Examples

Basic Exception Handling

from celery import Celery
from celery.exceptions import (
    Retry, Ignore, Reject, MaxRetriesExceededError,
    SoftTimeLimitExceeded, TimeLimitExceeded
)

app = Celery('exception_example')

@app.task(bind=True, max_retries=3)
def unreliable_task(self, data):
    """Task with comprehensive error handling."""
    
    try:
        # Simulate unreliable operation
        if random.random() < 0.3:
            raise ConnectionError("Network is down")
        
        # Process data
        result = process_data(data)
        return result
        
    except SoftTimeLimitExceeded:
        # Cleanup before hard termination
        cleanup_resources()
        raise
        
    except ConnectionError as exc:
        # Retry on network errors with exponential backoff
        countdown = 2 ** self.request.retries
        raise self.retry(countdown=countdown, exc=exc, max_retries=5)
        
    except ValueError as exc:
        # Don't retry on data validation errors
        raise Ignore(f"Invalid data: {exc}")
        
    except Exception as exc:
        # Log unexpected errors and retry
        logger.error(f"Unexpected error in task {self.request.id}: {exc}")
        raise self.retry(countdown=60, exc=exc)

def process_data(data):
    """Simulate data processing."""
    if not data:
        raise ValueError("Empty data")
    return f"processed_{data}"

def cleanup_resources():
    """Cleanup before task termination."""
    print("Cleaning up resources...")

Retry Logic with Custom Exceptions

from celery.exceptions import Retry
import requests
from requests.exceptions import RequestException

@app.task(bind=True, max_retries=5)
def api_call_task(self, url, data):
    """Task that handles API failures with smart retry logic."""
    
    try:
        response = requests.post(url, json=data, timeout=30)
        response.raise_for_status()
        return response.json()
        
    except requests.exceptions.Timeout:
        # Retry timeouts with longer delay
        raise self.retry(countdown=30, exc=exc)
        
    except requests.exceptions.ConnectionError as exc:
        # Retry connection errors with exponential backoff
        countdown = min(2 ** self.request.retries, 300)  # Max 5 minutes
        raise self.retry(countdown=countdown, exc=exc)
        
    except requests.exceptions.HTTPError as exc:
        # Don't retry client errors (4xx), do retry server errors (5xx)
        if 400 <= exc.response.status_code < 500:
            raise Ignore(f"Client error {exc.response.status_code}: {exc}")
        else:
            raise self.retry(countdown=60, exc=exc)
            
    except MaxRetriesExceededError:
        # Log final failure and send alert
        logger.error(f"API call to {url} failed after all retries")
        send_failure_alert.delay(url, str(exc))
        raise

@app.task
def send_failure_alert(url, error):
    """Send alert for permanent failures."""
    # Send notification to ops team
    pass

Task Rejection and Requeuing

from celery.exceptions import Reject
import psutil

@app.task(bind=True)
def memory_intensive_task(self, large_data):
    """Task that rejects itself if system memory is low."""
    
    # Check available memory
    memory = psutil.virtual_memory()
    if memory.percent > 85:  # More than 85% memory used
        logger.warning("System memory high, rejecting task for requeue")
        raise Reject("High memory usage", requeue=True)
    
    try:
        # Memory intensive processing
        result = process_large_data(large_data)
        return result
        
    except MemoryError:
        # Reject and requeue if we run out of memory
        logger.error("Task ran out of memory, requeuing")
        raise Reject("Out of memory", requeue=True)

def process_large_data(data):
    """Simulate memory intensive processing."""
    return f"processed_{len(data)}_items"

Time Limit Handling

from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
import signal

@app.task(bind=True, soft_time_limit=300, time_limit=320)  # 5min soft, 5min 20s hard
def long_running_task(self, items):
    """Task with graceful timeout handling."""
    
    processed_items = []
    
    try:
        for i, item in enumerate(items):
            # Check for soft time limit periodically
            if i % 100 == 0 and self.request.timelimit:
                remaining = self.request.timelimit[0] - time.time()
                if remaining < 30:  # Less than 30 seconds left
                    logger.warning("Approaching soft time limit, saving progress")
                    save_partial_results.delay(processed_items)
            
            result = process_item(item)
            processed_items.append(result)
            
        return processed_items
        
    except SoftTimeLimitExceeded:
        # Graceful cleanup on soft limit
        logger.info("Soft time limit exceeded, saving partial results")
        save_partial_results.delay(processed_items)
        
        # Continue processing remaining items in new task
        remaining_items = items[len(processed_items):]
        if remaining_items:
            long_running_task.apply_async(args=[remaining_items], countdown=5)
        
        return f"Partial completion: {len(processed_items)} items processed"

@app.task
def save_partial_results(results):
    """Save partial results for recovery."""
    # Save to database or file
    logger.info(f"Saved {len(results)} partial results")

def process_item(item):
    """Process individual item."""
    import time
    time.sleep(0.1)  # Simulate processing time
    return f"processed_{item}"

Backend Error Handling

from celery.exceptions import BackendError, BackendStoreError
from celery import Celery

@app.task(bind=True, ignore_result=False)
def critical_task(self, important_data):
    """Task with explicit backend error handling."""
    
    try:
        # Critical processing
        result = perform_critical_operation(important_data)
        
        # Try to store result explicitly
        try:
            self.update_state(state='SUCCESS', meta={'result': result})
        except BackendStoreError as exc:
            # Backend failed, store locally as fallback
            logger.error(f"Failed to store result in backend: {exc}")
            store_result_locally(self.request.id, result)
            
        return result
        
    except Exception as exc:
        # Ensure error is logged even if backend fails
        try:
            self.update_state(
                state='FAILURE',
                meta={'error': str(exc), 'traceback': traceback.format_exc()}
            )
        except BackendError:
            # Backend completely unavailable
            logger.critical(f"Backend unavailable, task {self.request.id} result lost")
            store_result_locally(self.request.id, {'error': str(exc)})
            
        raise

def perform_critical_operation(data):
    """Simulate critical operation."""
    return f"critical_result_{data}"

def store_result_locally(task_id, result):
    """Store result locally when backend fails."""
    # Store in local file, database, etc.
    with open(f'/tmp/celery_results/{task_id}.json', 'w') as f:
        json.dump({'task_id': task_id, 'result': result}, f)

Worker Process Management

from celery.exceptions import WorkerLostError, Terminated
from celery.signals import worker_process_shutdown, task_failure

@worker_process_shutdown.connect
def handle_worker_shutdown(sender=None, pid=None, exitcode=None, **kwargs):
    """Handle worker process shutdown."""
    
    if exitcode != 0:
        logger.error(f"Worker process {pid} died unexpectedly with exit code {exitcode}")
        
        # Notify monitoring system
        notify_ops_team.delay(f"Worker {pid} crashed with exit code {exitcode}")

@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):
    """Handle task failures, including worker-related ones."""
    
    if isinstance(exception, WorkerLostError):
        logger.error(f"Task {task_id} failed due to worker loss")
        
        # Requeue critical tasks
        if sender.name in ['critical_task', 'payment_processing']:
            requeue_critical_task.delay(task_id, sender.name)
    
    elif isinstance(exception, Terminated):
        logger.warning(f"Task {task_id} was terminated")

@app.task
def notify_ops_team(message):
    """Send notification to operations team."""
    # Send Slack message, email, etc.
    pass

@app.task  
def requeue_critical_task(task_id, task_name):
    """Requeue critical tasks that failed due to worker issues."""
    # Logic to requeue the task
    pass

Custom Exception Classes

from celery.exceptions import TaskError, CeleryError

class DataValidationError(TaskError):
    """Custom exception for data validation failures."""
    
    def __init__(self, field, value, message=None):
        self.field = field
        self.value = value
        self.message = message or f"Invalid {field}: {value}"
        super().__init__(self.message)

class ExternalServiceError(CeleryError):
    """Custom exception for external service failures."""
    
    def __init__(self, service_name, error_code, message=None):
        self.service_name = service_name
        self.error_code = error_code
        self.message = message or f"{service_name} error {error_code}"
        super().__init__(self.message)

@app.task(bind=True, max_retries=3)
def validate_and_process(self, user_data):
    """Task using custom exceptions."""
    
    try:
        # Validate data
        if not user_data.get('email'):
            raise DataValidationError('email', user_data.get('email'), 'Email is required')
        
        if '@' not in user_data['email']:
            raise DataValidationError('email', user_data['email'], 'Invalid email format')
        
        # Call external service
        response = call_external_api(user_data)
        
        if response.status_code != 200:
            raise ExternalServiceError('UserAPI', response.status_code, response.text)
        
        return response.json()
        
    except DataValidationError:
        # Don't retry validation errors
        raise Ignore(f"Data validation failed: {exc}")
        
    except ExternalServiceError as exc:
        if exc.error_code >= 500:
            # Retry server errors
            raise self.retry(countdown=30, exc=exc)
        else:
            # Don't retry client errors
            raise Ignore(f"External service error: {exc}")

def call_external_api(data):
    """Simulate external API call."""
    class MockResponse:
        status_code = 200
        def json(self):
            return {'processed': True}
    return MockResponse()

Comprehensive Error Recovery

from celery.exceptions import *
import sys

@app.task(bind=True, max_retries=5, default_retry_delay=60)
def robust_task(self, operation_data):
    """Task with comprehensive error handling and recovery."""
    
    try:
        result = perform_operation(operation_data)
        return result
        
    except SoftTimeLimitExceeded:
        # Save progress and continue in new task
        save_checkpoint.delay(operation_data, 'timeout')
        return {'status': 'timeout', 'checkpoint_saved': True}
        
    except (ConnectionError, TimeoutError) as exc:
        # Network issues - retry with backoff
        countdown = min(2 ** self.request.retries * 60, 3600)  # Max 1 hour
        logger.warning(f"Network error, retrying in {countdown}s: {exc}")
        raise self.retry(countdown=countdown, exc=exc)
        
    except MemoryError:
        # Out of memory - reject and requeue
        logger.error("Out of memory, rejecting task for requeue")
        raise Reject("Insufficient memory", requeue=True)
        
    except ValueError as exc:
        # Data errors - don't retry
        logger.error(f"Data validation error: {exc}")
        raise Ignore(f"Invalid data: {exc}")
        
    except MaxRetriesExceededError:
        # All retries exhausted - send to dead letter queue
        logger.error(f"Task failed after {self.max_retries} retries")
        send_to_dead_letter.delay(self.request.id, operation_data, str(exc))
        raise
        
    except Exception as exc:
        # Unexpected errors - log and retry
        logger.exception(f"Unexpected error in task {self.request.id}")
        
        # Don't retry certain critical errors
        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
            raise
            
        # Generic retry for other errors
        raise self.retry(exc=exc)

@app.task
def save_checkpoint(data, reason):
    """Save task checkpoint for recovery."""
    logger.info(f"Saving checkpoint due to {reason}")
    # Save to persistent storage

@app.task  
def send_to_dead_letter(task_id, data, error):
    """Send failed task to dead letter queue for manual review."""
    logger.error(f"Sending task {task_id} to dead letter queue: {error}")
    # Store in dead letter queue/database

def perform_operation(data):
    """Simulate operation that might fail."""
    if not data:
        raise ValueError("No data provided")
    return f"processed_{data}"

Install with Tessl CLI

npx tessl i tessl/pypi-celery

docs

configuration.md

core-application.md

exceptions.md

index.md

results-state.md

scheduling-beat.md

signals-events.md

workflow-primitives.md

tile.json