or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

brokers.mdindex.mdresult-backends.mdschedule-sources.md
tile.json

tessl/pypi-taskiq-redis

Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/taskiq-redis@1.1.x

To install, run

npx @tessl/cli install tessl/pypi-taskiq-redis@1.1.0

index.mddocs/

TaskIQ-Redis

Redis integration for TaskIQ, providing comprehensive Redis-based message brokers and result backends with support for different Redis architectures (single node, cluster, sentinel). TaskIQ-Redis offers three broker types: PubSub for broadcasting (no acknowledgements), ListQueue for simple queuing (no acknowledgements), and Stream for reliable message processing with acknowledgement support.

Package Information

  • Package Name: taskiq-redis
  • Language: Python
  • Installation: pip install taskiq-redis
  • Dependencies: taskiq>=0.11.12,<1, redis^6

Core Imports

from taskiq_redis import (
    # Basic Redis brokers
    PubSubBroker,
    ListQueueBroker,
    RedisStreamBroker,
    
    # Redis Cluster brokers
    ListQueueClusterBroker,
    RedisStreamClusterBroker,
    
    # Redis Sentinel brokers
    ListQueueSentinelBroker,
    PubSubSentinelBroker,
    RedisStreamSentinelBroker,
    
    # Result backends
    RedisAsyncResultBackend,
    RedisAsyncClusterResultBackend,
    RedisAsyncSentinelResultBackend,
    
    # Schedule sources
    ListRedisScheduleSource,
    RedisScheduleSource,
    RedisClusterScheduleSource,
    RedisSentinelScheduleSource
)

Basic Usage

import asyncio
from taskiq import TaskiqResult, TaskiqMessage
from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend

# Create broker with result backend
broker = RedisStreamBroker(
    url="redis://localhost:6379",
    result_backend=RedisAsyncResultBackend("redis://localhost:6379")
)

# Define a task
@broker.task
async def add_numbers(a: int, b: int) -> int:
    return a + b

async def main():
    # Start the broker
    await broker.startup()
    
    # Send task
    task = await add_numbers.kiq(10, 20)
    
    # Get result
    result = await task.wait_result()
    print(f"Result: {result.return_value}")  # Result: 30
    
    # Shutdown
    await broker.shutdown()

# Run the example
asyncio.run(main())

Architecture

TaskIQ-Redis provides a comprehensive Redis integration with three main component categories:

  • Brokers: Handle message distribution between producers and consumers
    • PubSub: Broadcast messages to all connected workers (fire-and-forget)
    • ListQueue: Distribute tasks between workers using Redis lists (simple queuing)
    • Stream: Reliable message processing with acknowledgement using Redis streams
  • Result Backends: Store and retrieve task execution results with configurable expiration
  • Schedule Sources: Manage scheduled/recurring tasks with different storage strategies

Each component type supports three Redis deployment architectures:

  • Standard Redis: Single Redis instance
  • Redis Cluster: Distributed Redis deployment
  • Redis Sentinel: High-availability Redis with automatic failover

Capabilities

Message Brokers

Core message brokers that handle task distribution between producers and consumers, supporting different message patterns and Redis deployment types.

class PubSubBroker(BaseRedisBroker):
    def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
    async def kick(self, message: BrokerMessage) -> None: ...
    async def listen(self) -> AsyncGenerator[bytes, None]: ...

class ListQueueBroker(BaseRedisBroker):
    def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
    async def kick(self, message: BrokerMessage) -> None: ...
    async def listen(self) -> AsyncGenerator[bytes, None]: ...

class RedisStreamBroker(BaseRedisBroker):
    def __init__(
        self,
        url: str,
        queue_name: str = "taskiq",
        consumer_group_name: str = "taskiq",
        consumer_name: Optional[str] = None,
        **kwargs
    ): ...
    async def startup(self) -> None: ...
    async def kick(self, message: BrokerMessage) -> None: ...
    async def listen(self) -> AsyncGenerator[AckableMessage, None]: ...

Message Brokers

Result Backends

Async result backends for storing and retrieving task execution results with configurable expiration times and different Redis deployment support.

class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
    def __init__(
        self,
        redis_url: str,
        keep_results: bool = True,
        result_ex_time: Optional[int] = None,
        result_px_time: Optional[int] = None,
        **kwargs
    ): ...
    async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None: ...
    async def get_result(self, task_id: str, with_logs: bool = False) -> TaskiqResult[_ReturnType]: ...
    async def is_result_ready(self, task_id: str) -> bool: ...
    async def set_progress(self, task_id: str, progress: TaskProgress[_ReturnType]) -> None: ...
    async def get_progress(self, task_id: str) -> Union[TaskProgress[_ReturnType], None]: ...

Result Backends

Schedule Sources

Schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support.

class ListRedisScheduleSource(ScheduleSource):
    def __init__(
        self,
        url: str,
        prefix: str = "schedule",
        buffer_size: int = 50,
        skip_past_schedules: bool = False,
        **kwargs
    ): ...
    async def add_schedule(self, schedule: ScheduledTask) -> None: ...
    async def get_schedules(self) -> List[ScheduledTask]: ...
    async def delete_schedule(self, schedule_id: str) -> None: ...
    def with_migrate_from(self, source: ScheduleSource, delete_schedules: bool = True) -> Self: ...

Schedule Sources

Exception Classes

class TaskIQRedisError(TaskiqError):
    """Base error for all taskiq-redis exceptions."""

class DuplicateExpireTimeSelectedError(ResultBackendError, TaskIQRedisError):
    """Error if two lifetimes are selected."""

class ExpireTimeMustBeMoreThanZeroError(ResultBackendError, TaskIQRedisError):
    """Error if lifetimes are less or equal zero."""

class ResultIsMissingError(TaskIQRedisError, ResultGetError):
    """Error if there is no result when trying to get it."""