Distributed task queue with full async support for Python applications
Brokers are the core components that handle message distribution and task routing in taskiq. They provide the interface between your application and the underlying message queue system, enabling distributed task processing across multiple workers.
The base AsyncBroker class defines the interface that all brokers must implement, providing task decoration, registration, and lifecycle management.
class AsyncBroker:
"""Abstract base class for all brokers."""
def __init__(
self,
result_backend: Optional[AsyncResultBackend] = None,
task_id_generator: Optional[Callable[[], str]] = None,
) -> None: ...
def task(
self,
task_name: Optional[str] = None,
**labels: Any,
) -> Callable: ...
def register_task(
self,
func: Callable,
task_name: Optional[str] = None,
**labels: Any,
) -> AsyncTaskiqDecoratedTask: ...
def find_task(self, task_name: str) -> Optional[AsyncTaskiqDecoratedTask]: ...
def get_all_tasks(self) -> Dict[str, AsyncTaskiqDecoratedTask]: ...
async def startup(self) -> None: ...
async def shutdown(self) -> None: ...
async def kick(self, message: BrokerMessage) -> None: ...
def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: ...Builder methods for configuring brokers with result backends, middlewares, serializers, and other components.
def with_result_backend(
self,
result_backend: AsyncResultBackend,
) -> Self:
"""Set result backend and return updated broker."""
def with_middlewares(
self,
*middlewares: TaskiqMiddleware,
) -> Self:
"""Add middlewares to broker."""
def with_serializer(
self,
serializer: TaskiqSerializer,
) -> Self:
"""Set serializer and return updated broker."""
def with_formatter(
self,
formatter: TaskiqFormatter,
) -> Self:
"""Set formatter and return updated broker."""
def with_id_generator(
self,
task_id_generator: Callable[[], str],
) -> Self:
"""Set ID generator and return updated broker."""
def add_middlewares(self, *middlewares: TaskiqMiddleware) -> None:
"""Add middlewares to existing broker."""
def add_dependency_context(self, new_ctx: Dict[Any, Any]) -> None:
"""Add dependencies for dependency injection."""Built-in broker implementation for development, testing, and single-process applications.
class InMemoryBroker(AsyncBroker):
"""
In-memory broker for development and testing.
Processes tasks within the same process without external dependencies.
Includes built-in result backend with configurable result storage limits.
"""
def __init__(
self,
sync_tasks_pool_size: int = 4,
max_stored_results: int = 100,
cast_types: bool = True,
max_async_tasks: int = 30,
propagate_exceptions: bool = True,
await_inplace: bool = False,
) -> None: ...
async def kick(self, message: BrokerMessage) -> None: ...
def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: ...
async def wait_all(self) -> None:
"""Wait for all currently running tasks to complete."""Broker implementation using ZeroMQ for distributed message passing.
class ZeroMQBroker(AsyncBroker):
"""
ZeroMQ-based broker for distributed task processing.
Uses ZeroMQ PUSH/PULL pattern for task distribution.
Supports both bind and connect modes for flexible network topologies.
"""
def __init__(
self,
zmq_pub_host: str = "tcp://0.0.0.0:5555",
zmq_sub_host: str = "tcp://localhost:5555",
result_backend: Optional[AsyncResultBackend] = None,
task_id_generator: Optional[Callable[[], str]] = None,
) -> None: ...
async def startup(self) -> None: ...
async def shutdown(self) -> None: ...
async def kick(self, message: BrokerMessage) -> None: ...
def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: ...Global broker instance for sharing across async contexts.
async_shared_broker: AsyncBroker
"""
Shared broker instance that can be imported and used across modules.
Useful for dependency injection and avoiding broker parameter passing.
"""Brokers support event handlers for lifecycle management and custom logic execution.
def on_event(
self,
*events: TaskiqEvents,
) -> Callable[[EventHandler], EventHandler]:
"""Decorator for adding event handlers."""
def add_event_handler(
self,
event: TaskiqEvents,
handler: EventHandler,
) -> None:
"""Add event handler programmatically."""
def with_event_handlers(
self,
event: TaskiqEvents,
*handlers: EventHandler,
) -> Self:
"""Set event handlers and return updated broker."""from taskiq import InMemoryBroker
# Simple in-memory broker
broker = InMemoryBroker()
@broker.task
async def my_task(x: int) -> int:
return x * 2from taskiq import ZeroMQBroker
from taskiq.middlewares import SimpleRetryMiddleware
# Configured broker with middleware and result backend
broker = (ZeroMQBroker("tcp://localhost:5555")
.with_middlewares(SimpleRetryMiddleware(max_retries=3))
.with_result_backend(my_result_backend))
@broker.task
async def process_data(data: str) -> dict:
# Task implementation
return {"processed": data}@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_startup(state):
print("Worker starting up")
# Initialize connections, etc.
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def on_shutdown(state):
print("Worker shutting down")
# Cleanup resourcesEventHandler = Callable[[TaskiqState], Optional[Awaitable[None]]]
AsyncTaskiqDecoratedTask = TypeVar("AsyncTaskiqDecoratedTask")
"""Decorated task type returned by @broker.task decorator."""Install with Tessl CLI
npx tessl i tessl/pypi-taskiq