0
# Broker Integration
1
2
Multi-broker support for monitoring message queues across different broker types including RabbitMQ, Redis, and variants with comprehensive queue statistics.
3
4
## Capabilities
5
6
### Broker Factory
7
8
```python { .api }
9
class Broker:
10
"""
11
Broker abstraction factory for different message brokers.
12
13
Automatically selects appropriate broker implementation based on URL scheme.
14
"""
15
16
def __new__(cls, broker_url, *args, **kwargs):
17
"""
18
Create appropriate broker instance based on URL scheme.
19
20
Args:
21
broker_url (str): Broker connection URL with scheme
22
*args: Additional arguments passed to broker implementation
23
**kwargs: Additional keyword arguments passed to broker implementation
24
25
Returns:
26
BrokerBase: Broker implementation instance (RabbitMQ, Redis, etc.)
27
28
Supported schemes:
29
- 'amqp': RabbitMQ broker
30
- 'redis': Redis broker
31
- 'rediss': SSL Redis broker
32
- 'redis+socket': Unix socket Redis broker
33
- 'sentinel': Redis Sentinel broker
34
"""
35
36
async def queues(self, names):
37
"""
38
Get queue information for specified queues.
39
40
Args:
41
names (list): List of queue names to query
42
43
Returns:
44
dict: Queue information keyed by queue name
45
46
Async method that returns queue statistics including message counts,
47
consumer information, and queue configuration.
48
"""
49
```
50
51
### Broker Base Class
52
53
```python { .api }
54
class BrokerBase:
55
"""Abstract base class for broker implementations."""
56
57
def __init__(self, broker_url, *_, **__):
58
"""
59
Initialize broker with connection URL.
60
61
Args:
62
broker_url (str): Broker connection URL
63
"""
64
65
async def queues(self, names):
66
"""
67
Abstract method for retrieving queue information.
68
69
Args:
70
names (list): Queue names to query
71
72
Returns:
73
dict: Queue statistics and configuration
74
"""
75
```
76
77
### RabbitMQ Support
78
79
```python { .api }
80
class RabbitMQ(BrokerBase):
81
"""
82
RabbitMQ broker integration using Management API.
83
84
Supports comprehensive queue monitoring through RabbitMQ's
85
HTTP Management API.
86
"""
87
88
async def queues(self, names):
89
"""
90
Get RabbitMQ queue information via Management API.
91
92
Returns detailed queue statistics including:
93
- Message counts (ready, unacked, total)
94
- Consumer counts and details
95
- Queue configuration and properties
96
- Memory usage and performance metrics
97
"""
98
99
def __init__(self, broker_url, http_api, io_loop=None, **kwargs):
100
"""
101
Initialize RabbitMQ broker.
102
103
Args:
104
broker_url (str): AMQP connection URL
105
http_api (str): HTTP Management API URL
106
io_loop: Tornado IOLoop instance
107
**kwargs: Additional broker options
108
"""
109
110
@classmethod
111
def validate_http_api(cls, http_api):
112
"""
113
Validate HTTP Management API URL format.
114
115
Args:
116
http_api (str): Management API URL to validate
117
118
Raises:
119
ValueError: If URL scheme is invalid
120
"""
121
```
122
123
### Redis Base Class
124
125
```python { .api }
126
class RedisBase(BrokerBase):
127
"""
128
Base class for Redis broker implementations.
129
130
Provides common functionality for Redis-based brokers including
131
priority queue support and message counting.
132
"""
133
134
DEFAULT_SEP = '\x06\x16' # Priority separator
135
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] # Default priority levels
136
137
def __init__(self, broker_url, *_, **kwargs):
138
"""
139
Initialize Redis base broker.
140
141
Args:
142
broker_url (str): Redis connection URL
143
**kwargs: Broker options including:
144
- priority_steps: Custom priority levels
145
- sep: Priority separator character
146
- global_keyprefix: Key prefix for all operations
147
"""
148
149
def _q_for_pri(self, queue, pri):
150
"""
151
Generate priority-specific queue name.
152
153
Args:
154
queue (str): Base queue name
155
pri (int): Priority level
156
157
Returns:
158
str: Priority-specific queue name
159
"""
160
161
async def queues(self, names):
162
"""
163
Get queue statistics for Redis queues.
164
165
Args:
166
names (list): Queue names to query
167
168
Returns:
169
list: Queue statistics with message counts
170
171
Counts messages across all priority levels for each queue.
172
"""
173
```
174
175
### Redis Standard Implementation
176
177
```python { .api }
178
class Redis(RedisBase):
179
"""
180
Standard Redis broker implementation.
181
182
Supports Redis database selection and standard authentication.
183
"""
184
185
def __init__(self, broker_url, *args, **kwargs):
186
"""
187
Initialize Redis broker.
188
189
Args:
190
broker_url (str): Redis connection URL (redis://host:port/db)
191
"""
192
193
def _prepare_virtual_host(self, vhost):
194
"""
195
Convert virtual host to Redis database number.
196
197
Args:
198
vhost (str): Virtual host from URL
199
200
Returns:
201
int: Redis database number (0-15)
202
"""
203
204
def _get_redis_client_args(self):
205
"""
206
Get Redis client connection arguments.
207
208
Returns:
209
dict: Connection parameters for Redis client
210
"""
211
212
def _get_redis_client(self):
213
"""
214
Create Redis client instance.
215
216
Returns:
217
redis.Redis: Configured Redis client
218
"""
219
```
220
221
### Redis SSL Implementation
222
223
```python { .api }
224
class RedisSsl(Redis):
225
"""
226
Redis SSL/TLS broker implementation.
227
228
Provides encrypted connections to Redis using SSL/TLS with
229
configurable SSL parameters.
230
"""
231
232
def __init__(self, broker_url, *args, **kwargs):
233
"""
234
Initialize SSL Redis broker.
235
236
Args:
237
broker_url (str): Redis SSL URL (rediss://host:port/db)
238
**kwargs: Must include 'broker_use_ssl' configuration
239
240
Raises:
241
ValueError: If broker_use_ssl is not configured
242
"""
243
244
def _get_redis_client_args(self):
245
"""
246
Get SSL-enabled Redis client arguments.
247
248
Returns:
249
dict: Connection parameters with SSL configuration
250
"""
251
```
252
253
### Redis Sentinel Implementation
254
255
```python { .api }
256
class RedisSentinel(RedisBase):
257
"""
258
Redis Sentinel cluster implementation.
259
260
Provides high availability Redis access through Sentinel
261
with automatic failover support.
262
"""
263
264
def __init__(self, broker_url, *args, **kwargs):
265
"""
266
Initialize Redis Sentinel broker.
267
268
Args:
269
broker_url (str): Sentinel URL (sentinel://host:port/service)
270
**kwargs: Must include broker_options with master_name
271
272
Raises:
273
ValueError: If master_name is not provided
274
"""
275
276
def _prepare_master_name(self, broker_options):
277
"""
278
Extract master name from broker options.
279
280
Args:
281
broker_options (dict): Broker configuration
282
283
Returns:
284
str: Redis master service name
285
"""
286
287
def _get_redis_client(self, broker_options):
288
"""
289
Create Sentinel-aware Redis client.
290
291
Args:
292
broker_options (dict): Sentinel configuration
293
294
Returns:
295
redis.Redis: Sentinel-managed Redis client
296
"""
297
```
298
299
### Redis Unix Socket Implementation
300
301
```python { .api }
302
class RedisSocket(RedisBase):
303
"""
304
Redis Unix domain socket implementation.
305
306
Provides local Redis access through Unix sockets for
307
improved performance and security.
308
"""
309
310
def __init__(self, broker_url, *args, **kwargs):
311
"""
312
Initialize Unix socket Redis broker.
313
314
Args:
315
broker_url (str): Socket URL (redis+socket:///path/to/socket)
316
"""
317
```
318
319
## Queue Information Structure
320
321
```python { .api }
322
QueueInfo = {
323
'name': str, # Queue name
324
'messages': int, # Total messages in queue
325
'messages_ready': int, # Messages ready for delivery
326
'messages_unacknowledged': int, # Messages awaiting acknowledgment
327
'consumers': int, # Number of consumers
328
'consumer_details': [
329
{
330
'consumer_tag': str, # Consumer identifier
331
'channel_details': dict, # Channel information
332
'ack_required': bool, # Acknowledgment required
333
'prefetch_count': int, # Prefetch limit
334
}
335
],
336
'memory': int, # Memory usage in bytes
337
'policy': str, # Queue policy name
338
'arguments': dict, # Queue arguments
339
'auto_delete': bool, # Auto-delete setting
340
'durable': bool, # Durability setting
341
'exclusive': bool, # Exclusivity setting
342
'node': str, # Cluster node hosting queue
343
'state': str, # Queue state
344
'backing_queue_status': dict, # Internal queue status
345
}
346
```
347
348
## Usage Examples
349
350
### Basic Queue Monitoring
351
352
```python
353
from flower.utils.broker import Broker
354
355
# Get broker instance (auto-detected from Celery config)
356
broker = Broker(celery_app.conf.broker_url)
357
358
# Get queue information
359
queue_names = ['celery', 'high_priority', 'low_priority']
360
queue_info = await broker.queues(queue_names)
361
362
for name, info in queue_info.items():
363
print(f"Queue {name}:")
364
print(f" Messages: {info.get('messages', 'N/A')}")
365
print(f" Consumers: {info.get('consumers', 'N/A')}")
366
```
367
368
### RabbitMQ Management API
369
370
```python
371
from flower.utils.broker import RabbitMQ
372
373
# Configure Management API access
374
rabbitmq = RabbitMQ('amqp://guest:guest@localhost:5672//')
375
rabbitmq.management_api = 'http://guest:guest@localhost:15672/api/'
376
377
# Get detailed queue information
378
queues = await rabbitmq.queues(['celery'])
379
queue_info = queues['celery']
380
381
print(f"Ready messages: {queue_info['messages_ready']}")
382
print(f"Unacked messages: {queue_info['messages_unacknowledged']}")
383
print(f"Consumer count: {queue_info['consumers']}")
384
```
385
386
### Redis Queue Monitoring
387
388
```python
389
from flower.utils.broker import Redis
390
391
# Redis broker
392
redis_broker = Redis('redis://localhost:6379/0')
393
394
# Get queue lengths
395
queue_info = await redis_broker.queues(['celery'])
396
print(f"Queue length: {queue_info['celery']['messages']}")
397
```
398
399
## Configuration
400
401
### RabbitMQ Management API
402
403
```bash
404
# Configure Management API access
405
--broker-api=http://guest:guest@localhost:15672/api/
406
407
# Environment variable
408
export FLOWER_BROKER_API=http://guest:guest@localhost:15672/api/
409
```
410
411
### Broker URL Formats
412
413
```python
414
# RabbitMQ
415
broker_url = 'amqp://user:pass@host:5672/vhost'
416
417
# Redis
418
broker_url = 'redis://host:6379/0'
419
420
# Redis SSL
421
broker_url = 'rediss://host:6380/0'
422
423
# Redis Sentinel
424
broker_url = 'sentinel://host:26379/service-name'
425
426
# Redis Unix Socket
427
broker_url = 'redis+socket:///tmp/redis.sock'
428
```
429
430
## Error Handling
431
432
Broker integration includes comprehensive error handling:
433
434
```python
435
try:
436
queue_info = await broker.queues(['celery'])
437
except ConnectionError:
438
print("Cannot connect to broker")
439
except TimeoutError:
440
print("Broker request timed out")
441
except Exception as e:
442
print(f"Broker error: {e}")
443
```
444
445
## Performance Considerations
446
447
- Queue monitoring can impact broker performance with many queues
448
- Use appropriate timeouts for broker requests
449
- Cache queue information when possible
450
- Monitor API rate limits (RabbitMQ Management API)
451
- Consider broker load when configuring monitoring frequency