CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-celery

Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

workflow-primitives.mddocs/

Workflow Primitives (Canvas)

Canvas workflow primitives enable complex task composition and orchestration patterns. These building blocks allow creating sophisticated distributed workflows including sequential chains, parallel execution, callbacks, and functional programming patterns over tasks.

Capabilities

Signature

Task signature that wraps a task call with its arguments and execution options, forming the foundation for all Canvas workflow patterns.

class Signature:
    def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
        """
        Create task signature.
        
        Args:
            task (str|Task): Task name or task instance
            args (tuple): Positional arguments
            kwargs (dict): Keyword arguments
            options (dict): Execution options
        """

    def apply_async(self, args=None, kwargs=None, **options):
        """
        Execute signature asynchronously.
        
        Args:
            args (tuple): Additional positional arguments
            kwargs (dict): Additional keyword arguments
            
        Returns:
            AsyncResult instance
        """

    def apply(self, args=None, kwargs=None, **options):
        """
        Execute signature synchronously.
        
        Args:
            args (tuple): Additional positional arguments  
            kwargs (dict): Additional keyword arguments
            
        Returns:
            Task result
        """

    def clone(self, args=None, kwargs=None, **opts):
        """
        Create copy with modified arguments or options.
        
        Args:
            args (tuple): New positional arguments
            kwargs (dict): New keyword arguments
            **opts: New execution options
            
        Returns:
            New Signature instance
        """

    def freeze(self, id=None):
        """
        Make signature immutable with optional custom ID.
        
        Args:
            id (str): Custom task ID
            
        Returns:
            Immutable signature
        """

    def set(self, immutable=None, **options):
        """
        Set signature options.
        
        Args:
            immutable (bool): Make signature immutable
            **options: Execution options to set
            
        Returns:
            Self for chaining
        """

    def link(self, callback):
        """
        Add success callback.
        
        Args:
            callback (Signature): Callback signature
            
        Returns:
            Self for chaining
        """

    def link_error(self, errback):
        """
        Add error callback.
        
        Args:
            errback (Signature): Error callback signature
            
        Returns:
            Self for chaining
        """

def signature(task, args=None, kwargs=None, **options):
    """
    Create task signature.
    
    Args:
        task (str|Task): Task name or instance
        args (tuple): Positional arguments
        kwargs (dict): Keyword arguments
        **options: Execution options
        
    Returns:
        Signature instance
    """

Chain

Execute tasks sequentially, passing the result of each task as the first argument to the next task in the chain.

class chain:
    def __init__(self, *tasks, **kwargs):
        """
        Create task chain.
        
        Args:
            *tasks: Task signatures to chain
            **kwargs: Chain options
        """

    def apply_async(self, args=None, kwargs=None, **options):
        """
        Execute chain asynchronously.
        
        Args:
            args (tuple): Arguments for first task
            kwargs (dict): Keyword arguments for first task
            
        Returns:
            AsyncResult for final task
        """

    def apply(self, args=None, kwargs=None, **options):
        """
        Execute chain synchronously.
        
        Args:
            args (tuple): Arguments for first task
            kwargs (dict): Keyword arguments for first task
            
        Returns:
            Final task result
        """

def chain(*tasks, **kwargs):
    """
    Create sequential task chain.
    
    Args:
        *tasks: Task signatures to execute in order
        **kwargs: Chain execution options
        
    Returns:
        chain instance
    """

Group

Execute multiple tasks in parallel, collecting results when all tasks complete.

class group:
    def __init__(self, *tasks, **kwargs):
        """
        Create task group.
        
        Args:
            *tasks: Task signatures to execute in parallel
            **kwargs: Group options
        """

    def apply_async(self, args=None, kwargs=None, **options):
        """
        Execute group asynchronously.
        
        Args:
            args (tuple): Arguments to add to each task
            kwargs (dict): Keyword arguments to add to each task
            
        Returns:
            GroupResult instance
        """

    def apply(self, args=None, kwargs=None, **options):
        """
        Execute group synchronously.
        
        Args:
            args (tuple): Arguments to add to each task
            kwargs (dict): Keyword arguments to add to each task
            
        Returns:
            List of task results
        """

def group(*tasks, **kwargs):
    """
    Create parallel task group.
    
    Args:
        *tasks: Task signatures to execute in parallel
        **kwargs: Group execution options
        
    Returns:
        group instance
    """

Chord

Execute a group of tasks in parallel, then execute a callback task with the results when all tasks in the group complete.

class chord:
    def __init__(self, header, body, **kwargs):
        """
        Create task chord.
        
        Args:
            header: Group of tasks to execute in parallel
            body (Signature): Callback task to execute with results
            **kwargs: Chord options
        """

    def apply_async(self, args=None, kwargs=None, **options):
        """
        Execute chord asynchronously.
        
        Args:
            args (tuple): Arguments for header tasks
            kwargs (dict): Keyword arguments for header tasks
            
        Returns:
            AsyncResult for callback task
        """

    def apply(self, args=None, kwargs=None, **options):
        """
        Execute chord synchronously.
        
        Args:
            args (tuple): Arguments for header tasks
            kwargs (dict): Keyword arguments for header tasks
            
        Returns:
            Callback task result
        """

def chord(header, body, **kwargs):
    """
    Create chord (group + callback).
    
    Args:
        header: Group or list of tasks for parallel execution
        body (Signature): Callback task executed with group results
        **kwargs: Chord execution options
        
    Returns:
        chord instance
    """

Chunks

Split an iterable into chunks and create tasks to process each chunk in parallel.

class chunks:
    def __init__(self, it, n, task):
        """
        Create chunked task processing.
        
        Args:
            it: Iterable to chunk
            n (int): Chunk size
            task (Signature): Task to process each chunk
        """

    def apply_async(self, **options):
        """
        Execute chunks asynchronously.
        
        Returns:
            GroupResult instance
        """

def chunks(it, n, task):
    """
    Split iterable into chunks for parallel processing.
    
    Args:
        it: Iterable to split
        n (int): Size of each chunk
        task (Signature): Task to process each chunk
        
    Returns:
        chunks instance
    """

Map Operations

Functional programming style operations for mapping tasks over iterables.

def xmap(task, it):
    """
    Map task over iterable arguments.
    
    Args:
        task (Signature): Task to map
        it: Iterable of argument tuples
        
    Returns:
        group instance
    """

def xstarmap(task, it):
    """
    Map task over iterable with argument unpacking.
    
    Args:
        task (Signature): Task to map
        it: Iterable of argument tuples to unpack
        
    Returns:
        group instance
    """

Utility Functions

Helper functions for working with signatures and Canvas primitives.

def maybe_signature(d, app=None):
    """
    Convert signature-like object to actual signature.
    
    Args:
        d: Object that might be signature (dict, signature, etc.)
        app: Celery app instance
        
    Returns:
        Signature instance or original object
    """

Usage Examples

Basic Signature Usage

from celery import signature, Celery

app = Celery('example')

@app.task
def add(x, y):
    return x + y

@app.task  
def mul(x, y):
    return x * y

# Create and execute signature
sig = signature('add', args=(2, 3))
result = sig.apply_async()
print(result.get())  # 5

# Using task shortcut methods
sig = add.s(2, 3)  # Equivalent to signature
result = sig()
print(result.get())  # 5

# Immutable signature
sig = add.si(2, 3)  # Won't accept additional arguments

Chain Workflows

from celery import chain

# Sequential processing - result of each becomes first arg of next
workflow = chain(
    add.s(2, 3),     # 2 + 3 = 5
    mul.s(4),        # 5 * 4 = 20  
    add.s(10)        # 20 + 10 = 30
)
result = workflow()
print(result.get())  # 30

# Partial chain application
partial_chain = chain(mul.s(2), add.s(10))
result = partial_chain.apply_async(args=(5,))  # (5 * 2) + 10 = 20
print(result.get())  # 20

Parallel Groups

from celery import group

# Execute tasks in parallel
job = group([
    add.s(2, 2),
    add.s(4, 4), 
    add.s(8, 8),
    add.s(16, 16)
])
result = job.apply_async()

# Get all results
results = result.get()
print(results)  # [4, 8, 16, 32]

# Check completion status
print(result.ready())      # True when all complete
print(result.successful()) # True when all successful

Chord Patterns

from celery import chord

@app.task
def sum_results(numbers):
    return sum(numbers)

# Parallel execution with callback
callback_workflow = chord([
    add.s(2, 2),
    add.s(4, 4),
    add.s(8, 8)
])(sum_results.s())

result = callback_workflow.apply_async()
print(result.get())  # 28 (4 + 8 + 16)

# Nested chord with error handling
try:
    result = chord([
        add.s(1, 1),
        add.s(2, 2)
    ])(mul.s(5)).apply_async()
    
    final_result = result.get()  # (1+1 + 2+2) * 5 = 30
except Exception as exc:
    print(f"Chord failed: {exc}")

Chunked Processing

from celery import chunks

@app.task
def process_batch(items):
    return [item * 2 for item in items]

# Process large dataset in chunks
data = list(range(100))
job = chunks(data, 10, process_batch.s())
result = job.apply_async()

# Get all batch results
batch_results = result.get()
print(len(batch_results))  # 10 batches

Functional Map Operations

from celery import xmap, xstarmap

# Map task over arguments
arguments = [(1, 2), (3, 4), (5, 6)]
job = xmap(add.s(), arguments)
results = job.apply_async().get()
print(results)  # [3, 7, 11]

# Map with argument unpacking
job = xstarmap(add.s(), arguments)  
results = job.apply_async().get()
print(results)  # [3, 7, 11] - same result

# More complex mapping
@app.task
def process_user(user_id, action, **options):
    return f"User {user_id}: {action}"

user_actions = [
    (1, 'login', {'timestamp': '2023-01-01'}),
    (2, 'logout', {'timestamp': '2023-01-02'})
]

job = xstarmap(process_user.s(), user_actions)
results = job.apply_async().get()

Complex Workflow Composition

# Combine multiple patterns
from celery import chain, group, chord

@app.task
def fetch_data(source):
    return f"data_from_{source}"

@app.task  
def process_data(data):
    return f"processed_{data}"

@app.task
def aggregate_results(results):
    return f"aggregated_{len(results)}_items"

# Complex nested workflow
workflow = chain(
    # Step 1: Fetch data from multiple sources in parallel
    group([
        fetch_data.s('db'),
        fetch_data.s('api'),
        fetch_data.s('cache')
    ]),
    
    # Step 2: Process each result and aggregate
    chord(
        group([process_data.s() for _ in range(3)]),
        aggregate_results.s()
    )
)

result = workflow.apply_async()
print(result.get())  # Final aggregated result

Error Handling and Callbacks

from celery import signature

@app.task
def may_fail(x):
    if x < 0:
        raise ValueError("Negative numbers not allowed")
    return x * 2

@app.task
def handle_success(result):
    print(f"Success: {result}")
    return result

@app.task  
def handle_failure(task_id, error, traceback):
    print(f"Task {task_id} failed: {error}")

# Add callbacks to signature
sig = may_fail.s(5)
sig.link(handle_success.s())
sig.link_error(handle_failure.s())

result = sig.apply_async()

Install with Tessl CLI

npx tessl i tessl/pypi-celery

docs

configuration.md

core-application.md

exceptions.md

index.md

results-state.md

scheduling-beat.md

signals-events.md

workflow-primitives.md

tile.json