Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
Task composition in Dramatiq enables complex workflows by chaining tasks sequentially with pipelines or executing multiple tasks in parallel with groups. This allows building sophisticated data processing workflows from simple actor building blocks.
Sequential task execution where each task's output becomes the next task's input, creating processing chains.
class pipeline:
def __init__(self, children: Iterable[Message | pipeline], *, broker: Broker = None):
"""
Create a pipeline from message objects or other pipelines.
Parameters:
- children: Iterable of Message objects or pipeline objects
- broker: Broker instance (uses global broker if None)
"""
def run(self, *, delay: int = None) -> pipeline:
"""
Execute the pipeline by sending all messages to the broker.
Parameters:
- delay: Delay in milliseconds before starting execution
Returns:
Self (for method chaining)
"""
def get_result(self, *, block: bool = False, timeout: int = None):
"""
Get the result of the final task in the pipeline.
Parameters:
- block: Whether to block waiting for result
- timeout: Timeout in milliseconds when blocking
Returns:
Result of the last task in the pipeline
Raises:
ResultMissing: If result is not available
ResultTimeout: If timeout exceeded while blocking
"""
def get_results(self, *, block: bool = False, timeout: int = None) -> List:
"""
Get results from all tasks in the pipeline.
Parameters:
- block: Whether to block waiting for results
- timeout: Timeout in milliseconds when blocking
Returns:
List of results from all pipeline tasks
"""
def __or__(self, other) -> pipeline:
"""
Chain this pipeline with another message or pipeline using | operator.
Parameters:
- other: Message or pipeline to chain with
Returns:
New pipeline containing both sequences
"""
def __len__(self) -> int:
"""
Get the number of tasks in the pipeline.
Returns:
Number of messages in the pipeline
"""
# Properties
completed: bool # True if all tasks completed
completed_count: int # Number of completed tasks
messages: List[Message] # List of pipeline messagesUsage:
@dramatiq.actor
def fetch_data(url):
"""Fetch data from URL"""
return {"data": f"content from {url}", "size": 1024}
@dramatiq.actor
def process_data(data_info):
"""Process the fetched data"""
processed = data_info["data"].upper()
return {"processed": processed, "original_size": data_info["size"]}
@dramatiq.actor
def save_data(processed_info):
"""Save processed data"""
print(f"Saving: {processed_info['processed']}")
return {"saved": True, "id": "12345"}
# Create pipeline using | operator
pipeline = (
fetch_data.message("https://api.example.com/data") |
process_data.message() | # Will receive output from previous task
save_data.message()
)
# Execute pipeline
pipeline.run()
# Get final result (blocking)
final_result = pipeline.get_result(block=True, timeout=30000)
print(f"Pipeline result: {final_result}")
# Get all results
all_results = pipeline.get_results(block=True)
print(f"All results: {all_results}")Parallel task execution where multiple tasks run concurrently and can be synchronized.
class group:
def __init__(self, children: Iterable[Message], *, broker: Broker = None):
"""
Create a group from message objects.
Parameters:
- children: Iterable of Message objects
- broker: Broker instance (uses global broker if None)
"""
def run(self, *, delay: int = None) -> group:
"""
Execute the group by sending all messages to the broker.
Parameters:
- delay: Delay in milliseconds before starting execution
Returns:
Self (for method chaining)
"""
def get_results(self, *, block: bool = False, timeout: int = None) -> List:
"""
Get results from all tasks in the group.
Parameters:
- block: Whether to block waiting for all results
- timeout: Timeout in milliseconds when blocking
Returns:
List of results from all group tasks
Raises:
ResultTimeout: If timeout exceeded while blocking
"""
def wait(self, *, timeout: int = None):
"""
Wait for all tasks in the group to complete.
Parameters:
- timeout: Timeout in milliseconds
Raises:
ResultTimeout: If timeout exceeded
"""
def add_completion_callback(self, message: Message):
"""
Add a callback to be executed when all group tasks complete.
Parameters:
- message: Message to execute as completion callback
"""
def __len__(self) -> int:
"""
Get the number of tasks in the group.
Returns:
Number of messages in the group
"""
# Properties
completed: bool # True if all tasks completed
completed_count: int # Number of completed tasksUsage:
@dramatiq.actor
def process_item(item_id, item_data):
"""Process individual item"""
print(f"Processing item {item_id}: {item_data}")
return {"id": item_id, "processed": True, "result": item_data * 2}
@dramatiq.actor
def group_completion_handler(group_id):
"""Handle group completion"""
print(f"Group {group_id} completed!")
# Create group of parallel tasks
items = [
{"id": 1, "data": 10},
{"id": 2, "data": 20},
{"id": 3, "data": 30},
{"id": 4, "data": 40}
]
task_group = group([
process_item.message(item["id"], item["data"])
for item in items
])
# Add completion callback
task_group.add_completion_callback(
group_completion_handler.message("batch_001")
)
# Execute group
task_group.run()
# Wait for completion
task_group.wait(timeout=60000) # 1 minute timeout
# Get all results
results = task_group.get_results(block=True)
print(f"Group results: {results}")@dramatiq.actor
def fetch_urls(urls):
"""Fetch multiple URLs"""
return [{"url": url, "data": f"content from {url}"} for url in urls]
@dramatiq.actor
def process_single_url(url_data):
"""Process single URL data"""
return {
"url": url_data["url"],
"processed": url_data["data"].upper(),
"length": len(url_data["data"])
}
@dramatiq.actor
def aggregate_results(results):
"""Aggregate all processed results"""
total_length = sum(r["length"] for r in results)
return {"total_items": len(results), "total_length": total_length}
# Complex composition: pipeline -> group -> pipeline
urls = ["https://api1.com", "https://api2.com", "https://api3.com"]
# Step 1: Fetch all URLs (single task)
fetch_step = fetch_urls.message(urls)
# Step 2: Process each URL in parallel (group)
# Note: This would require custom logic to split fetch results into group
# For demonstration, we'll create the group directly
process_group = group([
process_single_url.message({"url": url, "data": f"content from {url}"})
for url in urls
])
# Step 3: Aggregate results (single task)
aggregate_step = aggregate_results.message()
# Execute steps sequentially
fetch_step.send()
# Wait for fetch to complete, then create group with actual data
process_group.run()
process_group.wait()
aggregate_step.send()@dramatiq.actor
def safe_fetch(url):
"""Fetch with error handling"""
try:
# Simulate fetch operation
if "error" in url:
raise ValueError("Simulated fetch error")
return {"url": url, "data": f"content from {url}", "success": True}
except Exception as e:
return {"url": url, "error": str(e), "success": False}
@dramatiq.actor
def process_or_skip(fetch_result):
"""Process successful fetches, skip errors"""
if fetch_result["success"]:
return {
"processed": fetch_result["data"].upper(),
"original_url": fetch_result["url"]
}
else:
print(f"Skipping failed fetch: {fetch_result['error']}")
return {"skipped": True, "error": fetch_result["error"]}
@dramatiq.actor
def finalize_result(process_result):
"""Finalize the processing result"""
if process_result.get("skipped"):
return {"status": "failed", "reason": process_result["error"]}
else:
return {
"status": "success",
"result": process_result["processed"],
"url": process_result["original_url"]
}
# Error-resilient pipeline
error_pipeline = (
safe_fetch.message("https://error.example.com/data") |
process_or_skip.message() |
finalize_result.message()
)
error_pipeline.run()
result = error_pipeline.get_result(block=True)
print(f"Pipeline handled error gracefully: {result}")@dramatiq.actor
def create_tasks_for_batch(batch_data):
"""Dynamically create tasks based on batch data"""
tasks = []
for item in batch_data["items"]:
if item["type"] == "email":
tasks.append(send_email.message(item["to"], item["subject"], item["body"]))
elif item["type"] == "sms":
tasks.append(send_sms.message(item["to"], item["message"]))
elif item["type"] == "push":
tasks.append(send_push.message(item["device_id"], item["message"]))
# Create and run group
notification_group = group(tasks)
notification_group.run()
return {"batch_id": batch_data["batch_id"], "task_count": len(tasks)}
# Usage
batch_data = {
"batch_id": "batch_123",
"items": [
{"type": "email", "to": "user1@example.com", "subject": "Hello", "body": "Message"},
{"type": "sms", "to": "+1234567890", "message": "Hello via SMS"},
{"type": "push", "device_id": "device123", "message": "Hello via push"}
]
}
create_tasks_for_batch.send(batch_data)@dramatiq.actor
def check_condition(data):
"""Check if pipeline should continue"""
return {"continue": data["value"] > 10, "data": data}
@dramatiq.actor
def conditional_processor(check_result):
"""Process only if condition was met"""
if check_result["continue"]:
return {"processed": check_result["data"]["value"] * 2}
else:
return {"skipped": True, "reason": "Condition not met"}
@dramatiq.actor
def final_handler(process_result):
"""Handle final result regardless of path taken"""
if process_result.get("skipped"):
return {"status": "skipped", "reason": process_result["reason"]}
else:
return {"status": "completed", "result": process_result["processed"]}
# Conditional pipeline
conditional_pipeline = (
check_condition.message({"value": 5}) | # Will not meet condition
conditional_processor.message() |
final_handler.message()
)
conditional_pipeline.run()
result = conditional_pipeline.get_result(block=True)
print(f"Conditional result: {result}")When using the Results middleware, composition objects can retrieve and work with stored results:
# Enable results storage
from dramatiq.middleware import Results
from dramatiq.results.backends import RedisBackend
result_backend = RedisBackend()
results_middleware = Results(backend=result_backend, store_results=True)
broker.add_middleware(results_middleware)
@dramatiq.actor(store_results=True)
def data_processor(data):
return {"processed": data, "timestamp": time.time()}
@dramatiq.actor(store_results=True)
def data_validator(processed_data):
return {"valid": True, "data": processed_data}
# Pipeline with result storage
result_pipeline = (
data_processor.message({"input": "test_data"}) |
data_validator.message()
)
result_pipeline.run()
# Get individual step results
step_results = result_pipeline.get_results(block=True, timeout=30000)
print(f"Each step result: {step_results}")
# Get final result
final_result = result_pipeline.get_result(block=True)
print(f"Final result: {final_result}")import time
def monitor_composition(composition, name):
"""Monitor composition progress"""
print(f"Starting {name} with {len(composition)} tasks")
start_time = time.time()
while not composition.completed:
elapsed = time.time() - start_time
print(f"{name}: {composition.completed_count}/{len(composition)} completed ({elapsed:.1f}s)")
time.sleep(1.0)
total_time = time.time() - start_time
print(f"{name} completed in {total_time:.1f}s")
# Usage with monitoring
large_group = group([
process_item.message(i, f"data_{i}")
for i in range(100)
])
large_group.run()
monitor_composition(large_group, "Large Group Processing")Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq