CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flower

Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

rest-api.mddocs/

REST API

Complete REST API providing programmatic access to all Flower functionality including worker control, task management, and monitoring operations.

Capabilities

Base API Handler

Foundation class for all REST API endpoints with authentication and error handling.

class BaseApiHandler(BaseHandler):
    """
    Base class for all REST API endpoints.
    
    Provides authentication, error handling, and common functionality
    for all API endpoints.
    """
    
    def prepare(self):
        """
        Prepare request with authentication check.
        
        Validates authentication requirements before processing requests.
        Requires FLOWER_UNAUTHENTICATED_API environment variable or
        authentication to be configured.
        """
    
    def write_error(self, status_code, **kwargs):
        """
        Handle API error responses.
        
        Args:
            status_code (int): HTTP status code
            **kwargs: Additional error context
            
        Formats error responses appropriately for API consumption.
        """

class ControlHandler(BaseApiHandler):
    """
    Base handler for worker and task control operations.
    
    Extends BaseApiHandler with remote control capabilities
    for managing workers and tasks through Celery's control interface.
    """

Worker API Endpoints

Worker Information

# GET /api/workers
class ListWorkers(ControlHandler):
    """List and inspect all workers with optional refresh."""
    
    async def get(self):
        """
        Get worker information.
        
        Query Parameters:
            refresh (bool): Force worker info refresh
            status (bool): Return only status information
            workername (str): Filter by specific worker
            
        Returns:
            dict: Worker information keyed by worker name
        """

Worker Control

# POST /api/worker/shutdown/{workername}
class WorkerShutDown(ControlHandler):
    """Shutdown specific worker."""

# POST /api/worker/pool/restart/{workername}  
class WorkerPoolRestart(ControlHandler):
    """Restart worker process pool."""

# POST /api/worker/pool/grow/{workername}?n=1
class WorkerPoolGrow(ControlHandler):
    """Increase worker pool size."""

# POST /api/worker/pool/shrink/{workername}?n=1
class WorkerPoolShrink(ControlHandler):
    """Decrease worker pool size."""

# POST /api/worker/pool/autoscale/{workername}?min=1&max=10
class WorkerPoolAutoscale(ControlHandler):
    """Configure worker pool autoscaling."""

# POST /api/worker/queue/add-consumer/{workername}?queue=queue_name
class WorkerQueueAddConsumer(ControlHandler):
    """Add queue consumer to worker."""

# POST /api/worker/queue/cancel-consumer/{workername}?queue=queue_name
class WorkerQueueCancelConsumer(ControlHandler):
    """Remove queue consumer from worker."""

Task API Endpoints

Task Information

# GET /api/tasks
class ListTasks(BaseApiHandler):
    """
    List tasks with filtering and pagination.
    
    Query Parameters:
        limit (int): Maximum tasks to return
        offset (int): Tasks to skip (pagination)
        sort_by (str): Sort field
        workername (str): Filter by worker
        taskname (str): Filter by task name
        state (str): Filter by task state
        received_start (str): Start date filter
        received_end (str): End date filter
        search (str): Search term
    """

# GET /api/task/types
class ListTaskTypes(BaseApiHandler):
    """List all available task types."""

# GET /api/task/info/{taskid}
class TaskInfo(BaseApiHandler):
    """Get detailed information for specific task."""

# GET /api/task/result/{taskid}?timeout=10
class TaskResult(BaseApiHandler):
    """Get task result with optional timeout."""

Task Execution

# POST /api/task/apply/{taskname}
class TaskApply(BaseTaskHandler):
    """
    Execute task synchronously.
    
    Request Body:
        {
            "args": [arg1, arg2],
            "kwargs": {"key": "value"},
            "queue": "queue_name",
            "countdown": 10,
            "eta": "2023-01-01T00:00:00"
        }
    """

# POST /api/task/async-apply/{taskname}
class TaskAsyncApply(BaseTaskHandler):
    """Execute task asynchronously without waiting."""

# POST /api/task/send-task/{taskname}
class TaskSend(BaseTaskHandler):
    """Send task without requiring local task definition."""

# POST /api/task/abort/{taskid}
class TaskAbort(BaseTaskHandler):
    """Abort running task (requires AbortableTask)."""

Task Control

# POST /api/task/revoke/{taskid}?terminate=false&signal=SIGTERM
class TaskRevoke(ControlHandler):
    """Revoke task with optional termination."""

# POST /api/task/timeout/{taskname}
class TaskTimeout(ControlHandler):
    """
    Set task timeout limits.
    
    Form Data:
        soft (float): Soft timeout in seconds
        hard (float): Hard timeout in seconds
        workername (str): Specific worker to apply
    """

# POST /api/task/rate-limit/{taskname}
class TaskRateLimit(ControlHandler):
    """
    Set task rate limit.
    
    Form Data:
        ratelimit (str): Rate limit (e.g., '10/m', '1/s')
        workername (str): Specific worker to apply
    """

Queue Information

# GET /api/queues/length
class GetQueueLengths(BaseApiHandler):
    """
    Get message counts for all active queues.
    
    Returns:
        dict: Queue names mapped to message counts
    """

Usage Examples

Worker Management

import requests

# List all workers
response = requests.get('http://localhost:5555/api/workers')
workers = response.json()

# Refresh worker information
response = requests.get('http://localhost:5555/api/workers?refresh=1')

# Shutdown worker
requests.post('http://localhost:5555/api/worker/shutdown/celery@worker1')

# Scale worker pool
requests.post('http://localhost:5555/api/worker/pool/grow/celery@worker1?n=2')

Task Operations

# List recent failed tasks
response = requests.get('http://localhost:5555/api/tasks?state=FAILURE&limit=10')
failed_tasks = response.json()

# Execute task
task_data = {
    "args": [1, 2, 3],
    "kwargs": {"timeout": 30},
    "queue": "high_priority"
}
response = requests.post('http://localhost:5555/api/task/apply/my_task', json=task_data)
result = response.json()

# Get task result
response = requests.get(f'http://localhost:5555/api/task/result/{task_id}')
task_result = response.json()

# Revoke task
requests.post(f'http://localhost:5555/api/task/revoke/{task_id}?terminate=true')

Authentication

# With basic authentication
import requests
from requests.auth import HTTPBasicAuth

auth = HTTPBasicAuth('admin', 'password')
response = requests.get('http://localhost:5555/api/workers', auth=auth)

# With API key (if configured)
headers = {'Authorization': 'Bearer your-api-key'}
response = requests.get('http://localhost:5555/api/workers', headers=headers)

Response Formats

Worker Information Response

{
  "celery@worker1": {
    "status": "online",
    "active": 2,
    "processed": 150,
    "load": [0.5, 0.4, 0.3],
    "registered": ["task1", "task2"],
    "stats": {...},
    "active_queues": [...]
  }
}

Task List Response

{
  "tasks": [
    {
      "uuid": "task-uuid",
      "name": "my_task",
      "state": "SUCCESS",
      "received": 1640995200.0,
      "started": 1640995201.0,
      "runtime": 2.5,
      "worker": "celery@worker1",
      "args": [1, 2, 3],
      "result": "task_result"
    }
  ],
  "total": 1000,
  "offset": 0,
  "limit": 10
}

Error Response

{
  "error": "Task not found",
  "status": 404
}

Authentication Requirements

The REST API requires authentication unless FLOWER_UNAUTHENTICATED_API=true is set:

  • Basic Auth: HTTP Basic Authentication
  • OAuth2: Various OAuth2 providers (Google, GitHub, GitLab, Okta)
  • Environment Variable: FLOWER_UNAUTHENTICATED_API=true to disable authentication

Rate Limiting and Performance

  • API endpoints are designed for moderate usage patterns
  • Large task listings should use pagination (limit/offset)
  • Worker operations may have higher latency due to broker communication
  • Consider caching for frequently accessed data

Install with Tessl CLI

npx tessl i tessl/pypi-flower

docs

application.md

authentication.md

broker.md

command-line.md

events.md

index.md

rest-api.md

tasks.md

utilities.md

web-interface.md

workers.md

tile.json