CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flower

Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.

Pending
Overview
Eval results
Files

broker.mddocs/

Broker Integration

Multi-broker support for monitoring message queues across different broker types including RabbitMQ, Redis, and variants with comprehensive queue statistics.

Capabilities

Broker Factory

class Broker:
    """
    Broker abstraction factory for different message brokers.
    
    Automatically selects appropriate broker implementation based on URL scheme.
    """
    
    def __new__(cls, broker_url, *args, **kwargs):
        """
        Create appropriate broker instance based on URL scheme.
        
        Args:
            broker_url (str): Broker connection URL with scheme
            *args: Additional arguments passed to broker implementation
            **kwargs: Additional keyword arguments passed to broker implementation
            
        Returns:
            BrokerBase: Broker implementation instance (RabbitMQ, Redis, etc.)
            
        Supported schemes:
        - 'amqp': RabbitMQ broker
        - 'redis': Redis broker  
        - 'rediss': SSL Redis broker
        - 'redis+socket': Unix socket Redis broker
        - 'sentinel': Redis Sentinel broker
        """
    
    async def queues(self, names):
        """
        Get queue information for specified queues.
        
        Args:
            names (list): List of queue names to query
            
        Returns:
            dict: Queue information keyed by queue name
            
        Async method that returns queue statistics including message counts,
        consumer information, and queue configuration.
        """

Broker Base Class

class BrokerBase:
    """Abstract base class for broker implementations."""
    
    def __init__(self, broker_url, *_, **__):
        """
        Initialize broker with connection URL.
        
        Args:
            broker_url (str): Broker connection URL
        """
    
    async def queues(self, names):
        """
        Abstract method for retrieving queue information.
        
        Args:
            names (list): Queue names to query
            
        Returns:
            dict: Queue statistics and configuration
        """

RabbitMQ Support

class RabbitMQ(BrokerBase):
    """
    RabbitMQ broker integration using Management API.
    
    Supports comprehensive queue monitoring through RabbitMQ's
    HTTP Management API.
    """
    
    async def queues(self, names):
        """
        Get RabbitMQ queue information via Management API.
        
        Returns detailed queue statistics including:
        - Message counts (ready, unacked, total)
        - Consumer counts and details
        - Queue configuration and properties
        - Memory usage and performance metrics
        """
    
    def __init__(self, broker_url, http_api, io_loop=None, **kwargs):
        """
        Initialize RabbitMQ broker.
        
        Args:
            broker_url (str): AMQP connection URL
            http_api (str): HTTP Management API URL  
            io_loop: Tornado IOLoop instance
            **kwargs: Additional broker options
        """
    
    @classmethod
    def validate_http_api(cls, http_api):
        """
        Validate HTTP Management API URL format.
        
        Args:
            http_api (str): Management API URL to validate
            
        Raises:
            ValueError: If URL scheme is invalid
        """

Redis Base Class

class RedisBase(BrokerBase):
    """
    Base class for Redis broker implementations.
    
    Provides common functionality for Redis-based brokers including
    priority queue support and message counting.
    """
    
    DEFAULT_SEP = '\x06\x16'                    # Priority separator
    DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]      # Default priority levels
    
    def __init__(self, broker_url, *_, **kwargs):
        """
        Initialize Redis base broker.
        
        Args:
            broker_url (str): Redis connection URL
            **kwargs: Broker options including:
                - priority_steps: Custom priority levels
                - sep: Priority separator character
                - global_keyprefix: Key prefix for all operations
        """
    
    def _q_for_pri(self, queue, pri):
        """
        Generate priority-specific queue name.
        
        Args:
            queue (str): Base queue name
            pri (int): Priority level
            
        Returns:
            str: Priority-specific queue name
        """
    
    async def queues(self, names):
        """
        Get queue statistics for Redis queues.
        
        Args:
            names (list): Queue names to query
            
        Returns:
            list: Queue statistics with message counts
            
        Counts messages across all priority levels for each queue.
        """

Redis Standard Implementation

class Redis(RedisBase):
    """
    Standard Redis broker implementation.
    
    Supports Redis database selection and standard authentication.
    """
    
    def __init__(self, broker_url, *args, **kwargs):
        """
        Initialize Redis broker.
        
        Args:
            broker_url (str): Redis connection URL (redis://host:port/db)
        """
    
    def _prepare_virtual_host(self, vhost):
        """
        Convert virtual host to Redis database number.
        
        Args:
            vhost (str): Virtual host from URL
            
        Returns:
            int: Redis database number (0-15)
        """
    
    def _get_redis_client_args(self):
        """
        Get Redis client connection arguments.
        
        Returns:
            dict: Connection parameters for Redis client
        """
    
    def _get_redis_client(self):
        """
        Create Redis client instance.
        
        Returns:
            redis.Redis: Configured Redis client
        """

Redis SSL Implementation

class RedisSsl(Redis):
    """
    Redis SSL/TLS broker implementation.
    
    Provides encrypted connections to Redis using SSL/TLS with
    configurable SSL parameters.
    """
    
    def __init__(self, broker_url, *args, **kwargs):
        """
        Initialize SSL Redis broker.
        
        Args:
            broker_url (str): Redis SSL URL (rediss://host:port/db)
            **kwargs: Must include 'broker_use_ssl' configuration
            
        Raises:
            ValueError: If broker_use_ssl is not configured
        """
    
    def _get_redis_client_args(self):
        """
        Get SSL-enabled Redis client arguments.
        
        Returns:
            dict: Connection parameters with SSL configuration
        """

Redis Sentinel Implementation

class RedisSentinel(RedisBase):
    """
    Redis Sentinel cluster implementation.
    
    Provides high availability Redis access through Sentinel
    with automatic failover support.
    """
    
    def __init__(self, broker_url, *args, **kwargs):
        """
        Initialize Redis Sentinel broker.
        
        Args:
            broker_url (str): Sentinel URL (sentinel://host:port/service)
            **kwargs: Must include broker_options with master_name
            
        Raises:
            ValueError: If master_name is not provided
        """
    
    def _prepare_master_name(self, broker_options):
        """
        Extract master name from broker options.
        
        Args:
            broker_options (dict): Broker configuration
            
        Returns:
            str: Redis master service name
        """
    
    def _get_redis_client(self, broker_options):
        """
        Create Sentinel-aware Redis client.
        
        Args:
            broker_options (dict): Sentinel configuration
            
        Returns:
            redis.Redis: Sentinel-managed Redis client
        """

Redis Unix Socket Implementation

class RedisSocket(RedisBase):
    """
    Redis Unix domain socket implementation.
    
    Provides local Redis access through Unix sockets for
    improved performance and security.
    """
    
    def __init__(self, broker_url, *args, **kwargs):
        """
        Initialize Unix socket Redis broker.
        
        Args:
            broker_url (str): Socket URL (redis+socket:///path/to/socket)
        """

Queue Information Structure

QueueInfo = {
    'name': str,                    # Queue name
    'messages': int,                # Total messages in queue
    'messages_ready': int,          # Messages ready for delivery
    'messages_unacknowledged': int, # Messages awaiting acknowledgment
    'consumers': int,               # Number of consumers
    'consumer_details': [
        {
            'consumer_tag': str,    # Consumer identifier
            'channel_details': dict, # Channel information
            'ack_required': bool,   # Acknowledgment required
            'prefetch_count': int,  # Prefetch limit
        }
    ],
    'memory': int,                  # Memory usage in bytes
    'policy': str,                  # Queue policy name
    'arguments': dict,              # Queue arguments
    'auto_delete': bool,            # Auto-delete setting
    'durable': bool,               # Durability setting
    'exclusive': bool,             # Exclusivity setting
    'node': str,                   # Cluster node hosting queue
    'state': str,                  # Queue state
    'backing_queue_status': dict,  # Internal queue status
}

Usage Examples

Basic Queue Monitoring

from flower.utils.broker import Broker

# Get broker instance (auto-detected from Celery config)
broker = Broker(celery_app.conf.broker_url)

# Get queue information
queue_names = ['celery', 'high_priority', 'low_priority']
queue_info = await broker.queues(queue_names)

for name, info in queue_info.items():
    print(f"Queue {name}:")
    print(f"  Messages: {info.get('messages', 'N/A')}")
    print(f"  Consumers: {info.get('consumers', 'N/A')}")

RabbitMQ Management API

from flower.utils.broker import RabbitMQ

# Configure Management API access
rabbitmq = RabbitMQ('amqp://guest:guest@localhost:5672//')
rabbitmq.management_api = 'http://guest:guest@localhost:15672/api/'

# Get detailed queue information
queues = await rabbitmq.queues(['celery'])
queue_info = queues['celery']

print(f"Ready messages: {queue_info['messages_ready']}")
print(f"Unacked messages: {queue_info['messages_unacknowledged']}")
print(f"Consumer count: {queue_info['consumers']}")

Redis Queue Monitoring

from flower.utils.broker import Redis

# Redis broker
redis_broker = Redis('redis://localhost:6379/0')

# Get queue lengths
queue_info = await redis_broker.queues(['celery'])
print(f"Queue length: {queue_info['celery']['messages']}")

Configuration

RabbitMQ Management API

# Configure Management API access
--broker-api=http://guest:guest@localhost:15672/api/

# Environment variable
export FLOWER_BROKER_API=http://guest:guest@localhost:15672/api/

Broker URL Formats

# RabbitMQ
broker_url = 'amqp://user:pass@host:5672/vhost'

# Redis
broker_url = 'redis://host:6379/0'

# Redis SSL
broker_url = 'rediss://host:6380/0'

# Redis Sentinel
broker_url = 'sentinel://host:26379/service-name'

# Redis Unix Socket
broker_url = 'redis+socket:///tmp/redis.sock'

Error Handling

Broker integration includes comprehensive error handling:

try:
    queue_info = await broker.queues(['celery'])
except ConnectionError:
    print("Cannot connect to broker")
except TimeoutError:
    print("Broker request timed out")
except Exception as e:
    print(f"Broker error: {e}")

Performance Considerations

  • Queue monitoring can impact broker performance with many queues
  • Use appropriate timeouts for broker requests
  • Cache queue information when possible
  • Monitor API rate limits (RabbitMQ Management API)
  • Consider broker load when configuring monitoring frequency

Install with Tessl CLI

npx tessl i tessl/pypi-flower

docs

application.md

authentication.md

broker.md

command-line.md

events.md

index.md

rest-api.md

tasks.md

utilities.md

web-interface.md

workers.md

tile.json