CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Overview
Eval results
Files

brokers.mddocs/

Brokers

Brokers are the core components that handle message distribution and task routing in taskiq. They provide the interface between your application and the underlying message queue system, enabling distributed task processing across multiple workers.

Capabilities

Abstract Broker Interface

The base AsyncBroker class defines the interface that all brokers must implement, providing task decoration, registration, and lifecycle management.

class AsyncBroker:
    """Abstract base class for all brokers."""
    
    def __init__(
        self,
        result_backend: Optional[AsyncResultBackend] = None,
        task_id_generator: Optional[Callable[[], str]] = None,
    ) -> None: ...
    
    def task(
        self,
        task_name: Optional[str] = None,
        **labels: Any,
    ) -> Callable: ...
    
    def register_task(
        self,
        func: Callable,
        task_name: Optional[str] = None,
        **labels: Any,
    ) -> AsyncTaskiqDecoratedTask: ...
    
    def find_task(self, task_name: str) -> Optional[AsyncTaskiqDecoratedTask]: ...
    
    def get_all_tasks(self) -> Dict[str, AsyncTaskiqDecoratedTask]: ...
    
    async def startup(self) -> None: ...
    
    async def shutdown(self) -> None: ...
    
    async def kick(self, message: BrokerMessage) -> None: ...
    
    def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: ...

Broker Configuration

Builder methods for configuring brokers with result backends, middlewares, serializers, and other components.

def with_result_backend(
    self,
    result_backend: AsyncResultBackend,
) -> Self:
    """Set result backend and return updated broker."""

def with_middlewares(
    self,
    *middlewares: TaskiqMiddleware,
) -> Self:
    """Add middlewares to broker."""

def with_serializer(
    self,
    serializer: TaskiqSerializer,
) -> Self:
    """Set serializer and return updated broker."""

def with_formatter(
    self,
    formatter: TaskiqFormatter,
) -> Self:
    """Set formatter and return updated broker."""

def with_id_generator(
    self,
    task_id_generator: Callable[[], str],
) -> Self:
    """Set ID generator and return updated broker."""

def add_middlewares(self, *middlewares: TaskiqMiddleware) -> None:
    """Add middlewares to existing broker."""

def add_dependency_context(self, new_ctx: Dict[Any, Any]) -> None:
    """Add dependencies for dependency injection."""

In-Memory Broker

Built-in broker implementation for development, testing, and single-process applications.

class InMemoryBroker(AsyncBroker):
    """
    In-memory broker for development and testing.
    
    Processes tasks within the same process without external dependencies.
    Includes built-in result backend with configurable result storage limits.
    """
    
    def __init__(
        self,
        sync_tasks_pool_size: int = 4,
        max_stored_results: int = 100,
        cast_types: bool = True,
        max_async_tasks: int = 30,
        propagate_exceptions: bool = True,
        await_inplace: bool = False,
    ) -> None: ...
    
    async def kick(self, message: BrokerMessage) -> None: ...
    
    def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: ...
    
    async def wait_all(self) -> None:
        """Wait for all currently running tasks to complete."""

ZeroMQ Broker

Broker implementation using ZeroMQ for distributed message passing.

class ZeroMQBroker(AsyncBroker):
    """
    ZeroMQ-based broker for distributed task processing.
    
    Uses ZeroMQ PUSH/PULL pattern for task distribution.
    Supports both bind and connect modes for flexible network topologies.
    """
    
    def __init__(
        self,
        zmq_pub_host: str = "tcp://0.0.0.0:5555",
        zmq_sub_host: str = "tcp://localhost:5555",
        result_backend: Optional[AsyncResultBackend] = None,
        task_id_generator: Optional[Callable[[], str]] = None,
    ) -> None: ...
    
    async def startup(self) -> None: ...
    
    async def shutdown(self) -> None: ...
    
    async def kick(self, message: BrokerMessage) -> None: ...
    
    def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: ...

Shared Broker

Global broker instance for sharing across async contexts.

async_shared_broker: AsyncBroker
"""
Shared broker instance that can be imported and used across modules.
Useful for dependency injection and avoiding broker parameter passing.
"""

Event Handling

Brokers support event handlers for lifecycle management and custom logic execution.

def on_event(
    self,
    *events: TaskiqEvents,
) -> Callable[[EventHandler], EventHandler]:
    """Decorator for adding event handlers."""

def add_event_handler(
    self,
    event: TaskiqEvents,
    handler: EventHandler,
) -> None:
    """Add event handler programmatically."""

def with_event_handlers(
    self,
    event: TaskiqEvents,
    *handlers: EventHandler,
) -> Self:
    """Set event handlers and return updated broker."""

Usage Examples

Basic Broker Setup

from taskiq import InMemoryBroker

# Simple in-memory broker
broker = InMemoryBroker()

@broker.task
async def my_task(x: int) -> int:
    return x * 2

Production Broker Configuration

from taskiq import ZeroMQBroker
from taskiq.middlewares import SimpleRetryMiddleware

# Configured broker with middleware and result backend
broker = (ZeroMQBroker("tcp://localhost:5555")
    .with_middlewares(SimpleRetryMiddleware(max_retries=3))
    .with_result_backend(my_result_backend))

@broker.task
async def process_data(data: str) -> dict:
    # Task implementation
    return {"processed": data}

Event Handlers

@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_startup(state):
    print("Worker starting up")
    # Initialize connections, etc.

@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def on_shutdown(state):
    print("Worker shutting down")
    # Cleanup resources

Types

EventHandler = Callable[[TaskiqState], Optional[Awaitable[None]]]

AsyncTaskiqDecoratedTask = TypeVar("AsyncTaskiqDecoratedTask")
"""Decorated task type returned by @broker.task decorator."""

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq

docs

brokers.md

events-state.md

exceptions.md

index.md

middleware.md

result-backends.md

scheduling.md

tasks-results.md

tile.json