Distributed task queue with full async support for Python applications
Middleware system providing extensible pipeline for implementing cross-cutting concerns like retries, monitoring, authentication, and custom processing logic. Middleware components can intercept and modify messages at various stages of the task lifecycle.
Base middleware class that defines hooks for message processing at different lifecycle stages.
class TaskiqMiddleware:
"""
Abstract base class for implementing middleware components.
Middleware can intercept and modify messages during:
- Before sending to broker (pre_send)
- After sending to broker (post_send)
- Before task execution (pre_execute)
- After task execution (post_execute)
"""
def __init__(self) -> None: ...
def set_broker(self, broker: AsyncBroker) -> None:
"""Called when middleware is added to a broker."""
async def startup(self) -> None:
"""Called during broker startup."""
async def shutdown(self) -> None:
"""Called during broker shutdown."""
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
"""
Process message before sending to broker.
Args:
message: Task message to be sent
Returns:
Modified message (can return same message if no changes)
"""
async def post_send(self, message: TaskiqMessage) -> None:
"""
Process message after successful send to broker.
Args:
message: Task message that was sent
"""
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
"""
Process message before task execution.
Args:
message: Task message to be executed
Returns:
Modified message (can return same message if no changes)
"""
async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult,
) -> None:
"""
Process result after task execution.
Args:
message: Original task message
result: Task execution result
"""Basic retry mechanism that retries failed tasks a fixed number of times.
class SimpleRetryMiddleware(TaskiqMiddleware):
"""
Simple retry middleware with fixed retry count.
Retries failed tasks up to max_retries times with no delay between attempts.
Uses task labels to track retry count and prevent infinite loops.
"""
def __init__(
self,
max_retries: int = 3,
ignore_errors: Optional[List[Type[Exception]]] = None,
) -> None:
"""
Initialize simple retry middleware.
Args:
max_retries: Maximum number of retry attempts
ignore_errors: Exception types that should not trigger retries
"""
async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult,
) -> None:
"""Retry task if it failed and retries are available."""Advanced retry mechanism with exponential backoff, jitter, and configurable retry conditions.
class SmartRetryMiddleware(TaskiqMiddleware):
"""
Advanced retry middleware with exponential backoff.
Features:
- Exponential backoff with configurable base delay
- Jitter to prevent thundering herd
- Maximum retry attempts
- Exception type filtering
- Custom retry condition functions
"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
ignore_errors: Optional[List[Type[Exception]]] = None,
retry_on: Optional[Callable[[Exception], bool]] = None,
) -> None:
"""
Initialize smart retry middleware.
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay between retries in seconds
max_delay: Maximum delay between retries in seconds
exponential_base: Base for exponential backoff calculation
jitter: Whether to add random jitter to delays
ignore_errors: Exception types that should not trigger retries
retry_on: Custom function to determine if exception should trigger retry
"""
async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult,
) -> None:
"""Retry task with exponential backoff if conditions are met."""Monitoring middleware that collects and exports Prometheus metrics for task execution.
class PrometheusMiddleware(TaskiqMiddleware):
"""
Prometheus metrics collection middleware.
Collects metrics for:
- Task execution counts by status (success/failure)
- Task execution duration histograms
- Active task counts
- Queue size metrics
"""
def __init__(
self,
registry: Optional[CollectorRegistry] = None,
label_names: Optional[List[str]] = None,
) -> None:
"""
Initialize Prometheus middleware.
Args:
registry: Prometheus registry for metric collection
label_names: Additional label names for metrics
"""
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
"""Start timing and increment active task counter."""
async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult,
) -> None:
"""Record execution metrics and update counters."""from taskiq import InMemoryBroker
from taskiq.middlewares import SimpleRetryMiddleware
# Add middleware to broker
broker = InMemoryBroker()
broker.add_middlewares(SimpleRetryMiddleware(max_retries=5))
# Alternative: using builder pattern
broker = InMemoryBroker().with_middlewares(
SimpleRetryMiddleware(max_retries=5)
)
@broker.task
async def unreliable_task(data: str) -> str:
# Task that might fail and need retries
if random.random() < 0.3: # 30% failure rate
raise ValueError("Random failure")
return f"Processed: {data}"from taskiq.middlewares import SmartRetryMiddleware
# Configure smart retry with custom settings
smart_retry = SmartRetryMiddleware(
max_retries=5,
base_delay=2.0, # Start with 2 second delay
max_delay=120.0, # Cap at 2 minutes
exponential_base=2.0, # Double delay each time
jitter=True, # Add randomness
ignore_errors=[ValueError], # Don't retry ValueError
)
broker.add_middlewares(smart_retry)
@broker.task
async def api_call_task(url: str) -> dict:
# Task that benefits from smart retry
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()class LoggingMiddleware(TaskiqMiddleware):
"""Custom middleware for detailed task logging."""
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
print(f"Sending task: {message.task_name} (ID: {message.task_id})")
return message
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
print(f"Executing task: {message.task_name}")
# Add execution start time to labels
message.labels["execution_started"] = time.time()
return message
async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult,
) -> None:
start_time = message.labels.get("execution_started", 0)
duration = time.time() - start_time
status = "SUCCESS" if not result.is_err else "FAILED"
print(f"Task {message.task_name} {status} in {duration:.2f}s")
if result.is_err:
print(f"Error: {result.error}")
# Use custom middleware
broker.add_middlewares(LoggingMiddleware())from taskiq.middlewares import (
SimpleRetryMiddleware,
PrometheusMiddleware,
)
# Multiple middleware are executed in order
broker = InMemoryBroker().with_middlewares(
LoggingMiddleware(), # First: logging
SimpleRetryMiddleware(max_retries=3), # Second: retries
PrometheusMiddleware(), # Third: metrics
)
# Execution order:
# 1. pre_send: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware
# 2. post_send: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware
# 3. pre_execute: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware
# 4. post_execute: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddlewareclass ConditionalRetryMiddleware(TaskiqMiddleware):
"""Retry middleware that only applies to specific tasks."""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult,
) -> None:
# Only retry tasks with 'retryable' label
if not message.labels.get("retryable", False):
return
if result.is_err:
retry_count = int(message.labels.get("retry_count", 0))
if retry_count < self.max_retries:
message.labels["retry_count"] = str(retry_count + 1)
await self.broker.kick(self.broker.formatter.dumps(message))
# Use with labeled tasks
@broker.task(retryable=True)
async def important_task(data: str) -> str:
# This task will be retried on failure
return process_important_data(data)
@broker.task(retryable=False)
async def simple_task(data: str) -> str:
# This task won't be retried
return process_simple_data(data)Install with Tessl CLI
npx tessl i tessl/pypi-taskiq