or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

brokers.mddocs/

0

# Brokers

1

2

Message brokers handle the routing, persistence, and delivery of task messages between actors and workers. Dramatiq supports Redis, RabbitMQ, and in-memory brokers for different deployment scenarios.

3

4

## Capabilities

5

6

### Broker Base Class

7

8

The abstract base class that defines the broker interface for all implementations.

9

10

```python { .api }

11

class Broker:

12

def __init__(self, middleware=None):

13

"""

14

Initialize broker with optional middleware.

15

16

Parameters:

17

- middleware: List of middleware instances

18

"""

19

20

def add_middleware(self, middleware, *, before=None, after=None):

21

"""

22

Add middleware to the broker.

23

24

Parameters:

25

- middleware: Middleware instance to add

26

- before: Add before this middleware class

27

- after: Add after this middleware class

28

"""

29

30

def declare_actor(self, actor):

31

"""

32

Register an actor with the broker.

33

34

Parameters:

35

- actor: Actor instance to register

36

"""

37

38

def declare_queue(self, queue_name):

39

"""

40

Declare a queue on the broker.

41

42

Parameters:

43

- queue_name: str - Name of the queue to declare

44

"""

45

46

def enqueue(self, message, *, delay=None) -> Message:

47

"""

48

Enqueue a message for processing.

49

50

Parameters:

51

- message: Message instance to enqueue

52

- delay: int - Delay in milliseconds before processing

53

54

Returns:

55

Message instance with updated metadata

56

"""

57

58

def consume(self, queue_name, prefetch=1, timeout=30000) -> Consumer:

59

"""

60

Create a consumer for a queue.

61

62

Parameters:

63

- queue_name: str - Queue to consume from

64

- prefetch: int - Number of messages to prefetch

65

- timeout: int - Consumer timeout in milliseconds

66

67

Returns:

68

Consumer instance for processing messages

69

"""

70

71

def get_actor(self, actor_name) -> Actor:

72

"""

73

Get registered actor by name.

74

75

Parameters:

76

- actor_name: str - Name of the actor

77

78

Returns:

79

Actor instance

80

81

Raises:

82

ActorNotFound: If actor is not registered

83

"""

84

85

def get_results_backend(self) -> ResultBackend:

86

"""

87

Get the results backend associated with this broker.

88

89

Returns:

90

ResultBackend instance or None

91

"""

92

93

def flush(self, queue_name):

94

"""

95

Remove all messages from a queue.

96

97

Parameters:

98

- queue_name: str - Queue to flush

99

"""

100

101

def flush_all(self):

102

"""Remove all messages from all queues."""

103

104

def join(self, queue_name, *, timeout=None):

105

"""

106

Wait for all messages in a queue to be processed.

107

108

Parameters:

109

- queue_name: str - Queue to wait for

110

- timeout: int - Timeout in milliseconds

111

112

Raises:

113

QueueJoinTimeout: If timeout is exceeded

114

"""

115

116

def close(self):

117

"""Close the broker and clean up connections."""

118

119

# Properties

120

actors: Dict[str, Actor] # Registered actors

121

queues: Dict[str, Queue] # Declared queues

122

middleware: List[Middleware] # Middleware stack

123

actor_options: Set[str] # Set of valid actor options

124

```

125

126

### Redis Broker

127

128

Production-ready broker using Redis as the message transport and storage backend.

129

130

```python { .api }

131

class RedisBroker(Broker):

132

def __init__(

133

self, *,

134

url: str = None,

135

middleware: List[Middleware] = None,

136

namespace: str = "dramatiq",

137

maintenance_chance: int = 1000,

138

heartbeat_timeout: int = 60000,

139

dead_message_ttl: int = 604800000,

140

requeue_deadline: int = None,

141

requeue_interval: int = None,

142

client: redis.Redis = None,

143

**parameters

144

):

145

"""

146

Create Redis broker instance.

147

148

Parameters:

149

- url: Redis connection URL (redis://host:port/db)

150

- middleware: List of middleware instances

151

- namespace: Key namespace prefix (default: "dramatiq")

152

- maintenance_chance: Probability of running maintenance (1/chance)

153

- heartbeat_timeout: Worker heartbeat timeout in ms

154

- dead_message_ttl: Dead message TTL in ms (7 days)

155

- requeue_deadline: Message requeue deadline in ms

156

- requeue_interval: Message requeue check interval in ms

157

- client: Existing Redis client instance

158

- **parameters: Additional Redis connection parameters

159

"""

160

```

161

162

**Usage:**

163

164

```python

165

# Basic Redis broker

166

redis_broker = RedisBroker(host="localhost", port=6379, db=0)

167

168

# Redis broker with URL

169

redis_broker = RedisBroker(url="redis://localhost:6379/0")

170

171

# Redis broker with custom settings

172

redis_broker = RedisBroker(

173

host="redis.example.com",

174

port=6379,

175

password="secret",

176

namespace="myapp",

177

heartbeat_timeout=120000, # 2 minutes

178

dead_message_ttl=86400000 # 1 day

179

)

180

181

# Redis broker with existing client

182

import redis

183

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

184

redis_broker = RedisBroker(client=redis_client)

185

186

dramatiq.set_broker(redis_broker)

187

```

188

189

### RabbitMQ Broker

190

191

Enterprise-grade broker using RabbitMQ for high-throughput message processing with advanced routing features.

192

193

```python { .api }

194

class RabbitmqBroker(Broker):

195

def __init__(

196

self, *,

197

confirm_delivery: bool = False,

198

url: str = None,

199

middleware: List[Middleware] = None,

200

max_priority: int = None,

201

parameters: pika.ConnectionParameters = None,

202

**kwargs

203

):

204

"""

205

Create RabbitMQ broker instance.

206

207

Parameters:

208

- confirm_delivery: Enable delivery confirmations

209

- url: AMQP connection URL (amqp://user:pass@host:port/vhost)

210

- middleware: List of middleware instances

211

- max_priority: Maximum message priority (enables priority queues)

212

- parameters: Pika ConnectionParameters instance

213

- **kwargs: Additional connection parameters

214

"""

215

```

216

217

**Usage:**

218

219

```python

220

# Basic RabbitMQ broker

221

rabbitmq_broker = RabbitmqBroker(host="localhost", port=5672)

222

223

# RabbitMQ broker with URL

224

rabbitmq_broker = RabbitmqBroker(

225

url="amqp://user:password@rabbitmq.example.com:5672/myapp"

226

)

227

228

# RabbitMQ broker with priorities and confirmations

229

rabbitmq_broker = RabbitmqBroker(

230

host="localhost",

231

port=5672,

232

confirm_delivery=True,

233

max_priority=255,

234

heartbeat=600,

235

connection_attempts=3

236

)

237

238

# RabbitMQ broker with custom parameters

239

import pika

240

params = pika.ConnectionParameters(

241

host="rabbitmq.example.com",

242

port=5672,

243

credentials=pika.PlainCredentials("user", "password"),

244

heartbeat=600

245

)

246

rabbitmq_broker = RabbitmqBroker(parameters=params)

247

248

dramatiq.set_broker(rabbitmq_broker)

249

```

250

251

### Stub Broker

252

253

In-memory broker for testing and development environments.

254

255

```python { .api }

256

class StubBroker(Broker):

257

def __init__(self, middleware=None):

258

"""

259

Create in-memory broker for testing.

260

261

Parameters:

262

- middleware: List of middleware instances

263

"""

264

265

# Testing-specific properties

266

dead_letters: List[Message] # All dead-lettered messages

267

dead_letters_by_queue: Dict[str, List[Message]] # Dead letters grouped by queue

268

```

269

270

**Usage:**

271

272

```python

273

# Create stub broker for testing

274

stub_broker = StubBroker()

275

dramatiq.set_broker(stub_broker)

276

277

# Define and test actors

278

@dramatiq.actor

279

def test_task(value):

280

return value * 2

281

282

# Send message

283

test_task.send(21)

284

285

# Process messages synchronously in tests

286

import dramatiq

287

worker = dramatiq.Worker(stub_broker, worker_timeout=100)

288

worker.start()

289

worker.join()

290

worker.stop()

291

292

# Check results

293

assert len(stub_broker.dead_letters) == 0 # No failures

294

```

295

296

### Broker Management

297

298

Global functions for managing the broker instance used by actors.

299

300

```python { .api }

301

def get_broker() -> Broker:

302

"""

303

Get the current global broker instance.

304

305

Returns:

306

Current global broker

307

308

Raises:

309

RuntimeError: If no broker has been set

310

"""

311

312

def set_broker(broker: Broker):

313

"""

314

Set the global broker instance.

315

316

Parameters:

317

- broker: Broker instance to set as global

318

"""

319

```

320

321

**Usage:**

322

323

```python

324

# Set up broker

325

redis_broker = RedisBroker()

326

dramatiq.set_broker(redis_broker)

327

328

# Get current broker

329

current_broker = dramatiq.get_broker()

330

print(f"Using broker: {type(current_broker).__name__}")

331

332

# Add middleware to current broker

333

from dramatiq.middleware import Prometheus

334

current_broker.add_middleware(Prometheus())

335

```

336

337

### Consumer Interface

338

339

Interface for consuming messages from broker queues.

340

341

```python { .api }

342

class Consumer:

343

"""

344

Interface for consuming messages from a queue.

345

346

Consumers are created by calling broker.consume() and provide

347

an iterator interface for processing messages.

348

"""

349

350

def __iter__(self):

351

"""Return iterator for message consumption."""

352

353

def __next__(self) -> Message:

354

"""

355

Get next message from queue.

356

357

Returns:

358

Message instance to process

359

360

Raises:

361

StopIteration: When no more messages or timeout

362

"""

363

364

def ack(self, message):

365

"""

366

Acknowledge successful message processing.

367

368

Parameters:

369

- message: Message to acknowledge

370

"""

371

372

def nack(self, message):

373

"""

374

Negative acknowledge failed message processing.

375

376

Parameters:

377

- message: Message to nack

378

"""

379

380

def close(self):

381

"""Close the consumer and clean up resources."""

382

```

383

384

### Message Proxy

385

386

Proxy object for delayed message operations and broker interaction.

387

388

```python { .api }

389

class MessageProxy:

390

"""

391

Proxy for message operations that may be delayed or batched.

392

393

Used internally by brokers for optimizing message operations.

394

"""

395

396

def __init__(self, broker, message):

397

"""

398

Create message proxy.

399

400

Parameters:

401

- broker: Broker instance

402

- message: Message instance

403

"""

404

```

405

406

### Advanced Broker Configuration

407

408

#### Custom Middleware Stack

409

410

```python

411

from dramatiq.middleware import AgeLimit, TimeLimit, Retries, Prometheus

412

413

# Create broker with custom middleware

414

custom_middleware = [

415

Prometheus(),

416

AgeLimit(max_age=3600000), # 1 hour

417

TimeLimit(time_limit=300000), # 5 minutes

418

Retries(max_retries=5)

419

]

420

421

broker = RedisBroker(middleware=custom_middleware)

422

dramatiq.set_broker(broker)

423

```

424

425

#### Connection Pooling and High Availability

426

427

```python

428

# Redis with connection pooling

429

import redis

430

pool = redis.ConnectionPool(

431

host="redis.example.com",

432

port=6379,

433

max_connections=20,

434

retry_on_timeout=True

435

)

436

redis_client = redis.Redis(connection_pool=pool)

437

redis_broker = RedisBroker(client=redis_client)

438

439

# RabbitMQ with HA setup

440

rabbitmq_broker = RabbitmqBroker(

441

url="amqp://user:pass@rabbitmq-cluster.example.com:5672/prod",

442

confirm_delivery=True,

443

connection_attempts=5,

444

retry_delay=2.0

445

)

446

```

447

448

#### Multi-Broker Setup

449

450

```python

451

# Different brokers for different environments

452

import os

453

454

if os.getenv("ENVIRONMENT") == "production":

455

broker = RabbitmqBroker(

456

url=os.getenv("RABBITMQ_URL"),

457

confirm_delivery=True

458

)

459

elif os.getenv("ENVIRONMENT") == "development":

460

broker = RedisBroker(

461

url=os.getenv("REDIS_URL", "redis://localhost:6379/0")

462

)

463

else: # testing

464

broker = StubBroker()

465

466

dramatiq.set_broker(broker)

467

```

468

469

### Broker-Specific Features

470

471

#### Redis-Specific Operations

472

473

```python

474

redis_broker = RedisBroker()

475

476

# Access underlying Redis client

477

redis_client = redis_broker.client

478

479

# Custom Redis operations

480

redis_client.set("custom_key", "value")

481

queue_length = redis_client.llen(f"{redis_broker.namespace}:default.msgs")

482

```

483

484

#### RabbitMQ-Specific Operations

485

486

```python

487

rabbitmq_broker = RabbitmqBroker(max_priority=10)

488

489

# Priority queue support (RabbitMQ only)

490

@dramatiq.actor(priority=5)

491

def high_priority_task():

492

pass

493

494

@dramatiq.actor(priority=1) # Higher priority (lower number)

495

def critical_task():

496

pass

497

```

498

499

### Error Handling

500

501

Brokers raise specific exceptions for different error conditions:

502

503

```python

504

try:

505

broker.get_actor("nonexistent_actor")

506

except dramatiq.ActorNotFound:

507

print("Actor not found")

508

509

try:

510

broker.join("queue_name", timeout=5000)

511

except dramatiq.QueueJoinTimeout:

512

print("Queue join timed out")

513

514

try:

515

message = broker.enqueue(invalid_message)

516

except dramatiq.BrokerError as e:

517

print(f"Broker error: {e}")

518

```