CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flower

Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.

Pending
Overview
Eval results
Files

events.mddocs/

Event Monitoring

Real-time event processing system with persistent storage, state management, and Prometheus metrics collection for comprehensive Celery cluster monitoring.

Capabilities

Events Class

Main event monitoring class that captures and processes Celery events in real-time.

class Events(threading.Thread):
    """
    Real-time Celery event monitoring with persistent storage and metrics.
    
    Runs in a separate thread to capture events from Celery's event system,
    process them for state management, and optionally persist to disk.
    """
    
    def __init__(self, capp, io_loop, db=None, persistent=False,
                 enable_events=True, state_save_interval=0, **kwargs):
        """
        Initialize event monitoring system.
        
        Args:
            capp: Celery application instance
            io_loop: Tornado IOLoop for async operations
            db (str, optional): Database file path for persistence
            persistent (bool): Enable persistent storage (default: False)
            enable_events (bool): Auto-enable events on workers (default: True)
            state_save_interval (int): Save interval in seconds (default: 0)
            **kwargs: Additional configuration options
        """
    
    def start(self):
        """
        Start event monitoring thread.
        
        Begins the event capture loop and state management.
        """
    
    def stop(self):
        """
        Stop event monitoring thread.
        
        Gracefully shuts down event capture and saves final state.
        """
    
    def run(self):
        """
        Main event capture loop.
        
        Continuously captures events from Celery broker and processes them.
        This method runs in the event monitoring thread.
        """
    
    def save_state(self):
        """
        Save current state to persistent storage.
        
        Serializes worker and task state to the configured database file.
        """
    
    def on_enable_events(self):
        """
        Enable event monitoring on all workers.
        
        Sends enable_events control command to all active workers.
        """
    
    def on_event(self, event):
        """
        Process incoming Celery event.
        
        Args:
            event (dict): Celery event data
            
        Updates internal state and triggers any necessary notifications.
        """
    
    # State access properties
    @property
    def state(self):
        """EventsState instance containing all worker and task data."""
    
    @property
    def workers(self):
        """Dict of worker information keyed by worker name."""
    
    @property
    def tasks(self):
        """Dict of task information keyed by task UUID."""
    
    events_enable_interval = 5000  # Interval for enabling events on workers (ms)

Enhanced Events State

Extended state management class with metrics and improved event processing.

class EventsState(celery.events.state.State):
    """
    Enhanced Celery state management with metrics collection.
    
    Extends Celery's built-in State class to add Prometheus metrics
    and event counting capabilities.
    """
    
    def __init__(self, *args, **kwargs):
        """
        Initialize enhanced state management.
        
        Args:
            *args: Arguments passed to parent State class
            **kwargs: Keyword arguments passed to parent State class
            
        Creates internal event counters and metrics instance.
        """
    
    def event(self, event):
        """
        Process and store event with metrics collection.
        
        Args:
            event (dict): Celery event data
            
        Processes the event through the parent class, updates counters,
        and collects Prometheus metrics for various event types.
        
        Handles:
        - Task events: received, started, succeeded, failed, etc.
        - Worker events: online, offline, heartbeat
        - Metrics: runtime, prefetch time, worker status
        """
    
    # Enhanced attributes
    counter: collections.defaultdict  # Event counters per worker
    metrics: PrometheusMetrics       # Prometheus metrics instance

Prometheus Metrics

Comprehensive metrics collection for monitoring cluster performance and health.

class PrometheusMetrics:
    """
    Prometheus metrics collection for Celery cluster monitoring.
    
    Provides various metrics for tracking task execution, worker status,
    and system performance.
    """
    
    def __init__(self):
        """Initialize Prometheus metrics with proper labels and buckets."""
    
    # Core metrics (actual implementation from source code)
    events: PrometheusCounter = PrometheusCounter(
        'flower_events_total', 
        "Number of events", 
        ['worker', 'type', 'task']
    )
    
    runtime: Histogram = Histogram(
        'flower_task_runtime_seconds',
        "Task runtime",
        ['worker', 'task'],
        buckets=options.task_runtime_metric_buckets
    )
    
    prefetch_time: Gauge = Gauge(
        'flower_task_prefetch_time_seconds',
        "The time the task spent waiting at the celery worker to be executed.",
        ['worker', 'task']
    )
    
    number_of_prefetched_tasks: Gauge = Gauge(
        'flower_worker_prefetched_tasks',
        'Number of tasks of given type prefetched at a worker',
        ['worker', 'task']
    )
    
    worker_online: Gauge = Gauge(
        'flower_worker_online', 
        "Worker online status", 
        ['worker']
    )
    
    worker_number_of_currently_executing_tasks: Gauge = Gauge(
        'flower_worker_number_of_currently_executing_tasks',
        "Number of tasks currently executing at a worker",
        ['worker']
    )

def get_prometheus_metrics():
    """
    Get singleton PrometheusMetrics instance.
    
    Returns:
        PrometheusMetrics: Global metrics instance
        
    Creates the metrics instance on first call and returns the same
    instance on subsequent calls.
    """

Event Types

Task Events

Events related to task lifecycle and execution.

# Task event types
TASK_EVENTS = [
    'task-sent',        # Task was sent to broker
    'task-received',    # Worker received task
    'task-started',     # Worker started executing task
    'task-succeeded',   # Task completed successfully
    'task-failed',      # Task execution failed
    'task-retried',     # Task was retried
    'task-revoked',     # Task was revoked/cancelled
]

# Task event data structure
TaskEvent = {
    'type': str,           # Event type
    'uuid': str,           # Task UUID
    'name': str,           # Task name
    'hostname': str,       # Worker hostname
    'timestamp': float,    # Event timestamp
    'args': list,          # Task arguments (if available)
    'kwargs': dict,        # Task keyword arguments (if available)
    'retries': int,        # Number of retries
    'eta': str,           # Estimated time of arrival
    'expires': str,       # Expiration time
    'result': Any,        # Task result (for success events)
    'traceback': str,     # Error traceback (for failure events)
    'runtime': float,     # Execution time (for completion events)
}

Worker Events

Events related to worker status and lifecycle.

# Worker event types
WORKER_EVENTS = [
    'worker-online',     # Worker came online
    'worker-offline',    # Worker went offline  
    'worker-heartbeat',  # Worker heartbeat
]

# Worker event data structure
WorkerEvent = {
    'type': str,         # Event type
    'hostname': str,     # Worker hostname
    'timestamp': float,  # Event timestamp
    'active': int,       # Number of active tasks
    'processed': int,    # Total processed tasks
    'load': list,        # System load averages
    'freq': float,       # CPU frequency
    'sw_ident': str,     # Software identifier
    'sw_ver': str,       # Software version
    'sw_sys': str,       # System information
}

Usage Examples

Basic Event Monitoring

from flower.events import Events
from tornado.ioloop import IOLoop
import celery

# Create Celery app
celery_app = celery.Celery('myapp', broker='redis://localhost:6379')

# Create event monitor
io_loop = IOLoop.current()
events = Events(
    capp=celery_app,
    io_loop=io_loop,
    enable_events=True  # Auto-enable events on workers
)

# Start monitoring
events.start()

# Access state
print(f"Active workers: {len(events.workers)}")
print(f"Total tasks: {len(events.tasks)}")

# Stop monitoring
events.stop()

Persistent Event Storage

from flower.events import Events

# Enable persistence
events = Events(
    capp=celery_app,
    io_loop=io_loop,
    persistent=True,
    db='/var/lib/flower/events.db',
    state_save_interval=30  # Save every 30 seconds
)

events.start()

# State is automatically saved and restored

Memory Management

# Configure memory limits
events = Events(
    capp=celery_app,
    io_loop=io_loop,
    max_workers_in_memory=1000,   # Keep max 1000 workers
    max_tasks_in_memory=50000     # Keep max 50000 tasks
)

events.start()

Custom Event Processing

from flower.events import Events

class CustomEvents(Events):
    def on_event(self, event):
        # Custom event processing
        if event['type'] == 'task-failed':
            print(f"Task {event['uuid']} failed: {event.get('traceback')}")
        
        # Call parent processing
        super().on_event(event)

# Use custom event processor
events = CustomEvents(capp=celery_app, io_loop=io_loop)
events.start()

Metrics Integration

from flower.events import Events, PrometheusMetrics

# Events automatically collect Prometheus metrics
events = Events(capp=celery_app, io_loop=io_loop)
events.start()

# Access metrics through the events.state.metrics
metrics = events.state.metrics

# Metrics are available at /metrics endpoint when using Flower web interface

State Persistence

Database Format

Flower uses pickle serialization for state persistence:

# State file structure
{
    'workers': {
        'worker_name': {
            'hostname': str,
            'active': int,
            'processed': int,
            'load': list,
            'timestamp': float,
            # ... additional worker data
        }
    },
    'tasks': {
        'task_uuid': {
            'name': str,
            'state': str,
            'hostname': str,
            'timestamp': float,
            'args': list,
            'kwargs': dict,
            'result': Any,
            'runtime': float,
            # ... additional task data
        }
    }
}

Manual State Management

# Manually save state
events.save_state()

# Check if persistence is enabled
if events.persistent:
    print(f"State saved to: {events.db}")

# Load state on startup (automatic)
events = Events(capp=celery_app, io_loop=io_loop, persistent=True, db='state.db')
# Previous state is automatically loaded

Performance Considerations

Memory Usage

  • Configure appropriate limits for max_workers_in_memory and max_tasks_in_memory
  • Monitor memory usage through Prometheus metrics
  • Use persistent storage to avoid data loss on restart
  • Regularly clean up old task data

Event Processing

  • Event processing is asynchronous to avoid blocking
  • Large event volumes may require tuning of processing parameters
  • Consider using Redis broker for better event performance
  • Monitor event processing lag through metrics

Persistence Performance

  • State saving is performed in background to avoid blocking
  • Adjust state_save_interval based on data volume and requirements
  • Use SSDs for better I/O performance with persistence
  • Consider database storage for very large deployments

Install with Tessl CLI

npx tessl i tessl/pypi-flower

docs

application.md

authentication.md

broker.md

command-line.md

events.md

index.md

rest-api.md

tasks.md

utilities.md

web-interface.md

workers.md

tile.json