0
# Message Brokers
1
2
TaskIQ-Redis provides comprehensive message brokers for different Redis deployment architectures and message patterns. Each broker type supports different message delivery guarantees and is optimized for specific use cases.
3
4
## Capabilities
5
6
### Standard Redis Brokers
7
8
Message brokers for single Redis instance deployments.
9
10
#### PubSub Broadcasting Broker
11
12
Broadcasts messages to all connected workers using Redis pub/sub. Messages are fire-and-forget with no delivery guarantees or acknowledgements.
13
14
```python { .api }
15
class PubSubBroker(BaseRedisBroker):
16
def __init__(
17
self,
18
url: str,
19
task_id_generator: Optional[Callable[[], str]] = None,
20
result_backend: Optional[AsyncResultBackend[_T]] = None,
21
queue_name: str = "taskiq",
22
max_connection_pool_size: Optional[int] = None,
23
**connection_kwargs: Any,
24
) -> None:
25
"""
26
Redis pub/sub broker for broadcasting tasks.
27
28
Parameters:
29
- url: Redis connection URL
30
- task_id_generator: Custom task ID generator function
31
- result_backend: Result backend for storing task results
32
- queue_name: Redis pub/sub channel name (default: "taskiq")
33
- max_connection_pool_size: Maximum connections in pool
34
- connection_kwargs: Additional Redis connection arguments
35
"""
36
37
async def kick(self, message: BrokerMessage) -> None:
38
"""
39
Publish message to Redis pub/sub channel.
40
41
Parameters:
42
- message: Message to broadcast to all subscribers
43
"""
44
45
async def listen(self) -> AsyncGenerator[bytes, None]:
46
"""
47
Listen for messages on Redis pub/sub channel.
48
49
Yields:
50
- bytes: Raw message data from pub/sub channel
51
"""
52
53
async def shutdown(self) -> None:
54
"""Close Redis connection pool."""
55
```
56
57
**Usage Example:**
58
59
```python
60
from taskiq_redis import PubSubBroker
61
62
# Create pub/sub broker
63
broker = PubSubBroker("redis://localhost:6379")
64
65
@broker.task
66
async def broadcast_task(message: str) -> str:
67
return f"Processed: {message}"
68
69
# All connected workers will receive this task
70
await broadcast_task.kiq("Hello workers!")
71
```
72
73
#### List Queue Broker
74
75
Distributes tasks between workers using Redis lists. Tasks are queued and distributed to available workers with simple load balancing.
76
77
```python { .api }
78
class ListQueueBroker(BaseRedisBroker):
79
def __init__(
80
self,
81
url: str,
82
task_id_generator: Optional[Callable[[], str]] = None,
83
result_backend: Optional[AsyncResultBackend[_T]] = None,
84
queue_name: str = "taskiq",
85
max_connection_pool_size: Optional[int] = None,
86
**connection_kwargs: Any,
87
) -> None:
88
"""
89
Redis list-based queue broker for task distribution.
90
91
Parameters:
92
- url: Redis connection URL
93
- task_id_generator: Custom task ID generator function
94
- result_backend: Result backend for storing task results
95
- queue_name: Redis list key name (default: "taskiq")
96
- max_connection_pool_size: Maximum connections in pool
97
- connection_kwargs: Additional Redis connection arguments
98
"""
99
100
async def kick(self, message: BrokerMessage) -> None:
101
"""
102
Add message to Redis list queue.
103
104
Parameters:
105
- message: Message to queue for worker processing
106
"""
107
108
async def listen(self) -> AsyncGenerator[bytes, None]:
109
"""
110
Listen for messages from Redis list queue.
111
112
Yields:
113
- bytes: Raw message data from queue
114
"""
115
```
116
117
**Usage Example:**
118
119
```python
120
from taskiq_redis import ListQueueBroker
121
122
# Create list queue broker
123
broker = ListQueueBroker("redis://localhost:6379", queue_name="my_tasks")
124
125
@broker.task
126
async def process_item(item_id: int) -> dict:
127
return {"item_id": item_id, "status": "processed"}
128
129
# Task will be distributed to next available worker
130
await process_item.kiq(123)
131
```
132
133
#### Redis Stream Broker
134
135
Uses Redis streams for reliable message processing with acknowledgement support, consumer groups, and automatic message redelivery.
136
137
```python { .api }
138
class RedisStreamBroker(BaseRedisBroker):
139
def __init__(
140
self,
141
url: str,
142
queue_name: str = "taskiq",
143
max_connection_pool_size: Optional[int] = None,
144
consumer_group_name: str = "taskiq",
145
consumer_name: Optional[str] = None,
146
consumer_id: str = "$",
147
mkstream: bool = True,
148
xread_block: int = 2000,
149
maxlen: Optional[int] = None,
150
approximate: bool = True,
151
idle_timeout: int = 600000,
152
unacknowledged_batch_size: int = 100,
153
xread_count: Optional[int] = 100,
154
additional_streams: Optional[Dict[str, str]] = None,
155
**connection_kwargs: Any,
156
) -> None:
157
"""
158
Redis streams broker with acknowledgement support.
159
160
Parameters:
161
- url: Redis connection URL
162
- queue_name: Redis stream key name (default: "taskiq")
163
- max_connection_pool_size: Maximum connections in pool
164
- consumer_group_name: Consumer group name (default: "taskiq")
165
- consumer_name: Consumer name (default: random UUID)
166
- consumer_id: Consumer starting position (default: "$")
167
- mkstream: Create stream if it doesn't exist (default: True)
168
- xread_block: Block time in ms for stream reads (default: 2000)
169
- maxlen: Maximum stream length for trimming (default: None)
170
- approximate: Use approximate trimming (default: True)
171
- idle_timeout: Message redelivery timeout in ms (default: 600000)
172
- unacknowledged_batch_size: Batch size for reclaiming messages (default: 100)
173
- xread_count: Messages to read per batch (default: 100)
174
- additional_streams: Additional streams to read from
175
- connection_kwargs: Additional Redis connection arguments
176
"""
177
178
async def startup(self) -> None:
179
"""Initialize consumer group on startup."""
180
181
async def kick(self, message: BrokerMessage) -> None:
182
"""
183
Add message to Redis stream.
184
185
Parameters:
186
- message: Message to add to stream
187
"""
188
189
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
190
"""
191
Listen for messages from Redis stream.
192
193
Yields:
194
- AckableMessage: Message with acknowledgement capability
195
"""
196
```
197
198
**Usage Example:**
199
200
```python
201
from taskiq_redis import RedisStreamBroker
202
203
# Create stream broker with custom configuration
204
broker = RedisStreamBroker(
205
url="redis://localhost:6379",
206
consumer_group_name="workers",
207
idle_timeout=300000 # 5 minutes
208
)
209
210
@broker.task
211
async def critical_task(data: dict) -> dict:
212
# Process important data
213
return {"processed": data, "timestamp": time.time()}
214
215
# Task will be acknowledged after processing
216
await critical_task.kiq({"important": "data"})
217
```
218
219
### Redis Cluster Brokers
220
221
Message brokers for Redis Cluster deployments, providing horizontal scaling across multiple Redis nodes.
222
223
#### List Queue Cluster Broker
224
225
```python { .api }
226
class ListQueueClusterBroker(BaseRedisClusterBroker):
227
def __init__(
228
self,
229
url: str,
230
queue_name: str = "taskiq",
231
max_connection_pool_size: int = 2**31,
232
**connection_kwargs: Any,
233
) -> None:
234
"""
235
Redis Cluster list queue broker.
236
237
Parameters:
238
- url: Redis cluster connection URL
239
- queue_name: Redis list key name (default: "taskiq")
240
- max_connection_pool_size: Maximum connections (default: 2**31)
241
- connection_kwargs: Additional Redis cluster connection arguments
242
"""
243
244
async def kick(self, message: BrokerMessage) -> None:
245
"""Add message to Redis cluster list queue."""
246
247
async def listen(self) -> AsyncGenerator[bytes, None]:
248
"""Listen for messages from Redis cluster list queue."""
249
250
async def shutdown(self) -> None:
251
"""Close Redis cluster connection."""
252
```
253
254
#### Redis Stream Cluster Broker
255
256
```python { .api }
257
class RedisStreamClusterBroker(BaseRedisClusterBroker):
258
def __init__(
259
self,
260
url: str,
261
queue_name: str = "taskiq",
262
max_connection_pool_size: int = 2**31,
263
consumer_group_name: str = "taskiq",
264
consumer_name: Optional[str] = None,
265
consumer_id: str = "$",
266
mkstream: bool = True,
267
xread_block: int = 10000,
268
maxlen: Optional[int] = None,
269
approximate: bool = True,
270
additional_streams: Optional[Dict[str, str]] = None,
271
**connection_kwargs: Any,
272
) -> None:
273
"""
274
Redis Cluster streams broker with acknowledgement support.
275
276
Similar parameters to RedisStreamBroker but for Redis Cluster.
277
"""
278
279
async def startup(self) -> None:
280
"""Initialize consumer group on startup."""
281
282
async def kick(self, message: BrokerMessage) -> None:
283
"""Add message to Redis cluster stream."""
284
285
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
286
"""Listen for messages from Redis cluster stream."""
287
```
288
289
### Redis Sentinel Brokers
290
291
Message brokers for Redis Sentinel deployments, providing high availability with automatic failover.
292
293
#### PubSub Sentinel Broker
294
295
```python { .api }
296
class PubSubSentinelBroker(BaseSentinelBroker):
297
def __init__(
298
self,
299
sentinels: List[Tuple[str, int]],
300
master_name: str,
301
result_backend: Optional[AsyncResultBackend[_T]] = None,
302
task_id_generator: Optional[Callable[[], str]] = None,
303
queue_name: str = "taskiq",
304
min_other_sentinels: int = 0,
305
sentinel_kwargs: Optional[Any] = None,
306
**connection_kwargs: Any,
307
) -> None:
308
"""
309
Redis Sentinel pub/sub broker.
310
311
Parameters:
312
- sentinels: List of sentinel (host, port) pairs
313
- master_name: Sentinel master name
314
- result_backend: Result backend for storing task results
315
- task_id_generator: Custom task ID generator function
316
- queue_name: Pub/sub channel name (default: "taskiq")
317
- min_other_sentinels: Minimum other sentinels required (default: 0)
318
- sentinel_kwargs: Additional sentinel configuration
319
- connection_kwargs: Additional Redis connection arguments
320
"""
321
322
async def kick(self, message: BrokerMessage) -> None:
323
"""Publish message to Redis Sentinel pub/sub channel."""
324
325
async def listen(self) -> AsyncGenerator[bytes, None]:
326
"""Listen for messages from Redis Sentinel pub/sub channel."""
327
```
328
329
#### List Queue Sentinel Broker
330
331
```python { .api }
332
class ListQueueSentinelBroker(BaseSentinelBroker):
333
def __init__(
334
self,
335
sentinels: List[Tuple[str, int]],
336
master_name: str,
337
**kwargs
338
) -> None:
339
"""Redis Sentinel list queue broker."""
340
341
async def kick(self, message: BrokerMessage) -> None:
342
"""Add message to Redis Sentinel list queue."""
343
344
async def listen(self) -> AsyncGenerator[bytes, None]:
345
"""Listen for messages from Redis Sentinel list queue."""
346
```
347
348
#### Redis Stream Sentinel Broker
349
350
```python { .api }
351
class RedisStreamSentinelBroker(BaseSentinelBroker):
352
def __init__(
353
self,
354
sentinels: List[Tuple[str, int]],
355
master_name: str,
356
min_other_sentinels: int = 0,
357
queue_name: str = "taskiq",
358
consumer_group_name: str = "taskiq",
359
consumer_name: Optional[str] = None,
360
consumer_id: str = "$",
361
mkstream: bool = True,
362
xread_block: int = 10000,
363
maxlen: Optional[int] = None,
364
approximate: bool = True,
365
additional_streams: Optional[Dict[str, str]] = None,
366
**connection_kwargs: Any,
367
) -> None:
368
"""Redis Sentinel streams broker with acknowledgement support."""
369
370
async def startup(self) -> None:
371
"""Initialize consumer group on startup."""
372
373
async def kick(self, message: BrokerMessage) -> None:
374
"""Add message to Redis Sentinel stream."""
375
376
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
377
"""Listen for messages from Redis Sentinel stream."""
378
```
379
380
**Usage Example:**
381
382
```python
383
from taskiq_redis import RedisStreamSentinelBroker
384
385
# Create high-availability stream broker
386
broker = RedisStreamSentinelBroker(
387
sentinels=[("sentinel1", 26379), ("sentinel2", 26379)],
388
master_name="mymaster",
389
consumer_group_name="ha-workers"
390
)
391
392
@broker.task
393
async def ha_task(data: str) -> str:
394
return f"Processed with HA: {data}"
395
```
396
397
## Types
398
399
```python { .api }
400
from typing import TypeVar, Callable, Optional, Any, AsyncGenerator, Dict, List, Tuple
401
from taskiq.abc.broker import AsyncBroker
402
from taskiq.abc.result_backend import AsyncResultBackend
403
from taskiq.message import BrokerMessage
404
from taskiq import AckableMessage
405
406
_T = TypeVar("_T")
407
408
class BaseRedisBroker(AsyncBroker):
409
"""Base class for Redis brokers."""
410
411
class BaseRedisClusterBroker(AsyncBroker):
412
"""Base class for Redis Cluster brokers."""
413
414
class BaseSentinelBroker(AsyncBroker):
415
"""Base class for Redis Sentinel brokers."""
416
```