CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Overview
Eval results
Files

middleware.mddocs/

Middleware

Middleware system providing extensible pipeline for implementing cross-cutting concerns like retries, monitoring, authentication, and custom processing logic. Middleware components can intercept and modify messages at various stages of the task lifecycle.

Capabilities

Middleware Interface

Base middleware class that defines hooks for message processing at different lifecycle stages.

class TaskiqMiddleware:
    """
    Abstract base class for implementing middleware components.
    
    Middleware can intercept and modify messages during:
    - Before sending to broker (pre_send)
    - After sending to broker (post_send)  
    - Before task execution (pre_execute)
    - After task execution (post_execute)
    """
    
    def __init__(self) -> None: ...
    
    def set_broker(self, broker: AsyncBroker) -> None:
        """Called when middleware is added to a broker."""
    
    async def startup(self) -> None:
        """Called during broker startup."""
    
    async def shutdown(self) -> None:
        """Called during broker shutdown."""
    
    async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
        """
        Process message before sending to broker.
        
        Args:
            message: Task message to be sent
            
        Returns:
            Modified message (can return same message if no changes)
        """
    
    async def post_send(self, message: TaskiqMessage) -> None:
        """
        Process message after successful send to broker.
        
        Args:
            message: Task message that was sent
        """
    
    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
        """
        Process message before task execution.
        
        Args:
            message: Task message to be executed
            
        Returns:
            Modified message (can return same message if no changes)
        """
    
    async def post_execute(
        self,
        message: TaskiqMessage,
        result: TaskiqResult,
    ) -> None:
        """
        Process result after task execution.
        
        Args:
            message: Original task message
            result: Task execution result
        """

Simple Retry Middleware

Basic retry mechanism that retries failed tasks a fixed number of times.

class SimpleRetryMiddleware(TaskiqMiddleware):
    """
    Simple retry middleware with fixed retry count.
    
    Retries failed tasks up to max_retries times with no delay between attempts.
    Uses task labels to track retry count and prevent infinite loops.
    """
    
    def __init__(
        self,
        max_retries: int = 3,
        ignore_errors: Optional[List[Type[Exception]]] = None,
    ) -> None:
        """
        Initialize simple retry middleware.
        
        Args:
            max_retries: Maximum number of retry attempts
            ignore_errors: Exception types that should not trigger retries
        """
    
    async def post_execute(
        self,
        message: TaskiqMessage,
        result: TaskiqResult,
    ) -> None:
        """Retry task if it failed and retries are available."""

Smart Retry Middleware

Advanced retry mechanism with exponential backoff, jitter, and configurable retry conditions.

class SmartRetryMiddleware(TaskiqMiddleware):
    """
    Advanced retry middleware with exponential backoff.
    
    Features:
    - Exponential backoff with configurable base delay
    - Jitter to prevent thundering herd
    - Maximum retry attempts
    - Exception type filtering
    - Custom retry condition functions
    """
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        ignore_errors: Optional[List[Type[Exception]]] = None,
        retry_on: Optional[Callable[[Exception], bool]] = None,
    ) -> None:
        """
        Initialize smart retry middleware.
        
        Args:
            max_retries: Maximum number of retry attempts
            base_delay: Initial delay between retries in seconds
            max_delay: Maximum delay between retries in seconds
            exponential_base: Base for exponential backoff calculation
            jitter: Whether to add random jitter to delays
            ignore_errors: Exception types that should not trigger retries
            retry_on: Custom function to determine if exception should trigger retry
        """
    
    async def post_execute(
        self,
        message: TaskiqMessage,
        result: TaskiqResult,
    ) -> None:
        """Retry task with exponential backoff if conditions are met."""

Prometheus Middleware

Monitoring middleware that collects and exports Prometheus metrics for task execution.

class PrometheusMiddleware(TaskiqMiddleware):
    """
    Prometheus metrics collection middleware.
    
    Collects metrics for:
    - Task execution counts by status (success/failure)
    - Task execution duration histograms
    - Active task counts
    - Queue size metrics
    """
    
    def __init__(
        self,
        registry: Optional[CollectorRegistry] = None,
        label_names: Optional[List[str]] = None,
    ) -> None:
        """
        Initialize Prometheus middleware.
        
        Args:
            registry: Prometheus registry for metric collection
            label_names: Additional label names for metrics
        """
    
    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
        """Start timing and increment active task counter."""
    
    async def post_execute(
        self,
        message: TaskiqMessage,
        result: TaskiqResult,
    ) -> None:
        """Record execution metrics and update counters."""

Usage Examples

Basic Middleware Setup

from taskiq import InMemoryBroker
from taskiq.middlewares import SimpleRetryMiddleware

# Add middleware to broker
broker = InMemoryBroker()
broker.add_middlewares(SimpleRetryMiddleware(max_retries=5))

# Alternative: using builder pattern
broker = InMemoryBroker().with_middlewares(
    SimpleRetryMiddleware(max_retries=5)
)

@broker.task
async def unreliable_task(data: str) -> str:
    # Task that might fail and need retries
    if random.random() < 0.3:  # 30% failure rate
        raise ValueError("Random failure")
    return f"Processed: {data}"

Advanced Retry Configuration

from taskiq.middlewares import SmartRetryMiddleware

# Configure smart retry with custom settings
smart_retry = SmartRetryMiddleware(
    max_retries=5,
    base_delay=2.0,        # Start with 2 second delay
    max_delay=120.0,       # Cap at 2 minutes
    exponential_base=2.0,  # Double delay each time
    jitter=True,           # Add randomness
    ignore_errors=[ValueError],  # Don't retry ValueError
)

broker.add_middlewares(smart_retry)

@broker.task
async def api_call_task(url: str) -> dict:
    # Task that benefits from smart retry
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

Custom Middleware Implementation

class LoggingMiddleware(TaskiqMiddleware):
    """Custom middleware for detailed task logging."""
    
    async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
        print(f"Sending task: {message.task_name} (ID: {message.task_id})")
        return message
    
    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
        print(f"Executing task: {message.task_name}")
        # Add execution start time to labels
        message.labels["execution_started"] = time.time()
        return message
    
    async def post_execute(
        self,
        message: TaskiqMessage,
        result: TaskiqResult,
    ) -> None:
        start_time = message.labels.get("execution_started", 0)
        duration = time.time() - start_time
        
        status = "SUCCESS" if not result.is_err else "FAILED"
        print(f"Task {message.task_name} {status} in {duration:.2f}s")
        
        if result.is_err:
            print(f"Error: {result.error}")

# Use custom middleware
broker.add_middlewares(LoggingMiddleware())

Multiple Middleware Pipeline

from taskiq.middlewares import (
    SimpleRetryMiddleware,
    PrometheusMiddleware,
)

# Multiple middleware are executed in order
broker = InMemoryBroker().with_middlewares(
    LoggingMiddleware(),           # First: logging
    SimpleRetryMiddleware(max_retries=3),  # Second: retries
    PrometheusMiddleware(),        # Third: metrics
)

# Execution order:
# 1. pre_send: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware
# 2. post_send: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware
# 3. pre_execute: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware  
# 4. post_execute: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware

Conditional Middleware

class ConditionalRetryMiddleware(TaskiqMiddleware):
    """Retry middleware that only applies to specific tasks."""
    
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
    
    async def post_execute(
        self,
        message: TaskiqMessage,
        result: TaskiqResult,
    ) -> None:
        # Only retry tasks with 'retryable' label
        if not message.labels.get("retryable", False):
            return
        
        if result.is_err:
            retry_count = int(message.labels.get("retry_count", 0))
            if retry_count < self.max_retries:
                message.labels["retry_count"] = str(retry_count + 1)
                await self.broker.kick(self.broker.formatter.dumps(message))

# Use with labeled tasks
@broker.task(retryable=True)
async def important_task(data: str) -> str:
    # This task will be retried on failure
    return process_important_data(data)

@broker.task(retryable=False)  
async def simple_task(data: str) -> str:
    # This task won't be retried
    return process_simple_data(data)

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq

docs

brokers.md

events-state.md

exceptions.md

index.md

middleware.md

result-backends.md

scheduling.md

tasks-results.md

tile.json