CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flower

Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

workers.mddocs/

Worker Management

Worker inspection, status monitoring, and remote control operations for managing Celery worker processes across the cluster.

Capabilities

Inspector Class

Core worker inspection and management functionality using Celery's remote control system.

class Inspector:
    """
    Celery cluster inspection and worker management.
    
    Provides asynchronous worker inspection and remote control capabilities
    using Celery's built-in management commands.
    """
    
    def __init__(self, io_loop, capp, timeout):
        """
        Initialize worker inspector.
        
        Args:
            io_loop: Tornado IOLoop for async operations
            capp: Celery application instance
            timeout (float): Inspection timeout in seconds
        """
    
    def inspect(self, workername=None):
        """
        Inspect workers asynchronously.
        
        Args:
            workername (str, optional): Specific worker name, or None for all workers
            
        Returns:
            list: List of futures for inspection results
            
        Performs inspection of worker status, active tasks, registered tasks,
        and other worker information using Celery's inspection system.
        """
    
    @property
    def workers(self):
        """
        Get current worker information dictionary.
        
        Returns:
            dict: Worker information keyed by worker name, containing:
                - status: Worker online/offline status
                - active: Number of active tasks
                - processed: Total processed tasks  
                - load: System load information
                - heartbeats: Last heartbeat timestamp
                - registered: List of registered task names
                - stats: Worker statistics
                - active_queues: Active queue information
        """
    
    # Internal methods for inspection
    def _inspect(self, method, workername):
        """
        Internal method to perform actual worker inspection.
        
        Args:
            method (str): Inspection method to execute
            workername (str): Target worker name or None for all workers
            
        Executes the specified inspection method using Celery's control interface
        and handles the response processing.
        """
    
    def _on_update(self, workername, method, response):
        """
        Handle inspection response and update worker information.
        
        Args:
            workername (str): Worker name that responded
            method (str): Inspection method that was executed
            response (dict): Response data from the worker
            
        Updates the internal workers dictionary with the latest information.
        """
    
    # Available inspection methods (from source code)
    methods = ('stats', 'active_queues', 'registered', 'scheduled',
               'active', 'reserved', 'revoked', 'conf')
    
    @property  
    def workers(self):
        """
        Worker information dictionary.
        
        Returns:
            collections.defaultdict: Worker data keyed by worker name
            
        Structure:
            {
                'worker_name': {
                    'stats': dict,           # Worker statistics
                    'active_queues': list,   # Active queue names
                    'registered': list,      # Registered task names
                    'scheduled': list,       # Scheduled tasks
                    'active': list,          # Currently executing tasks
                    'reserved': list,        # Reserved tasks
                    'revoked': set,          # Revoked task IDs
                    'conf': dict,           # Worker configuration
                    'timestamp': float,      # Last update timestamp
                }
            }
        """

Worker Control Operations

Remote control operations for managing worker processes and their behavior.

# Worker lifecycle control
def worker_shutdown(workername):
    """
    Shutdown a specific worker.
    
    Args:
        workername (str): Name of worker to shutdown
        
    Sends shutdown signal to the specified worker process.
    """

def worker_pool_restart(workername):
    """
    Restart worker's process pool.
    
    Args:
        workername (str): Name of worker to restart pool
        
    Restarts the worker's process pool without shutting down the worker.
    """

def worker_pool_grow(workername, n=1):
    """
    Increase worker pool size.
    
    Args:
        workername (str): Name of worker to modify
        n (int): Number of processes to add (default: 1)
        
    Dynamically increases the worker's process pool size.
    """

def worker_pool_shrink(workername, n=1):
    """
    Decrease worker pool size.
    
    Args:
        workername (str): Name of worker to modify
        n (int): Number of processes to remove (default: 1)
        
    Dynamically decreases the worker's process pool size.
    """

def worker_pool_autoscale(workername, min_workers, max_workers):
    """
    Configure worker pool autoscaling.
    
    Args:
        workername (str): Name of worker to configure
        min_workers (int): Minimum number of worker processes
        max_workers (int): Maximum number of worker processes
        
    Sets autoscaling parameters for the worker's process pool.
    """

Queue Management

Operations for managing worker queue consumption and routing.

def worker_queue_add_consumer(workername, queue_name):
    """
    Add queue consumer to worker.
    
    Args:
        workername (str): Name of worker to modify
        queue_name (str): Name of queue to start consuming
        
    Instructs the worker to start consuming from the specified queue.
    """

def worker_queue_cancel_consumer(workername, queue_name):
    """
    Remove queue consumer from worker.
    
    Args:
        workername (str): Name of worker to modify  
        queue_name (str): Name of queue to stop consuming
        
    Instructs the worker to stop consuming from the specified queue.
    """

def get_active_queue_names():
    """
    Get list of all active queue names across the cluster.
    
    Returns:
        list: Names of all queues being consumed by workers
    """

Worker Information Structure

Worker Status Data

Comprehensive worker information structure returned by inspection operations.

WorkerInfo = {
    # Basic identification
    'hostname': str,           # Worker hostname/identifier
    'status': str,            # 'online' or 'offline'
    'timestamp': float,       # Last update timestamp
    
    # Task statistics
    'active': int,            # Number of currently active tasks
    'processed': int,         # Total number of processed tasks
    'load': [float, float, float],  # System load averages (1m, 5m, 15m)
    
    # Process information
    'pool': {
        'max-concurrency': int,      # Maximum concurrent tasks
        'processes': [int],          # Process IDs in pool
        'max-tasks-per-child': int,  # Max tasks per child process
        'put-guarded-by-semaphore': bool,
        'timeouts': [float, float],  # Soft and hard timeouts
        'writes': {
            'total': int,            # Total writes
            'avg': float,           # Average write time
            'all': str,             # Write time details  
        }
    },
    
    # System information
    'rusage': {
        'utime': float,         # User CPU time
        'stime': float,         # System CPU time
        'maxrss': int,          # Maximum resident set size
        'ixrss': int,          # Integral shared memory size
        'idrss': int,          # Integral unshared data size
        'isrss': int,          # Integral unshared stack size
        'minflt': int,         # Page reclaims
        'majflt': int,         # Page faults
        'nswap': int,          # Swaps
        'inblock': int,        # Block input operations
        'oublock': int,        # Block output operations
        'msgsnd': int,         # Messages sent
        'msgrcv': int,         # Messages received
        'nsignals': int,       # Signals received
        'nvcsw': int,          # Voluntary context switches
        'nivcsw': int,         # Involuntary context switches
    },
    
    # Registered tasks
    'registered': [str],      # List of registered task names
    
    # Active queues
    'active_queues': [
        {
            'name': str,              # Queue name
            'exchange': {
                'name': str,          # Exchange name
                'type': str,          # Exchange type
                'durable': bool,      # Exchange durability
                'auto_delete': bool,  # Auto-delete setting
                'arguments': dict,    # Exchange arguments
            },
            'routing_key': str,       # Routing key
            'durable': bool,          # Queue durability
            'exclusive': bool,        # Queue exclusivity
            'auto_delete': bool,      # Auto-delete setting
            'no_ack': bool,          # No acknowledgment setting
            'alias': str,            # Queue alias
            'bindings': [dict],      # Queue bindings
            'no_declare': bool,      # No declaration flag
            'expires': int,          # Queue expiration
            'message_ttl': int,      # Message TTL
            'max_length': int,       # Maximum queue length
            'max_length_bytes': int, # Maximum queue size in bytes
            'max_priority': int,     # Maximum message priority
        }
    ],
    
    # Clock information
    'clock': int,             # Logical clock value
    
    # Software information
    'sw_ident': str,          # Software identifier
    'sw_ver': str,           # Software version
    'sw_sys': str,           # System information
}

Active Task Information

Structure for active task information returned by worker inspection.

ActiveTask = {
    'id': str,               # Task UUID
    'name': str,             # Task name
    'args': list,            # Task arguments
    'kwargs': dict,          # Task keyword arguments
    'type': str,             # Task type
    'hostname': str,         # Worker hostname
    'time_start': float,     # Task start timestamp
    'acknowledged': bool,    # Task acknowledgment status
    'delivery_info': {
        'exchange': str,     # Exchange name
        'routing_key': str,  # Routing key
        'priority': int,     # Message priority
        'redelivered': bool, # Redelivery flag
    },
    'worker_pid': int,       # Worker process ID
}

Usage Examples

Basic Worker Inspection

from flower.inspector import Inspector
from tornado.ioloop import IOLoop
import celery

# Create inspector
celery_app = celery.Celery('myapp', broker='redis://localhost:6379')
io_loop = IOLoop.current()
inspector = Inspector(io_loop, celery_app, timeout=10.0)

# Inspect all workers
async def inspect_workers():
    result = await inspector.inspect()
    print(f"Found {len(result)} workers")
    
    for worker_name, worker_info in result.items():
        print(f"Worker: {worker_name}")
        print(f"  Status: {worker_info.get('status', 'unknown')}")
        print(f"  Active tasks: {worker_info.get('active', 0)}")
        print(f"  Processed: {worker_info.get('processed', 0)}")

# Run inspection
io_loop.run_sync(inspect_workers)

Inspect Specific Worker

# Inspect single worker
async def inspect_single_worker():
    result = await inspector.inspect(workername='celery@worker1')
    
    if result:
        worker_info = result['celery@worker1']
        print(f"Worker celery@worker1:")
        print(f"  Registered tasks: {worker_info.get('registered', [])}")
        print(f"  Active queues: {len(worker_info.get('active_queues', []))}")
        print(f"  Load: {worker_info.get('load', [])}")

io_loop.run_sync(inspect_single_worker)

Worker Control Operations

from flower.api.control import (
    WorkerShutDown, WorkerPoolRestart, WorkerPoolGrow,
    WorkerPoolShrink, WorkerPoolAutoscale,
    WorkerQueueAddConsumer, WorkerQueueCancelConsumer
)

# Shutdown worker
async def shutdown_worker():
    handler = WorkerShutDown()
    await handler.post('celery@worker1')

# Restart worker pool
async def restart_pool():
    handler = WorkerPoolRestart()
    await handler.post('celery@worker1')

# Grow worker pool
async def grow_pool():
    handler = WorkerPoolGrow()
    await handler.post('celery@worker1', n=2)  # Add 2 processes

# Configure autoscaling
async def configure_autoscale():
    handler = WorkerPoolAutoscale()
    await handler.post('celery@worker1', min=2, max=10)

Queue Consumer Management

# Add queue consumer
async def add_queue_consumer():
    handler = WorkerQueueAddConsumer()
    await handler.post('celery@worker1', queue='high_priority')

# Remove queue consumer  
async def remove_queue_consumer():
    handler = WorkerQueueCancelConsumer()
    await handler.post('celery@worker1', queue='low_priority')

Real-time Worker Monitoring

import asyncio
from flower.inspector import Inspector

class WorkerMonitor:
    def __init__(self, inspector):
        self.inspector = inspector
        self.running = False
    
    async def start_monitoring(self, interval=30):
        """Monitor workers every `interval` seconds."""
        self.running = True
        
        while self.running:
            try:
                workers = await self.inspector.inspect()
                await self.process_worker_updates(workers)
                await asyncio.sleep(interval)
            except Exception as e:
                print(f"Monitoring error: {e}")
                await asyncio.sleep(5)
    
    async def process_worker_updates(self, workers):
        """Process worker status updates."""
        for worker_name, worker_info in workers.items():
            status = worker_info.get('status', 'unknown')
            active = worker_info.get('active', 0)
            load = worker_info.get('load', [0, 0, 0])
            
            print(f"{worker_name}: {status}, {active} active, load: {load[0]:.2f}")
            
            # Alert on high load
            if load[0] > 5.0:
                print(f"HIGH LOAD WARNING: {worker_name} load: {load[0]:.2f}")
    
    def stop_monitoring(self):
        """Stop monitoring loop."""
        self.running = False

# Usage
monitor = WorkerMonitor(inspector)
asyncio.create_task(monitor.start_monitoring(interval=10))

Integration with Events

Worker information from the inspector is combined with real-time event data for comprehensive monitoring.

from flower.events import Events
from flower.inspector import Inspector

# Combine inspector and events data
class WorkerManager:
    def __init__(self, celery_app, io_loop):
        self.inspector = Inspector(io_loop, celery_app, timeout=10.0)
        self.events = Events(celery_app, io_loop)
    
    async def get_complete_worker_info(self):
        """Get combined worker information from inspection and events."""
        # Get inspection data
        inspection_data = await self.inspector.inspect()
        
        # Get event data
        event_workers = self.events.workers
        
        # Combine data
        combined = {}
        for worker_name in set(inspection_data.keys()) | set(event_workers.keys()):
            combined[worker_name] = {
                'inspection': inspection_data.get(worker_name, {}),
                'events': event_workers.get(worker_name, {}),
                'online': worker_name in inspection_data,
                'last_heartbeat': event_workers.get(worker_name, {}).get('timestamp'),
            }
        
        return combined

Error Handling

Worker management operations include comprehensive error handling for various failure scenarios:

# Handle worker inspection errors
try:
    result = await inspector.inspect()
except Exception as e:
    if 'timeout' in str(e).lower():
        print("Worker inspection timed out - workers may be overloaded")
    elif 'connection' in str(e).lower():
        print("Cannot connect to broker - check broker status")
    else:
        print(f"Inspection failed: {e}")

# Handle control command errors
try:
    await worker_shutdown('celery@worker1')
except Exception as e:
    if 'no such worker' in str(e).lower():
        print("Worker not found or already offline")
    else:
        print(f"Control command failed: {e}")

Performance Considerations

  • Worker inspection can be expensive with many workers - use appropriate timeouts
  • Cache inspection results when possible to reduce broker load
  • Monitor inspection latency to detect broker or network issues
  • Use specific worker names when possible to reduce inspection scope
  • Consider inspection frequency based on cluster size and requirements

Install with Tessl CLI

npx tessl i tessl/pypi-flower

docs

application.md

authentication.md

broker.md

command-line.md

events.md

index.md

rest-api.md

tasks.md

utilities.md

web-interface.md

workers.md

tile.json