0
# Brokers
1
2
Message brokers handle the routing, persistence, and delivery of task messages between actors and workers. Dramatiq supports Redis, RabbitMQ, and in-memory brokers for different deployment scenarios.
3
4
## Capabilities
5
6
### Broker Base Class
7
8
The abstract base class that defines the broker interface for all implementations.
9
10
```python { .api }
11
class Broker:
12
def __init__(self, middleware=None):
13
"""
14
Initialize broker with optional middleware.
15
16
Parameters:
17
- middleware: List of middleware instances
18
"""
19
20
def add_middleware(self, middleware, *, before=None, after=None):
21
"""
22
Add middleware to the broker.
23
24
Parameters:
25
- middleware: Middleware instance to add
26
- before: Add before this middleware class
27
- after: Add after this middleware class
28
"""
29
30
def declare_actor(self, actor):
31
"""
32
Register an actor with the broker.
33
34
Parameters:
35
- actor: Actor instance to register
36
"""
37
38
def declare_queue(self, queue_name):
39
"""
40
Declare a queue on the broker.
41
42
Parameters:
43
- queue_name: str - Name of the queue to declare
44
"""
45
46
def enqueue(self, message, *, delay=None) -> Message:
47
"""
48
Enqueue a message for processing.
49
50
Parameters:
51
- message: Message instance to enqueue
52
- delay: int - Delay in milliseconds before processing
53
54
Returns:
55
Message instance with updated metadata
56
"""
57
58
def consume(self, queue_name, prefetch=1, timeout=30000) -> Consumer:
59
"""
60
Create a consumer for a queue.
61
62
Parameters:
63
- queue_name: str - Queue to consume from
64
- prefetch: int - Number of messages to prefetch
65
- timeout: int - Consumer timeout in milliseconds
66
67
Returns:
68
Consumer instance for processing messages
69
"""
70
71
def get_actor(self, actor_name) -> Actor:
72
"""
73
Get registered actor by name.
74
75
Parameters:
76
- actor_name: str - Name of the actor
77
78
Returns:
79
Actor instance
80
81
Raises:
82
ActorNotFound: If actor is not registered
83
"""
84
85
def get_results_backend(self) -> ResultBackend:
86
"""
87
Get the results backend associated with this broker.
88
89
Returns:
90
ResultBackend instance or None
91
"""
92
93
def flush(self, queue_name):
94
"""
95
Remove all messages from a queue.
96
97
Parameters:
98
- queue_name: str - Queue to flush
99
"""
100
101
def flush_all(self):
102
"""Remove all messages from all queues."""
103
104
def join(self, queue_name, *, timeout=None):
105
"""
106
Wait for all messages in a queue to be processed.
107
108
Parameters:
109
- queue_name: str - Queue to wait for
110
- timeout: int - Timeout in milliseconds
111
112
Raises:
113
QueueJoinTimeout: If timeout is exceeded
114
"""
115
116
def close(self):
117
"""Close the broker and clean up connections."""
118
119
# Properties
120
actors: Dict[str, Actor] # Registered actors
121
queues: Dict[str, Queue] # Declared queues
122
middleware: List[Middleware] # Middleware stack
123
actor_options: Set[str] # Set of valid actor options
124
```
125
126
### Redis Broker
127
128
Production-ready broker using Redis as the message transport and storage backend.
129
130
```python { .api }
131
class RedisBroker(Broker):
132
def __init__(
133
self, *,
134
url: str = None,
135
middleware: List[Middleware] = None,
136
namespace: str = "dramatiq",
137
maintenance_chance: int = 1000,
138
heartbeat_timeout: int = 60000,
139
dead_message_ttl: int = 604800000,
140
requeue_deadline: int = None,
141
requeue_interval: int = None,
142
client: redis.Redis = None,
143
**parameters
144
):
145
"""
146
Create Redis broker instance.
147
148
Parameters:
149
- url: Redis connection URL (redis://host:port/db)
150
- middleware: List of middleware instances
151
- namespace: Key namespace prefix (default: "dramatiq")
152
- maintenance_chance: Probability of running maintenance (1/chance)
153
- heartbeat_timeout: Worker heartbeat timeout in ms
154
- dead_message_ttl: Dead message TTL in ms (7 days)
155
- requeue_deadline: Message requeue deadline in ms
156
- requeue_interval: Message requeue check interval in ms
157
- client: Existing Redis client instance
158
- **parameters: Additional Redis connection parameters
159
"""
160
```
161
162
**Usage:**
163
164
```python
165
# Basic Redis broker
166
redis_broker = RedisBroker(host="localhost", port=6379, db=0)
167
168
# Redis broker with URL
169
redis_broker = RedisBroker(url="redis://localhost:6379/0")
170
171
# Redis broker with custom settings
172
redis_broker = RedisBroker(
173
host="redis.example.com",
174
port=6379,
175
password="secret",
176
namespace="myapp",
177
heartbeat_timeout=120000, # 2 minutes
178
dead_message_ttl=86400000 # 1 day
179
)
180
181
# Redis broker with existing client
182
import redis
183
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
184
redis_broker = RedisBroker(client=redis_client)
185
186
dramatiq.set_broker(redis_broker)
187
```
188
189
### RabbitMQ Broker
190
191
Enterprise-grade broker using RabbitMQ for high-throughput message processing with advanced routing features.
192
193
```python { .api }
194
class RabbitmqBroker(Broker):
195
def __init__(
196
self, *,
197
confirm_delivery: bool = False,
198
url: str = None,
199
middleware: List[Middleware] = None,
200
max_priority: int = None,
201
parameters: pika.ConnectionParameters = None,
202
**kwargs
203
):
204
"""
205
Create RabbitMQ broker instance.
206
207
Parameters:
208
- confirm_delivery: Enable delivery confirmations
209
- url: AMQP connection URL (amqp://user:pass@host:port/vhost)
210
- middleware: List of middleware instances
211
- max_priority: Maximum message priority (enables priority queues)
212
- parameters: Pika ConnectionParameters instance
213
- **kwargs: Additional connection parameters
214
"""
215
```
216
217
**Usage:**
218
219
```python
220
# Basic RabbitMQ broker
221
rabbitmq_broker = RabbitmqBroker(host="localhost", port=5672)
222
223
# RabbitMQ broker with URL
224
rabbitmq_broker = RabbitmqBroker(
225
url="amqp://user:password@rabbitmq.example.com:5672/myapp"
226
)
227
228
# RabbitMQ broker with priorities and confirmations
229
rabbitmq_broker = RabbitmqBroker(
230
host="localhost",
231
port=5672,
232
confirm_delivery=True,
233
max_priority=255,
234
heartbeat=600,
235
connection_attempts=3
236
)
237
238
# RabbitMQ broker with custom parameters
239
import pika
240
params = pika.ConnectionParameters(
241
host="rabbitmq.example.com",
242
port=5672,
243
credentials=pika.PlainCredentials("user", "password"),
244
heartbeat=600
245
)
246
rabbitmq_broker = RabbitmqBroker(parameters=params)
247
248
dramatiq.set_broker(rabbitmq_broker)
249
```
250
251
### Stub Broker
252
253
In-memory broker for testing and development environments.
254
255
```python { .api }
256
class StubBroker(Broker):
257
def __init__(self, middleware=None):
258
"""
259
Create in-memory broker for testing.
260
261
Parameters:
262
- middleware: List of middleware instances
263
"""
264
265
# Testing-specific properties
266
dead_letters: List[Message] # All dead-lettered messages
267
dead_letters_by_queue: Dict[str, List[Message]] # Dead letters grouped by queue
268
```
269
270
**Usage:**
271
272
```python
273
# Create stub broker for testing
274
stub_broker = StubBroker()
275
dramatiq.set_broker(stub_broker)
276
277
# Define and test actors
278
@dramatiq.actor
279
def test_task(value):
280
return value * 2
281
282
# Send message
283
test_task.send(21)
284
285
# Process messages synchronously in tests
286
import dramatiq
287
worker = dramatiq.Worker(stub_broker, worker_timeout=100)
288
worker.start()
289
worker.join()
290
worker.stop()
291
292
# Check results
293
assert len(stub_broker.dead_letters) == 0 # No failures
294
```
295
296
### Broker Management
297
298
Global functions for managing the broker instance used by actors.
299
300
```python { .api }
301
def get_broker() -> Broker:
302
"""
303
Get the current global broker instance.
304
305
Returns:
306
Current global broker
307
308
Raises:
309
RuntimeError: If no broker has been set
310
"""
311
312
def set_broker(broker: Broker):
313
"""
314
Set the global broker instance.
315
316
Parameters:
317
- broker: Broker instance to set as global
318
"""
319
```
320
321
**Usage:**
322
323
```python
324
# Set up broker
325
redis_broker = RedisBroker()
326
dramatiq.set_broker(redis_broker)
327
328
# Get current broker
329
current_broker = dramatiq.get_broker()
330
print(f"Using broker: {type(current_broker).__name__}")
331
332
# Add middleware to current broker
333
from dramatiq.middleware import Prometheus
334
current_broker.add_middleware(Prometheus())
335
```
336
337
### Consumer Interface
338
339
Interface for consuming messages from broker queues.
340
341
```python { .api }
342
class Consumer:
343
"""
344
Interface for consuming messages from a queue.
345
346
Consumers are created by calling broker.consume() and provide
347
an iterator interface for processing messages.
348
"""
349
350
def __iter__(self):
351
"""Return iterator for message consumption."""
352
353
def __next__(self) -> Message:
354
"""
355
Get next message from queue.
356
357
Returns:
358
Message instance to process
359
360
Raises:
361
StopIteration: When no more messages or timeout
362
"""
363
364
def ack(self, message):
365
"""
366
Acknowledge successful message processing.
367
368
Parameters:
369
- message: Message to acknowledge
370
"""
371
372
def nack(self, message):
373
"""
374
Negative acknowledge failed message processing.
375
376
Parameters:
377
- message: Message to nack
378
"""
379
380
def close(self):
381
"""Close the consumer and clean up resources."""
382
```
383
384
### Message Proxy
385
386
Proxy object for delayed message operations and broker interaction.
387
388
```python { .api }
389
class MessageProxy:
390
"""
391
Proxy for message operations that may be delayed or batched.
392
393
Used internally by brokers for optimizing message operations.
394
"""
395
396
def __init__(self, broker, message):
397
"""
398
Create message proxy.
399
400
Parameters:
401
- broker: Broker instance
402
- message: Message instance
403
"""
404
```
405
406
### Advanced Broker Configuration
407
408
#### Custom Middleware Stack
409
410
```python
411
from dramatiq.middleware import AgeLimit, TimeLimit, Retries, Prometheus
412
413
# Create broker with custom middleware
414
custom_middleware = [
415
Prometheus(),
416
AgeLimit(max_age=3600000), # 1 hour
417
TimeLimit(time_limit=300000), # 5 minutes
418
Retries(max_retries=5)
419
]
420
421
broker = RedisBroker(middleware=custom_middleware)
422
dramatiq.set_broker(broker)
423
```
424
425
#### Connection Pooling and High Availability
426
427
```python
428
# Redis with connection pooling
429
import redis
430
pool = redis.ConnectionPool(
431
host="redis.example.com",
432
port=6379,
433
max_connections=20,
434
retry_on_timeout=True
435
)
436
redis_client = redis.Redis(connection_pool=pool)
437
redis_broker = RedisBroker(client=redis_client)
438
439
# RabbitMQ with HA setup
440
rabbitmq_broker = RabbitmqBroker(
441
url="amqp://user:pass@rabbitmq-cluster.example.com:5672/prod",
442
confirm_delivery=True,
443
connection_attempts=5,
444
retry_delay=2.0
445
)
446
```
447
448
#### Multi-Broker Setup
449
450
```python
451
# Different brokers for different environments
452
import os
453
454
if os.getenv("ENVIRONMENT") == "production":
455
broker = RabbitmqBroker(
456
url=os.getenv("RABBITMQ_URL"),
457
confirm_delivery=True
458
)
459
elif os.getenv("ENVIRONMENT") == "development":
460
broker = RedisBroker(
461
url=os.getenv("REDIS_URL", "redis://localhost:6379/0")
462
)
463
else: # testing
464
broker = StubBroker()
465
466
dramatiq.set_broker(broker)
467
```
468
469
### Broker-Specific Features
470
471
#### Redis-Specific Operations
472
473
```python
474
redis_broker = RedisBroker()
475
476
# Access underlying Redis client
477
redis_client = redis_broker.client
478
479
# Custom Redis operations
480
redis_client.set("custom_key", "value")
481
queue_length = redis_client.llen(f"{redis_broker.namespace}:default.msgs")
482
```
483
484
#### RabbitMQ-Specific Operations
485
486
```python
487
rabbitmq_broker = RabbitmqBroker(max_priority=10)
488
489
# Priority queue support (RabbitMQ only)
490
@dramatiq.actor(priority=5)
491
def high_priority_task():
492
pass
493
494
@dramatiq.actor(priority=1) # Higher priority (lower number)
495
def critical_task():
496
pass
497
```
498
499
### Error Handling
500
501
Brokers raise specific exceptions for different error conditions:
502
503
```python
504
try:
505
broker.get_actor("nonexistent_actor")
506
except dramatiq.ActorNotFound:
507
print("Actor not found")
508
509
try:
510
broker.join("queue_name", timeout=5000)
511
except dramatiq.QueueJoinTimeout:
512
print("Queue join timed out")
513
514
try:
515
message = broker.enqueue(invalid_message)
516
except dramatiq.BrokerError as e:
517
print(f"Broker error: {e}")
518
```