or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdindex.mdresult-backends.mdschedule-sources.md

brokers.mddocs/

0

# Message Brokers

1

2

TaskIQ-Redis provides comprehensive message brokers for different Redis deployment architectures and message patterns. Each broker type supports different message delivery guarantees and is optimized for specific use cases.

3

4

## Capabilities

5

6

### Standard Redis Brokers

7

8

Message brokers for single Redis instance deployments.

9

10

#### PubSub Broadcasting Broker

11

12

Broadcasts messages to all connected workers using Redis pub/sub. Messages are fire-and-forget with no delivery guarantees or acknowledgements.

13

14

```python { .api }

15

class PubSubBroker(BaseRedisBroker):

16

def __init__(

17

self,

18

url: str,

19

task_id_generator: Optional[Callable[[], str]] = None,

20

result_backend: Optional[AsyncResultBackend[_T]] = None,

21

queue_name: str = "taskiq",

22

max_connection_pool_size: Optional[int] = None,

23

**connection_kwargs: Any,

24

) -> None:

25

"""

26

Redis pub/sub broker for broadcasting tasks.

27

28

Parameters:

29

- url: Redis connection URL

30

- task_id_generator: Custom task ID generator function

31

- result_backend: Result backend for storing task results

32

- queue_name: Redis pub/sub channel name (default: "taskiq")

33

- max_connection_pool_size: Maximum connections in pool

34

- connection_kwargs: Additional Redis connection arguments

35

"""

36

37

async def kick(self, message: BrokerMessage) -> None:

38

"""

39

Publish message to Redis pub/sub channel.

40

41

Parameters:

42

- message: Message to broadcast to all subscribers

43

"""

44

45

async def listen(self) -> AsyncGenerator[bytes, None]:

46

"""

47

Listen for messages on Redis pub/sub channel.

48

49

Yields:

50

- bytes: Raw message data from pub/sub channel

51

"""

52

53

async def shutdown(self) -> None:

54

"""Close Redis connection pool."""

55

```

56

57

**Usage Example:**

58

59

```python

60

from taskiq_redis import PubSubBroker

61

62

# Create pub/sub broker

63

broker = PubSubBroker("redis://localhost:6379")

64

65

@broker.task

66

async def broadcast_task(message: str) -> str:

67

return f"Processed: {message}"

68

69

# All connected workers will receive this task

70

await broadcast_task.kiq("Hello workers!")

71

```

72

73

#### List Queue Broker

74

75

Distributes tasks between workers using Redis lists. Tasks are queued and distributed to available workers with simple load balancing.

76

77

```python { .api }

78

class ListQueueBroker(BaseRedisBroker):

79

def __init__(

80

self,

81

url: str,

82

task_id_generator: Optional[Callable[[], str]] = None,

83

result_backend: Optional[AsyncResultBackend[_T]] = None,

84

queue_name: str = "taskiq",

85

max_connection_pool_size: Optional[int] = None,

86

**connection_kwargs: Any,

87

) -> None:

88

"""

89

Redis list-based queue broker for task distribution.

90

91

Parameters:

92

- url: Redis connection URL

93

- task_id_generator: Custom task ID generator function

94

- result_backend: Result backend for storing task results

95

- queue_name: Redis list key name (default: "taskiq")

96

- max_connection_pool_size: Maximum connections in pool

97

- connection_kwargs: Additional Redis connection arguments

98

"""

99

100

async def kick(self, message: BrokerMessage) -> None:

101

"""

102

Add message to Redis list queue.

103

104

Parameters:

105

- message: Message to queue for worker processing

106

"""

107

108

async def listen(self) -> AsyncGenerator[bytes, None]:

109

"""

110

Listen for messages from Redis list queue.

111

112

Yields:

113

- bytes: Raw message data from queue

114

"""

115

```

116

117

**Usage Example:**

118

119

```python

120

from taskiq_redis import ListQueueBroker

121

122

# Create list queue broker

123

broker = ListQueueBroker("redis://localhost:6379", queue_name="my_tasks")

124

125

@broker.task

126

async def process_item(item_id: int) -> dict:

127

return {"item_id": item_id, "status": "processed"}

128

129

# Task will be distributed to next available worker

130

await process_item.kiq(123)

131

```

132

133

#### Redis Stream Broker

134

135

Uses Redis streams for reliable message processing with acknowledgement support, consumer groups, and automatic message redelivery.

136

137

```python { .api }

138

class RedisStreamBroker(BaseRedisBroker):

139

def __init__(

140

self,

141

url: str,

142

queue_name: str = "taskiq",

143

max_connection_pool_size: Optional[int] = None,

144

consumer_group_name: str = "taskiq",

145

consumer_name: Optional[str] = None,

146

consumer_id: str = "$",

147

mkstream: bool = True,

148

xread_block: int = 2000,

149

maxlen: Optional[int] = None,

150

approximate: bool = True,

151

idle_timeout: int = 600000,

152

unacknowledged_batch_size: int = 100,

153

xread_count: Optional[int] = 100,

154

additional_streams: Optional[Dict[str, str]] = None,

155

**connection_kwargs: Any,

156

) -> None:

157

"""

158

Redis streams broker with acknowledgement support.

159

160

Parameters:

161

- url: Redis connection URL

162

- queue_name: Redis stream key name (default: "taskiq")

163

- max_connection_pool_size: Maximum connections in pool

164

- consumer_group_name: Consumer group name (default: "taskiq")

165

- consumer_name: Consumer name (default: random UUID)

166

- consumer_id: Consumer starting position (default: "$")

167

- mkstream: Create stream if it doesn't exist (default: True)

168

- xread_block: Block time in ms for stream reads (default: 2000)

169

- maxlen: Maximum stream length for trimming (default: None)

170

- approximate: Use approximate trimming (default: True)

171

- idle_timeout: Message redelivery timeout in ms (default: 600000)

172

- unacknowledged_batch_size: Batch size for reclaiming messages (default: 100)

173

- xread_count: Messages to read per batch (default: 100)

174

- additional_streams: Additional streams to read from

175

- connection_kwargs: Additional Redis connection arguments

176

"""

177

178

async def startup(self) -> None:

179

"""Initialize consumer group on startup."""

180

181

async def kick(self, message: BrokerMessage) -> None:

182

"""

183

Add message to Redis stream.

184

185

Parameters:

186

- message: Message to add to stream

187

"""

188

189

async def listen(self) -> AsyncGenerator[AckableMessage, None]:

190

"""

191

Listen for messages from Redis stream.

192

193

Yields:

194

- AckableMessage: Message with acknowledgement capability

195

"""

196

```

197

198

**Usage Example:**

199

200

```python

201

from taskiq_redis import RedisStreamBroker

202

203

# Create stream broker with custom configuration

204

broker = RedisStreamBroker(

205

url="redis://localhost:6379",

206

consumer_group_name="workers",

207

idle_timeout=300000 # 5 minutes

208

)

209

210

@broker.task

211

async def critical_task(data: dict) -> dict:

212

# Process important data

213

return {"processed": data, "timestamp": time.time()}

214

215

# Task will be acknowledged after processing

216

await critical_task.kiq({"important": "data"})

217

```

218

219

### Redis Cluster Brokers

220

221

Message brokers for Redis Cluster deployments, providing horizontal scaling across multiple Redis nodes.

222

223

#### List Queue Cluster Broker

224

225

```python { .api }

226

class ListQueueClusterBroker(BaseRedisClusterBroker):

227

def __init__(

228

self,

229

url: str,

230

queue_name: str = "taskiq",

231

max_connection_pool_size: int = 2**31,

232

**connection_kwargs: Any,

233

) -> None:

234

"""

235

Redis Cluster list queue broker.

236

237

Parameters:

238

- url: Redis cluster connection URL

239

- queue_name: Redis list key name (default: "taskiq")

240

- max_connection_pool_size: Maximum connections (default: 2**31)

241

- connection_kwargs: Additional Redis cluster connection arguments

242

"""

243

244

async def kick(self, message: BrokerMessage) -> None:

245

"""Add message to Redis cluster list queue."""

246

247

async def listen(self) -> AsyncGenerator[bytes, None]:

248

"""Listen for messages from Redis cluster list queue."""

249

250

async def shutdown(self) -> None:

251

"""Close Redis cluster connection."""

252

```

253

254

#### Redis Stream Cluster Broker

255

256

```python { .api }

257

class RedisStreamClusterBroker(BaseRedisClusterBroker):

258

def __init__(

259

self,

260

url: str,

261

queue_name: str = "taskiq",

262

max_connection_pool_size: int = 2**31,

263

consumer_group_name: str = "taskiq",

264

consumer_name: Optional[str] = None,

265

consumer_id: str = "$",

266

mkstream: bool = True,

267

xread_block: int = 10000,

268

maxlen: Optional[int] = None,

269

approximate: bool = True,

270

additional_streams: Optional[Dict[str, str]] = None,

271

**connection_kwargs: Any,

272

) -> None:

273

"""

274

Redis Cluster streams broker with acknowledgement support.

275

276

Similar parameters to RedisStreamBroker but for Redis Cluster.

277

"""

278

279

async def startup(self) -> None:

280

"""Initialize consumer group on startup."""

281

282

async def kick(self, message: BrokerMessage) -> None:

283

"""Add message to Redis cluster stream."""

284

285

async def listen(self) -> AsyncGenerator[AckableMessage, None]:

286

"""Listen for messages from Redis cluster stream."""

287

```

288

289

### Redis Sentinel Brokers

290

291

Message brokers for Redis Sentinel deployments, providing high availability with automatic failover.

292

293

#### PubSub Sentinel Broker

294

295

```python { .api }

296

class PubSubSentinelBroker(BaseSentinelBroker):

297

def __init__(

298

self,

299

sentinels: List[Tuple[str, int]],

300

master_name: str,

301

result_backend: Optional[AsyncResultBackend[_T]] = None,

302

task_id_generator: Optional[Callable[[], str]] = None,

303

queue_name: str = "taskiq",

304

min_other_sentinels: int = 0,

305

sentinel_kwargs: Optional[Any] = None,

306

**connection_kwargs: Any,

307

) -> None:

308

"""

309

Redis Sentinel pub/sub broker.

310

311

Parameters:

312

- sentinels: List of sentinel (host, port) pairs

313

- master_name: Sentinel master name

314

- result_backend: Result backend for storing task results

315

- task_id_generator: Custom task ID generator function

316

- queue_name: Pub/sub channel name (default: "taskiq")

317

- min_other_sentinels: Minimum other sentinels required (default: 0)

318

- sentinel_kwargs: Additional sentinel configuration

319

- connection_kwargs: Additional Redis connection arguments

320

"""

321

322

async def kick(self, message: BrokerMessage) -> None:

323

"""Publish message to Redis Sentinel pub/sub channel."""

324

325

async def listen(self) -> AsyncGenerator[bytes, None]:

326

"""Listen for messages from Redis Sentinel pub/sub channel."""

327

```

328

329

#### List Queue Sentinel Broker

330

331

```python { .api }

332

class ListQueueSentinelBroker(BaseSentinelBroker):

333

def __init__(

334

self,

335

sentinels: List[Tuple[str, int]],

336

master_name: str,

337

**kwargs

338

) -> None:

339

"""Redis Sentinel list queue broker."""

340

341

async def kick(self, message: BrokerMessage) -> None:

342

"""Add message to Redis Sentinel list queue."""

343

344

async def listen(self) -> AsyncGenerator[bytes, None]:

345

"""Listen for messages from Redis Sentinel list queue."""

346

```

347

348

#### Redis Stream Sentinel Broker

349

350

```python { .api }

351

class RedisStreamSentinelBroker(BaseSentinelBroker):

352

def __init__(

353

self,

354

sentinels: List[Tuple[str, int]],

355

master_name: str,

356

min_other_sentinels: int = 0,

357

queue_name: str = "taskiq",

358

consumer_group_name: str = "taskiq",

359

consumer_name: Optional[str] = None,

360

consumer_id: str = "$",

361

mkstream: bool = True,

362

xread_block: int = 10000,

363

maxlen: Optional[int] = None,

364

approximate: bool = True,

365

additional_streams: Optional[Dict[str, str]] = None,

366

**connection_kwargs: Any,

367

) -> None:

368

"""Redis Sentinel streams broker with acknowledgement support."""

369

370

async def startup(self) -> None:

371

"""Initialize consumer group on startup."""

372

373

async def kick(self, message: BrokerMessage) -> None:

374

"""Add message to Redis Sentinel stream."""

375

376

async def listen(self) -> AsyncGenerator[AckableMessage, None]:

377

"""Listen for messages from Redis Sentinel stream."""

378

```

379

380

**Usage Example:**

381

382

```python

383

from taskiq_redis import RedisStreamSentinelBroker

384

385

# Create high-availability stream broker

386

broker = RedisStreamSentinelBroker(

387

sentinels=[("sentinel1", 26379), ("sentinel2", 26379)],

388

master_name="mymaster",

389

consumer_group_name="ha-workers"

390

)

391

392

@broker.task

393

async def ha_task(data: str) -> str:

394

return f"Processed with HA: {data}"

395

```

396

397

## Types

398

399

```python { .api }

400

from typing import TypeVar, Callable, Optional, Any, AsyncGenerator, Dict, List, Tuple

401

from taskiq.abc.broker import AsyncBroker

402

from taskiq.abc.result_backend import AsyncResultBackend

403

from taskiq.message import BrokerMessage

404

from taskiq import AckableMessage

405

406

_T = TypeVar("_T")

407

408

class BaseRedisBroker(AsyncBroker):

409

"""Base class for Redis brokers."""

410

411

class BaseRedisClusterBroker(AsyncBroker):

412

"""Base class for Redis Cluster brokers."""

413

414

class BaseSentinelBroker(AsyncBroker):

415

"""Base class for Redis Sentinel brokers."""

416

```