Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core Celery application classes and task creation mechanisms that form the foundation of distributed task processing. These components provide the primary interface for creating, configuring, and managing Celery applications and tasks.
Main application class that serves as the central coordination point for task management, configuration, and worker communication.
class Celery:
def __init__(
self,
main=None,
loader=None,
backend=None,
amqp=None,
events=None,
log=None,
control=None,
set_as_current=True,
tasks=None,
broker=None,
include=None,
changes=None,
config_source=None,
fixups=None,
task_cls=None,
autofinalize=True,
namespace=None,
strict_typing=True,
**kwargs
):
"""
Create a Celery application instance.
Args:
main (str): Name of main module if running as __main__
loader: Custom loader class for configuration
backend (str): Result backend URL or class
broker (str): Message broker URL
include (list): Modules to import when worker starts
set_as_current (bool): Make this the current app
autofinalize (bool): Auto-finalize app on first use
namespace (str): Configuration key namespace
"""
def task(self, *args, **opts):
"""
Decorator to create task from any callable.
Args:
bind (bool): Create bound task with self parameter
name (str): Custom task name
base (class): Custom task base class
serializer (str): Task argument serializer
max_retries (int): Maximum retry attempts
default_retry_delay (int): Default retry delay in seconds
rate_limit (str): Rate limit (e.g., '100/m' for 100/minute)
time_limit (int): Hard time limit in seconds
soft_time_limit (int): Soft time limit in seconds
ignore_result (bool): Don't store task results
store_errors_even_if_ignored (bool): Store errors even when ignoring results
Returns:
Task class instance
"""
def send_task(
self,
name,
args=None,
kwargs=None,
countdown=None,
eta=None,
task_id=None,
producer=None,
connection=None,
router=None,
result_cls=None,
expires=None,
publisher=None,
link=None,
link_error=None,
add_to_parent=True,
group_id=None,
group_index=None,
retries=0,
chord=None,
reply_to=None,
time_limit=None,
soft_time_limit=None,
root_id=None,
parent_id=None,
route_name=None,
shadow=None,
chain=None,
task_type=None,
**options
):
"""
Send task by name without having the task function imported.
Args:
name (str): Task name to execute
args (tuple): Positional arguments for task
kwargs (dict): Keyword arguments for task
countdown (int): Delay execution for N seconds
eta (datetime): Specific execution time
task_id (str): Custom task ID
expires (datetime|int): Task expiration time
link (Signature): Success callback
link_error (Signature): Failure callback
Returns:
AsyncResult instance
"""
def signature(self, *args, **kwargs):
"""
Create signature bound to this app.
Returns:
Signature instance
"""
def start(self, argv=None):
"""
Run celery using command line arguments.
Args:
argv (list): Command line arguments
"""
def worker_main(self, argv=None):
"""
Run celery worker using command line arguments.
Args:
argv (list): Command line arguments
"""
def config_from_object(self, obj, silent=False, force=False, namespace=None):
"""
Load configuration from object.
Args:
obj: Configuration object, module, or string
silent (bool): Don't raise on import errors
force (bool): Force update even if finalized
namespace (str): Only load keys with this prefix
"""
def config_from_envvar(self, variable_name, silent=False, force=False):
"""
Load configuration from environment variable.
Args:
variable_name (str): Environment variable name
silent (bool): Don't raise if variable not found
force (bool): Force update even if finalized
"""
def autodiscover_tasks(self, packages=None, related_name='tasks', force=False):
"""
Automatically discover tasks from packages.
Args:
packages (list): Packages to search (defaults to INSTALLED_APPS)
related_name (str): Module name to search for tasks
force (bool): Force discovery even if already done
"""
def finalize(self, auto=False):
"""
Finalize the app configuration.
Args:
auto (bool): Called automatically during first use
"""
def close(self):
"""Clean up after the application."""
def connection_for_read(self, url=None, **kwargs):
"""
Get connection for consuming messages.
Args:
url (str): Broker URL override
Returns:
Connection instance
"""
def connection_for_write(self, url=None, **kwargs):
"""
Get connection for producing messages.
Args:
url (str): Broker URL override
Returns:
Connection instance
"""
def add_periodic_task(
self,
schedule,
sig,
args=(),
kwargs=(),
name=None,
**opts
):
"""
Add periodic task to beat schedule.
Args:
schedule: Schedule instance (crontab, schedule)
sig (Signature): Task to execute
args (tuple): Arguments for task
kwargs (dict): Keyword arguments for task
name (str): Schedule entry name
"""
@property
def conf(self):
"""Current configuration namespace."""
@property
def tasks(self):
"""Task registry containing all registered tasks."""
@property
def backend(self):
"""Current result backend instance."""
@property
def control(self):
"""Remote control interface for workers."""
@property
def events(self):
"""Events interface for monitoring."""
@property
def current_task(self):
"""Currently executing task."""Base class for all Celery tasks, providing execution methods and task context access.
class Task:
def __init__(self):
"""Initialize task instance."""
def delay(self, *args, **kwargs):
"""
Shortcut to apply_async with only positional arguments.
Args:
*args: Positional arguments for task
**kwargs: Keyword arguments for task
Returns:
AsyncResult instance
"""
def apply_async(
self,
args=None,
kwargs=None,
task_id=None,
producer=None,
link=None,
link_error=None,
shadow=None,
**options
):
"""
Apply task asynchronously.
Args:
args (tuple): Positional arguments
kwargs (dict): Keyword arguments
task_id (str): Custom task ID
producer: Message producer
link (Signature): Success callback
link_error (Signature): Error callback
shadow (str): Override task name in logs
countdown (int): Delay execution N seconds
eta (datetime): Execute at specific time
expires (datetime|int): Task expiration
retry (bool): Enable retries
retry_policy (dict): Retry configuration
Returns:
AsyncResult instance
"""
def apply(self, args=None, kwargs=None, **options):
"""
Execute task synchronously in current process.
Args:
args (tuple): Positional arguments
kwargs (dict): Keyword arguments
Returns:
Task result directly
"""
def retry(
self,
args=None,
kwargs=None,
exc=None,
throw=True,
eta=None,
countdown=None,
max_retries=None,
**options
):
"""
Retry current task.
Args:
args (tuple): New positional arguments
kwargs (dict): New keyword arguments
exc (Exception): Exception that caused retry
throw (bool): Re-raise Retry exception
eta (datetime): Retry at specific time
countdown (int): Retry after N seconds
max_retries (int): Override max retries
Raises:
Retry: To trigger task retry
"""
def signature(self, args=None, kwargs=None, **options):
"""
Create signature for this task.
Args:
args (tuple): Positional arguments
kwargs (dict): Keyword arguments
Returns:
Signature instance
"""
def s(self, *args, **kwargs):
"""
Shortcut for signature creation.
Args:
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
Signature instance
"""
def si(self, *args, **kwargs):
"""
Create immutable signature.
Args:
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
Immutable signature instance
"""
def chunks(self, it, n):
"""
Split iterator into chunks for parallel processing.
Args:
it: Iterator to chunk
n (int): Chunk size
Returns:
Chunks instance
"""
@property
def name(self):
"""Task name."""
@property
def app(self):
"""Celery app instance this task is bound to."""
@property
def request(self):
"""Current task request context."""Decorator for creating tasks that work with any Celery app, useful for reusable libraries and Django integration.
def shared_task(*args, **kwargs):
"""
Create task that works with any Celery app instance.
Args:
bind (bool): Create bound task with self parameter
name (str): Custom task name
base (class): Custom task base class
serializer (str): Argument serializer
max_retries (int): Maximum retry attempts
default_retry_delay (int): Default retry delay
rate_limit (str): Task rate limit
ignore_result (bool): Don't store results
Returns:
Task decorator function
"""
def current_app():
"""
Get the current Celery application instance.
Returns:
Celery: Current application instance
Raises:
RuntimeError: If no current app is set
"""
def current_task():
"""
Get the currently executing task.
Returns:
Task: Current task instance or None if not in task context
"""Context object providing access to current task metadata and execution information.
class Context:
"""
Task execution context available via Task.request.
Attributes:
id (str): Unique task ID
args (tuple): Task positional arguments
kwargs (dict): Task keyword arguments
retries (int): Number of retries attempted
is_eager (bool): True if executed synchronously
eta (datetime): Scheduled execution time
expires (datetime): Task expiration time
headers (dict): Message headers
delivery_info (dict): Message delivery information
reply_to (str): Reply queue name
correlation_id (str): Message correlation ID
root_id (str): Root task ID in chain
parent_id (str): Parent task ID
group (str): Group ID if part of group
group_index (int): Position in group
chord (str): Chord ID if part of chord
chain (list): Chain information
hostname (str): Worker hostname
logfile (str): Worker log file
loglevel (int): Worker log level
utc (bool): Use UTC times
called_directly (bool): Called via apply()
callbacks (list): Success callbacks
errbacks (list): Error callbacks
timelimit (tuple): Time limits (soft, hard)
origin (str): Message origin
"""from celery import Celery
# Create application with Redis broker and backend
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Configure from object
app.config_from_object({
'task_serializer': 'json',
'accept_content': ['json'],
'result_serializer': 'json',
'timezone': 'UTC',
'enable_utc': True,
})
# Auto-discover tasks
app.autodiscover_tasks(['myapp.tasks', 'myapp.utils'])# Basic task
@app.task
def add(x, y):
return x + y
# Bound task with retry logic
@app.task(bind=True, max_retries=3)
def process_data(self, data_id):
try:
# Process data
return process(data_id)
except Exception as exc:
# Retry with exponential backoff
self.retry(countdown=2 ** self.request.retries, exc=exc)
# Shared task for libraries
@shared_task
def send_email(recipient, subject, body):
# Email sending logic
pass
# Custom task class
class DatabaseTask(app.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
# Custom failure handling
logger.error(f"Task {task_id} failed: {exc}")
@app.task(base=DatabaseTask)
def update_user(user_id, data):
# Database operation
pass# Synchronous execution
result = add.apply(args=(4, 4))
print(result) # 8
# Asynchronous execution
result = add.delay(4, 4)
print(result.get()) # Wait for result: 8
# Advanced async execution
result = add.apply_async(
args=(4, 4),
countdown=10, # Execute in 10 seconds
expires=60, # Expire after 60 seconds
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
)
# Send task by name
result = app.send_task('myapp.tasks.add', args=(4, 4))Install with Tessl CLI
npx tessl i tessl/pypi-celery