or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

brokers.mdevents-state.mdexceptions.mdindex.mdmiddleware.mdresult-backends.mdscheduling.mdtasks-results.md
tile.json

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/taskiq@0.11.x

To install, run

npx @tessl/cli install tessl/pypi-taskiq@0.11.0

index.mddocs/

Taskiq

Taskiq is an asynchronous distributed task queue for Python that enables sending and running both synchronous and asynchronous functions across distributed systems. It provides full async support with integration for popular frameworks like FastAPI and AioHTTP, comprehensive type safety with PEP-612 support, and various broker backends including NATS, Redis, RabbitMQ, and Kafka.

Package Information

  • Package Name: taskiq
  • Language: Python
  • Installation: pip install taskiq
  • Python Versions: 3.8+

Core Imports

import taskiq

Common imports for basic usage:

from taskiq import InMemoryBroker, TaskiqResult, Context

For production usage with external brokers:

from taskiq_nats import JetStreamBroker  # External package
from taskiq_redis import RedisBroker     # External package

Basic Usage

import asyncio
from taskiq import InMemoryBroker

# Create a broker
broker = InMemoryBroker()

# Define a task
@broker.task
async def add_numbers(a: int, b: int) -> int:
    return a + b

# Alternative: sync task
@broker.task
def multiply_numbers(a: int, b: int) -> int:
    return a * b

async def main():
    # Startup broker
    await broker.startup()
    
    # Send tasks for execution
    result1 = await add_numbers.kiq(5, 3)
    result2 = await multiply_numbers.kiq(4, 7)
    
    # Get results
    value1 = await result1.wait_result()  # 8
    value2 = await result2.wait_result()  # 28
    
    print(f"Addition result: {value1}")
    print(f"Multiplication result: {value2}")
    
    # Shutdown broker
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Architecture

Taskiq follows a distributed architecture with these core components:

  • Broker: Message queue interface that handles task distribution
  • Worker: Process that executes tasks received from the broker
  • Result Backend: Storage system for task results
  • Scheduler: Component for handling periodic and scheduled tasks
  • Middleware: Pipeline for cross-cutting concerns like retries and monitoring

The library supports both in-memory brokers for development and external broker systems (NATS, Redis, etc.) for production distributed environments.

Capabilities

Broker Management

Core broker functionality for creating, configuring, and managing task distribution. Includes abstract base classes and concrete implementations for different message queue backends.

class AsyncBroker:
    def task(self, task_name: Optional[str] = None, **labels: Any) -> Callable
    def register_task(self, func: Callable, task_name: Optional[str] = None, **labels: Any) -> AsyncTaskiqDecoratedTask
    def with_result_backend(self, result_backend: AsyncResultBackend) -> Self
    def with_middlewares(self, *middlewares: TaskiqMiddleware) -> Self
    async def startup(self) -> None
    async def shutdown(self) -> None

class InMemoryBroker(AsyncBroker): ...
class ZeroMQBroker(AsyncBroker): ...

async_shared_broker: AsyncBroker
"""Global shared broker instance for cross-module usage."""

Brokers

Task Execution and Results

Task execution system including decorated task wrappers, result containers, and context management for task execution environments.

class AsyncTaskiqDecoratedTask:
    async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask

class AsyncTaskiqTask:
    async def wait_result(
        self,
        check_interval: float = 0.2,
        timeout: float = -1.0,
        with_logs: bool = False,
    ) -> TaskiqResult

class TaskiqResult:
    is_err: bool
    return_value: Any
    execution_time: float
    labels: Dict[str, Any]
    error: Optional[BaseException]

class Context:
    message: TaskiqMessage
    broker: AsyncBroker
    state: TaskiqState
    async def requeue(self) -> None
    def reject(self) -> None

async def gather(
    *tasks: AsyncTaskiqTask[Any],
    timeout: float = -1,
    with_logs: bool = False,
    periodicity: float = 0.1,
) -> Tuple[TaskiqResult[Any], ...]:
    """Wait for multiple task results concurrently."""

Tasks and Results

Middleware System

Extensible middleware pipeline for implementing cross-cutting concerns like retries, monitoring, and custom processing logic.

class TaskiqMiddleware:
    async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage
    async def post_send(self, message: TaskiqMessage) -> None
    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage
    async def post_execute(self, message: TaskiqMessage, result: TaskiqResult) -> None

class SimpleRetryMiddleware(TaskiqMiddleware): ...
class SmartRetryMiddleware(TaskiqMiddleware): ...
class PrometheusMiddleware(TaskiqMiddleware): ...

Middleware

Scheduling

Task scheduling capabilities for periodic execution, cron-like scheduling, and delayed task execution.

class TaskiqScheduler:
    def __init__(self, broker: AsyncBroker, sources: List[ScheduleSource]) -> None
    async def startup(self) -> None
    async def shutdown(self) -> None

class ScheduledTask:
    task_name: str
    cron: Optional[str]
    time: Optional[datetime]
    labels: Dict[str, Any]
    args: Tuple[Any, ...]
    kwargs: Dict[str, Any]

Scheduling

Result Backends

Storage systems for persisting task results and progress tracking across distributed environments.

class AsyncResultBackend:
    async def set_result(self, task_id: str, result: TaskiqResult) -> None
    async def get_result(self, task_id: str, with_logs: bool = True) -> TaskiqResult
    async def is_result_ready(self, task_id: str) -> bool
    async def startup(self) -> None
    async def shutdown(self) -> None

Result Backends

Events and State

Event system for lifecycle management and global state container for broker and task coordination.

class TaskiqEvents:
    CLIENT_STARTUP: str
    CLIENT_SHUTDOWN: str
    WORKER_STARTUP: str
    WORKER_SHUTDOWN: str

class TaskiqState:
    def __init__(self) -> None
    def set_value(self, key: str, value: Any) -> None
    def get_value(self, key: str, default: Any = None) -> Any

Events and State

Exception Handling

Comprehensive exception hierarchy for handling various error conditions in distributed task processing.

class TaskiqError(Exception): ...
class NoResultError(TaskiqError): ...
class ResultGetError(TaskiqError): ...
class SendTaskError(TaskiqError): ...
class SecurityError(TaskiqError): ...
class TaskiqResultTimeoutError(TaskiqError): ...

Exceptions

Types

class BrokerMessage:
    task_id: str
    task_name: str
    message: bytes
    labels: Dict[str, str]

class TaskiqMessage:
    task_id: str
    task_name: str
    labels: Dict[str, Any]
    args: Tuple[Any, ...]
    kwargs: Dict[str, Any]

class AckableMessage:
    data: Union[bytes, str]
    ack: Callable[[], Awaitable[None]]

class TaskiqFormatter:
    """Abstract base class for message formatting."""
    def dumps(self, message: TaskiqMessage) -> BrokerMessage
    def loads(self, message: bytes) -> TaskiqMessage

class ScheduleSource:
    """Abstract base class for schedule sources."""
    async def get_schedules(self) -> List[ScheduledTask]
    async def pre_send(self, task: ScheduledTask) -> None
    async def post_send(self, task: ScheduledTask) -> None

TaskiqDepends = Depends  # From taskiq_dependencies package

__version__: str
"""Package version string."""