Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete REST API providing programmatic access to all Flower functionality including worker control, task management, and monitoring operations.
Foundation class for all REST API endpoints with authentication and error handling.
class BaseApiHandler(BaseHandler):
"""
Base class for all REST API endpoints.
Provides authentication, error handling, and common functionality
for all API endpoints.
"""
def prepare(self):
"""
Prepare request with authentication check.
Validates authentication requirements before processing requests.
Requires FLOWER_UNAUTHENTICATED_API environment variable or
authentication to be configured.
"""
def write_error(self, status_code, **kwargs):
"""
Handle API error responses.
Args:
status_code (int): HTTP status code
**kwargs: Additional error context
Formats error responses appropriately for API consumption.
"""
class ControlHandler(BaseApiHandler):
"""
Base handler for worker and task control operations.
Extends BaseApiHandler with remote control capabilities
for managing workers and tasks through Celery's control interface.
"""# GET /api/workers
class ListWorkers(ControlHandler):
"""List and inspect all workers with optional refresh."""
async def get(self):
"""
Get worker information.
Query Parameters:
refresh (bool): Force worker info refresh
status (bool): Return only status information
workername (str): Filter by specific worker
Returns:
dict: Worker information keyed by worker name
"""# POST /api/worker/shutdown/{workername}
class WorkerShutDown(ControlHandler):
"""Shutdown specific worker."""
# POST /api/worker/pool/restart/{workername}
class WorkerPoolRestart(ControlHandler):
"""Restart worker process pool."""
# POST /api/worker/pool/grow/{workername}?n=1
class WorkerPoolGrow(ControlHandler):
"""Increase worker pool size."""
# POST /api/worker/pool/shrink/{workername}?n=1
class WorkerPoolShrink(ControlHandler):
"""Decrease worker pool size."""
# POST /api/worker/pool/autoscale/{workername}?min=1&max=10
class WorkerPoolAutoscale(ControlHandler):
"""Configure worker pool autoscaling."""
# POST /api/worker/queue/add-consumer/{workername}?queue=queue_name
class WorkerQueueAddConsumer(ControlHandler):
"""Add queue consumer to worker."""
# POST /api/worker/queue/cancel-consumer/{workername}?queue=queue_name
class WorkerQueueCancelConsumer(ControlHandler):
"""Remove queue consumer from worker."""# GET /api/tasks
class ListTasks(BaseApiHandler):
"""
List tasks with filtering and pagination.
Query Parameters:
limit (int): Maximum tasks to return
offset (int): Tasks to skip (pagination)
sort_by (str): Sort field
workername (str): Filter by worker
taskname (str): Filter by task name
state (str): Filter by task state
received_start (str): Start date filter
received_end (str): End date filter
search (str): Search term
"""
# GET /api/task/types
class ListTaskTypes(BaseApiHandler):
"""List all available task types."""
# GET /api/task/info/{taskid}
class TaskInfo(BaseApiHandler):
"""Get detailed information for specific task."""
# GET /api/task/result/{taskid}?timeout=10
class TaskResult(BaseApiHandler):
"""Get task result with optional timeout."""# POST /api/task/apply/{taskname}
class TaskApply(BaseTaskHandler):
"""
Execute task synchronously.
Request Body:
{
"args": [arg1, arg2],
"kwargs": {"key": "value"},
"queue": "queue_name",
"countdown": 10,
"eta": "2023-01-01T00:00:00"
}
"""
# POST /api/task/async-apply/{taskname}
class TaskAsyncApply(BaseTaskHandler):
"""Execute task asynchronously without waiting."""
# POST /api/task/send-task/{taskname}
class TaskSend(BaseTaskHandler):
"""Send task without requiring local task definition."""
# POST /api/task/abort/{taskid}
class TaskAbort(BaseTaskHandler):
"""Abort running task (requires AbortableTask)."""# POST /api/task/revoke/{taskid}?terminate=false&signal=SIGTERM
class TaskRevoke(ControlHandler):
"""Revoke task with optional termination."""
# POST /api/task/timeout/{taskname}
class TaskTimeout(ControlHandler):
"""
Set task timeout limits.
Form Data:
soft (float): Soft timeout in seconds
hard (float): Hard timeout in seconds
workername (str): Specific worker to apply
"""
# POST /api/task/rate-limit/{taskname}
class TaskRateLimit(ControlHandler):
"""
Set task rate limit.
Form Data:
ratelimit (str): Rate limit (e.g., '10/m', '1/s')
workername (str): Specific worker to apply
"""# GET /api/queues/length
class GetQueueLengths(BaseApiHandler):
"""
Get message counts for all active queues.
Returns:
dict: Queue names mapped to message counts
"""import requests
# List all workers
response = requests.get('http://localhost:5555/api/workers')
workers = response.json()
# Refresh worker information
response = requests.get('http://localhost:5555/api/workers?refresh=1')
# Shutdown worker
requests.post('http://localhost:5555/api/worker/shutdown/celery@worker1')
# Scale worker pool
requests.post('http://localhost:5555/api/worker/pool/grow/celery@worker1?n=2')# List recent failed tasks
response = requests.get('http://localhost:5555/api/tasks?state=FAILURE&limit=10')
failed_tasks = response.json()
# Execute task
task_data = {
"args": [1, 2, 3],
"kwargs": {"timeout": 30},
"queue": "high_priority"
}
response = requests.post('http://localhost:5555/api/task/apply/my_task', json=task_data)
result = response.json()
# Get task result
response = requests.get(f'http://localhost:5555/api/task/result/{task_id}')
task_result = response.json()
# Revoke task
requests.post(f'http://localhost:5555/api/task/revoke/{task_id}?terminate=true')# With basic authentication
import requests
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth('admin', 'password')
response = requests.get('http://localhost:5555/api/workers', auth=auth)
# With API key (if configured)
headers = {'Authorization': 'Bearer your-api-key'}
response = requests.get('http://localhost:5555/api/workers', headers=headers){
"celery@worker1": {
"status": "online",
"active": 2,
"processed": 150,
"load": [0.5, 0.4, 0.3],
"registered": ["task1", "task2"],
"stats": {...},
"active_queues": [...]
}
}{
"tasks": [
{
"uuid": "task-uuid",
"name": "my_task",
"state": "SUCCESS",
"received": 1640995200.0,
"started": 1640995201.0,
"runtime": 2.5,
"worker": "celery@worker1",
"args": [1, 2, 3],
"result": "task_result"
}
],
"total": 1000,
"offset": 0,
"limit": 10
}{
"error": "Task not found",
"status": 404
}The REST API requires authentication unless FLOWER_UNAUTHENTICATED_API=true is set:
FLOWER_UNAUTHENTICATED_API=true to disable authenticationlimit/offset)Install with Tessl CLI
npx tessl i tessl/pypi-flower