Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments
npx @tessl/cli install tessl/pypi-taskiq-redis@1.1.00
# TaskIQ-Redis
1
2
Redis integration for TaskIQ, providing comprehensive Redis-based message brokers and result backends with support for different Redis architectures (single node, cluster, sentinel). TaskIQ-Redis offers three broker types: PubSub for broadcasting (no acknowledgements), ListQueue for simple queuing (no acknowledgements), and Stream for reliable message processing with acknowledgement support.
3
4
## Package Information
5
6
- **Package Name**: taskiq-redis
7
- **Language**: Python
8
- **Installation**: `pip install taskiq-redis`
9
- **Dependencies**: `taskiq>=0.11.12,<1`, `redis^6`
10
11
## Core Imports
12
13
```python
14
from taskiq_redis import (
15
# Basic Redis brokers
16
PubSubBroker,
17
ListQueueBroker,
18
RedisStreamBroker,
19
20
# Redis Cluster brokers
21
ListQueueClusterBroker,
22
RedisStreamClusterBroker,
23
24
# Redis Sentinel brokers
25
ListQueueSentinelBroker,
26
PubSubSentinelBroker,
27
RedisStreamSentinelBroker,
28
29
# Result backends
30
RedisAsyncResultBackend,
31
RedisAsyncClusterResultBackend,
32
RedisAsyncSentinelResultBackend,
33
34
# Schedule sources
35
ListRedisScheduleSource,
36
RedisScheduleSource,
37
RedisClusterScheduleSource,
38
RedisSentinelScheduleSource
39
)
40
```
41
42
## Basic Usage
43
44
```python
45
import asyncio
46
from taskiq import TaskiqResult, TaskiqMessage
47
from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend
48
49
# Create broker with result backend
50
broker = RedisStreamBroker(
51
url="redis://localhost:6379",
52
result_backend=RedisAsyncResultBackend("redis://localhost:6379")
53
)
54
55
# Define a task
56
@broker.task
57
async def add_numbers(a: int, b: int) -> int:
58
return a + b
59
60
async def main():
61
# Start the broker
62
await broker.startup()
63
64
# Send task
65
task = await add_numbers.kiq(10, 20)
66
67
# Get result
68
result = await task.wait_result()
69
print(f"Result: {result.return_value}") # Result: 30
70
71
# Shutdown
72
await broker.shutdown()
73
74
# Run the example
75
asyncio.run(main())
76
```
77
78
## Architecture
79
80
TaskIQ-Redis provides a comprehensive Redis integration with three main component categories:
81
82
- **Brokers**: Handle message distribution between producers and consumers
83
- **PubSub**: Broadcast messages to all connected workers (fire-and-forget)
84
- **ListQueue**: Distribute tasks between workers using Redis lists (simple queuing)
85
- **Stream**: Reliable message processing with acknowledgement using Redis streams
86
- **Result Backends**: Store and retrieve task execution results with configurable expiration
87
- **Schedule Sources**: Manage scheduled/recurring tasks with different storage strategies
88
89
Each component type supports three Redis deployment architectures:
90
- **Standard Redis**: Single Redis instance
91
- **Redis Cluster**: Distributed Redis deployment
92
- **Redis Sentinel**: High-availability Redis with automatic failover
93
94
## Capabilities
95
96
### Message Brokers
97
98
Core message brokers that handle task distribution between producers and consumers, supporting different message patterns and Redis deployment types.
99
100
```python { .api }
101
class PubSubBroker(BaseRedisBroker):
102
def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
103
async def kick(self, message: BrokerMessage) -> None: ...
104
async def listen(self) -> AsyncGenerator[bytes, None]: ...
105
106
class ListQueueBroker(BaseRedisBroker):
107
def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...
108
async def kick(self, message: BrokerMessage) -> None: ...
109
async def listen(self) -> AsyncGenerator[bytes, None]: ...
110
111
class RedisStreamBroker(BaseRedisBroker):
112
def __init__(
113
self,
114
url: str,
115
queue_name: str = "taskiq",
116
consumer_group_name: str = "taskiq",
117
consumer_name: Optional[str] = None,
118
**kwargs
119
): ...
120
async def startup(self) -> None: ...
121
async def kick(self, message: BrokerMessage) -> None: ...
122
async def listen(self) -> AsyncGenerator[AckableMessage, None]: ...
123
```
124
125
[Message Brokers](./brokers.md)
126
127
### Result Backends
128
129
Async result backends for storing and retrieving task execution results with configurable expiration times and different Redis deployment support.
130
131
```python { .api }
132
class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
133
def __init__(
134
self,
135
redis_url: str,
136
keep_results: bool = True,
137
result_ex_time: Optional[int] = None,
138
result_px_time: Optional[int] = None,
139
**kwargs
140
): ...
141
async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None: ...
142
async def get_result(self, task_id: str, with_logs: bool = False) -> TaskiqResult[_ReturnType]: ...
143
async def is_result_ready(self, task_id: str) -> bool: ...
144
async def set_progress(self, task_id: str, progress: TaskProgress[_ReturnType]) -> None: ...
145
async def get_progress(self, task_id: str) -> Union[TaskProgress[_ReturnType], None]: ...
146
```
147
148
[Result Backends](./result-backends.md)
149
150
### Schedule Sources
151
152
Schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support.
153
154
```python { .api }
155
class ListRedisScheduleSource(ScheduleSource):
156
def __init__(
157
self,
158
url: str,
159
prefix: str = "schedule",
160
buffer_size: int = 50,
161
skip_past_schedules: bool = False,
162
**kwargs
163
): ...
164
async def add_schedule(self, schedule: ScheduledTask) -> None: ...
165
async def get_schedules(self) -> List[ScheduledTask]: ...
166
async def delete_schedule(self, schedule_id: str) -> None: ...
167
def with_migrate_from(self, source: ScheduleSource, delete_schedules: bool = True) -> Self: ...
168
```
169
170
[Schedule Sources](./schedule-sources.md)
171
172
## Exception Classes
173
174
```python { .api }
175
class TaskIQRedisError(TaskiqError):
176
"""Base error for all taskiq-redis exceptions."""
177
178
class DuplicateExpireTimeSelectedError(ResultBackendError, TaskIQRedisError):
179
"""Error if two lifetimes are selected."""
180
181
class ExpireTimeMustBeMoreThanZeroError(ResultBackendError, TaskIQRedisError):
182
"""Error if lifetimes are less or equal zero."""
183
184
class ResultIsMissingError(TaskIQRedisError, ResultGetError):
185
"""Error if there is no result when trying to get it."""
186
```