Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
npx @tessl/cli install tessl/pypi-celery@4.4.0Celery is a distributed task queue for Python that enables asynchronous execution of jobs across multiple workers and machines. It uses message brokers like RabbitMQ or Redis to coordinate between clients and workers, providing high availability, horizontal scaling, and robust task execution with retries, routing, and monitoring capabilities.
pip install celerypip install "celery[redis]" (with Redis support)from celery import CeleryCommon task creation imports:
from celery import shared_taskApplication and task state imports:
from celery import current_app, current_taskCanvas workflow imports:
from celery import signature, chain, group, chord, chunks, xmapfrom celery import Celery
# Create Celery app
app = Celery('myapp', broker='redis://localhost:6379/0')
# Define a task
@app.task
def add(x, y):
return x + y
# Execute task asynchronously
result = add.delay(4, 4)
print(result.get()) # Wait for result
# Use shared task for library code
from celery import shared_task
@shared_task
def multiply(x, y):
return x * y
# Chain tasks together
from celery import chain
result = chain(add.s(2, 2), multiply.s(8))()
print(result.get()) # Result: 32 (4 * 8)Celery's architecture consists of several key components working together:
This distributed architecture enables Celery to scale horizontally, handle failures gracefully, and provide flexible task routing and execution patterns.
Main Celery application class, task creation with decorators, shared tasks for reusable components, and application lifecycle management.
class Celery:
def __init__(self, main=None, broker=None, backend=None, **kwargs): ...
def task(self, *args, **opts): ...
def send_task(self, name, args=None, kwargs=None, **options): ...
def shared_task(*args, **kwargs): ...Task composition primitives for building complex workflows including sequential chains, parallel groups, callbacks, and functional programming patterns.
def signature(task, args=None, kwargs=None, **options): ...
def chain(*tasks, **kwargs): ...
def group(*tasks, **kwargs): ...
def chord(header, body, **kwargs): ...
def chunks(it, n, task): ...Task result handling, state monitoring, result retrieval with timeouts, and task lifecycle management.
class AsyncResult:
def get(self, timeout=None, propagate=True, interval=0.5): ...
def ready(self) -> bool: ...
def successful(self) -> bool: ...
def revoke(self, terminate=False): ...
class GroupResult:
def get(self, timeout=None, propagate=True): ...Periodic task scheduling with cron-like syntax, interval-based schedules, solar event scheduling, and beat scheduler management.
def crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'): ...
def schedule(run_every, relative=False): ...
def solar(event, lat, lon): ...Task lifecycle signals, worker events, monitoring hooks, and custom signal handlers for debugging and monitoring integration.
# Task signals
task_prerun = Signal()
task_postrun = Signal()
task_success = Signal()
task_failure = Signal()
# Worker signals
worker_ready = Signal()
worker_shutdown = Signal()Application configuration management, broker and backend setup, task routing, serialization options, and environment-based configuration.
class Celery:
def config_from_object(self, obj, silent=False, force=False): ...
def config_from_envvar(self, variable_name, silent=False): ...
@property
def conf(self): ... # Configuration namespaceComplete exception hierarchy for task errors, retry mechanisms, timeout handling, backend errors, and worker-related exceptions.
class CeleryError(Exception): ...
class Retry(CeleryError): ...
class MaxRetriesExceededError(CeleryError): ...
class TaskRevokedError(CeleryError): ...
class SoftTimeLimitExceeded(Exception): ...