Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Multi-broker support for monitoring message queues across different broker types including RabbitMQ, Redis, and variants with comprehensive queue statistics.
class Broker:
"""
Broker abstraction factory for different message brokers.
Automatically selects appropriate broker implementation based on URL scheme.
"""
def __new__(cls, broker_url, *args, **kwargs):
"""
Create appropriate broker instance based on URL scheme.
Args:
broker_url (str): Broker connection URL with scheme
*args: Additional arguments passed to broker implementation
**kwargs: Additional keyword arguments passed to broker implementation
Returns:
BrokerBase: Broker implementation instance (RabbitMQ, Redis, etc.)
Supported schemes:
- 'amqp': RabbitMQ broker
- 'redis': Redis broker
- 'rediss': SSL Redis broker
- 'redis+socket': Unix socket Redis broker
- 'sentinel': Redis Sentinel broker
"""
async def queues(self, names):
"""
Get queue information for specified queues.
Args:
names (list): List of queue names to query
Returns:
dict: Queue information keyed by queue name
Async method that returns queue statistics including message counts,
consumer information, and queue configuration.
"""class BrokerBase:
"""Abstract base class for broker implementations."""
def __init__(self, broker_url, *_, **__):
"""
Initialize broker with connection URL.
Args:
broker_url (str): Broker connection URL
"""
async def queues(self, names):
"""
Abstract method for retrieving queue information.
Args:
names (list): Queue names to query
Returns:
dict: Queue statistics and configuration
"""class RabbitMQ(BrokerBase):
"""
RabbitMQ broker integration using Management API.
Supports comprehensive queue monitoring through RabbitMQ's
HTTP Management API.
"""
async def queues(self, names):
"""
Get RabbitMQ queue information via Management API.
Returns detailed queue statistics including:
- Message counts (ready, unacked, total)
- Consumer counts and details
- Queue configuration and properties
- Memory usage and performance metrics
"""
def __init__(self, broker_url, http_api, io_loop=None, **kwargs):
"""
Initialize RabbitMQ broker.
Args:
broker_url (str): AMQP connection URL
http_api (str): HTTP Management API URL
io_loop: Tornado IOLoop instance
**kwargs: Additional broker options
"""
@classmethod
def validate_http_api(cls, http_api):
"""
Validate HTTP Management API URL format.
Args:
http_api (str): Management API URL to validate
Raises:
ValueError: If URL scheme is invalid
"""class RedisBase(BrokerBase):
"""
Base class for Redis broker implementations.
Provides common functionality for Redis-based brokers including
priority queue support and message counting.
"""
DEFAULT_SEP = '\x06\x16' # Priority separator
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] # Default priority levels
def __init__(self, broker_url, *_, **kwargs):
"""
Initialize Redis base broker.
Args:
broker_url (str): Redis connection URL
**kwargs: Broker options including:
- priority_steps: Custom priority levels
- sep: Priority separator character
- global_keyprefix: Key prefix for all operations
"""
def _q_for_pri(self, queue, pri):
"""
Generate priority-specific queue name.
Args:
queue (str): Base queue name
pri (int): Priority level
Returns:
str: Priority-specific queue name
"""
async def queues(self, names):
"""
Get queue statistics for Redis queues.
Args:
names (list): Queue names to query
Returns:
list: Queue statistics with message counts
Counts messages across all priority levels for each queue.
"""class Redis(RedisBase):
"""
Standard Redis broker implementation.
Supports Redis database selection and standard authentication.
"""
def __init__(self, broker_url, *args, **kwargs):
"""
Initialize Redis broker.
Args:
broker_url (str): Redis connection URL (redis://host:port/db)
"""
def _prepare_virtual_host(self, vhost):
"""
Convert virtual host to Redis database number.
Args:
vhost (str): Virtual host from URL
Returns:
int: Redis database number (0-15)
"""
def _get_redis_client_args(self):
"""
Get Redis client connection arguments.
Returns:
dict: Connection parameters for Redis client
"""
def _get_redis_client(self):
"""
Create Redis client instance.
Returns:
redis.Redis: Configured Redis client
"""class RedisSsl(Redis):
"""
Redis SSL/TLS broker implementation.
Provides encrypted connections to Redis using SSL/TLS with
configurable SSL parameters.
"""
def __init__(self, broker_url, *args, **kwargs):
"""
Initialize SSL Redis broker.
Args:
broker_url (str): Redis SSL URL (rediss://host:port/db)
**kwargs: Must include 'broker_use_ssl' configuration
Raises:
ValueError: If broker_use_ssl is not configured
"""
def _get_redis_client_args(self):
"""
Get SSL-enabled Redis client arguments.
Returns:
dict: Connection parameters with SSL configuration
"""class RedisSentinel(RedisBase):
"""
Redis Sentinel cluster implementation.
Provides high availability Redis access through Sentinel
with automatic failover support.
"""
def __init__(self, broker_url, *args, **kwargs):
"""
Initialize Redis Sentinel broker.
Args:
broker_url (str): Sentinel URL (sentinel://host:port/service)
**kwargs: Must include broker_options with master_name
Raises:
ValueError: If master_name is not provided
"""
def _prepare_master_name(self, broker_options):
"""
Extract master name from broker options.
Args:
broker_options (dict): Broker configuration
Returns:
str: Redis master service name
"""
def _get_redis_client(self, broker_options):
"""
Create Sentinel-aware Redis client.
Args:
broker_options (dict): Sentinel configuration
Returns:
redis.Redis: Sentinel-managed Redis client
"""class RedisSocket(RedisBase):
"""
Redis Unix domain socket implementation.
Provides local Redis access through Unix sockets for
improved performance and security.
"""
def __init__(self, broker_url, *args, **kwargs):
"""
Initialize Unix socket Redis broker.
Args:
broker_url (str): Socket URL (redis+socket:///path/to/socket)
"""QueueInfo = {
'name': str, # Queue name
'messages': int, # Total messages in queue
'messages_ready': int, # Messages ready for delivery
'messages_unacknowledged': int, # Messages awaiting acknowledgment
'consumers': int, # Number of consumers
'consumer_details': [
{
'consumer_tag': str, # Consumer identifier
'channel_details': dict, # Channel information
'ack_required': bool, # Acknowledgment required
'prefetch_count': int, # Prefetch limit
}
],
'memory': int, # Memory usage in bytes
'policy': str, # Queue policy name
'arguments': dict, # Queue arguments
'auto_delete': bool, # Auto-delete setting
'durable': bool, # Durability setting
'exclusive': bool, # Exclusivity setting
'node': str, # Cluster node hosting queue
'state': str, # Queue state
'backing_queue_status': dict, # Internal queue status
}from flower.utils.broker import Broker
# Get broker instance (auto-detected from Celery config)
broker = Broker(celery_app.conf.broker_url)
# Get queue information
queue_names = ['celery', 'high_priority', 'low_priority']
queue_info = await broker.queues(queue_names)
for name, info in queue_info.items():
print(f"Queue {name}:")
print(f" Messages: {info.get('messages', 'N/A')}")
print(f" Consumers: {info.get('consumers', 'N/A')}")from flower.utils.broker import RabbitMQ
# Configure Management API access
rabbitmq = RabbitMQ('amqp://guest:guest@localhost:5672//')
rabbitmq.management_api = 'http://guest:guest@localhost:15672/api/'
# Get detailed queue information
queues = await rabbitmq.queues(['celery'])
queue_info = queues['celery']
print(f"Ready messages: {queue_info['messages_ready']}")
print(f"Unacked messages: {queue_info['messages_unacknowledged']}")
print(f"Consumer count: {queue_info['consumers']}")from flower.utils.broker import Redis
# Redis broker
redis_broker = Redis('redis://localhost:6379/0')
# Get queue lengths
queue_info = await redis_broker.queues(['celery'])
print(f"Queue length: {queue_info['celery']['messages']}")# Configure Management API access
--broker-api=http://guest:guest@localhost:15672/api/
# Environment variable
export FLOWER_BROKER_API=http://guest:guest@localhost:15672/api/# RabbitMQ
broker_url = 'amqp://user:pass@host:5672/vhost'
# Redis
broker_url = 'redis://host:6379/0'
# Redis SSL
broker_url = 'rediss://host:6380/0'
# Redis Sentinel
broker_url = 'sentinel://host:26379/service-name'
# Redis Unix Socket
broker_url = 'redis+socket:///tmp/redis.sock'Broker integration includes comprehensive error handling:
try:
queue_info = await broker.queues(['celery'])
except ConnectionError:
print("Cannot connect to broker")
except TimeoutError:
print("Broker request timed out")
except Exception as e:
print(f"Broker error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-flower