Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Web-based user interface providing interactive monitoring and management capabilities with authentication support and comprehensive dashboard views.
Foundation class for all web interface handlers with authentication and template rendering.
class BaseHandler(tornado.web.RequestHandler):
"""
Base class for all web view handlers.
Provides authentication, template rendering, and common functionality
for web interface pages.
"""
def render(self, *args, **kwargs):
"""
Render template with Flower-specific context.
Args:
*args: Template name and context arguments
**kwargs: Template context variables
Adds common context variables like current user, application state,
and configuration options to all rendered templates.
"""
def get_current_user(self):
"""
Get current authenticated user.
Returns:
str or None: User identifier if authenticated, None otherwise
Handles both Basic Auth and OAuth2 authentication methods.
"""
def get_argument(self, name, default=None, strip=True, type=str):
"""
Enhanced argument parsing with type conversion.
Args:
name (str): Argument name
default: Default value if not present
strip (bool): Strip whitespace from string values
type: Type to convert to (str, int, float, bool)
Returns:
Parsed and converted argument value
"""
def format_task(self, task):
"""
Apply custom task formatting for display.
Args:
task (dict): Task object to format
Returns:
dict: Formatted task with display-friendly values
"""
def get_active_queue_names(self):
"""
Get list of active queue names across all workers.
Returns:
list: Queue names currently being consumed
"""
@property
def capp(self):
"""Access to Celery application instance."""# GET / and GET /workers
class WorkersView(BaseHandler):
"""
Main workers dashboard showing cluster overview.
Query Parameters:
refresh (bool): Force worker information refresh
json (bool): Return JSON response instead of HTML
Displays worker status, active tasks, load information,
and provides controls for worker management.
"""
# GET /worker/{name}
class WorkerView(BaseHandler):
"""
Individual worker details page.
Shows detailed information for a specific worker including:
- Active and processed task counts
- System load and resource usage
- Registered tasks and active queues
- Process pool information
"""# GET /tasks
class TasksView(BaseHandler):
"""
Tasks dashboard with filtering and search capabilities.
Provides interactive task management with:
- Real-time task status updates
- Advanced filtering by state, worker, time range
- Search functionality across task content
- Pagination for large task sets
"""
# GET /task/{task_id}
class TaskView(BaseHandler):
"""
Individual task details page.
Shows complete task information including:
- Execution timeline and status
- Arguments and result data
- Error traceback if failed
- Worker and routing information
"""
# GET /tasks/datatable
class TasksDataTable(BaseHandler):
"""
DataTable-compatible task data endpoint.
Query Parameters:
draw (int): DataTable draw counter
start (int): Starting record number
length (int): Number of records to return
search[value] (str): Search term
order[0][column] (int): Sort column index
order[0][dir] (str): Sort direction ('asc' or 'desc')
Returns JSON data formatted for DataTables JavaScript library.
"""# GET /broker
class BrokerView(BaseHandler):
"""
Broker information and queue status page.
Displays:
- Queue lengths and message counts
- Exchange information
- Broker connection status
- Queue consumer details
"""# GET /login
class LoginHandler:
"""
Dynamic login handler factory.
Creates appropriate login handler based on configured
authentication provider (Google, GitHub, GitLab, Okta).
"""
class GoogleAuth2LoginHandler(BaseHandler):
"""Google OAuth2 authentication handler."""
def get(self):
"""Handle OAuth2 authentication flow."""
def _on_auth(self, user):
"""Process authentication result."""
class GithubLoginHandler(BaseHandler):
"""GitHub OAuth2 authentication handler."""
class GitLabLoginHandler(BaseHandler):
"""GitLab OAuth2 authentication handler."""
class OktaLoginHandler(BaseHandler):
"""Okta OAuth2 authentication handler."""# GET /metrics
class Metrics(BaseHandler):
"""
Prometheus metrics endpoint.
Returns:
text/plain: Prometheus metrics in text format
Provides comprehensive metrics for:
- Task execution statistics
- Worker status and performance
- Queue lengths and throughput
- System resource usage
"""
# GET /healthcheck
class Healthcheck(BaseHandler):
"""
Health check endpoint.
Returns:
text/plain: "OK" if service is healthy
Used for load balancer health checks and monitoring.
"""All web interface templates receive common context variables:
template_context = {
'current_user': str, # Authenticated user identifier
'capp': celery.Celery, # Celery application instance
'events': Events, # Events monitoring instance
'broker_url': str, # Broker connection URL
'flower_version': str, # Flower version string
'celery_version': str, # Celery version string
'options': OptionsNamespace, # Configuration options
'url_prefix': str, # URL prefix for reverse proxy
'static_url': callable, # Static file URL generator
'reverse_url': callable, # URL reverse lookup
}from flower.views import BaseHandler
class CustomView(BaseHandler):
def get(self):
# Add custom context
context = {
'custom_data': self.get_custom_data(),
'page_title': 'Custom Dashboard'
}
self.render('custom_template.html', **context)
def get_custom_data(self):
# Custom data processing
return {'key': 'value'}class ProtectedView(BaseHandler):
@tornado.web.authenticated
def get(self):
user = self.get_current_user()
if not self.is_authorized(user):
raise tornado.web.HTTPError(403)
self.render('protected.html', user=user)
def is_authorized(self, user):
# Custom authorization logic
return user.endswith('@company.com')The web interface supports real-time updates through:
Flower includes comprehensive static assets for the web interface:
# Default 404 handler
class NotFoundErrorHandler(BaseHandler):
"""Handle 404 errors with custom error page."""
def get(self):
"""Raise 404 error with custom template."""
def post(self):
"""Handle POST requests to invalid URLs."""The web interface is fully responsive and mobile-friendly:
# Override default templates
TEMPLATE_PATH = '/path/to/custom/templates'
flower_app = Flower(template_path=TEMPLATE_PATH)
# Custom template structure
templates/
├── base.html # Base template
├── workers.html # Workers dashboard
├── tasks.html # Tasks dashboard
├── task.html # Task details
├── broker.html # Broker information
└── login.html # Login page# Custom static files
STATIC_PATH = '/path/to/custom/static'
flower_app = Flower(static_path=STATIC_PATH)
# Custom CSS
static/
├── css/
│ └── custom.css # Additional styles
├── js/
│ └── custom.js # Additional JavaScript
└── images/
└── logo.png # Custom logoInstall with Tessl CLI
npx tessl i tessl/pypi-flower