Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Canvas workflow primitives enable complex task composition and orchestration patterns. These building blocks allow creating sophisticated distributed workflows including sequential chains, parallel execution, callbacks, and functional programming patterns over tasks.
Task signature that wraps a task call with its arguments and execution options, forming the foundation for all Canvas workflow patterns.
class Signature:
def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
"""
Create task signature.
Args:
task (str|Task): Task name or task instance
args (tuple): Positional arguments
kwargs (dict): Keyword arguments
options (dict): Execution options
"""
def apply_async(self, args=None, kwargs=None, **options):
"""
Execute signature asynchronously.
Args:
args (tuple): Additional positional arguments
kwargs (dict): Additional keyword arguments
Returns:
AsyncResult instance
"""
def apply(self, args=None, kwargs=None, **options):
"""
Execute signature synchronously.
Args:
args (tuple): Additional positional arguments
kwargs (dict): Additional keyword arguments
Returns:
Task result
"""
def clone(self, args=None, kwargs=None, **opts):
"""
Create copy with modified arguments or options.
Args:
args (tuple): New positional arguments
kwargs (dict): New keyword arguments
**opts: New execution options
Returns:
New Signature instance
"""
def freeze(self, id=None):
"""
Make signature immutable with optional custom ID.
Args:
id (str): Custom task ID
Returns:
Immutable signature
"""
def set(self, immutable=None, **options):
"""
Set signature options.
Args:
immutable (bool): Make signature immutable
**options: Execution options to set
Returns:
Self for chaining
"""
def link(self, callback):
"""
Add success callback.
Args:
callback (Signature): Callback signature
Returns:
Self for chaining
"""
def link_error(self, errback):
"""
Add error callback.
Args:
errback (Signature): Error callback signature
Returns:
Self for chaining
"""
def signature(task, args=None, kwargs=None, **options):
"""
Create task signature.
Args:
task (str|Task): Task name or instance
args (tuple): Positional arguments
kwargs (dict): Keyword arguments
**options: Execution options
Returns:
Signature instance
"""Execute tasks sequentially, passing the result of each task as the first argument to the next task in the chain.
class chain:
def __init__(self, *tasks, **kwargs):
"""
Create task chain.
Args:
*tasks: Task signatures to chain
**kwargs: Chain options
"""
def apply_async(self, args=None, kwargs=None, **options):
"""
Execute chain asynchronously.
Args:
args (tuple): Arguments for first task
kwargs (dict): Keyword arguments for first task
Returns:
AsyncResult for final task
"""
def apply(self, args=None, kwargs=None, **options):
"""
Execute chain synchronously.
Args:
args (tuple): Arguments for first task
kwargs (dict): Keyword arguments for first task
Returns:
Final task result
"""
def chain(*tasks, **kwargs):
"""
Create sequential task chain.
Args:
*tasks: Task signatures to execute in order
**kwargs: Chain execution options
Returns:
chain instance
"""Execute multiple tasks in parallel, collecting results when all tasks complete.
class group:
def __init__(self, *tasks, **kwargs):
"""
Create task group.
Args:
*tasks: Task signatures to execute in parallel
**kwargs: Group options
"""
def apply_async(self, args=None, kwargs=None, **options):
"""
Execute group asynchronously.
Args:
args (tuple): Arguments to add to each task
kwargs (dict): Keyword arguments to add to each task
Returns:
GroupResult instance
"""
def apply(self, args=None, kwargs=None, **options):
"""
Execute group synchronously.
Args:
args (tuple): Arguments to add to each task
kwargs (dict): Keyword arguments to add to each task
Returns:
List of task results
"""
def group(*tasks, **kwargs):
"""
Create parallel task group.
Args:
*tasks: Task signatures to execute in parallel
**kwargs: Group execution options
Returns:
group instance
"""Execute a group of tasks in parallel, then execute a callback task with the results when all tasks in the group complete.
class chord:
def __init__(self, header, body, **kwargs):
"""
Create task chord.
Args:
header: Group of tasks to execute in parallel
body (Signature): Callback task to execute with results
**kwargs: Chord options
"""
def apply_async(self, args=None, kwargs=None, **options):
"""
Execute chord asynchronously.
Args:
args (tuple): Arguments for header tasks
kwargs (dict): Keyword arguments for header tasks
Returns:
AsyncResult for callback task
"""
def apply(self, args=None, kwargs=None, **options):
"""
Execute chord synchronously.
Args:
args (tuple): Arguments for header tasks
kwargs (dict): Keyword arguments for header tasks
Returns:
Callback task result
"""
def chord(header, body, **kwargs):
"""
Create chord (group + callback).
Args:
header: Group or list of tasks for parallel execution
body (Signature): Callback task executed with group results
**kwargs: Chord execution options
Returns:
chord instance
"""Split an iterable into chunks and create tasks to process each chunk in parallel.
class chunks:
def __init__(self, it, n, task):
"""
Create chunked task processing.
Args:
it: Iterable to chunk
n (int): Chunk size
task (Signature): Task to process each chunk
"""
def apply_async(self, **options):
"""
Execute chunks asynchronously.
Returns:
GroupResult instance
"""
def chunks(it, n, task):
"""
Split iterable into chunks for parallel processing.
Args:
it: Iterable to split
n (int): Size of each chunk
task (Signature): Task to process each chunk
Returns:
chunks instance
"""Functional programming style operations for mapping tasks over iterables.
def xmap(task, it):
"""
Map task over iterable arguments.
Args:
task (Signature): Task to map
it: Iterable of argument tuples
Returns:
group instance
"""
def xstarmap(task, it):
"""
Map task over iterable with argument unpacking.
Args:
task (Signature): Task to map
it: Iterable of argument tuples to unpack
Returns:
group instance
"""Helper functions for working with signatures and Canvas primitives.
def maybe_signature(d, app=None):
"""
Convert signature-like object to actual signature.
Args:
d: Object that might be signature (dict, signature, etc.)
app: Celery app instance
Returns:
Signature instance or original object
"""from celery import signature, Celery
app = Celery('example')
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
# Create and execute signature
sig = signature('add', args=(2, 3))
result = sig.apply_async()
print(result.get()) # 5
# Using task shortcut methods
sig = add.s(2, 3) # Equivalent to signature
result = sig()
print(result.get()) # 5
# Immutable signature
sig = add.si(2, 3) # Won't accept additional argumentsfrom celery import chain
# Sequential processing - result of each becomes first arg of next
workflow = chain(
add.s(2, 3), # 2 + 3 = 5
mul.s(4), # 5 * 4 = 20
add.s(10) # 20 + 10 = 30
)
result = workflow()
print(result.get()) # 30
# Partial chain application
partial_chain = chain(mul.s(2), add.s(10))
result = partial_chain.apply_async(args=(5,)) # (5 * 2) + 10 = 20
print(result.get()) # 20from celery import group
# Execute tasks in parallel
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16)
])
result = job.apply_async()
# Get all results
results = result.get()
print(results) # [4, 8, 16, 32]
# Check completion status
print(result.ready()) # True when all complete
print(result.successful()) # True when all successfulfrom celery import chord
@app.task
def sum_results(numbers):
return sum(numbers)
# Parallel execution with callback
callback_workflow = chord([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8)
])(sum_results.s())
result = callback_workflow.apply_async()
print(result.get()) # 28 (4 + 8 + 16)
# Nested chord with error handling
try:
result = chord([
add.s(1, 1),
add.s(2, 2)
])(mul.s(5)).apply_async()
final_result = result.get() # (1+1 + 2+2) * 5 = 30
except Exception as exc:
print(f"Chord failed: {exc}")from celery import chunks
@app.task
def process_batch(items):
return [item * 2 for item in items]
# Process large dataset in chunks
data = list(range(100))
job = chunks(data, 10, process_batch.s())
result = job.apply_async()
# Get all batch results
batch_results = result.get()
print(len(batch_results)) # 10 batchesfrom celery import xmap, xstarmap
# Map task over arguments
arguments = [(1, 2), (3, 4), (5, 6)]
job = xmap(add.s(), arguments)
results = job.apply_async().get()
print(results) # [3, 7, 11]
# Map with argument unpacking
job = xstarmap(add.s(), arguments)
results = job.apply_async().get()
print(results) # [3, 7, 11] - same result
# More complex mapping
@app.task
def process_user(user_id, action, **options):
return f"User {user_id}: {action}"
user_actions = [
(1, 'login', {'timestamp': '2023-01-01'}),
(2, 'logout', {'timestamp': '2023-01-02'})
]
job = xstarmap(process_user.s(), user_actions)
results = job.apply_async().get()# Combine multiple patterns
from celery import chain, group, chord
@app.task
def fetch_data(source):
return f"data_from_{source}"
@app.task
def process_data(data):
return f"processed_{data}"
@app.task
def aggregate_results(results):
return f"aggregated_{len(results)}_items"
# Complex nested workflow
workflow = chain(
# Step 1: Fetch data from multiple sources in parallel
group([
fetch_data.s('db'),
fetch_data.s('api'),
fetch_data.s('cache')
]),
# Step 2: Process each result and aggregate
chord(
group([process_data.s() for _ in range(3)]),
aggregate_results.s()
)
)
result = workflow.apply_async()
print(result.get()) # Final aggregated resultfrom celery import signature
@app.task
def may_fail(x):
if x < 0:
raise ValueError("Negative numbers not allowed")
return x * 2
@app.task
def handle_success(result):
print(f"Success: {result}")
return result
@app.task
def handle_failure(task_id, error, traceback):
print(f"Task {task_id} failed: {error}")
# Add callbacks to signature
sig = may_fail.s(5)
sig.link(handle_success.s())
sig.link_error(handle_failure.s())
result = sig.apply_async()Install with Tessl CLI
npx tessl i tessl/pypi-celery