Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete exception hierarchy for Celery task errors, retry mechanisms, timeout handling, backend errors, and worker-related exceptions. These exceptions provide fine-grained error handling and control flow for robust distributed task processing.
Base exceptions and fundamental error types that form the foundation of Celery's error handling system.
class CeleryError(Exception):
"""
Base exception for all Celery errors.
"""
class ImproperlyConfigured(CeleryError):
"""
Raised when Celery is improperly configured.
Common causes:
- Missing broker configuration
- Invalid serializer settings
- Incorrect backend configuration
"""
class SecurityError(CeleryError):
"""
Raised when security-related operations fail.
Common causes:
- Invalid message signatures
- Authentication failures
- SSL/TLS errors
"""
class OperationalError(Exception):
"""
Raised when transport connection error occurs while sending messages.
Note:
This exception does not inherit from CeleryError as it comes
from the kombu messaging library.
Common causes:
- Broker connection failures
- Network timeouts
- Authentication errors with message broker
"""Exceptions related to task execution, retry logic, and task lifecycle management.
class TaskError(CeleryError):
"""Base exception for task-related errors."""
class TaskPredicate(CeleryError):
"""
Base class for task predicate exceptions.
These exceptions control task execution flow rather than
indicating actual errors.
"""
class Retry(TaskPredicate):
"""
Exception raised to trigger task retry.
Attributes:
message (str): Retry reason message
exc (Exception): Original exception that caused retry
when (datetime): When to retry (eta)
"""
def __init__(self, message=None, exc=None, when=None, **kwargs):
"""
Create retry exception.
Args:
message (str): Retry message
exc (Exception): Causing exception
when (datetime): Retry time
**kwargs: Additional retry options
"""
class Ignore(TaskPredicate):
"""
Exception raised to ignore task result.
When raised, the task state will not be updated and no result
will be stored in the result backend.
"""
class Reject(TaskPredicate):
"""
Exception raised to reject and requeue task.
Args:
reason (str): Rejection reason
requeue (bool): Whether to requeue the task
"""
def __init__(self, reason=None, requeue=False):
"""
Create reject exception.
Args:
reason (str): Reason for rejection
requeue (bool): Requeue the task
"""
class NotRegistered(TaskError):
"""
Raised when attempting to execute unregistered task.
Args:
task_name (str): Name of unregistered task
"""
class AlreadyRegistered(TaskError):
"""
Raised when attempting to register already registered task.
Args:
task_name (str): Name of already registered task
"""
class MaxRetriesExceededError(TaskError):
"""
Raised when task exceeds maximum retry attempts.
Attributes:
task_args (tuple): Task arguments
task_kwargs (dict): Task keyword arguments
task_name (str): Task name
task_id (str): Task ID
"""
class TaskRevokedError(TaskError):
"""
Raised when trying to execute revoked task.
Args:
message (str): Revocation reason
task_id (str): Revoked task ID
"""
class InvalidTaskError(TaskError):
"""
Raised when task data is invalid or corrupted.
Common causes:
- Malformed message body
- Missing required task attributes
- Invalid task signature
"""
class ChordError(TaskError):
"""
Raised when chord callback fails or chord is misconfigured.
Args:
message (str): Error description
callback (str): Callback task name
"""
class QueueNotFound(TaskError):
"""
Raised when task is routed to a queue not in conf.queues.
Args:
queue_name (str): Name of the missing queue
"""
class IncompleteStream(TaskError):
"""
Raised when end of data stream is found but data isn't complete.
Common causes:
- Network interruption during message transfer
- Corrupted message payload
- Premature connection termination
"""Time-related exceptions for handling task execution limits and timeouts.
class TimeoutError(CeleryError):
"""
Raised when operation times out.
Args:
operation (str): Operation that timed out
timeout (float): Timeout value in seconds
"""
class SoftTimeLimitExceeded(Exception):
"""
Raised when task exceeds soft time limit.
This exception allows tasks to perform cleanup before
hard termination occurs.
"""
class TimeLimitExceeded(Exception):
"""
Raised when task exceeds hard time limit.
This exception indicates immediate task termination.
"""Exceptions related to result backend operations and storage systems.
class BackendError(CeleryError):
"""
Base exception for result backend errors.
"""
class BackendGetMetaError(BackendError):
"""
Raised when backend fails to retrieve task metadata.
Args:
task_id (str): Task ID that failed to retrieve
backend_type (str): Type of backend
"""
class BackendStoreError(BackendError):
"""
Raised when backend fails to store task result.
Args:
task_id (str): Task ID that failed to store
result: Result that failed to store
backend_type (str): Type of backend
"""Exceptions related to worker processes, including termination and process management.
class WorkerLostError(Exception):
"""
Raised when worker process is lost unexpectedly.
Args:
message (str): Loss description
exitcode (int): Worker exit code
"""
class Terminated(Exception):
"""
Raised when worker process is terminated.
Args:
signum (int): Signal number that caused termination
reason (str): Termination reason
"""
class WorkerShutdown(SystemExit):
"""
Exception raised to signal worker shutdown.
Args:
msg (str): Shutdown message
exitcode (int): Exit code
"""
class WorkerTerminate(SystemExit):
"""
Exception raised to signal immediate worker termination.
Args:
msg (str): Termination message
exitcode (int): Exit code
"""Warning categories for non-fatal issues that should be brought to user attention.
class CeleryWarning(UserWarning):
"""Base warning class for Celery."""
class AlwaysEagerIgnored(CeleryWarning):
"""
Warning raised when task_always_eager is ignored.
Occurs when eager execution is requested but not supported
in the current context.
"""
class DuplicateNodenameWarning(CeleryWarning):
"""
Warning raised when duplicate worker node names detected.
Can cause issues with worker management and monitoring.
"""
class FixupWarning(CeleryWarning):
"""
Warning raised during fixup operations.
Indicates potential compatibility or configuration issues.
"""
class NotConfigured(CeleryWarning):
"""
Warning raised when required configuration is missing.
Indicates that default values are being used where
explicit configuration would be preferred.
"""from celery import Celery
from celery.exceptions import (
Retry, Ignore, Reject, MaxRetriesExceededError,
SoftTimeLimitExceeded, TimeLimitExceeded
)
app = Celery('exception_example')
@app.task(bind=True, max_retries=3)
def unreliable_task(self, data):
"""Task with comprehensive error handling."""
try:
# Simulate unreliable operation
if random.random() < 0.3:
raise ConnectionError("Network is down")
# Process data
result = process_data(data)
return result
except SoftTimeLimitExceeded:
# Cleanup before hard termination
cleanup_resources()
raise
except ConnectionError as exc:
# Retry on network errors with exponential backoff
countdown = 2 ** self.request.retries
raise self.retry(countdown=countdown, exc=exc, max_retries=5)
except ValueError as exc:
# Don't retry on data validation errors
raise Ignore(f"Invalid data: {exc}")
except Exception as exc:
# Log unexpected errors and retry
logger.error(f"Unexpected error in task {self.request.id}: {exc}")
raise self.retry(countdown=60, exc=exc)
def process_data(data):
"""Simulate data processing."""
if not data:
raise ValueError("Empty data")
return f"processed_{data}"
def cleanup_resources():
"""Cleanup before task termination."""
print("Cleaning up resources...")from celery.exceptions import Retry
import requests
from requests.exceptions import RequestException
@app.task(bind=True, max_retries=5)
def api_call_task(self, url, data):
"""Task that handles API failures with smart retry logic."""
try:
response = requests.post(url, json=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
# Retry timeouts with longer delay
raise self.retry(countdown=30, exc=exc)
except requests.exceptions.ConnectionError as exc:
# Retry connection errors with exponential backoff
countdown = min(2 ** self.request.retries, 300) # Max 5 minutes
raise self.retry(countdown=countdown, exc=exc)
except requests.exceptions.HTTPError as exc:
# Don't retry client errors (4xx), do retry server errors (5xx)
if 400 <= exc.response.status_code < 500:
raise Ignore(f"Client error {exc.response.status_code}: {exc}")
else:
raise self.retry(countdown=60, exc=exc)
except MaxRetriesExceededError:
# Log final failure and send alert
logger.error(f"API call to {url} failed after all retries")
send_failure_alert.delay(url, str(exc))
raise
@app.task
def send_failure_alert(url, error):
"""Send alert for permanent failures."""
# Send notification to ops team
passfrom celery.exceptions import Reject
import psutil
@app.task(bind=True)
def memory_intensive_task(self, large_data):
"""Task that rejects itself if system memory is low."""
# Check available memory
memory = psutil.virtual_memory()
if memory.percent > 85: # More than 85% memory used
logger.warning("System memory high, rejecting task for requeue")
raise Reject("High memory usage", requeue=True)
try:
# Memory intensive processing
result = process_large_data(large_data)
return result
except MemoryError:
# Reject and requeue if we run out of memory
logger.error("Task ran out of memory, requeuing")
raise Reject("Out of memory", requeue=True)
def process_large_data(data):
"""Simulate memory intensive processing."""
return f"processed_{len(data)}_items"from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
import signal
@app.task(bind=True, soft_time_limit=300, time_limit=320) # 5min soft, 5min 20s hard
def long_running_task(self, items):
"""Task with graceful timeout handling."""
processed_items = []
try:
for i, item in enumerate(items):
# Check for soft time limit periodically
if i % 100 == 0 and self.request.timelimit:
remaining = self.request.timelimit[0] - time.time()
if remaining < 30: # Less than 30 seconds left
logger.warning("Approaching soft time limit, saving progress")
save_partial_results.delay(processed_items)
result = process_item(item)
processed_items.append(result)
return processed_items
except SoftTimeLimitExceeded:
# Graceful cleanup on soft limit
logger.info("Soft time limit exceeded, saving partial results")
save_partial_results.delay(processed_items)
# Continue processing remaining items in new task
remaining_items = items[len(processed_items):]
if remaining_items:
long_running_task.apply_async(args=[remaining_items], countdown=5)
return f"Partial completion: {len(processed_items)} items processed"
@app.task
def save_partial_results(results):
"""Save partial results for recovery."""
# Save to database or file
logger.info(f"Saved {len(results)} partial results")
def process_item(item):
"""Process individual item."""
import time
time.sleep(0.1) # Simulate processing time
return f"processed_{item}"from celery.exceptions import BackendError, BackendStoreError
from celery import Celery
@app.task(bind=True, ignore_result=False)
def critical_task(self, important_data):
"""Task with explicit backend error handling."""
try:
# Critical processing
result = perform_critical_operation(important_data)
# Try to store result explicitly
try:
self.update_state(state='SUCCESS', meta={'result': result})
except BackendStoreError as exc:
# Backend failed, store locally as fallback
logger.error(f"Failed to store result in backend: {exc}")
store_result_locally(self.request.id, result)
return result
except Exception as exc:
# Ensure error is logged even if backend fails
try:
self.update_state(
state='FAILURE',
meta={'error': str(exc), 'traceback': traceback.format_exc()}
)
except BackendError:
# Backend completely unavailable
logger.critical(f"Backend unavailable, task {self.request.id} result lost")
store_result_locally(self.request.id, {'error': str(exc)})
raise
def perform_critical_operation(data):
"""Simulate critical operation."""
return f"critical_result_{data}"
def store_result_locally(task_id, result):
"""Store result locally when backend fails."""
# Store in local file, database, etc.
with open(f'/tmp/celery_results/{task_id}.json', 'w') as f:
json.dump({'task_id': task_id, 'result': result}, f)from celery.exceptions import WorkerLostError, Terminated
from celery.signals import worker_process_shutdown, task_failure
@worker_process_shutdown.connect
def handle_worker_shutdown(sender=None, pid=None, exitcode=None, **kwargs):
"""Handle worker process shutdown."""
if exitcode != 0:
logger.error(f"Worker process {pid} died unexpectedly with exit code {exitcode}")
# Notify monitoring system
notify_ops_team.delay(f"Worker {pid} crashed with exit code {exitcode}")
@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):
"""Handle task failures, including worker-related ones."""
if isinstance(exception, WorkerLostError):
logger.error(f"Task {task_id} failed due to worker loss")
# Requeue critical tasks
if sender.name in ['critical_task', 'payment_processing']:
requeue_critical_task.delay(task_id, sender.name)
elif isinstance(exception, Terminated):
logger.warning(f"Task {task_id} was terminated")
@app.task
def notify_ops_team(message):
"""Send notification to operations team."""
# Send Slack message, email, etc.
pass
@app.task
def requeue_critical_task(task_id, task_name):
"""Requeue critical tasks that failed due to worker issues."""
# Logic to requeue the task
passfrom celery.exceptions import TaskError, CeleryError
class DataValidationError(TaskError):
"""Custom exception for data validation failures."""
def __init__(self, field, value, message=None):
self.field = field
self.value = value
self.message = message or f"Invalid {field}: {value}"
super().__init__(self.message)
class ExternalServiceError(CeleryError):
"""Custom exception for external service failures."""
def __init__(self, service_name, error_code, message=None):
self.service_name = service_name
self.error_code = error_code
self.message = message or f"{service_name} error {error_code}"
super().__init__(self.message)
@app.task(bind=True, max_retries=3)
def validate_and_process(self, user_data):
"""Task using custom exceptions."""
try:
# Validate data
if not user_data.get('email'):
raise DataValidationError('email', user_data.get('email'), 'Email is required')
if '@' not in user_data['email']:
raise DataValidationError('email', user_data['email'], 'Invalid email format')
# Call external service
response = call_external_api(user_data)
if response.status_code != 200:
raise ExternalServiceError('UserAPI', response.status_code, response.text)
return response.json()
except DataValidationError:
# Don't retry validation errors
raise Ignore(f"Data validation failed: {exc}")
except ExternalServiceError as exc:
if exc.error_code >= 500:
# Retry server errors
raise self.retry(countdown=30, exc=exc)
else:
# Don't retry client errors
raise Ignore(f"External service error: {exc}")
def call_external_api(data):
"""Simulate external API call."""
class MockResponse:
status_code = 200
def json(self):
return {'processed': True}
return MockResponse()from celery.exceptions import *
import sys
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def robust_task(self, operation_data):
"""Task with comprehensive error handling and recovery."""
try:
result = perform_operation(operation_data)
return result
except SoftTimeLimitExceeded:
# Save progress and continue in new task
save_checkpoint.delay(operation_data, 'timeout')
return {'status': 'timeout', 'checkpoint_saved': True}
except (ConnectionError, TimeoutError) as exc:
# Network issues - retry with backoff
countdown = min(2 ** self.request.retries * 60, 3600) # Max 1 hour
logger.warning(f"Network error, retrying in {countdown}s: {exc}")
raise self.retry(countdown=countdown, exc=exc)
except MemoryError:
# Out of memory - reject and requeue
logger.error("Out of memory, rejecting task for requeue")
raise Reject("Insufficient memory", requeue=True)
except ValueError as exc:
# Data errors - don't retry
logger.error(f"Data validation error: {exc}")
raise Ignore(f"Invalid data: {exc}")
except MaxRetriesExceededError:
# All retries exhausted - send to dead letter queue
logger.error(f"Task failed after {self.max_retries} retries")
send_to_dead_letter.delay(self.request.id, operation_data, str(exc))
raise
except Exception as exc:
# Unexpected errors - log and retry
logger.exception(f"Unexpected error in task {self.request.id}")
# Don't retry certain critical errors
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
raise
# Generic retry for other errors
raise self.retry(exc=exc)
@app.task
def save_checkpoint(data, reason):
"""Save task checkpoint for recovery."""
logger.info(f"Saving checkpoint due to {reason}")
# Save to persistent storage
@app.task
def send_to_dead_letter(task_id, data, error):
"""Send failed task to dead letter queue for manual review."""
logger.error(f"Sending task {task_id} to dead letter queue: {error}")
# Store in dead letter queue/database
def perform_operation(data):
"""Simulate operation that might fail."""
if not data:
raise ValueError("No data provided")
return f"processed_{data}"Install with Tessl CLI
npx tessl i tessl/pypi-celery