0
# Result Backends
1
2
TaskIQ-Redis provides async result backends for storing and retrieving task execution results with configurable expiration times. Each backend supports different Redis deployment architectures while maintaining the same API interface.
3
4
## Capabilities
5
6
### Standard Redis Result Backend
7
8
Async result backend for single Redis instance deployments.
9
10
```python { .api }
11
class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
12
def __init__(
13
self,
14
redis_url: str,
15
keep_results: bool = True,
16
result_ex_time: Optional[int] = None,
17
result_px_time: Optional[int] = None,
18
max_connection_pool_size: Optional[int] = None,
19
serializer: Optional[TaskiqSerializer] = None,
20
prefix_str: Optional[str] = None,
21
**connection_kwargs: Any,
22
) -> None:
23
"""
24
Redis async result backend.
25
26
Parameters:
27
- redis_url: Redis connection URL
28
- keep_results: Don't remove results after reading (default: True)
29
- result_ex_time: Result expiration time in seconds
30
- result_px_time: Result expiration time in milliseconds
31
- max_connection_pool_size: Maximum connections in pool
32
- serializer: Custom serializer (default: PickleSerializer)
33
- prefix_str: Prefix for Redis keys
34
- connection_kwargs: Additional Redis connection arguments
35
36
Raises:
37
- DuplicateExpireTimeSelectedError: If both ex_time and px_time specified
38
- ExpireTimeMustBeMoreThanZeroError: If expiration time <= 0
39
"""
40
41
async def shutdown(self) -> None:
42
"""Close Redis connection pool."""
43
44
async def set_result(
45
self,
46
task_id: str,
47
result: TaskiqResult[_ReturnType]
48
) -> None:
49
"""
50
Store task result in Redis.
51
52
Parameters:
53
- task_id: Unique task identifier
54
- result: Task execution result to store
55
"""
56
57
async def is_result_ready(self, task_id: str) -> bool:
58
"""
59
Check if task result is available.
60
61
Parameters:
62
- task_id: Unique task identifier
63
64
Returns:
65
- bool: True if result is ready, False otherwise
66
"""
67
68
async def get_result(
69
self,
70
task_id: str,
71
with_logs: bool = False
72
) -> TaskiqResult[_ReturnType]:
73
"""
74
Retrieve task result from Redis.
75
76
Parameters:
77
- task_id: Unique task identifier
78
- with_logs: Include execution logs in result (default: False)
79
80
Returns:
81
- TaskiqResult: Task execution result
82
83
Raises:
84
- ResultIsMissingError: If result not found
85
"""
86
87
async def set_progress(
88
self,
89
task_id: str,
90
progress: TaskProgress[_ReturnType]
91
) -> None:
92
"""
93
Store task progress information.
94
95
Parameters:
96
- task_id: Unique task identifier
97
- progress: Task progress information
98
"""
99
100
async def get_progress(
101
self,
102
task_id: str
103
) -> Union[TaskProgress[_ReturnType], None]:
104
"""
105
Retrieve task progress information.
106
107
Parameters:
108
- task_id: Unique task identifier
109
110
Returns:
111
- TaskProgress or None: Progress information if available
112
"""
113
```
114
115
**Usage Example:**
116
117
```python
118
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
119
120
# Create result backend with 1 hour expiration
121
backend = RedisAsyncResultBackend(
122
redis_url="redis://localhost:6379",
123
result_ex_time=3600, # 1 hour in seconds
124
keep_results=True
125
)
126
127
# Use with broker
128
broker = RedisStreamBroker(
129
url="redis://localhost:6379",
130
result_backend=backend
131
)
132
133
@broker.task
134
async def compute_task(x: int, y: int) -> int:
135
return x * y
136
137
# Execute task and get result
138
task = await compute_task.kiq(10, 20)
139
result = await task.wait_result()
140
print(f"Result: {result.return_value}") # Result: 200
141
142
# Check progress (if task supports it)
143
progress = await backend.get_progress(task.task_id)
144
if progress:
145
print(f"Progress: {progress.progress}%")
146
```
147
148
### Redis Cluster Result Backend
149
150
Async result backend for Redis Cluster deployments.
151
152
```python { .api }
153
class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):
154
def __init__(
155
self,
156
redis_url: str,
157
keep_results: bool = True,
158
result_ex_time: Optional[int] = None,
159
result_px_time: Optional[int] = None,
160
serializer: Optional[TaskiqSerializer] = None,
161
prefix_str: Optional[str] = None,
162
**connection_kwargs: Any,
163
) -> None:
164
"""
165
Redis Cluster async result backend.
166
167
Parameters similar to RedisAsyncResultBackend but without
168
max_connection_pool_size (managed by Redis Cluster client).
169
"""
170
171
async def shutdown(self) -> None:
172
"""Close Redis cluster connection."""
173
174
async def set_result(
175
self,
176
task_id: str,
177
result: TaskiqResult[_ReturnType]
178
) -> None:
179
"""Store task result in Redis cluster."""
180
181
async def is_result_ready(self, task_id: str) -> bool:
182
"""Check if task result is available in Redis cluster."""
183
184
async def get_result(
185
self,
186
task_id: str,
187
with_logs: bool = False
188
) -> TaskiqResult[_ReturnType]:
189
"""Retrieve task result from Redis cluster."""
190
191
async def set_progress(
192
self,
193
task_id: str,
194
progress: TaskProgress[_ReturnType]
195
) -> None:
196
"""Store task progress in Redis cluster."""
197
198
async def get_progress(
199
self,
200
task_id: str
201
) -> Union[TaskProgress[_ReturnType], None]:
202
"""Retrieve task progress from Redis cluster."""
203
```
204
205
**Usage Example:**
206
207
```python
208
from taskiq_redis import RedisAsyncClusterResultBackend, RedisStreamClusterBroker
209
210
# Create cluster result backend
211
backend = RedisAsyncClusterResultBackend(
212
redis_url="redis://cluster-node1:6379",
213
result_ex_time=7200 # 2 hours
214
)
215
216
# Use with cluster broker
217
broker = RedisStreamClusterBroker(
218
url="redis://cluster-node1:6379",
219
result_backend=backend
220
)
221
```
222
223
### Redis Sentinel Result Backend
224
225
Async result backend for Redis Sentinel deployments with high availability.
226
227
```python { .api }
228
class RedisAsyncSentinelResultBackend(AsyncResultBackend[_ReturnType]):
229
def __init__(
230
self,
231
sentinels: List[Tuple[str, int]],
232
master_name: str,
233
keep_results: bool = True,
234
result_ex_time: Optional[int] = None,
235
result_px_time: Optional[int] = None,
236
min_other_sentinels: int = 0,
237
sentinel_kwargs: Optional[Any] = None,
238
serializer: Optional[TaskiqSerializer] = None,
239
prefix_str: Optional[str] = None,
240
**connection_kwargs: Any,
241
) -> None:
242
"""
243
Redis Sentinel async result backend.
244
245
Parameters:
246
- sentinels: List of sentinel (host, port) pairs
247
- master_name: Sentinel master name
248
- keep_results: Don't remove results after reading (default: True)
249
- result_ex_time: Result expiration time in seconds
250
- result_px_time: Result expiration time in milliseconds
251
- min_other_sentinels: Minimum other sentinels required (default: 0)
252
- sentinel_kwargs: Additional sentinel configuration
253
- serializer: Custom serializer (default: PickleSerializer)
254
- prefix_str: Prefix for Redis keys
255
- connection_kwargs: Additional Redis connection arguments
256
"""
257
258
async def shutdown(self) -> None:
259
"""Close Redis sentinel connection."""
260
261
async def set_result(
262
self,
263
task_id: str,
264
result: TaskiqResult[_ReturnType]
265
) -> None:
266
"""Store task result in Redis via Sentinel."""
267
268
async def is_result_ready(self, task_id: str) -> bool:
269
"""Check if task result is available via Sentinel."""
270
271
async def get_result(
272
self,
273
task_id: str,
274
with_logs: bool = False
275
) -> TaskiqResult[_ReturnType]:
276
"""Retrieve task result from Redis via Sentinel."""
277
278
async def set_progress(
279
self,
280
task_id: str,
281
progress: TaskProgress[_ReturnType]
282
) -> None:
283
"""Store task progress via Sentinel."""
284
285
async def get_progress(
286
self,
287
task_id: str
288
) -> Union[TaskProgress[_ReturnType], None]:
289
"""Retrieve task progress via Sentinel."""
290
```
291
292
**Usage Example:**
293
294
```python
295
from taskiq_redis import RedisAsyncSentinelResultBackend, RedisStreamSentinelBroker
296
297
# Create high-availability result backend
298
backend = RedisAsyncSentinelResultBackend(
299
sentinels=[
300
("sentinel1", 26379),
301
("sentinel2", 26379),
302
("sentinel3", 26379)
303
],
304
master_name="mymaster",
305
result_ex_time=1800, # 30 minutes
306
min_other_sentinels=1
307
)
308
309
# Use with sentinel broker
310
broker = RedisStreamSentinelBroker(
311
sentinels=[("sentinel1", 26379), ("sentinel2", 26379)],
312
master_name="mymaster",
313
result_backend=backend
314
)
315
316
@broker.task
317
async def important_task(data: dict) -> dict:
318
# Critical task with HA storage
319
return {"processed": data, "success": True}
320
```
321
322
## Result Management
323
324
### Expiration Strategies
325
326
Results can be configured to expire automatically:
327
328
```python
329
# Expire after 1 hour (3600 seconds)
330
backend = RedisAsyncResultBackend(
331
redis_url="redis://localhost:6379",
332
result_ex_time=3600
333
)
334
335
# Expire after 30 minutes (1800000 milliseconds)
336
backend = RedisAsyncResultBackend(
337
redis_url="redis://localhost:6379",
338
result_px_time=1800000
339
)
340
341
# Keep results indefinitely
342
backend = RedisAsyncResultBackend(
343
redis_url="redis://localhost:6379",
344
keep_results=True
345
)
346
```
347
348
### Progress Tracking
349
350
Tasks can report progress during execution:
351
352
```python
353
from taskiq_redis import RedisAsyncResultBackend
354
from taskiq.depends.progress_tracker import TaskProgress
355
356
backend = RedisAsyncResultBackend("redis://localhost:6379")
357
358
@broker.task
359
async def long_running_task(items: List[str]) -> List[str]:
360
results = []
361
total = len(items)
362
363
for i, item in enumerate(items):
364
# Process item
365
processed = await process_item(item)
366
results.append(processed)
367
368
# Update progress
369
progress = TaskProgress(
370
progress=int((i + 1) / total * 100),
371
message=f"Processed {i + 1}/{total} items"
372
)
373
await backend.set_progress(task.task_id, progress)
374
375
return results
376
377
# Monitor progress
378
task = await long_running_task.kiq(["item1", "item2", "item3"])
379
while not await backend.is_result_ready(task.task_id):
380
progress = await backend.get_progress(task.task_id)
381
if progress:
382
print(f"Progress: {progress.progress}% - {progress.message}")
383
await asyncio.sleep(1)
384
```
385
386
## Types
387
388
```python { .api }
389
from typing import TypeVar, Optional, Any, List, Tuple, Union
390
from taskiq.abc.result_backend import AsyncResultBackend
391
from taskiq.abc.serializer import TaskiqSerializer
392
from taskiq.result import TaskiqResult
393
from taskiq.depends.progress_tracker import TaskProgress
394
395
_ReturnType = TypeVar("_ReturnType")
396
397
# Constants
398
PROGRESS_KEY_SUFFIX: str = "__progress"
399
```