Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Real-time event processing system with persistent storage, state management, and Prometheus metrics collection for comprehensive Celery cluster monitoring.
Main event monitoring class that captures and processes Celery events in real-time.
class Events(threading.Thread):
"""
Real-time Celery event monitoring with persistent storage and metrics.
Runs in a separate thread to capture events from Celery's event system,
process them for state management, and optionally persist to disk.
"""
def __init__(self, capp, io_loop, db=None, persistent=False,
enable_events=True, state_save_interval=0, **kwargs):
"""
Initialize event monitoring system.
Args:
capp: Celery application instance
io_loop: Tornado IOLoop for async operations
db (str, optional): Database file path for persistence
persistent (bool): Enable persistent storage (default: False)
enable_events (bool): Auto-enable events on workers (default: True)
state_save_interval (int): Save interval in seconds (default: 0)
**kwargs: Additional configuration options
"""
def start(self):
"""
Start event monitoring thread.
Begins the event capture loop and state management.
"""
def stop(self):
"""
Stop event monitoring thread.
Gracefully shuts down event capture and saves final state.
"""
def run(self):
"""
Main event capture loop.
Continuously captures events from Celery broker and processes them.
This method runs in the event monitoring thread.
"""
def save_state(self):
"""
Save current state to persistent storage.
Serializes worker and task state to the configured database file.
"""
def on_enable_events(self):
"""
Enable event monitoring on all workers.
Sends enable_events control command to all active workers.
"""
def on_event(self, event):
"""
Process incoming Celery event.
Args:
event (dict): Celery event data
Updates internal state and triggers any necessary notifications.
"""
# State access properties
@property
def state(self):
"""EventsState instance containing all worker and task data."""
@property
def workers(self):
"""Dict of worker information keyed by worker name."""
@property
def tasks(self):
"""Dict of task information keyed by task UUID."""
events_enable_interval = 5000 # Interval for enabling events on workers (ms)Extended state management class with metrics and improved event processing.
class EventsState(celery.events.state.State):
"""
Enhanced Celery state management with metrics collection.
Extends Celery's built-in State class to add Prometheus metrics
and event counting capabilities.
"""
def __init__(self, *args, **kwargs):
"""
Initialize enhanced state management.
Args:
*args: Arguments passed to parent State class
**kwargs: Keyword arguments passed to parent State class
Creates internal event counters and metrics instance.
"""
def event(self, event):
"""
Process and store event with metrics collection.
Args:
event (dict): Celery event data
Processes the event through the parent class, updates counters,
and collects Prometheus metrics for various event types.
Handles:
- Task events: received, started, succeeded, failed, etc.
- Worker events: online, offline, heartbeat
- Metrics: runtime, prefetch time, worker status
"""
# Enhanced attributes
counter: collections.defaultdict # Event counters per worker
metrics: PrometheusMetrics # Prometheus metrics instanceComprehensive metrics collection for monitoring cluster performance and health.
class PrometheusMetrics:
"""
Prometheus metrics collection for Celery cluster monitoring.
Provides various metrics for tracking task execution, worker status,
and system performance.
"""
def __init__(self):
"""Initialize Prometheus metrics with proper labels and buckets."""
# Core metrics (actual implementation from source code)
events: PrometheusCounter = PrometheusCounter(
'flower_events_total',
"Number of events",
['worker', 'type', 'task']
)
runtime: Histogram = Histogram(
'flower_task_runtime_seconds',
"Task runtime",
['worker', 'task'],
buckets=options.task_runtime_metric_buckets
)
prefetch_time: Gauge = Gauge(
'flower_task_prefetch_time_seconds',
"The time the task spent waiting at the celery worker to be executed.",
['worker', 'task']
)
number_of_prefetched_tasks: Gauge = Gauge(
'flower_worker_prefetched_tasks',
'Number of tasks of given type prefetched at a worker',
['worker', 'task']
)
worker_online: Gauge = Gauge(
'flower_worker_online',
"Worker online status",
['worker']
)
worker_number_of_currently_executing_tasks: Gauge = Gauge(
'flower_worker_number_of_currently_executing_tasks',
"Number of tasks currently executing at a worker",
['worker']
)
def get_prometheus_metrics():
"""
Get singleton PrometheusMetrics instance.
Returns:
PrometheusMetrics: Global metrics instance
Creates the metrics instance on first call and returns the same
instance on subsequent calls.
"""Events related to task lifecycle and execution.
# Task event types
TASK_EVENTS = [
'task-sent', # Task was sent to broker
'task-received', # Worker received task
'task-started', # Worker started executing task
'task-succeeded', # Task completed successfully
'task-failed', # Task execution failed
'task-retried', # Task was retried
'task-revoked', # Task was revoked/cancelled
]
# Task event data structure
TaskEvent = {
'type': str, # Event type
'uuid': str, # Task UUID
'name': str, # Task name
'hostname': str, # Worker hostname
'timestamp': float, # Event timestamp
'args': list, # Task arguments (if available)
'kwargs': dict, # Task keyword arguments (if available)
'retries': int, # Number of retries
'eta': str, # Estimated time of arrival
'expires': str, # Expiration time
'result': Any, # Task result (for success events)
'traceback': str, # Error traceback (for failure events)
'runtime': float, # Execution time (for completion events)
}Events related to worker status and lifecycle.
# Worker event types
WORKER_EVENTS = [
'worker-online', # Worker came online
'worker-offline', # Worker went offline
'worker-heartbeat', # Worker heartbeat
]
# Worker event data structure
WorkerEvent = {
'type': str, # Event type
'hostname': str, # Worker hostname
'timestamp': float, # Event timestamp
'active': int, # Number of active tasks
'processed': int, # Total processed tasks
'load': list, # System load averages
'freq': float, # CPU frequency
'sw_ident': str, # Software identifier
'sw_ver': str, # Software version
'sw_sys': str, # System information
}from flower.events import Events
from tornado.ioloop import IOLoop
import celery
# Create Celery app
celery_app = celery.Celery('myapp', broker='redis://localhost:6379')
# Create event monitor
io_loop = IOLoop.current()
events = Events(
capp=celery_app,
io_loop=io_loop,
enable_events=True # Auto-enable events on workers
)
# Start monitoring
events.start()
# Access state
print(f"Active workers: {len(events.workers)}")
print(f"Total tasks: {len(events.tasks)}")
# Stop monitoring
events.stop()from flower.events import Events
# Enable persistence
events = Events(
capp=celery_app,
io_loop=io_loop,
persistent=True,
db='/var/lib/flower/events.db',
state_save_interval=30 # Save every 30 seconds
)
events.start()
# State is automatically saved and restored# Configure memory limits
events = Events(
capp=celery_app,
io_loop=io_loop,
max_workers_in_memory=1000, # Keep max 1000 workers
max_tasks_in_memory=50000 # Keep max 50000 tasks
)
events.start()from flower.events import Events
class CustomEvents(Events):
def on_event(self, event):
# Custom event processing
if event['type'] == 'task-failed':
print(f"Task {event['uuid']} failed: {event.get('traceback')}")
# Call parent processing
super().on_event(event)
# Use custom event processor
events = CustomEvents(capp=celery_app, io_loop=io_loop)
events.start()from flower.events import Events, PrometheusMetrics
# Events automatically collect Prometheus metrics
events = Events(capp=celery_app, io_loop=io_loop)
events.start()
# Access metrics through the events.state.metrics
metrics = events.state.metrics
# Metrics are available at /metrics endpoint when using Flower web interfaceFlower uses pickle serialization for state persistence:
# State file structure
{
'workers': {
'worker_name': {
'hostname': str,
'active': int,
'processed': int,
'load': list,
'timestamp': float,
# ... additional worker data
}
},
'tasks': {
'task_uuid': {
'name': str,
'state': str,
'hostname': str,
'timestamp': float,
'args': list,
'kwargs': dict,
'result': Any,
'runtime': float,
# ... additional task data
}
}
}# Manually save state
events.save_state()
# Check if persistence is enabled
if events.persistent:
print(f"State saved to: {events.db}")
# Load state on startup (automatic)
events = Events(capp=celery_app, io_loop=io_loop, persistent=True, db='state.db')
# Previous state is automatically loadedmax_workers_in_memory and max_tasks_in_memorystate_save_interval based on data volume and requirementsInstall with Tessl CLI
npx tessl i tessl/pypi-flower