CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-celery

Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

results-state.mddocs/

Results and State Management

Task result handling and state monitoring capabilities for tracking task execution, retrieving results, and managing task lifecycle. These components provide comprehensive result access, state inspection, and task control functionality.

Capabilities

AsyncResult

Represents the result of an asynchronously executed task, providing methods to check status, retrieve results, and control task execution.

class AsyncResult:
    def __init__(self, id, backend=None, task_name=None, app=None, parent=None):
        """
        Create AsyncResult instance.
        
        Args:
            id (str): Task ID
            backend: Result backend instance
            task_name (str): Task name
            app: Celery app instance
            parent: Parent result
        """

    def get(
        self, 
        timeout=None, 
        propagate=True, 
        interval=0.5, 
        no_ack=True, 
        follow_parents=True,
        callback=None,
        on_message=None,
        on_interval=None,
        disable_sync_subtasks=True,
        EXCEPTION_STATES=None,
        PROPAGATE_STATES=None
    ):
        """
        Get task result, waiting if necessary.
        
        Args:
            timeout (float): Maximum time to wait in seconds
            propagate (bool): Re-raise task exceptions
            interval (float): Polling interval in seconds
            no_ack (bool): Don't acknowledge result
            follow_parents (bool): Follow parent results
            callback (callable): Called with result when ready
            on_message (callable): Called for each message received
            on_interval (callable): Called on each polling interval
            disable_sync_subtasks (bool): Disable synchronous subtasks
            
        Returns:
            Task result value
            
        Raises:
            TimeoutError: If timeout exceeded
            Exception: Task exception if propagate=True
        """

    def ready(self):
        """
        Check if task has finished executing.
        
        Returns:
            bool: True if task complete (success or failure)
        """

    def successful(self):
        """
        Check if task completed successfully.
        
        Returns:
            bool: True if task succeeded, False otherwise
            
        Raises:
            ValueError: If task not ready yet
        """

    def failed(self):
        """
        Check if task execution failed.
        
        Returns:
            bool: True if task failed, False otherwise
            
        Raises:
            ValueError: If task not ready yet
        """

    def retry(self):
        """
        Check if task is waiting for retry.
        
        Returns:
            bool: True if task will be retried
        """

    def revoke(self, connection=None, terminate=False, signal='SIGTERM', wait=False, timeout=None):
        """
        Revoke/cancel task execution.
        
        Args:
            connection: Broker connection
            terminate (bool): Terminate worker process
            signal (str): Signal to send if terminating
            wait (bool): Wait for termination confirmation
            timeout (float): Termination timeout
        """

    def forget(self):
        """
        Remove result from backend storage.
        """

    def build_graph(self, intermediate=False, formatter=None):
        """
        Build task dependency graph.
        
        Args:
            intermediate (bool): Include intermediate results
            formatter (callable): Result formatter function
            
        Returns:
            dict: Dependency graph structure
        """

    @property
    def result(self):
        """Task result value or exception."""

    @property
    def return_value(self):
        """Alias for result property."""

    @property
    def state(self):
        """Current task state."""

    @property
    def status(self):
        """Alias for state property."""

    @property
    def info(self):
        """Additional state information."""

    @property
    def traceback(self):
        """Exception traceback if task failed."""

    @property
    def id(self):
        """Task ID."""

    @property
    def task_id(self):
        """Alias for id property."""

    @property
    def name(self):
        """Task name."""

    @property
    def args(self):
        """Task positional arguments."""

    @property
    def kwargs(self):
        """Task keyword arguments."""

    @property
    def backend(self):
        """Result backend instance."""

    @property
    def children(self):
        """Child task results."""

GroupResult

Result collection for group task execution, providing methods to check group completion status and retrieve all results.

class GroupResult:
    def __init__(self, id=None, results=None, **kwargs):
        """
        Create GroupResult instance.
        
        Args:
            id (str): Group ID
            results (list): List of AsyncResult instances
        """

    def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):
        """
        Get results from all tasks in group.
        
        Args:
            timeout (float): Maximum time to wait
            propagate (bool): Re-raise task exceptions  
            interval (float): Polling interval
            callback (callable): Result callback
            no_ack (bool): Don't acknowledge results
            
        Returns:
            list: Results from all group tasks
        """

    def ready(self):
        """
        Check if all group tasks are complete.
        
        Returns:
            bool: True if all tasks finished
        """

    def successful(self):
        """
        Check if all group tasks succeeded.
        
        Returns:
            bool: True if all tasks successful
        """

    def failed(self):
        """
        Check if any group tasks failed.
        
        Returns:
            bool: True if any task failed
        """

    def waiting(self):
        """
        Check if any group tasks are still waiting.
        
        Returns:
            bool: True if any tasks not ready
        """

    def revoke(self, connection=None, terminate=False, signal='SIGTERM'):
        """
        Revoke all tasks in group.
        
        Args:
            connection: Broker connection
            terminate (bool): Terminate worker processes
            signal (str): Termination signal
        """

    def forget(self):
        """Remove all results from backend."""

    def iterate(self, timeout=None, propagate=True, interval=0.5):
        """
        Iterate over results as they become ready.
        
        Args:
            timeout (float): Overall timeout
            propagate (bool): Re-raise exceptions
            interval (float): Polling interval
            
        Yields:
            Task results as they complete
        """

    @property
    def results(self):
        """List of AsyncResult instances."""

    @property
    def children(self):
        """Alias for results property."""

ResultSet

Collection of results that can be managed as a single unit, providing batch operations over multiple AsyncResult instances.

class ResultSet:
    def __init__(self, results=None, **kwargs):
        """
        Create ResultSet instance.
        
        Args:
            results (list): AsyncResult instances
        """

    def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):
        """
        Get all results in set.
        
        Args:
            timeout (float): Maximum wait time
            propagate (bool): Re-raise exceptions
            interval (float): Polling interval  
            callback (callable): Result callback
            no_ack (bool): Don't acknowledge results
            
        Returns:
            list: All results
        """

    def ready(self):
        """
        Check if all results are ready.
        
        Returns:
            bool: True if all complete
        """

    def successful(self):
        """
        Check if all results are successful.
        
        Returns:
            bool: True if all succeeded
        """

    def failed(self):
        """
        Check if any results failed.
        
        Returns:
            bool: True if any failed
        """

    def revoke(self, connection=None, terminate=False, signal='SIGTERM'):
        """
        Revoke all results.
        
        Args:
            connection: Broker connection
            terminate (bool): Terminate processes
            signal (str): Termination signal
        """

    def iterate(self, timeout=None, propagate=True, interval=0.5):
        """
        Iterate over results as ready.
        
        Args:
            timeout (float): Overall timeout
            propagate (bool): Re-raise exceptions
            interval (float): Check interval
            
        Yields:
            Results as they complete
        """

    def add(self, result):
        """
        Add result to set.
        
        Args:
            result (AsyncResult): Result to add
        """

    def remove(self, result):
        """
        Remove result from set.
        
        Args:
            result (AsyncResult): Result to remove
        """

    @property
    def results(self):
        """List of AsyncResult instances in set."""

Task States

State constants and utilities for tracking task execution status and lifecycle.

# State constants
PENDING = 'PENDING'        # Task waiting for execution or unknown
RECEIVED = 'RECEIVED'      # Task received by worker
STARTED = 'STARTED'        # Task started by worker
SUCCESS = 'SUCCESS'        # Task executed successfully  
FAILURE = 'FAILURE'        # Task execution failed
REVOKED = 'REVOKED'        # Task revoked/cancelled
RETRY = 'RETRY'           # Task will be retried
IGNORED = 'IGNORED'       # Task result ignored

# State collections
READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])
EXCEPTION_STATES = frozenset([RETRY, FAILURE, REVOKED])
PROPAGATE_STATES = frozenset([FAILURE, REVOKED])

class state(str):
    """
    String subclass with state comparison methods.
    """
    
    def __lt__(self, other):
        """Compare state precedence."""
    
    def __gt__(self, other):  
        """Compare state precedence."""

State Utilities

Helper functions for working with task states and result objects.

def result_from_tuple(r, app=None):
    """
    Create result object from tuple representation.
    
    Args:
        r (tuple): Result tuple (task_id, status, result, traceback, children)
        app: Celery app instance
        
    Returns:
        AsyncResult instance
    """

Usage Examples

Basic Result Handling

from celery import Celery

app = Celery('example', broker='redis://localhost:6379')

@app.task
def add(x, y):
    import time
    time.sleep(2)  # Simulate work
    return x + y

@app.task
def divide(x, y):
    if y == 0:
        raise ValueError("Cannot divide by zero")
    return x / y

# Execute task and get result
result = add.delay(4, 4)

# Check status without blocking
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"State: {result.state}")

# Get result with timeout
try:
    value = result.get(timeout=10)
    print(f"Result: {value}")
except TimeoutError:
    print("Task took too long")

# Check if successful
if result.successful():
    print("Task completed successfully")
else:
    print("Task failed or not ready")

Error Handling and Propagation

# Task that may fail
result = divide.delay(10, 0)

# Get result with error handling
try:
    value = result.get(propagate=True)
    print(f"Result: {value}")
except ValueError as exc:
    print(f"Task failed: {exc}")
    print(f"Traceback: {result.traceback}")

# Get result without propagating errors
value = result.get(propagate=False)
if result.failed():
    print(f"Task failed with: {result.result}")
    print(f"Info: {result.info}")

Group Results

from celery import group

# Create and execute group
job = group([
    add.s(2, 2),
    add.s(4, 4),
    add.s(8, 8)
])
result = job.apply_async()

# Check group status
print(f"Group ready: {result.ready()}")
print(f"Group successful: {result.successful() if result.ready() else 'Not ready'}")

# Get all results
try:
    results = result.get(timeout=30)
    print(f"All results: {results}")  # [4, 8, 16]
except Exception as exc:
    print(f"Group failed: {exc}")

# Iterate over results as they complete
print("Results as they complete:")
for task_result in result.iterate(timeout=30):
    print(f"Got result: {task_result}")

ResultSet Operations

from celery.result import ResultSet

# Create multiple tasks
tasks = [add.delay(i, i) for i in range(5)]

# Create result set
result_set = ResultSet(tasks)

# Batch operations
print(f"All ready: {result_set.ready()}")

# Get results with error handling
results = []
for result in result_set.iterate(timeout=60):
    try:
        value = result
        results.append(value)
        print(f"Task completed with result: {value}")
    except Exception as exc:
        print(f"Task failed: {exc}")

print(f"Final results: {results}")

Task Revocation and Control

import time

# Start long-running task
result = add.delay(1000000, 2000000)

# Check if we can revoke it
if not result.ready():
    print("Revoking task...")
    result.revoke(terminate=True)
    
    # Wait a moment and check
    time.sleep(1)
    if result.state == 'REVOKED':
        print("Task was revoked")

# Forget about result
result.forget()

Advanced Result Monitoring

import time

def monitor_task(result, name="Task"):
    """Monitor task execution with detailed status."""
    
    print(f"Monitoring {name} (ID: {result.id})")
    
    # Polling loop
    while not result.ready():
        print(f"  Status: {result.state}")
        if hasattr(result, 'info') and result.info:
            print(f"  Info: {result.info}")
        time.sleep(1)
    
    # Final status
    if result.successful():
        print(f"  ✓ Completed: {result.result}")
    else:
        print(f"  ✗ Failed: {result.result}")
        if result.traceback:
            print(f"  Traceback: {result.traceback}")

# Monitor task execution
result = add.delay(10, 20)
monitor_task(result, "Addition")

Working with Task Metadata

@app.task(bind=True)
def task_with_progress(self, total_items):
    """Task that reports progress."""
    
    for i in range(total_items):
        # Update task state with progress info
        self.update_state(
            state='PROGRESS',
            meta={'current': i + 1, 'total': total_items}
        )
        time.sleep(0.1)
    
    return {'status': 'Complete', 'processed': total_items}

# Execute and monitor progress
result = task_with_progress.delay(10)

while not result.ready():
    if result.state == 'PROGRESS':
        meta = result.info
        progress = (meta['current'] / meta['total']) * 100
        print(f"Progress: {progress:.1f}% ({meta['current']}/{meta['total']})")
    time.sleep(0.5)

print(f"Final result: {result.get()}")

Handling Task Dependencies

from celery import chain

# Create dependent tasks
workflow = chain(
    add.s(2, 3),     # First task: 2 + 3 = 5
    add.s(10),       # Second task: 5 + 10 = 15
    add.s(5)         # Third task: 15 + 5 = 20  
)

result = workflow.apply_async()

# The result represents the final task in the chain
print(f"Final result: {result.get()}")  # 20

# Access parent results if needed
if hasattr(result, 'parent') and result.parent:
    print(f"Parent result: {result.parent.get()}")  # 15
    if hasattr(result.parent, 'parent') and result.parent.parent:
        print(f"Grandparent result: {result.parent.parent.get()}")  # 5

Install with Tessl CLI

npx tessl i tessl/pypi-celery

docs

configuration.md

core-application.md

exceptions.md

index.md

results-state.md

scheduling-beat.md

signals-events.md

workflow-primitives.md

tile.json