or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdauthentication.mdbroker.mdcommand-line.mdevents.mdindex.mdrest-api.mdtasks.mdutilities.mdweb-interface.mdworkers.md

rest-api.mddocs/

0

# REST API

1

2

Complete REST API providing programmatic access to all Flower functionality including worker control, task management, and monitoring operations.

3

4

## Capabilities

5

6

### Base API Handler

7

8

Foundation class for all REST API endpoints with authentication and error handling.

9

10

```python { .api }

11

class BaseApiHandler(BaseHandler):

12

"""

13

Base class for all REST API endpoints.

14

15

Provides authentication, error handling, and common functionality

16

for all API endpoints.

17

"""

18

19

def prepare(self):

20

"""

21

Prepare request with authentication check.

22

23

Validates authentication requirements before processing requests.

24

Requires FLOWER_UNAUTHENTICATED_API environment variable or

25

authentication to be configured.

26

"""

27

28

def write_error(self, status_code, **kwargs):

29

"""

30

Handle API error responses.

31

32

Args:

33

status_code (int): HTTP status code

34

**kwargs: Additional error context

35

36

Formats error responses appropriately for API consumption.

37

"""

38

39

class ControlHandler(BaseApiHandler):

40

"""

41

Base handler for worker and task control operations.

42

43

Extends BaseApiHandler with remote control capabilities

44

for managing workers and tasks through Celery's control interface.

45

"""

46

```

47

48

## Worker API Endpoints

49

50

### Worker Information

51

52

```python { .api }

53

# GET /api/workers

54

class ListWorkers(ControlHandler):

55

"""List and inspect all workers with optional refresh."""

56

57

async def get(self):

58

"""

59

Get worker information.

60

61

Query Parameters:

62

refresh (bool): Force worker info refresh

63

status (bool): Return only status information

64

workername (str): Filter by specific worker

65

66

Returns:

67

dict: Worker information keyed by worker name

68

"""

69

```

70

71

### Worker Control

72

73

```python { .api }

74

# POST /api/worker/shutdown/{workername}

75

class WorkerShutDown(ControlHandler):

76

"""Shutdown specific worker."""

77

78

# POST /api/worker/pool/restart/{workername}

79

class WorkerPoolRestart(ControlHandler):

80

"""Restart worker process pool."""

81

82

# POST /api/worker/pool/grow/{workername}?n=1

83

class WorkerPoolGrow(ControlHandler):

84

"""Increase worker pool size."""

85

86

# POST /api/worker/pool/shrink/{workername}?n=1

87

class WorkerPoolShrink(ControlHandler):

88

"""Decrease worker pool size."""

89

90

# POST /api/worker/pool/autoscale/{workername}?min=1&max=10

91

class WorkerPoolAutoscale(ControlHandler):

92

"""Configure worker pool autoscaling."""

93

94

# POST /api/worker/queue/add-consumer/{workername}?queue=queue_name

95

class WorkerQueueAddConsumer(ControlHandler):

96

"""Add queue consumer to worker."""

97

98

# POST /api/worker/queue/cancel-consumer/{workername}?queue=queue_name

99

class WorkerQueueCancelConsumer(ControlHandler):

100

"""Remove queue consumer from worker."""

101

```

102

103

## Task API Endpoints

104

105

### Task Information

106

107

```python { .api }

108

# GET /api/tasks

109

class ListTasks(BaseApiHandler):

110

"""

111

List tasks with filtering and pagination.

112

113

Query Parameters:

114

limit (int): Maximum tasks to return

115

offset (int): Tasks to skip (pagination)

116

sort_by (str): Sort field

117

workername (str): Filter by worker

118

taskname (str): Filter by task name

119

state (str): Filter by task state

120

received_start (str): Start date filter

121

received_end (str): End date filter

122

search (str): Search term

123

"""

124

125

# GET /api/task/types

126

class ListTaskTypes(BaseApiHandler):

127

"""List all available task types."""

128

129

# GET /api/task/info/{taskid}

130

class TaskInfo(BaseApiHandler):

131

"""Get detailed information for specific task."""

132

133

# GET /api/task/result/{taskid}?timeout=10

134

class TaskResult(BaseApiHandler):

135

"""Get task result with optional timeout."""

136

```

137

138

### Task Execution

139

140

```python { .api }

141

# POST /api/task/apply/{taskname}

142

class TaskApply(BaseTaskHandler):

143

"""

144

Execute task synchronously.

145

146

Request Body:

147

{

148

"args": [arg1, arg2],

149

"kwargs": {"key": "value"},

150

"queue": "queue_name",

151

"countdown": 10,

152

"eta": "2023-01-01T00:00:00"

153

}

154

"""

155

156

# POST /api/task/async-apply/{taskname}

157

class TaskAsyncApply(BaseTaskHandler):

158

"""Execute task asynchronously without waiting."""

159

160

# POST /api/task/send-task/{taskname}

161

class TaskSend(BaseTaskHandler):

162

"""Send task without requiring local task definition."""

163

164

# POST /api/task/abort/{taskid}

165

class TaskAbort(BaseTaskHandler):

166

"""Abort running task (requires AbortableTask)."""

167

```

168

169

### Task Control

170

171

```python { .api }

172

# POST /api/task/revoke/{taskid}?terminate=false&signal=SIGTERM

173

class TaskRevoke(ControlHandler):

174

"""Revoke task with optional termination."""

175

176

# POST /api/task/timeout/{taskname}

177

class TaskTimeout(ControlHandler):

178

"""

179

Set task timeout limits.

180

181

Form Data:

182

soft (float): Soft timeout in seconds

183

hard (float): Hard timeout in seconds

184

workername (str): Specific worker to apply

185

"""

186

187

# POST /api/task/rate-limit/{taskname}

188

class TaskRateLimit(ControlHandler):

189

"""

190

Set task rate limit.

191

192

Form Data:

193

ratelimit (str): Rate limit (e.g., '10/m', '1/s')

194

workername (str): Specific worker to apply

195

"""

196

```

197

198

### Queue Information

199

200

```python { .api }

201

# GET /api/queues/length

202

class GetQueueLengths(BaseApiHandler):

203

"""

204

Get message counts for all active queues.

205

206

Returns:

207

dict: Queue names mapped to message counts

208

"""

209

```

210

211

## Usage Examples

212

213

### Worker Management

214

215

```python

216

import requests

217

218

# List all workers

219

response = requests.get('http://localhost:5555/api/workers')

220

workers = response.json()

221

222

# Refresh worker information

223

response = requests.get('http://localhost:5555/api/workers?refresh=1')

224

225

# Shutdown worker

226

requests.post('http://localhost:5555/api/worker/shutdown/celery@worker1')

227

228

# Scale worker pool

229

requests.post('http://localhost:5555/api/worker/pool/grow/celery@worker1?n=2')

230

```

231

232

### Task Operations

233

234

```python

235

# List recent failed tasks

236

response = requests.get('http://localhost:5555/api/tasks?state=FAILURE&limit=10')

237

failed_tasks = response.json()

238

239

# Execute task

240

task_data = {

241

"args": [1, 2, 3],

242

"kwargs": {"timeout": 30},

243

"queue": "high_priority"

244

}

245

response = requests.post('http://localhost:5555/api/task/apply/my_task', json=task_data)

246

result = response.json()

247

248

# Get task result

249

response = requests.get(f'http://localhost:5555/api/task/result/{task_id}')

250

task_result = response.json()

251

252

# Revoke task

253

requests.post(f'http://localhost:5555/api/task/revoke/{task_id}?terminate=true')

254

```

255

256

### Authentication

257

258

```python

259

# With basic authentication

260

import requests

261

from requests.auth import HTTPBasicAuth

262

263

auth = HTTPBasicAuth('admin', 'password')

264

response = requests.get('http://localhost:5555/api/workers', auth=auth)

265

266

# With API key (if configured)

267

headers = {'Authorization': 'Bearer your-api-key'}

268

response = requests.get('http://localhost:5555/api/workers', headers=headers)

269

```

270

271

## Response Formats

272

273

### Worker Information Response

274

275

```json

276

{

277

"celery@worker1": {

278

"status": "online",

279

"active": 2,

280

"processed": 150,

281

"load": [0.5, 0.4, 0.3],

282

"registered": ["task1", "task2"],

283

"stats": {...},

284

"active_queues": [...]

285

}

286

}

287

```

288

289

### Task List Response

290

291

```json

292

{

293

"tasks": [

294

{

295

"uuid": "task-uuid",

296

"name": "my_task",

297

"state": "SUCCESS",

298

"received": 1640995200.0,

299

"started": 1640995201.0,

300

"runtime": 2.5,

301

"worker": "celery@worker1",

302

"args": [1, 2, 3],

303

"result": "task_result"

304

}

305

],

306

"total": 1000,

307

"offset": 0,

308

"limit": 10

309

}

310

```

311

312

### Error Response

313

314

```json

315

{

316

"error": "Task not found",

317

"status": 404

318

}

319

```

320

321

## Authentication Requirements

322

323

The REST API requires authentication unless `FLOWER_UNAUTHENTICATED_API=true` is set:

324

325

- **Basic Auth**: HTTP Basic Authentication

326

- **OAuth2**: Various OAuth2 providers (Google, GitHub, GitLab, Okta)

327

- **Environment Variable**: `FLOWER_UNAUTHENTICATED_API=true` to disable authentication

328

329

## Rate Limiting and Performance

330

331

- API endpoints are designed for moderate usage patterns

332

- Large task listings should use pagination (`limit`/`offset`)

333

- Worker operations may have higher latency due to broker communication

334

- Consider caching for frequently accessed data