Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments
TaskIQ-Redis provides comprehensive message brokers for different Redis deployment architectures and message patterns. Each broker type supports different message delivery guarantees and is optimized for specific use cases.
Message brokers for single Redis instance deployments.
Broadcasts messages to all connected workers using Redis pub/sub. Messages are fire-and-forget with no delivery guarantees or acknowledgements.
class PubSubBroker(BaseRedisBroker):
def __init__(
self,
url: str,
task_id_generator: Optional[Callable[[], str]] = None,
result_backend: Optional[AsyncResultBackend[_T]] = None,
queue_name: str = "taskiq",
max_connection_pool_size: Optional[int] = None,
**connection_kwargs: Any,
) -> None:
"""
Redis pub/sub broker for broadcasting tasks.
Parameters:
- url: Redis connection URL
- task_id_generator: Custom task ID generator function
- result_backend: Result backend for storing task results
- queue_name: Redis pub/sub channel name (default: "taskiq")
- max_connection_pool_size: Maximum connections in pool
- connection_kwargs: Additional Redis connection arguments
"""
async def kick(self, message: BrokerMessage) -> None:
"""
Publish message to Redis pub/sub channel.
Parameters:
- message: Message to broadcast to all subscribers
"""
async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Listen for messages on Redis pub/sub channel.
Yields:
- bytes: Raw message data from pub/sub channel
"""
async def shutdown(self) -> None:
"""Close Redis connection pool."""Usage Example:
from taskiq_redis import PubSubBroker
# Create pub/sub broker
broker = PubSubBroker("redis://localhost:6379")
@broker.task
async def broadcast_task(message: str) -> str:
return f"Processed: {message}"
# All connected workers will receive this task
await broadcast_task.kiq("Hello workers!")Distributes tasks between workers using Redis lists. Tasks are queued and distributed to available workers with simple load balancing.
class ListQueueBroker(BaseRedisBroker):
def __init__(
self,
url: str,
task_id_generator: Optional[Callable[[], str]] = None,
result_backend: Optional[AsyncResultBackend[_T]] = None,
queue_name: str = "taskiq",
max_connection_pool_size: Optional[int] = None,
**connection_kwargs: Any,
) -> None:
"""
Redis list-based queue broker for task distribution.
Parameters:
- url: Redis connection URL
- task_id_generator: Custom task ID generator function
- result_backend: Result backend for storing task results
- queue_name: Redis list key name (default: "taskiq")
- max_connection_pool_size: Maximum connections in pool
- connection_kwargs: Additional Redis connection arguments
"""
async def kick(self, message: BrokerMessage) -> None:
"""
Add message to Redis list queue.
Parameters:
- message: Message to queue for worker processing
"""
async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Listen for messages from Redis list queue.
Yields:
- bytes: Raw message data from queue
"""Usage Example:
from taskiq_redis import ListQueueBroker
# Create list queue broker
broker = ListQueueBroker("redis://localhost:6379", queue_name="my_tasks")
@broker.task
async def process_item(item_id: int) -> dict:
return {"item_id": item_id, "status": "processed"}
# Task will be distributed to next available worker
await process_item.kiq(123)Uses Redis streams for reliable message processing with acknowledgement support, consumer groups, and automatic message redelivery.
class RedisStreamBroker(BaseRedisBroker):
def __init__(
self,
url: str,
queue_name: str = "taskiq",
max_connection_pool_size: Optional[int] = None,
consumer_group_name: str = "taskiq",
consumer_name: Optional[str] = None,
consumer_id: str = "$",
mkstream: bool = True,
xread_block: int = 2000,
maxlen: Optional[int] = None,
approximate: bool = True,
idle_timeout: int = 600000,
unacknowledged_batch_size: int = 100,
xread_count: Optional[int] = 100,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
) -> None:
"""
Redis streams broker with acknowledgement support.
Parameters:
- url: Redis connection URL
- queue_name: Redis stream key name (default: "taskiq")
- max_connection_pool_size: Maximum connections in pool
- consumer_group_name: Consumer group name (default: "taskiq")
- consumer_name: Consumer name (default: random UUID)
- consumer_id: Consumer starting position (default: "$")
- mkstream: Create stream if it doesn't exist (default: True)
- xread_block: Block time in ms for stream reads (default: 2000)
- maxlen: Maximum stream length for trimming (default: None)
- approximate: Use approximate trimming (default: True)
- idle_timeout: Message redelivery timeout in ms (default: 600000)
- unacknowledged_batch_size: Batch size for reclaiming messages (default: 100)
- xread_count: Messages to read per batch (default: 100)
- additional_streams: Additional streams to read from
- connection_kwargs: Additional Redis connection arguments
"""
async def startup(self) -> None:
"""Initialize consumer group on startup."""
async def kick(self, message: BrokerMessage) -> None:
"""
Add message to Redis stream.
Parameters:
- message: Message to add to stream
"""
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
"""
Listen for messages from Redis stream.
Yields:
- AckableMessage: Message with acknowledgement capability
"""Usage Example:
from taskiq_redis import RedisStreamBroker
# Create stream broker with custom configuration
broker = RedisStreamBroker(
url="redis://localhost:6379",
consumer_group_name="workers",
idle_timeout=300000 # 5 minutes
)
@broker.task
async def critical_task(data: dict) -> dict:
# Process important data
return {"processed": data, "timestamp": time.time()}
# Task will be acknowledged after processing
await critical_task.kiq({"important": "data"})Message brokers for Redis Cluster deployments, providing horizontal scaling across multiple Redis nodes.
class ListQueueClusterBroker(BaseRedisClusterBroker):
def __init__(
self,
url: str,
queue_name: str = "taskiq",
max_connection_pool_size: int = 2**31,
**connection_kwargs: Any,
) -> None:
"""
Redis Cluster list queue broker.
Parameters:
- url: Redis cluster connection URL
- queue_name: Redis list key name (default: "taskiq")
- max_connection_pool_size: Maximum connections (default: 2**31)
- connection_kwargs: Additional Redis cluster connection arguments
"""
async def kick(self, message: BrokerMessage) -> None:
"""Add message to Redis cluster list queue."""
async def listen(self) -> AsyncGenerator[bytes, None]:
"""Listen for messages from Redis cluster list queue."""
async def shutdown(self) -> None:
"""Close Redis cluster connection."""class RedisStreamClusterBroker(BaseRedisClusterBroker):
def __init__(
self,
url: str,
queue_name: str = "taskiq",
max_connection_pool_size: int = 2**31,
consumer_group_name: str = "taskiq",
consumer_name: Optional[str] = None,
consumer_id: str = "$",
mkstream: bool = True,
xread_block: int = 10000,
maxlen: Optional[int] = None,
approximate: bool = True,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
) -> None:
"""
Redis Cluster streams broker with acknowledgement support.
Similar parameters to RedisStreamBroker but for Redis Cluster.
"""
async def startup(self) -> None:
"""Initialize consumer group on startup."""
async def kick(self, message: BrokerMessage) -> None:
"""Add message to Redis cluster stream."""
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
"""Listen for messages from Redis cluster stream."""Message brokers for Redis Sentinel deployments, providing high availability with automatic failover.
class PubSubSentinelBroker(BaseSentinelBroker):
def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
result_backend: Optional[AsyncResultBackend[_T]] = None,
task_id_generator: Optional[Callable[[], str]] = None,
queue_name: str = "taskiq",
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Any] = None,
**connection_kwargs: Any,
) -> None:
"""
Redis Sentinel pub/sub broker.
Parameters:
- sentinels: List of sentinel (host, port) pairs
- master_name: Sentinel master name
- result_backend: Result backend for storing task results
- task_id_generator: Custom task ID generator function
- queue_name: Pub/sub channel name (default: "taskiq")
- min_other_sentinels: Minimum other sentinels required (default: 0)
- sentinel_kwargs: Additional sentinel configuration
- connection_kwargs: Additional Redis connection arguments
"""
async def kick(self, message: BrokerMessage) -> None:
"""Publish message to Redis Sentinel pub/sub channel."""
async def listen(self) -> AsyncGenerator[bytes, None]:
"""Listen for messages from Redis Sentinel pub/sub channel."""class ListQueueSentinelBroker(BaseSentinelBroker):
def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
**kwargs
) -> None:
"""Redis Sentinel list queue broker."""
async def kick(self, message: BrokerMessage) -> None:
"""Add message to Redis Sentinel list queue."""
async def listen(self) -> AsyncGenerator[bytes, None]:
"""Listen for messages from Redis Sentinel list queue."""class RedisStreamSentinelBroker(BaseSentinelBroker):
def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
min_other_sentinels: int = 0,
queue_name: str = "taskiq",
consumer_group_name: str = "taskiq",
consumer_name: Optional[str] = None,
consumer_id: str = "$",
mkstream: bool = True,
xread_block: int = 10000,
maxlen: Optional[int] = None,
approximate: bool = True,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
) -> None:
"""Redis Sentinel streams broker with acknowledgement support."""
async def startup(self) -> None:
"""Initialize consumer group on startup."""
async def kick(self, message: BrokerMessage) -> None:
"""Add message to Redis Sentinel stream."""
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
"""Listen for messages from Redis Sentinel stream."""Usage Example:
from taskiq_redis import RedisStreamSentinelBroker
# Create high-availability stream broker
broker = RedisStreamSentinelBroker(
sentinels=[("sentinel1", 26379), ("sentinel2", 26379)],
master_name="mymaster",
consumer_group_name="ha-workers"
)
@broker.task
async def ha_task(data: str) -> str:
return f"Processed with HA: {data}"from typing import TypeVar, Callable, Optional, Any, AsyncGenerator, Dict, List, Tuple
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage
from taskiq import AckableMessage
_T = TypeVar("_T")
class BaseRedisBroker(AsyncBroker):
"""Base class for Redis brokers."""
class BaseRedisClusterBroker(AsyncBroker):
"""Base class for Redis Cluster brokers."""
class BaseSentinelBroker(AsyncBroker):
"""Base class for Redis Sentinel brokers."""Install with Tessl CLI
npx tessl i tessl/pypi-taskiq-redis