Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task result handling and state monitoring capabilities for tracking task execution, retrieving results, and managing task lifecycle. These components provide comprehensive result access, state inspection, and task control functionality.
Represents the result of an asynchronously executed task, providing methods to check status, retrieve results, and control task execution.
class AsyncResult:
def __init__(self, id, backend=None, task_name=None, app=None, parent=None):
"""
Create AsyncResult instance.
Args:
id (str): Task ID
backend: Result backend instance
task_name (str): Task name
app: Celery app instance
parent: Parent result
"""
def get(
self,
timeout=None,
propagate=True,
interval=0.5,
no_ack=True,
follow_parents=True,
callback=None,
on_message=None,
on_interval=None,
disable_sync_subtasks=True,
EXCEPTION_STATES=None,
PROPAGATE_STATES=None
):
"""
Get task result, waiting if necessary.
Args:
timeout (float): Maximum time to wait in seconds
propagate (bool): Re-raise task exceptions
interval (float): Polling interval in seconds
no_ack (bool): Don't acknowledge result
follow_parents (bool): Follow parent results
callback (callable): Called with result when ready
on_message (callable): Called for each message received
on_interval (callable): Called on each polling interval
disable_sync_subtasks (bool): Disable synchronous subtasks
Returns:
Task result value
Raises:
TimeoutError: If timeout exceeded
Exception: Task exception if propagate=True
"""
def ready(self):
"""
Check if task has finished executing.
Returns:
bool: True if task complete (success or failure)
"""
def successful(self):
"""
Check if task completed successfully.
Returns:
bool: True if task succeeded, False otherwise
Raises:
ValueError: If task not ready yet
"""
def failed(self):
"""
Check if task execution failed.
Returns:
bool: True if task failed, False otherwise
Raises:
ValueError: If task not ready yet
"""
def retry(self):
"""
Check if task is waiting for retry.
Returns:
bool: True if task will be retried
"""
def revoke(self, connection=None, terminate=False, signal='SIGTERM', wait=False, timeout=None):
"""
Revoke/cancel task execution.
Args:
connection: Broker connection
terminate (bool): Terminate worker process
signal (str): Signal to send if terminating
wait (bool): Wait for termination confirmation
timeout (float): Termination timeout
"""
def forget(self):
"""
Remove result from backend storage.
"""
def build_graph(self, intermediate=False, formatter=None):
"""
Build task dependency graph.
Args:
intermediate (bool): Include intermediate results
formatter (callable): Result formatter function
Returns:
dict: Dependency graph structure
"""
@property
def result(self):
"""Task result value or exception."""
@property
def return_value(self):
"""Alias for result property."""
@property
def state(self):
"""Current task state."""
@property
def status(self):
"""Alias for state property."""
@property
def info(self):
"""Additional state information."""
@property
def traceback(self):
"""Exception traceback if task failed."""
@property
def id(self):
"""Task ID."""
@property
def task_id(self):
"""Alias for id property."""
@property
def name(self):
"""Task name."""
@property
def args(self):
"""Task positional arguments."""
@property
def kwargs(self):
"""Task keyword arguments."""
@property
def backend(self):
"""Result backend instance."""
@property
def children(self):
"""Child task results."""Result collection for group task execution, providing methods to check group completion status and retrieve all results.
class GroupResult:
def __init__(self, id=None, results=None, **kwargs):
"""
Create GroupResult instance.
Args:
id (str): Group ID
results (list): List of AsyncResult instances
"""
def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):
"""
Get results from all tasks in group.
Args:
timeout (float): Maximum time to wait
propagate (bool): Re-raise task exceptions
interval (float): Polling interval
callback (callable): Result callback
no_ack (bool): Don't acknowledge results
Returns:
list: Results from all group tasks
"""
def ready(self):
"""
Check if all group tasks are complete.
Returns:
bool: True if all tasks finished
"""
def successful(self):
"""
Check if all group tasks succeeded.
Returns:
bool: True if all tasks successful
"""
def failed(self):
"""
Check if any group tasks failed.
Returns:
bool: True if any task failed
"""
def waiting(self):
"""
Check if any group tasks are still waiting.
Returns:
bool: True if any tasks not ready
"""
def revoke(self, connection=None, terminate=False, signal='SIGTERM'):
"""
Revoke all tasks in group.
Args:
connection: Broker connection
terminate (bool): Terminate worker processes
signal (str): Termination signal
"""
def forget(self):
"""Remove all results from backend."""
def iterate(self, timeout=None, propagate=True, interval=0.5):
"""
Iterate over results as they become ready.
Args:
timeout (float): Overall timeout
propagate (bool): Re-raise exceptions
interval (float): Polling interval
Yields:
Task results as they complete
"""
@property
def results(self):
"""List of AsyncResult instances."""
@property
def children(self):
"""Alias for results property."""Collection of results that can be managed as a single unit, providing batch operations over multiple AsyncResult instances.
class ResultSet:
def __init__(self, results=None, **kwargs):
"""
Create ResultSet instance.
Args:
results (list): AsyncResult instances
"""
def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):
"""
Get all results in set.
Args:
timeout (float): Maximum wait time
propagate (bool): Re-raise exceptions
interval (float): Polling interval
callback (callable): Result callback
no_ack (bool): Don't acknowledge results
Returns:
list: All results
"""
def ready(self):
"""
Check if all results are ready.
Returns:
bool: True if all complete
"""
def successful(self):
"""
Check if all results are successful.
Returns:
bool: True if all succeeded
"""
def failed(self):
"""
Check if any results failed.
Returns:
bool: True if any failed
"""
def revoke(self, connection=None, terminate=False, signal='SIGTERM'):
"""
Revoke all results.
Args:
connection: Broker connection
terminate (bool): Terminate processes
signal (str): Termination signal
"""
def iterate(self, timeout=None, propagate=True, interval=0.5):
"""
Iterate over results as ready.
Args:
timeout (float): Overall timeout
propagate (bool): Re-raise exceptions
interval (float): Check interval
Yields:
Results as they complete
"""
def add(self, result):
"""
Add result to set.
Args:
result (AsyncResult): Result to add
"""
def remove(self, result):
"""
Remove result from set.
Args:
result (AsyncResult): Result to remove
"""
@property
def results(self):
"""List of AsyncResult instances in set."""State constants and utilities for tracking task execution status and lifecycle.
# State constants
PENDING = 'PENDING' # Task waiting for execution or unknown
RECEIVED = 'RECEIVED' # Task received by worker
STARTED = 'STARTED' # Task started by worker
SUCCESS = 'SUCCESS' # Task executed successfully
FAILURE = 'FAILURE' # Task execution failed
REVOKED = 'REVOKED' # Task revoked/cancelled
RETRY = 'RETRY' # Task will be retried
IGNORED = 'IGNORED' # Task result ignored
# State collections
READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])
EXCEPTION_STATES = frozenset([RETRY, FAILURE, REVOKED])
PROPAGATE_STATES = frozenset([FAILURE, REVOKED])
class state(str):
"""
String subclass with state comparison methods.
"""
def __lt__(self, other):
"""Compare state precedence."""
def __gt__(self, other):
"""Compare state precedence."""Helper functions for working with task states and result objects.
def result_from_tuple(r, app=None):
"""
Create result object from tuple representation.
Args:
r (tuple): Result tuple (task_id, status, result, traceback, children)
app: Celery app instance
Returns:
AsyncResult instance
"""from celery import Celery
app = Celery('example', broker='redis://localhost:6379')
@app.task
def add(x, y):
import time
time.sleep(2) # Simulate work
return x + y
@app.task
def divide(x, y):
if y == 0:
raise ValueError("Cannot divide by zero")
return x / y
# Execute task and get result
result = add.delay(4, 4)
# Check status without blocking
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"State: {result.state}")
# Get result with timeout
try:
value = result.get(timeout=10)
print(f"Result: {value}")
except TimeoutError:
print("Task took too long")
# Check if successful
if result.successful():
print("Task completed successfully")
else:
print("Task failed or not ready")# Task that may fail
result = divide.delay(10, 0)
# Get result with error handling
try:
value = result.get(propagate=True)
print(f"Result: {value}")
except ValueError as exc:
print(f"Task failed: {exc}")
print(f"Traceback: {result.traceback}")
# Get result without propagating errors
value = result.get(propagate=False)
if result.failed():
print(f"Task failed with: {result.result}")
print(f"Info: {result.info}")from celery import group
# Create and execute group
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8)
])
result = job.apply_async()
# Check group status
print(f"Group ready: {result.ready()}")
print(f"Group successful: {result.successful() if result.ready() else 'Not ready'}")
# Get all results
try:
results = result.get(timeout=30)
print(f"All results: {results}") # [4, 8, 16]
except Exception as exc:
print(f"Group failed: {exc}")
# Iterate over results as they complete
print("Results as they complete:")
for task_result in result.iterate(timeout=30):
print(f"Got result: {task_result}")from celery.result import ResultSet
# Create multiple tasks
tasks = [add.delay(i, i) for i in range(5)]
# Create result set
result_set = ResultSet(tasks)
# Batch operations
print(f"All ready: {result_set.ready()}")
# Get results with error handling
results = []
for result in result_set.iterate(timeout=60):
try:
value = result
results.append(value)
print(f"Task completed with result: {value}")
except Exception as exc:
print(f"Task failed: {exc}")
print(f"Final results: {results}")import time
# Start long-running task
result = add.delay(1000000, 2000000)
# Check if we can revoke it
if not result.ready():
print("Revoking task...")
result.revoke(terminate=True)
# Wait a moment and check
time.sleep(1)
if result.state == 'REVOKED':
print("Task was revoked")
# Forget about result
result.forget()import time
def monitor_task(result, name="Task"):
"""Monitor task execution with detailed status."""
print(f"Monitoring {name} (ID: {result.id})")
# Polling loop
while not result.ready():
print(f" Status: {result.state}")
if hasattr(result, 'info') and result.info:
print(f" Info: {result.info}")
time.sleep(1)
# Final status
if result.successful():
print(f" ✓ Completed: {result.result}")
else:
print(f" ✗ Failed: {result.result}")
if result.traceback:
print(f" Traceback: {result.traceback}")
# Monitor task execution
result = add.delay(10, 20)
monitor_task(result, "Addition")@app.task(bind=True)
def task_with_progress(self, total_items):
"""Task that reports progress."""
for i in range(total_items):
# Update task state with progress info
self.update_state(
state='PROGRESS',
meta={'current': i + 1, 'total': total_items}
)
time.sleep(0.1)
return {'status': 'Complete', 'processed': total_items}
# Execute and monitor progress
result = task_with_progress.delay(10)
while not result.ready():
if result.state == 'PROGRESS':
meta = result.info
progress = (meta['current'] / meta['total']) * 100
print(f"Progress: {progress:.1f}% ({meta['current']}/{meta['total']})")
time.sleep(0.5)
print(f"Final result: {result.get()}")from celery import chain
# Create dependent tasks
workflow = chain(
add.s(2, 3), # First task: 2 + 3 = 5
add.s(10), # Second task: 5 + 10 = 15
add.s(5) # Third task: 15 + 5 = 20
)
result = workflow.apply_async()
# The result represents the final task in the chain
print(f"Final result: {result.get()}") # 20
# Access parent results if needed
if hasattr(result, 'parent') and result.parent:
print(f"Parent result: {result.parent.get()}") # 15
if hasattr(result.parent, 'parent') and result.parent.parent:
print(f"Grandparent result: {result.parent.parent.get()}") # 5Install with Tessl CLI
npx tessl i tessl/pypi-celery