Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
npx @tessl/cli install tessl/pypi-flower@2.0.0Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management. Flower provides comprehensive visibility into distributed task queues, enabling effective monitoring of worker status, task execution, and cluster operations through both REST API endpoints and a rich web interface.
pip install flowerfrom flower.command import flower
from flower.app import FlowerFor Celery integration:
from celery.bin.celery import main, celery
from flower.command import flower
# Add flower command to celery
celery.add_command(flower)Run Flower as a Celery command:
celery flowerWith options:
celery flower --port=5555 --broker=redis://localhost:6379from flower.app import Flower
from flower.options import default_options
import celery
# Create Celery app
capp = celery.Celery('myapp', broker='redis://localhost:6379')
# Create Flower app
flower_app = Flower(capp=capp, options=default_options)
# Start monitoring
flower_app.start()Flower follows a multi-layer architecture built on Tornado web framework:
Core application lifecycle management including server startup, configuration, and shutdown.
class Flower(tornado.web.Application):
def __init__(self, options=None, capp=None, events=None, io_loop=None, **kwargs): ...
def start(self): ...
def stop(self): ...
def update_workers(self, workername=None): ...
@property
def transport(self): ...
@property
def workers(self): ...Command-line interface for starting and configuring Flower with extensive configuration options.
@click.command(cls=CeleryCommand)
def flower(ctx, tornado_argv): ...
def apply_env_options(): ...
def apply_options(prog_name, argv): ...
def setup_logging(): ...
def extract_settings(): ...Real-time event processing with persistent storage, state management, and Prometheus metrics collection.
class Events(threading.Thread):
def __init__(self, capp, io_loop, db=None, persistent=False, enable_events=False, **kwargs): ...
def start(self): ...
def stop(self): ...
def run(self): ...
def save_state(self): ...
def on_enable_events(self): ...
def on_event(self, event): ...
class PrometheusMetrics:
events: Counter
runtime: Histogram
prefetch_time: Gauge
worker_online: GaugeWorker inspection, status monitoring, and remote control operations.
class Inspector:
def __init__(self, io_loop, capp, timeout): ...
def inspect(self, workername=None): ...
@property
def workers(self): ...
methods: List[str] # Available inspection methodsComplete task lifecycle management including execution, monitoring, filtering, and control operations.
def iter_tasks(events, limit=None, offset=None, type=None, worker=None,
state=None, sort_by=None, received_start=None, received_end=None,
started_start=None, started_end=None, search=None): ...
def sort_tasks(tasks, sort_by): ...
def get_task_by_id(events, task_id): ...
def as_dict(task): ...Complete REST API providing programmatic access to all Flower functionality including worker control, task management, and monitoring.
class BaseApiHandler(BaseHandler):
def prepare(self): ...
def write_error(self, status_code, **kwargs): ...API Endpoints:
/api/workers, /api/worker/*/api/tasks, /api/task/*Web-based user interface providing interactive monitoring and management capabilities with authentication support.
class BaseHandler(tornado.web.RequestHandler):
def render(self, *args, **kwargs): ...
def get_current_user(self): ...
def get_argument(self, name, default=None, strip=True, type=str): ...
def format_task(self, task): ...
def get_active_queue_names(self): ...
@property
def capp(self): ...Multiple authentication methods including Basic Auth and OAuth2 integration with various providers.
def authenticate(pattern, email): ...
def validate_auth_option(pattern): ...
class GoogleAuth2LoginHandler: ...
class GithubLoginHandler: ...
class GitLabLoginHandler: ...
class OktaLoginHandler: ...Multi-broker support for monitoring message queues across different broker types.
class Broker:
@staticmethod
def queues(names): ...
class BrokerBase:
def __init__(self, broker_url, *_, **__): ...
def queues(self, names): ...
class RabbitMQ(BrokerBase): ...
class Redis(BrokerBase): ...
class RedisSsl(Redis): ...
class RedisSocket(Redis): ...
class RedisSentinel(Redis): ...Utility functions for search, template formatting, and general operations.
def parse_search_terms(raw_search_value): ...
def satisfies_search_terms(task, search_terms): ...
def format_time(time, tz): ...
def humanize(obj, type, length): ...
def gen_cookie_secret(): ...
def abs_path(path): ...
def strtobool(val): ...Flower provides extensive configuration through command-line options, environment variables, and configuration files. Key options include:
Flower includes comprehensive error handling for:
# Configuration types
OptionsNamespace = tornado.options.OptionParser
# Event types (from Celery)
TaskEvent = Dict[str, Any]
WorkerEvent = Dict[str, Any]
# Task state types
TaskState = Literal['PENDING', 'STARTED', 'SUCCESS', 'FAILURE', 'RETRY', 'REVOKED']
# Authentication types
AuthProvider = Literal['google', 'github', 'gitlab', 'okta']