Distributed task queue with full async support for Python applications
npx @tessl/cli install tessl/pypi-taskiq@0.11.0Taskiq is an asynchronous distributed task queue for Python that enables sending and running both synchronous and asynchronous functions across distributed systems. It provides full async support with integration for popular frameworks like FastAPI and AioHTTP, comprehensive type safety with PEP-612 support, and various broker backends including NATS, Redis, RabbitMQ, and Kafka.
pip install taskiqimport taskiqCommon imports for basic usage:
from taskiq import InMemoryBroker, TaskiqResult, ContextFor production usage with external brokers:
from taskiq_nats import JetStreamBroker # External package
from taskiq_redis import RedisBroker # External packageimport asyncio
from taskiq import InMemoryBroker
# Create a broker
broker = InMemoryBroker()
# Define a task
@broker.task
async def add_numbers(a: int, b: int) -> int:
return a + b
# Alternative: sync task
@broker.task
def multiply_numbers(a: int, b: int) -> int:
return a * b
async def main():
# Startup broker
await broker.startup()
# Send tasks for execution
result1 = await add_numbers.kiq(5, 3)
result2 = await multiply_numbers.kiq(4, 7)
# Get results
value1 = await result1.wait_result() # 8
value2 = await result2.wait_result() # 28
print(f"Addition result: {value1}")
print(f"Multiplication result: {value2}")
# Shutdown broker
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())Taskiq follows a distributed architecture with these core components:
The library supports both in-memory brokers for development and external broker systems (NATS, Redis, etc.) for production distributed environments.
Core broker functionality for creating, configuring, and managing task distribution. Includes abstract base classes and concrete implementations for different message queue backends.
class AsyncBroker:
def task(self, task_name: Optional[str] = None, **labels: Any) -> Callable
def register_task(self, func: Callable, task_name: Optional[str] = None, **labels: Any) -> AsyncTaskiqDecoratedTask
def with_result_backend(self, result_backend: AsyncResultBackend) -> Self
def with_middlewares(self, *middlewares: TaskiqMiddleware) -> Self
async def startup(self) -> None
async def shutdown(self) -> None
class InMemoryBroker(AsyncBroker): ...
class ZeroMQBroker(AsyncBroker): ...
async_shared_broker: AsyncBroker
"""Global shared broker instance for cross-module usage."""Task execution system including decorated task wrappers, result containers, and context management for task execution environments.
class AsyncTaskiqDecoratedTask:
async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask
class AsyncTaskiqTask:
async def wait_result(
self,
check_interval: float = 0.2,
timeout: float = -1.0,
with_logs: bool = False,
) -> TaskiqResult
class TaskiqResult:
is_err: bool
return_value: Any
execution_time: float
labels: Dict[str, Any]
error: Optional[BaseException]
class Context:
message: TaskiqMessage
broker: AsyncBroker
state: TaskiqState
async def requeue(self) -> None
def reject(self) -> None
async def gather(
*tasks: AsyncTaskiqTask[Any],
timeout: float = -1,
with_logs: bool = False,
periodicity: float = 0.1,
) -> Tuple[TaskiqResult[Any], ...]:
"""Wait for multiple task results concurrently."""Extensible middleware pipeline for implementing cross-cutting concerns like retries, monitoring, and custom processing logic.
class TaskiqMiddleware:
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage
async def post_send(self, message: TaskiqMessage) -> None
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage
async def post_execute(self, message: TaskiqMessage, result: TaskiqResult) -> None
class SimpleRetryMiddleware(TaskiqMiddleware): ...
class SmartRetryMiddleware(TaskiqMiddleware): ...
class PrometheusMiddleware(TaskiqMiddleware): ...Task scheduling capabilities for periodic execution, cron-like scheduling, and delayed task execution.
class TaskiqScheduler:
def __init__(self, broker: AsyncBroker, sources: List[ScheduleSource]) -> None
async def startup(self) -> None
async def shutdown(self) -> None
class ScheduledTask:
task_name: str
cron: Optional[str]
time: Optional[datetime]
labels: Dict[str, Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]Storage systems for persisting task results and progress tracking across distributed environments.
class AsyncResultBackend:
async def set_result(self, task_id: str, result: TaskiqResult) -> None
async def get_result(self, task_id: str, with_logs: bool = True) -> TaskiqResult
async def is_result_ready(self, task_id: str) -> bool
async def startup(self) -> None
async def shutdown(self) -> NoneEvent system for lifecycle management and global state container for broker and task coordination.
class TaskiqEvents:
CLIENT_STARTUP: str
CLIENT_SHUTDOWN: str
WORKER_STARTUP: str
WORKER_SHUTDOWN: str
class TaskiqState:
def __init__(self) -> None
def set_value(self, key: str, value: Any) -> None
def get_value(self, key: str, default: Any = None) -> AnyComprehensive exception hierarchy for handling various error conditions in distributed task processing.
class TaskiqError(Exception): ...
class NoResultError(TaskiqError): ...
class ResultGetError(TaskiqError): ...
class SendTaskError(TaskiqError): ...
class SecurityError(TaskiqError): ...
class TaskiqResultTimeoutError(TaskiqError): ...class BrokerMessage:
task_id: str
task_name: str
message: bytes
labels: Dict[str, str]
class TaskiqMessage:
task_id: str
task_name: str
labels: Dict[str, Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
class AckableMessage:
data: Union[bytes, str]
ack: Callable[[], Awaitable[None]]
class TaskiqFormatter:
"""Abstract base class for message formatting."""
def dumps(self, message: TaskiqMessage) -> BrokerMessage
def loads(self, message: bytes) -> TaskiqMessage
class ScheduleSource:
"""Abstract base class for schedule sources."""
async def get_schedules(self) -> List[ScheduledTask]
async def pre_send(self, task: ScheduledTask) -> None
async def post_send(self, task: ScheduledTask) -> None
TaskiqDepends = Depends # From taskiq_dependencies package
__version__: str
"""Package version string."""