Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Worker inspection, status monitoring, and remote control operations for managing Celery worker processes across the cluster.
Core worker inspection and management functionality using Celery's remote control system.
class Inspector:
"""
Celery cluster inspection and worker management.
Provides asynchronous worker inspection and remote control capabilities
using Celery's built-in management commands.
"""
def __init__(self, io_loop, capp, timeout):
"""
Initialize worker inspector.
Args:
io_loop: Tornado IOLoop for async operations
capp: Celery application instance
timeout (float): Inspection timeout in seconds
"""
def inspect(self, workername=None):
"""
Inspect workers asynchronously.
Args:
workername (str, optional): Specific worker name, or None for all workers
Returns:
list: List of futures for inspection results
Performs inspection of worker status, active tasks, registered tasks,
and other worker information using Celery's inspection system.
"""
@property
def workers(self):
"""
Get current worker information dictionary.
Returns:
dict: Worker information keyed by worker name, containing:
- status: Worker online/offline status
- active: Number of active tasks
- processed: Total processed tasks
- load: System load information
- heartbeats: Last heartbeat timestamp
- registered: List of registered task names
- stats: Worker statistics
- active_queues: Active queue information
"""
# Internal methods for inspection
def _inspect(self, method, workername):
"""
Internal method to perform actual worker inspection.
Args:
method (str): Inspection method to execute
workername (str): Target worker name or None for all workers
Executes the specified inspection method using Celery's control interface
and handles the response processing.
"""
def _on_update(self, workername, method, response):
"""
Handle inspection response and update worker information.
Args:
workername (str): Worker name that responded
method (str): Inspection method that was executed
response (dict): Response data from the worker
Updates the internal workers dictionary with the latest information.
"""
# Available inspection methods (from source code)
methods = ('stats', 'active_queues', 'registered', 'scheduled',
'active', 'reserved', 'revoked', 'conf')
@property
def workers(self):
"""
Worker information dictionary.
Returns:
collections.defaultdict: Worker data keyed by worker name
Structure:
{
'worker_name': {
'stats': dict, # Worker statistics
'active_queues': list, # Active queue names
'registered': list, # Registered task names
'scheduled': list, # Scheduled tasks
'active': list, # Currently executing tasks
'reserved': list, # Reserved tasks
'revoked': set, # Revoked task IDs
'conf': dict, # Worker configuration
'timestamp': float, # Last update timestamp
}
}
"""Remote control operations for managing worker processes and their behavior.
# Worker lifecycle control
def worker_shutdown(workername):
"""
Shutdown a specific worker.
Args:
workername (str): Name of worker to shutdown
Sends shutdown signal to the specified worker process.
"""
def worker_pool_restart(workername):
"""
Restart worker's process pool.
Args:
workername (str): Name of worker to restart pool
Restarts the worker's process pool without shutting down the worker.
"""
def worker_pool_grow(workername, n=1):
"""
Increase worker pool size.
Args:
workername (str): Name of worker to modify
n (int): Number of processes to add (default: 1)
Dynamically increases the worker's process pool size.
"""
def worker_pool_shrink(workername, n=1):
"""
Decrease worker pool size.
Args:
workername (str): Name of worker to modify
n (int): Number of processes to remove (default: 1)
Dynamically decreases the worker's process pool size.
"""
def worker_pool_autoscale(workername, min_workers, max_workers):
"""
Configure worker pool autoscaling.
Args:
workername (str): Name of worker to configure
min_workers (int): Minimum number of worker processes
max_workers (int): Maximum number of worker processes
Sets autoscaling parameters for the worker's process pool.
"""Operations for managing worker queue consumption and routing.
def worker_queue_add_consumer(workername, queue_name):
"""
Add queue consumer to worker.
Args:
workername (str): Name of worker to modify
queue_name (str): Name of queue to start consuming
Instructs the worker to start consuming from the specified queue.
"""
def worker_queue_cancel_consumer(workername, queue_name):
"""
Remove queue consumer from worker.
Args:
workername (str): Name of worker to modify
queue_name (str): Name of queue to stop consuming
Instructs the worker to stop consuming from the specified queue.
"""
def get_active_queue_names():
"""
Get list of all active queue names across the cluster.
Returns:
list: Names of all queues being consumed by workers
"""Comprehensive worker information structure returned by inspection operations.
WorkerInfo = {
# Basic identification
'hostname': str, # Worker hostname/identifier
'status': str, # 'online' or 'offline'
'timestamp': float, # Last update timestamp
# Task statistics
'active': int, # Number of currently active tasks
'processed': int, # Total number of processed tasks
'load': [float, float, float], # System load averages (1m, 5m, 15m)
# Process information
'pool': {
'max-concurrency': int, # Maximum concurrent tasks
'processes': [int], # Process IDs in pool
'max-tasks-per-child': int, # Max tasks per child process
'put-guarded-by-semaphore': bool,
'timeouts': [float, float], # Soft and hard timeouts
'writes': {
'total': int, # Total writes
'avg': float, # Average write time
'all': str, # Write time details
}
},
# System information
'rusage': {
'utime': float, # User CPU time
'stime': float, # System CPU time
'maxrss': int, # Maximum resident set size
'ixrss': int, # Integral shared memory size
'idrss': int, # Integral unshared data size
'isrss': int, # Integral unshared stack size
'minflt': int, # Page reclaims
'majflt': int, # Page faults
'nswap': int, # Swaps
'inblock': int, # Block input operations
'oublock': int, # Block output operations
'msgsnd': int, # Messages sent
'msgrcv': int, # Messages received
'nsignals': int, # Signals received
'nvcsw': int, # Voluntary context switches
'nivcsw': int, # Involuntary context switches
},
# Registered tasks
'registered': [str], # List of registered task names
# Active queues
'active_queues': [
{
'name': str, # Queue name
'exchange': {
'name': str, # Exchange name
'type': str, # Exchange type
'durable': bool, # Exchange durability
'auto_delete': bool, # Auto-delete setting
'arguments': dict, # Exchange arguments
},
'routing_key': str, # Routing key
'durable': bool, # Queue durability
'exclusive': bool, # Queue exclusivity
'auto_delete': bool, # Auto-delete setting
'no_ack': bool, # No acknowledgment setting
'alias': str, # Queue alias
'bindings': [dict], # Queue bindings
'no_declare': bool, # No declaration flag
'expires': int, # Queue expiration
'message_ttl': int, # Message TTL
'max_length': int, # Maximum queue length
'max_length_bytes': int, # Maximum queue size in bytes
'max_priority': int, # Maximum message priority
}
],
# Clock information
'clock': int, # Logical clock value
# Software information
'sw_ident': str, # Software identifier
'sw_ver': str, # Software version
'sw_sys': str, # System information
}Structure for active task information returned by worker inspection.
ActiveTask = {
'id': str, # Task UUID
'name': str, # Task name
'args': list, # Task arguments
'kwargs': dict, # Task keyword arguments
'type': str, # Task type
'hostname': str, # Worker hostname
'time_start': float, # Task start timestamp
'acknowledged': bool, # Task acknowledgment status
'delivery_info': {
'exchange': str, # Exchange name
'routing_key': str, # Routing key
'priority': int, # Message priority
'redelivered': bool, # Redelivery flag
},
'worker_pid': int, # Worker process ID
}from flower.inspector import Inspector
from tornado.ioloop import IOLoop
import celery
# Create inspector
celery_app = celery.Celery('myapp', broker='redis://localhost:6379')
io_loop = IOLoop.current()
inspector = Inspector(io_loop, celery_app, timeout=10.0)
# Inspect all workers
async def inspect_workers():
result = await inspector.inspect()
print(f"Found {len(result)} workers")
for worker_name, worker_info in result.items():
print(f"Worker: {worker_name}")
print(f" Status: {worker_info.get('status', 'unknown')}")
print(f" Active tasks: {worker_info.get('active', 0)}")
print(f" Processed: {worker_info.get('processed', 0)}")
# Run inspection
io_loop.run_sync(inspect_workers)# Inspect single worker
async def inspect_single_worker():
result = await inspector.inspect(workername='celery@worker1')
if result:
worker_info = result['celery@worker1']
print(f"Worker celery@worker1:")
print(f" Registered tasks: {worker_info.get('registered', [])}")
print(f" Active queues: {len(worker_info.get('active_queues', []))}")
print(f" Load: {worker_info.get('load', [])}")
io_loop.run_sync(inspect_single_worker)from flower.api.control import (
WorkerShutDown, WorkerPoolRestart, WorkerPoolGrow,
WorkerPoolShrink, WorkerPoolAutoscale,
WorkerQueueAddConsumer, WorkerQueueCancelConsumer
)
# Shutdown worker
async def shutdown_worker():
handler = WorkerShutDown()
await handler.post('celery@worker1')
# Restart worker pool
async def restart_pool():
handler = WorkerPoolRestart()
await handler.post('celery@worker1')
# Grow worker pool
async def grow_pool():
handler = WorkerPoolGrow()
await handler.post('celery@worker1', n=2) # Add 2 processes
# Configure autoscaling
async def configure_autoscale():
handler = WorkerPoolAutoscale()
await handler.post('celery@worker1', min=2, max=10)# Add queue consumer
async def add_queue_consumer():
handler = WorkerQueueAddConsumer()
await handler.post('celery@worker1', queue='high_priority')
# Remove queue consumer
async def remove_queue_consumer():
handler = WorkerQueueCancelConsumer()
await handler.post('celery@worker1', queue='low_priority')import asyncio
from flower.inspector import Inspector
class WorkerMonitor:
def __init__(self, inspector):
self.inspector = inspector
self.running = False
async def start_monitoring(self, interval=30):
"""Monitor workers every `interval` seconds."""
self.running = True
while self.running:
try:
workers = await self.inspector.inspect()
await self.process_worker_updates(workers)
await asyncio.sleep(interval)
except Exception as e:
print(f"Monitoring error: {e}")
await asyncio.sleep(5)
async def process_worker_updates(self, workers):
"""Process worker status updates."""
for worker_name, worker_info in workers.items():
status = worker_info.get('status', 'unknown')
active = worker_info.get('active', 0)
load = worker_info.get('load', [0, 0, 0])
print(f"{worker_name}: {status}, {active} active, load: {load[0]:.2f}")
# Alert on high load
if load[0] > 5.0:
print(f"HIGH LOAD WARNING: {worker_name} load: {load[0]:.2f}")
def stop_monitoring(self):
"""Stop monitoring loop."""
self.running = False
# Usage
monitor = WorkerMonitor(inspector)
asyncio.create_task(monitor.start_monitoring(interval=10))Worker information from the inspector is combined with real-time event data for comprehensive monitoring.
from flower.events import Events
from flower.inspector import Inspector
# Combine inspector and events data
class WorkerManager:
def __init__(self, celery_app, io_loop):
self.inspector = Inspector(io_loop, celery_app, timeout=10.0)
self.events = Events(celery_app, io_loop)
async def get_complete_worker_info(self):
"""Get combined worker information from inspection and events."""
# Get inspection data
inspection_data = await self.inspector.inspect()
# Get event data
event_workers = self.events.workers
# Combine data
combined = {}
for worker_name in set(inspection_data.keys()) | set(event_workers.keys()):
combined[worker_name] = {
'inspection': inspection_data.get(worker_name, {}),
'events': event_workers.get(worker_name, {}),
'online': worker_name in inspection_data,
'last_heartbeat': event_workers.get(worker_name, {}).get('timestamp'),
}
return combinedWorker management operations include comprehensive error handling for various failure scenarios:
# Handle worker inspection errors
try:
result = await inspector.inspect()
except Exception as e:
if 'timeout' in str(e).lower():
print("Worker inspection timed out - workers may be overloaded")
elif 'connection' in str(e).lower():
print("Cannot connect to broker - check broker status")
else:
print(f"Inspection failed: {e}")
# Handle control command errors
try:
await worker_shutdown('celery@worker1')
except Exception as e:
if 'no such worker' in str(e).lower():
print("Worker not found or already offline")
else:
print(f"Control command failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-flower