CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-celery

Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-application.mddocs/

Core Application

Core Celery application classes and task creation mechanisms that form the foundation of distributed task processing. These components provide the primary interface for creating, configuring, and managing Celery applications and tasks.

Capabilities

Celery Application Class

Main application class that serves as the central coordination point for task management, configuration, and worker communication.

class Celery:
    def __init__(
        self,
        main=None,
        loader=None,
        backend=None,
        amqp=None,
        events=None,
        log=None,
        control=None,
        set_as_current=True,
        tasks=None,
        broker=None,
        include=None,
        changes=None,
        config_source=None,
        fixups=None,
        task_cls=None,
        autofinalize=True,
        namespace=None,
        strict_typing=True,
        **kwargs
    ):
        """
        Create a Celery application instance.

        Args:
            main (str): Name of main module if running as __main__ 
            loader: Custom loader class for configuration
            backend (str): Result backend URL or class
            broker (str): Message broker URL  
            include (list): Modules to import when worker starts
            set_as_current (bool): Make this the current app
            autofinalize (bool): Auto-finalize app on first use
            namespace (str): Configuration key namespace
        """

    def task(self, *args, **opts):
        """
        Decorator to create task from any callable.
        
        Args:
            bind (bool): Create bound task with self parameter
            name (str): Custom task name  
            base (class): Custom task base class
            serializer (str): Task argument serializer
            max_retries (int): Maximum retry attempts
            default_retry_delay (int): Default retry delay in seconds
            rate_limit (str): Rate limit (e.g., '100/m' for 100/minute)
            time_limit (int): Hard time limit in seconds
            soft_time_limit (int): Soft time limit in seconds
            ignore_result (bool): Don't store task results
            store_errors_even_if_ignored (bool): Store errors even when ignoring results
            
        Returns:
            Task class instance
        """

    def send_task(
        self,
        name,
        args=None,
        kwargs=None,
        countdown=None,
        eta=None,
        task_id=None,
        producer=None,
        connection=None,
        router=None,
        result_cls=None,
        expires=None,
        publisher=None,
        link=None,
        link_error=None,
        add_to_parent=True,
        group_id=None,
        group_index=None,
        retries=0,
        chord=None,
        reply_to=None,
        time_limit=None,
        soft_time_limit=None,
        root_id=None,
        parent_id=None,
        route_name=None,
        shadow=None,
        chain=None,
        task_type=None,
        **options
    ):
        """
        Send task by name without having the task function imported.
        
        Args:
            name (str): Task name to execute
            args (tuple): Positional arguments for task
            kwargs (dict): Keyword arguments for task
            countdown (int): Delay execution for N seconds
            eta (datetime): Specific execution time
            task_id (str): Custom task ID
            expires (datetime|int): Task expiration time
            link (Signature): Success callback
            link_error (Signature): Failure callback
            
        Returns:
            AsyncResult instance
        """

    def signature(self, *args, **kwargs):
        """
        Create signature bound to this app.
        
        Returns:
            Signature instance
        """

    def start(self, argv=None):
        """
        Run celery using command line arguments.
        
        Args:
            argv (list): Command line arguments
        """

    def worker_main(self, argv=None):
        """
        Run celery worker using command line arguments.
        
        Args:
            argv (list): Command line arguments  
        """

    def config_from_object(self, obj, silent=False, force=False, namespace=None):
        """
        Load configuration from object.
        
        Args:
            obj: Configuration object, module, or string
            silent (bool): Don't raise on import errors
            force (bool): Force update even if finalized
            namespace (str): Only load keys with this prefix
        """

    def config_from_envvar(self, variable_name, silent=False, force=False):
        """
        Load configuration from environment variable.
        
        Args:
            variable_name (str): Environment variable name
            silent (bool): Don't raise if variable not found
            force (bool): Force update even if finalized  
        """

    def autodiscover_tasks(self, packages=None, related_name='tasks', force=False):
        """
        Automatically discover tasks from packages.
        
        Args:
            packages (list): Packages to search (defaults to INSTALLED_APPS)
            related_name (str): Module name to search for tasks  
            force (bool): Force discovery even if already done
        """

    def finalize(self, auto=False):
        """
        Finalize the app configuration.
        
        Args:
            auto (bool): Called automatically during first use
        """

    def close(self):
        """Clean up after the application."""

    def connection_for_read(self, url=None, **kwargs):
        """
        Get connection for consuming messages.
        
        Args:
            url (str): Broker URL override
            
        Returns:
            Connection instance
        """

    def connection_for_write(self, url=None, **kwargs):
        """
        Get connection for producing messages.
        
        Args:
            url (str): Broker URL override
            
        Returns:
            Connection instance  
        """

    def add_periodic_task(
        self, 
        schedule, 
        sig, 
        args=(), 
        kwargs=(), 
        name=None, 
        **opts
    ):
        """
        Add periodic task to beat schedule.
        
        Args:
            schedule: Schedule instance (crontab, schedule)
            sig (Signature): Task to execute
            args (tuple): Arguments for task
            kwargs (dict): Keyword arguments for task
            name (str): Schedule entry name
        """

    @property
    def conf(self):
        """Current configuration namespace."""

    @property  
    def tasks(self):
        """Task registry containing all registered tasks."""

    @property
    def backend(self):
        """Current result backend instance."""

    @property
    def control(self):
        """Remote control interface for workers."""

    @property
    def events(self):
        """Events interface for monitoring."""

    @property
    def current_task(self):
        """Currently executing task."""

Task Base Class

Base class for all Celery tasks, providing execution methods and task context access.

class Task:
    def __init__(self):
        """Initialize task instance."""

    def delay(self, *args, **kwargs):
        """
        Shortcut to apply_async with only positional arguments.
        
        Args:
            *args: Positional arguments for task
            **kwargs: Keyword arguments for task
            
        Returns:
            AsyncResult instance
        """

    def apply_async(
        self,
        args=None,
        kwargs=None,
        task_id=None,
        producer=None,
        link=None,
        link_error=None,
        shadow=None,
        **options
    ):
        """
        Apply task asynchronously.
        
        Args:
            args (tuple): Positional arguments
            kwargs (dict): Keyword arguments  
            task_id (str): Custom task ID
            producer: Message producer
            link (Signature): Success callback
            link_error (Signature): Error callback
            shadow (str): Override task name in logs
            countdown (int): Delay execution N seconds
            eta (datetime): Execute at specific time
            expires (datetime|int): Task expiration
            retry (bool): Enable retries
            retry_policy (dict): Retry configuration
            
        Returns:
            AsyncResult instance
        """

    def apply(self, args=None, kwargs=None, **options):
        """
        Execute task synchronously in current process.
        
        Args:
            args (tuple): Positional arguments
            kwargs (dict): Keyword arguments
            
        Returns:
            Task result directly
        """

    def retry(
        self,
        args=None,
        kwargs=None,
        exc=None,
        throw=True,
        eta=None,
        countdown=None,
        max_retries=None,
        **options
    ):
        """
        Retry current task.
        
        Args:
            args (tuple): New positional arguments
            kwargs (dict): New keyword arguments
            exc (Exception): Exception that caused retry
            throw (bool): Re-raise Retry exception
            eta (datetime): Retry at specific time
            countdown (int): Retry after N seconds
            max_retries (int): Override max retries
            
        Raises:
            Retry: To trigger task retry
        """

    def signature(self, args=None, kwargs=None, **options):
        """
        Create signature for this task.
        
        Args:
            args (tuple): Positional arguments
            kwargs (dict): Keyword arguments
            
        Returns:
            Signature instance
        """

    def s(self, *args, **kwargs):
        """
        Shortcut for signature creation.
        
        Args:
            *args: Positional arguments
            **kwargs: Keyword arguments
            
        Returns:
            Signature instance
        """

    def si(self, *args, **kwargs):
        """
        Create immutable signature.
        
        Args:
            *args: Positional arguments  
            **kwargs: Keyword arguments
            
        Returns:
            Immutable signature instance
        """

    def chunks(self, it, n):
        """
        Split iterator into chunks for parallel processing.
        
        Args:
            it: Iterator to chunk
            n (int): Chunk size
            
        Returns:
            Chunks instance
        """

    @property
    def name(self):
        """Task name."""

    @property
    def app(self):
        """Celery app instance this task is bound to."""

    @property  
    def request(self):
        """Current task request context."""

Shared Task Decorator

Decorator for creating tasks that work with any Celery app, useful for reusable libraries and Django integration.

def shared_task(*args, **kwargs):
    """
    Create task that works with any Celery app instance.
    
    Args:
        bind (bool): Create bound task with self parameter
        name (str): Custom task name
        base (class): Custom task base class  
        serializer (str): Argument serializer
        max_retries (int): Maximum retry attempts
        default_retry_delay (int): Default retry delay
        rate_limit (str): Task rate limit
        ignore_result (bool): Don't store results
        
    Returns:
        Task decorator function
    """

def current_app():
    """
    Get the current Celery application instance.
    
    Returns:
        Celery: Current application instance
        
    Raises:
        RuntimeError: If no current app is set
    """

def current_task():
    """
    Get the currently executing task.
    
    Returns:
        Task: Current task instance or None if not in task context
    """

Task Request Context

Context object providing access to current task metadata and execution information.

class Context:
    """
    Task execution context available via Task.request.
    
    Attributes:
        id (str): Unique task ID
        args (tuple): Task positional arguments
        kwargs (dict): Task keyword arguments  
        retries (int): Number of retries attempted
        is_eager (bool): True if executed synchronously
        eta (datetime): Scheduled execution time
        expires (datetime): Task expiration time
        headers (dict): Message headers
        delivery_info (dict): Message delivery information
        reply_to (str): Reply queue name
        correlation_id (str): Message correlation ID
        root_id (str): Root task ID in chain
        parent_id (str): Parent task ID
        group (str): Group ID if part of group
        group_index (int): Position in group
        chord (str): Chord ID if part of chord
        chain (list): Chain information
        hostname (str): Worker hostname
        logfile (str): Worker log file
        loglevel (int): Worker log level
        utc (bool): Use UTC times
        called_directly (bool): Called via apply()
        callbacks (list): Success callbacks
        errbacks (list): Error callbacks
        timelimit (tuple): Time limits (soft, hard)
        origin (str): Message origin
    """

Usage Examples

Basic Application Setup

from celery import Celery

# Create application with Redis broker and backend
app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'  
)

# Configure from object
app.config_from_object({
    'task_serializer': 'json',
    'accept_content': ['json'],
    'result_serializer': 'json',
    'timezone': 'UTC',
    'enable_utc': True,
})

# Auto-discover tasks
app.autodiscover_tasks(['myapp.tasks', 'myapp.utils'])

Task Creation Patterns

# Basic task
@app.task
def add(x, y):
    return x + y

# Bound task with retry logic
@app.task(bind=True, max_retries=3)
def process_data(self, data_id):
    try:
        # Process data
        return process(data_id)
    except Exception as exc:
        # Retry with exponential backoff
        self.retry(countdown=2 ** self.request.retries, exc=exc)

# Shared task for libraries
@shared_task
def send_email(recipient, subject, body):
    # Email sending logic
    pass

# Custom task class
class DatabaseTask(app.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # Custom failure handling
        logger.error(f"Task {task_id} failed: {exc}")

@app.task(base=DatabaseTask)
def update_user(user_id, data):
    # Database operation
    pass

Task Execution

# Synchronous execution  
result = add.apply(args=(4, 4))
print(result)  # 8

# Asynchronous execution
result = add.delay(4, 4)
print(result.get())  # Wait for result: 8

# Advanced async execution
result = add.apply_async(
    args=(4, 4),
    countdown=10,  # Execute in 10 seconds
    expires=60,    # Expire after 60 seconds
    retry=True,
    retry_policy={
        'max_retries': 3,
        'interval_start': 0,
        'interval_step': 0.2,
        'interval_max': 0.2,
    }
)

# Send task by name
result = app.send_task('myapp.tasks.add', args=(4, 4))

Install with Tessl CLI

npx tessl i tessl/pypi-celery

docs

configuration.md

core-application.md

exceptions.md

index.md

results-state.md

scheduling-beat.md

signals-events.md

workflow-primitives.md

tile.json