Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments
npx @tessl/cli install tessl/pypi-taskiq-redis@1.1.0Redis integration for TaskIQ, providing comprehensive Redis-based message brokers and result backends with support for different Redis architectures (single node, cluster, sentinel). TaskIQ-Redis offers three broker types: PubSub for broadcasting (no acknowledgements), ListQueue for simple queuing (no acknowledgements), and Stream for reliable message processing with acknowledgement support.
pip install taskiq-redistaskiq>=0.11.12,<1, redis^6from taskiq_redis import (
# Basic Redis brokers
PubSubBroker,
ListQueueBroker,
RedisStreamBroker,
# Redis Cluster brokers
ListQueueClusterBroker,
RedisStreamClusterBroker,
# Redis Sentinel brokers
ListQueueSentinelBroker,
PubSubSentinelBroker,
RedisStreamSentinelBroker,
# Result backends
RedisAsyncResultBackend,
RedisAsyncClusterResultBackend,
RedisAsyncSentinelResultBackend,
# Schedule sources
ListRedisScheduleSource,
RedisScheduleSource,
RedisClusterScheduleSource,
RedisSentinelScheduleSource
)import asyncio
from taskiq import TaskiqResult, TaskiqMessage
from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend
# Create broker with result backend
broker = RedisStreamBroker(
url="redis://localhost:6379",
result_backend=RedisAsyncResultBackend("redis://localhost:6379")
)
# Define a task
@broker.task
async def add_numbers(a: int, b: int) -> int:
return a + b
async def main():
# Start the broker
await broker.startup()
# Send task
task = await add_numbers.kiq(10, 20)
# Get result
result = await task.wait_result()
print(f"Result: {result.return_value}") # Result: 30
# Shutdown
await broker.shutdown()
# Run the example
asyncio.run(main())TaskIQ-Redis provides a comprehensive Redis integration with three main component categories:
Each component type supports three Redis deployment architectures:
Core message brokers that handle task distribution between producers and consumers, supporting different message patterns and Redis deployment types.
class PubSubBroker(BaseRedisBroker):
def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
async def kick(self, message: BrokerMessage) -> None: ...
async def listen(self) -> AsyncGenerator[bytes, None]: ...
class ListQueueBroker(BaseRedisBroker):
def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
async def kick(self, message: BrokerMessage) -> None: ...
async def listen(self) -> AsyncGenerator[bytes, None]: ...
class RedisStreamBroker(BaseRedisBroker):
def __init__(
self,
url: str,
queue_name: str = "taskiq",
consumer_group_name: str = "taskiq",
consumer_name: Optional[str] = None,
**kwargs
): ...
async def startup(self) -> None: ...
async def kick(self, message: BrokerMessage) -> None: ...
async def listen(self) -> AsyncGenerator[AckableMessage, None]: ...Async result backends for storing and retrieving task execution results with configurable expiration times and different Redis deployment support.
class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
def __init__(
self,
redis_url: str,
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
**kwargs
): ...
async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None: ...
async def get_result(self, task_id: str, with_logs: bool = False) -> TaskiqResult[_ReturnType]: ...
async def is_result_ready(self, task_id: str) -> bool: ...
async def set_progress(self, task_id: str, progress: TaskProgress[_ReturnType]) -> None: ...
async def get_progress(self, task_id: str) -> Union[TaskProgress[_ReturnType], None]: ...Schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support.
class ListRedisScheduleSource(ScheduleSource):
def __init__(
self,
url: str,
prefix: str = "schedule",
buffer_size: int = 50,
skip_past_schedules: bool = False,
**kwargs
): ...
async def add_schedule(self, schedule: ScheduledTask) -> None: ...
async def get_schedules(self) -> List[ScheduledTask]: ...
async def delete_schedule(self, schedule_id: str) -> None: ...
def with_migrate_from(self, source: ScheduleSource, delete_schedules: bool = True) -> Self: ...class TaskIQRedisError(TaskiqError):
"""Base error for all taskiq-redis exceptions."""
class DuplicateExpireTimeSelectedError(ResultBackendError, TaskIQRedisError):
"""Error if two lifetimes are selected."""
class ExpireTimeMustBeMoreThanZeroError(ResultBackendError, TaskIQRedisError):
"""Error if lifetimes are less or equal zero."""
class ResultIsMissingError(TaskIQRedisError, ResultGetError):
"""Error if there is no result when trying to get it."""