0
# REST API
1
2
Complete REST API providing programmatic access to all Flower functionality including worker control, task management, and monitoring operations.
3
4
## Capabilities
5
6
### Base API Handler
7
8
Foundation class for all REST API endpoints with authentication and error handling.
9
10
```python { .api }
11
class BaseApiHandler(BaseHandler):
12
"""
13
Base class for all REST API endpoints.
14
15
Provides authentication, error handling, and common functionality
16
for all API endpoints.
17
"""
18
19
def prepare(self):
20
"""
21
Prepare request with authentication check.
22
23
Validates authentication requirements before processing requests.
24
Requires FLOWER_UNAUTHENTICATED_API environment variable or
25
authentication to be configured.
26
"""
27
28
def write_error(self, status_code, **kwargs):
29
"""
30
Handle API error responses.
31
32
Args:
33
status_code (int): HTTP status code
34
**kwargs: Additional error context
35
36
Formats error responses appropriately for API consumption.
37
"""
38
39
class ControlHandler(BaseApiHandler):
40
"""
41
Base handler for worker and task control operations.
42
43
Extends BaseApiHandler with remote control capabilities
44
for managing workers and tasks through Celery's control interface.
45
"""
46
```
47
48
## Worker API Endpoints
49
50
### Worker Information
51
52
```python { .api }
53
# GET /api/workers
54
class ListWorkers(ControlHandler):
55
"""List and inspect all workers with optional refresh."""
56
57
async def get(self):
58
"""
59
Get worker information.
60
61
Query Parameters:
62
refresh (bool): Force worker info refresh
63
status (bool): Return only status information
64
workername (str): Filter by specific worker
65
66
Returns:
67
dict: Worker information keyed by worker name
68
"""
69
```
70
71
### Worker Control
72
73
```python { .api }
74
# POST /api/worker/shutdown/{workername}
75
class WorkerShutDown(ControlHandler):
76
"""Shutdown specific worker."""
77
78
# POST /api/worker/pool/restart/{workername}
79
class WorkerPoolRestart(ControlHandler):
80
"""Restart worker process pool."""
81
82
# POST /api/worker/pool/grow/{workername}?n=1
83
class WorkerPoolGrow(ControlHandler):
84
"""Increase worker pool size."""
85
86
# POST /api/worker/pool/shrink/{workername}?n=1
87
class WorkerPoolShrink(ControlHandler):
88
"""Decrease worker pool size."""
89
90
# POST /api/worker/pool/autoscale/{workername}?min=1&max=10
91
class WorkerPoolAutoscale(ControlHandler):
92
"""Configure worker pool autoscaling."""
93
94
# POST /api/worker/queue/add-consumer/{workername}?queue=queue_name
95
class WorkerQueueAddConsumer(ControlHandler):
96
"""Add queue consumer to worker."""
97
98
# POST /api/worker/queue/cancel-consumer/{workername}?queue=queue_name
99
class WorkerQueueCancelConsumer(ControlHandler):
100
"""Remove queue consumer from worker."""
101
```
102
103
## Task API Endpoints
104
105
### Task Information
106
107
```python { .api }
108
# GET /api/tasks
109
class ListTasks(BaseApiHandler):
110
"""
111
List tasks with filtering and pagination.
112
113
Query Parameters:
114
limit (int): Maximum tasks to return
115
offset (int): Tasks to skip (pagination)
116
sort_by (str): Sort field
117
workername (str): Filter by worker
118
taskname (str): Filter by task name
119
state (str): Filter by task state
120
received_start (str): Start date filter
121
received_end (str): End date filter
122
search (str): Search term
123
"""
124
125
# GET /api/task/types
126
class ListTaskTypes(BaseApiHandler):
127
"""List all available task types."""
128
129
# GET /api/task/info/{taskid}
130
class TaskInfo(BaseApiHandler):
131
"""Get detailed information for specific task."""
132
133
# GET /api/task/result/{taskid}?timeout=10
134
class TaskResult(BaseApiHandler):
135
"""Get task result with optional timeout."""
136
```
137
138
### Task Execution
139
140
```python { .api }
141
# POST /api/task/apply/{taskname}
142
class TaskApply(BaseTaskHandler):
143
"""
144
Execute task synchronously.
145
146
Request Body:
147
{
148
"args": [arg1, arg2],
149
"kwargs": {"key": "value"},
150
"queue": "queue_name",
151
"countdown": 10,
152
"eta": "2023-01-01T00:00:00"
153
}
154
"""
155
156
# POST /api/task/async-apply/{taskname}
157
class TaskAsyncApply(BaseTaskHandler):
158
"""Execute task asynchronously without waiting."""
159
160
# POST /api/task/send-task/{taskname}
161
class TaskSend(BaseTaskHandler):
162
"""Send task without requiring local task definition."""
163
164
# POST /api/task/abort/{taskid}
165
class TaskAbort(BaseTaskHandler):
166
"""Abort running task (requires AbortableTask)."""
167
```
168
169
### Task Control
170
171
```python { .api }
172
# POST /api/task/revoke/{taskid}?terminate=false&signal=SIGTERM
173
class TaskRevoke(ControlHandler):
174
"""Revoke task with optional termination."""
175
176
# POST /api/task/timeout/{taskname}
177
class TaskTimeout(ControlHandler):
178
"""
179
Set task timeout limits.
180
181
Form Data:
182
soft (float): Soft timeout in seconds
183
hard (float): Hard timeout in seconds
184
workername (str): Specific worker to apply
185
"""
186
187
# POST /api/task/rate-limit/{taskname}
188
class TaskRateLimit(ControlHandler):
189
"""
190
Set task rate limit.
191
192
Form Data:
193
ratelimit (str): Rate limit (e.g., '10/m', '1/s')
194
workername (str): Specific worker to apply
195
"""
196
```
197
198
### Queue Information
199
200
```python { .api }
201
# GET /api/queues/length
202
class GetQueueLengths(BaseApiHandler):
203
"""
204
Get message counts for all active queues.
205
206
Returns:
207
dict: Queue names mapped to message counts
208
"""
209
```
210
211
## Usage Examples
212
213
### Worker Management
214
215
```python
216
import requests
217
218
# List all workers
219
response = requests.get('http://localhost:5555/api/workers')
220
workers = response.json()
221
222
# Refresh worker information
223
response = requests.get('http://localhost:5555/api/workers?refresh=1')
224
225
# Shutdown worker
226
requests.post('http://localhost:5555/api/worker/shutdown/celery@worker1')
227
228
# Scale worker pool
229
requests.post('http://localhost:5555/api/worker/pool/grow/celery@worker1?n=2')
230
```
231
232
### Task Operations
233
234
```python
235
# List recent failed tasks
236
response = requests.get('http://localhost:5555/api/tasks?state=FAILURE&limit=10')
237
failed_tasks = response.json()
238
239
# Execute task
240
task_data = {
241
"args": [1, 2, 3],
242
"kwargs": {"timeout": 30},
243
"queue": "high_priority"
244
}
245
response = requests.post('http://localhost:5555/api/task/apply/my_task', json=task_data)
246
result = response.json()
247
248
# Get task result
249
response = requests.get(f'http://localhost:5555/api/task/result/{task_id}')
250
task_result = response.json()
251
252
# Revoke task
253
requests.post(f'http://localhost:5555/api/task/revoke/{task_id}?terminate=true')
254
```
255
256
### Authentication
257
258
```python
259
# With basic authentication
260
import requests
261
from requests.auth import HTTPBasicAuth
262
263
auth = HTTPBasicAuth('admin', 'password')
264
response = requests.get('http://localhost:5555/api/workers', auth=auth)
265
266
# With API key (if configured)
267
headers = {'Authorization': 'Bearer your-api-key'}
268
response = requests.get('http://localhost:5555/api/workers', headers=headers)
269
```
270
271
## Response Formats
272
273
### Worker Information Response
274
275
```json
276
{
277
"celery@worker1": {
278
"status": "online",
279
"active": 2,
280
"processed": 150,
281
"load": [0.5, 0.4, 0.3],
282
"registered": ["task1", "task2"],
283
"stats": {...},
284
"active_queues": [...]
285
}
286
}
287
```
288
289
### Task List Response
290
291
```json
292
{
293
"tasks": [
294
{
295
"uuid": "task-uuid",
296
"name": "my_task",
297
"state": "SUCCESS",
298
"received": 1640995200.0,
299
"started": 1640995201.0,
300
"runtime": 2.5,
301
"worker": "celery@worker1",
302
"args": [1, 2, 3],
303
"result": "task_result"
304
}
305
],
306
"total": 1000,
307
"offset": 0,
308
"limit": 10
309
}
310
```
311
312
### Error Response
313
314
```json
315
{
316
"error": "Task not found",
317
"status": 404
318
}
319
```
320
321
## Authentication Requirements
322
323
The REST API requires authentication unless `FLOWER_UNAUTHENTICATED_API=true` is set:
324
325
- **Basic Auth**: HTTP Basic Authentication
326
- **OAuth2**: Various OAuth2 providers (Google, GitHub, GitLab, Okta)
327
- **Environment Variable**: `FLOWER_UNAUTHENTICATED_API=true` to disable authentication
328
329
## Rate Limiting and Performance
330
331
- API endpoints are designed for moderate usage patterns
332
- Large task listings should use pagination (`limit`/`offset`)
333
- Worker operations may have higher latency due to broker communication
334
- Consider caching for frequently accessed data