CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

composition.mddocs/

Composition

Task composition in Dramatiq enables complex workflows by chaining tasks sequentially with pipelines or executing multiple tasks in parallel with groups. This allows building sophisticated data processing workflows from simple actor building blocks.

Capabilities

Pipeline

Sequential task execution where each task's output becomes the next task's input, creating processing chains.

class pipeline:
    def __init__(self, children: Iterable[Message | pipeline], *, broker: Broker = None):
        """
        Create a pipeline from message objects or other pipelines.
        
        Parameters:
        - children: Iterable of Message objects or pipeline objects
        - broker: Broker instance (uses global broker if None)
        """
    
    def run(self, *, delay: int = None) -> pipeline:
        """
        Execute the pipeline by sending all messages to the broker.
        
        Parameters:
        - delay: Delay in milliseconds before starting execution
        
        Returns:
        Self (for method chaining)
        """
    
    def get_result(self, *, block: bool = False, timeout: int = None):
        """
        Get the result of the final task in the pipeline.
        
        Parameters:
        - block: Whether to block waiting for result
        - timeout: Timeout in milliseconds when blocking
        
        Returns:
        Result of the last task in the pipeline
        
        Raises:
        ResultMissing: If result is not available
        ResultTimeout: If timeout exceeded while blocking
        """
    
    def get_results(self, *, block: bool = False, timeout: int = None) -> List:
        """
        Get results from all tasks in the pipeline.
        
        Parameters:
        - block: Whether to block waiting for results
        - timeout: Timeout in milliseconds when blocking
        
        Returns:
        List of results from all pipeline tasks
        """
    
    def __or__(self, other) -> pipeline:
        """
        Chain this pipeline with another message or pipeline using | operator.
        
        Parameters:
        - other: Message or pipeline to chain with
        
        Returns:
        New pipeline containing both sequences
        """
    
    def __len__(self) -> int:
        """
        Get the number of tasks in the pipeline.
        
        Returns:
        Number of messages in the pipeline
        """
    
    # Properties
    completed: bool              # True if all tasks completed
    completed_count: int         # Number of completed tasks
    messages: List[Message]      # List of pipeline messages

Usage:

@dramatiq.actor
def fetch_data(url):
    """Fetch data from URL"""
    return {"data": f"content from {url}", "size": 1024}

@dramatiq.actor
def process_data(data_info):
    """Process the fetched data"""
    processed = data_info["data"].upper()
    return {"processed": processed, "original_size": data_info["size"]}

@dramatiq.actor
def save_data(processed_info):
    """Save processed data"""
    print(f"Saving: {processed_info['processed']}")
    return {"saved": True, "id": "12345"}

# Create pipeline using | operator
pipeline = (
    fetch_data.message("https://api.example.com/data") |
    process_data.message() |  # Will receive output from previous task
    save_data.message()
)

# Execute pipeline
pipeline.run()

# Get final result (blocking)
final_result = pipeline.get_result(block=True, timeout=30000)
print(f"Pipeline result: {final_result}")

# Get all results
all_results = pipeline.get_results(block=True)
print(f"All results: {all_results}")

Group

Parallel task execution where multiple tasks run concurrently and can be synchronized.

class group:
    def __init__(self, children: Iterable[Message], *, broker: Broker = None):
        """
        Create a group from message objects.
        
        Parameters:
        - children: Iterable of Message objects
        - broker: Broker instance (uses global broker if None)
        """
    
    def run(self, *, delay: int = None) -> group:
        """
        Execute the group by sending all messages to the broker.
        
        Parameters:
        - delay: Delay in milliseconds before starting execution
        
        Returns:
        Self (for method chaining)
        """
    
    def get_results(self, *, block: bool = False, timeout: int = None) -> List:
        """
        Get results from all tasks in the group.
        
        Parameters:
        - block: Whether to block waiting for all results
        - timeout: Timeout in milliseconds when blocking
        
        Returns:
        List of results from all group tasks
        
        Raises:
        ResultTimeout: If timeout exceeded while blocking
        """
    
    def wait(self, *, timeout: int = None):
        """
        Wait for all tasks in the group to complete.
        
        Parameters:
        - timeout: Timeout in milliseconds
        
        Raises:
        ResultTimeout: If timeout exceeded
        """
    
    def add_completion_callback(self, message: Message):
        """
        Add a callback to be executed when all group tasks complete.
        
        Parameters:
        - message: Message to execute as completion callback
        """
    
    def __len__(self) -> int:
        """
        Get the number of tasks in the group.
        
        Returns:
        Number of messages in the group
        """
    
    # Properties
    completed: bool              # True if all tasks completed
    completed_count: int         # Number of completed tasks

Usage:

@dramatiq.actor
def process_item(item_id, item_data):
    """Process individual item"""
    print(f"Processing item {item_id}: {item_data}")
    return {"id": item_id, "processed": True, "result": item_data * 2}

@dramatiq.actor
def group_completion_handler(group_id):
    """Handle group completion"""
    print(f"Group {group_id} completed!")

# Create group of parallel tasks
items = [
    {"id": 1, "data": 10},
    {"id": 2, "data": 20},
    {"id": 3, "data": 30},
    {"id": 4, "data": 40}
]

task_group = group([
    process_item.message(item["id"], item["data"])
    for item in items
])

# Add completion callback
task_group.add_completion_callback(
    group_completion_handler.message("batch_001")
)

# Execute group
task_group.run()

# Wait for completion
task_group.wait(timeout=60000)  # 1 minute timeout

# Get all results
results = task_group.get_results(block=True)
print(f"Group results: {results}")

Advanced Composition Patterns

Mixed Pipeline and Group Composition

@dramatiq.actor
def fetch_urls(urls):
    """Fetch multiple URLs"""
    return [{"url": url, "data": f"content from {url}"} for url in urls]

@dramatiq.actor
def process_single_url(url_data):
    """Process single URL data"""
    return {
        "url": url_data["url"],
        "processed": url_data["data"].upper(),
        "length": len(url_data["data"])
    }

@dramatiq.actor
def aggregate_results(results):
    """Aggregate all processed results"""
    total_length = sum(r["length"] for r in results)
    return {"total_items": len(results), "total_length": total_length}

# Complex composition: pipeline -> group -> pipeline
urls = ["https://api1.com", "https://api2.com", "https://api3.com"]

# Step 1: Fetch all URLs (single task)
fetch_step = fetch_urls.message(urls)

# Step 2: Process each URL in parallel (group)
# Note: This would require custom logic to split fetch results into group
# For demonstration, we'll create the group directly
process_group = group([
    process_single_url.message({"url": url, "data": f"content from {url}"})
    for url in urls
])

# Step 3: Aggregate results (single task)
aggregate_step = aggregate_results.message()

# Execute steps sequentially
fetch_step.send()
# Wait for fetch to complete, then create group with actual data
process_group.run()
process_group.wait()
aggregate_step.send()

Pipeline with Error Handling

@dramatiq.actor
def safe_fetch(url):
    """Fetch with error handling"""
    try:
        # Simulate fetch operation
        if "error" in url:
            raise ValueError("Simulated fetch error")
        return {"url": url, "data": f"content from {url}", "success": True}
    except Exception as e:
        return {"url": url, "error": str(e), "success": False}

@dramatiq.actor  
def process_or_skip(fetch_result):
    """Process successful fetches, skip errors"""
    if fetch_result["success"]:
        return {
            "processed": fetch_result["data"].upper(),
            "original_url": fetch_result["url"]
        }
    else:
        print(f"Skipping failed fetch: {fetch_result['error']}")
        return {"skipped": True, "error": fetch_result["error"]}

@dramatiq.actor
def finalize_result(process_result):
    """Finalize the processing result"""
    if process_result.get("skipped"):
        return {"status": "failed", "reason": process_result["error"]}
    else:
        return {
            "status": "success", 
            "result": process_result["processed"],
            "url": process_result["original_url"]
        }

# Error-resilient pipeline
error_pipeline = (
    safe_fetch.message("https://error.example.com/data") |
    process_or_skip.message() |
    finalize_result.message()
)

error_pipeline.run()
result = error_pipeline.get_result(block=True)
print(f"Pipeline handled error gracefully: {result}")

Dynamic Group Creation

@dramatiq.actor
def create_tasks_for_batch(batch_data):
    """Dynamically create tasks based on batch data"""
    tasks = []
    for item in batch_data["items"]:
        if item["type"] == "email":
            tasks.append(send_email.message(item["to"], item["subject"], item["body"]))
        elif item["type"] == "sms":
            tasks.append(send_sms.message(item["to"], item["message"]))
        elif item["type"] == "push":
            tasks.append(send_push.message(item["device_id"], item["message"]))
    
    # Create and run group
    notification_group = group(tasks)
    notification_group.run()
    
    return {"batch_id": batch_data["batch_id"], "task_count": len(tasks)}

# Usage
batch_data = {
    "batch_id": "batch_123",
    "items": [
        {"type": "email", "to": "user1@example.com", "subject": "Hello", "body": "Message"},
        {"type": "sms", "to": "+1234567890", "message": "Hello via SMS"},
        {"type": "push", "device_id": "device123", "message": "Hello via push"}
    ]
}

create_tasks_for_batch.send(batch_data)

Conditional Pipeline Execution

@dramatiq.actor
def check_condition(data):
    """Check if pipeline should continue"""
    return {"continue": data["value"] > 10, "data": data}

@dramatiq.actor
def conditional_processor(check_result):
    """Process only if condition was met"""
    if check_result["continue"]:
        return {"processed": check_result["data"]["value"] * 2}
    else:
        return {"skipped": True, "reason": "Condition not met"}

@dramatiq.actor
def final_handler(process_result):
    """Handle final result regardless of path taken"""
    if process_result.get("skipped"):
        return {"status": "skipped", "reason": process_result["reason"]}
    else:
        return {"status": "completed", "result": process_result["processed"]}

# Conditional pipeline
conditional_pipeline = (
    check_condition.message({"value": 5}) |  # Will not meet condition
    conditional_processor.message() |
    final_handler.message()
)

conditional_pipeline.run()
result = conditional_pipeline.get_result(block=True)
print(f"Conditional result: {result}")

Composition with Results Storage

When using the Results middleware, composition objects can retrieve and work with stored results:

# Enable results storage
from dramatiq.middleware import Results
from dramatiq.results.backends import RedisBackend

result_backend = RedisBackend()
results_middleware = Results(backend=result_backend, store_results=True)
broker.add_middleware(results_middleware)

@dramatiq.actor(store_results=True)
def data_processor(data):
    return {"processed": data, "timestamp": time.time()}

@dramatiq.actor(store_results=True)
def data_validator(processed_data):
    return {"valid": True, "data": processed_data}

# Pipeline with result storage
result_pipeline = (
    data_processor.message({"input": "test_data"}) |
    data_validator.message()
)

result_pipeline.run()

# Get individual step results
step_results = result_pipeline.get_results(block=True, timeout=30000)
print(f"Each step result: {step_results}")

# Get final result
final_result = result_pipeline.get_result(block=True)
print(f"Final result: {final_result}")

Composition Monitoring

import time

def monitor_composition(composition, name):
    """Monitor composition progress"""
    print(f"Starting {name} with {len(composition)} tasks")
    
    start_time = time.time()
    while not composition.completed:
        elapsed = time.time() - start_time
        print(f"{name}: {composition.completed_count}/{len(composition)} completed ({elapsed:.1f}s)")
        time.sleep(1.0)
    
    total_time = time.time() - start_time
    print(f"{name} completed in {total_time:.1f}s")

# Usage with monitoring
large_group = group([
    process_item.message(i, f"data_{i}")
    for i in range(100)
])

large_group.run()
monitor_composition(large_group, "Large Group Processing")

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json