CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq-redis

Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

TaskIQ-Redis

Redis integration for TaskIQ, providing comprehensive Redis-based message brokers and result backends with support for different Redis architectures (single node, cluster, sentinel). TaskIQ-Redis offers three broker types: PubSub for broadcasting (no acknowledgements), ListQueue for simple queuing (no acknowledgements), and Stream for reliable message processing with acknowledgement support.

Package Information

  • Package Name: taskiq-redis
  • Language: Python
  • Installation: pip install taskiq-redis
  • Dependencies: taskiq>=0.11.12,<1, redis^6

Core Imports

from taskiq_redis import (
    # Basic Redis brokers
    PubSubBroker,
    ListQueueBroker,
    RedisStreamBroker,
    
    # Redis Cluster brokers
    ListQueueClusterBroker,
    RedisStreamClusterBroker,
    
    # Redis Sentinel brokers
    ListQueueSentinelBroker,
    PubSubSentinelBroker,
    RedisStreamSentinelBroker,
    
    # Result backends
    RedisAsyncResultBackend,
    RedisAsyncClusterResultBackend,
    RedisAsyncSentinelResultBackend,
    
    # Schedule sources
    ListRedisScheduleSource,
    RedisScheduleSource,
    RedisClusterScheduleSource,
    RedisSentinelScheduleSource
)

Basic Usage

import asyncio
from taskiq import TaskiqResult, TaskiqMessage
from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend

# Create broker with result backend
broker = RedisStreamBroker(
    url="redis://localhost:6379",
    result_backend=RedisAsyncResultBackend("redis://localhost:6379")
)

# Define a task
@broker.task
async def add_numbers(a: int, b: int) -> int:
    return a + b

async def main():
    # Start the broker
    await broker.startup()
    
    # Send task
    task = await add_numbers.kiq(10, 20)
    
    # Get result
    result = await task.wait_result()
    print(f"Result: {result.return_value}")  # Result: 30
    
    # Shutdown
    await broker.shutdown()

# Run the example
asyncio.run(main())

Architecture

TaskIQ-Redis provides a comprehensive Redis integration with three main component categories:

  • Brokers: Handle message distribution between producers and consumers
    • PubSub: Broadcast messages to all connected workers (fire-and-forget)
    • ListQueue: Distribute tasks between workers using Redis lists (simple queuing)
    • Stream: Reliable message processing with acknowledgement using Redis streams
  • Result Backends: Store and retrieve task execution results with configurable expiration
  • Schedule Sources: Manage scheduled/recurring tasks with different storage strategies

Each component type supports three Redis deployment architectures:

  • Standard Redis: Single Redis instance
  • Redis Cluster: Distributed Redis deployment
  • Redis Sentinel: High-availability Redis with automatic failover

Capabilities

Message Brokers

Core message brokers that handle task distribution between producers and consumers, supporting different message patterns and Redis deployment types.

class PubSubBroker(BaseRedisBroker):
    def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
    async def kick(self, message: BrokerMessage) -> None: ...
    async def listen(self) -> AsyncGenerator[bytes, None]: ...

class ListQueueBroker(BaseRedisBroker):
    def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
    async def kick(self, message: BrokerMessage) -> None: ...
    async def listen(self) -> AsyncGenerator[bytes, None]: ...

class RedisStreamBroker(BaseRedisBroker):
    def __init__(
        self,
        url: str,
        queue_name: str = "taskiq",
        consumer_group_name: str = "taskiq",
        consumer_name: Optional[str] = None,
        **kwargs
    ): ...
    async def startup(self) -> None: ...
    async def kick(self, message: BrokerMessage) -> None: ...
    async def listen(self) -> AsyncGenerator[AckableMessage, None]: ...

Message Brokers

Result Backends

Async result backends for storing and retrieving task execution results with configurable expiration times and different Redis deployment support.

class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
    def __init__(
        self,
        redis_url: str,
        keep_results: bool = True,
        result_ex_time: Optional[int] = None,
        result_px_time: Optional[int] = None,
        **kwargs
    ): ...
    async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None: ...
    async def get_result(self, task_id: str, with_logs: bool = False) -> TaskiqResult[_ReturnType]: ...
    async def is_result_ready(self, task_id: str) -> bool: ...
    async def set_progress(self, task_id: str, progress: TaskProgress[_ReturnType]) -> None: ...
    async def get_progress(self, task_id: str) -> Union[TaskProgress[_ReturnType], None]: ...

Result Backends

Schedule Sources

Schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support.

class ListRedisScheduleSource(ScheduleSource):
    def __init__(
        self,
        url: str,
        prefix: str = "schedule",
        buffer_size: int = 50,
        skip_past_schedules: bool = False,
        **kwargs
    ): ...
    async def add_schedule(self, schedule: ScheduledTask) -> None: ...
    async def get_schedules(self) -> List[ScheduledTask]: ...
    async def delete_schedule(self, schedule_id: str) -> None: ...
    def with_migrate_from(self, source: ScheduleSource, delete_schedules: bool = True) -> Self: ...

Schedule Sources

Exception Classes

class TaskIQRedisError(TaskiqError):
    """Base error for all taskiq-redis exceptions."""

class DuplicateExpireTimeSelectedError(ResultBackendError, TaskIQRedisError):
    """Error if two lifetimes are selected."""

class ExpireTimeMustBeMoreThanZeroError(ResultBackendError, TaskIQRedisError):
    """Error if lifetimes are less or equal zero."""

class ResultIsMissingError(TaskIQRedisError, ResultGetError):
    """Error if there is no result when trying to get it."""
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/taskiq-redis@1.1.x
Publish Source
CLI
Badge
tessl/pypi-taskiq-redis badge