or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdauthentication.mdbroker.mdcommand-line.mdevents.mdindex.mdrest-api.mdtasks.mdutilities.mdweb-interface.mdworkers.md

workers.mddocs/

0

# Worker Management

1

2

Worker inspection, status monitoring, and remote control operations for managing Celery worker processes across the cluster.

3

4

## Capabilities

5

6

### Inspector Class

7

8

Core worker inspection and management functionality using Celery's remote control system.

9

10

```python { .api }

11

class Inspector:

12

"""

13

Celery cluster inspection and worker management.

14

15

Provides asynchronous worker inspection and remote control capabilities

16

using Celery's built-in management commands.

17

"""

18

19

def __init__(self, io_loop, capp, timeout):

20

"""

21

Initialize worker inspector.

22

23

Args:

24

io_loop: Tornado IOLoop for async operations

25

capp: Celery application instance

26

timeout (float): Inspection timeout in seconds

27

"""

28

29

def inspect(self, workername=None):

30

"""

31

Inspect workers asynchronously.

32

33

Args:

34

workername (str, optional): Specific worker name, or None for all workers

35

36

Returns:

37

list: List of futures for inspection results

38

39

Performs inspection of worker status, active tasks, registered tasks,

40

and other worker information using Celery's inspection system.

41

"""

42

43

@property

44

def workers(self):

45

"""

46

Get current worker information dictionary.

47

48

Returns:

49

dict: Worker information keyed by worker name, containing:

50

- status: Worker online/offline status

51

- active: Number of active tasks

52

- processed: Total processed tasks

53

- load: System load information

54

- heartbeats: Last heartbeat timestamp

55

- registered: List of registered task names

56

- stats: Worker statistics

57

- active_queues: Active queue information

58

"""

59

60

# Internal methods for inspection

61

def _inspect(self, method, workername):

62

"""

63

Internal method to perform actual worker inspection.

64

65

Args:

66

method (str): Inspection method to execute

67

workername (str): Target worker name or None for all workers

68

69

Executes the specified inspection method using Celery's control interface

70

and handles the response processing.

71

"""

72

73

def _on_update(self, workername, method, response):

74

"""

75

Handle inspection response and update worker information.

76

77

Args:

78

workername (str): Worker name that responded

79

method (str): Inspection method that was executed

80

response (dict): Response data from the worker

81

82

Updates the internal workers dictionary with the latest information.

83

"""

84

85

# Available inspection methods (from source code)

86

methods = ('stats', 'active_queues', 'registered', 'scheduled',

87

'active', 'reserved', 'revoked', 'conf')

88

89

@property

90

def workers(self):

91

"""

92

Worker information dictionary.

93

94

Returns:

95

collections.defaultdict: Worker data keyed by worker name

96

97

Structure:

98

{

99

'worker_name': {

100

'stats': dict, # Worker statistics

101

'active_queues': list, # Active queue names

102

'registered': list, # Registered task names

103

'scheduled': list, # Scheduled tasks

104

'active': list, # Currently executing tasks

105

'reserved': list, # Reserved tasks

106

'revoked': set, # Revoked task IDs

107

'conf': dict, # Worker configuration

108

'timestamp': float, # Last update timestamp

109

}

110

}

111

"""

112

```

113

114

### Worker Control Operations

115

116

Remote control operations for managing worker processes and their behavior.

117

118

```python { .api }

119

# Worker lifecycle control

120

def worker_shutdown(workername):

121

"""

122

Shutdown a specific worker.

123

124

Args:

125

workername (str): Name of worker to shutdown

126

127

Sends shutdown signal to the specified worker process.

128

"""

129

130

def worker_pool_restart(workername):

131

"""

132

Restart worker's process pool.

133

134

Args:

135

workername (str): Name of worker to restart pool

136

137

Restarts the worker's process pool without shutting down the worker.

138

"""

139

140

def worker_pool_grow(workername, n=1):

141

"""

142

Increase worker pool size.

143

144

Args:

145

workername (str): Name of worker to modify

146

n (int): Number of processes to add (default: 1)

147

148

Dynamically increases the worker's process pool size.

149

"""

150

151

def worker_pool_shrink(workername, n=1):

152

"""

153

Decrease worker pool size.

154

155

Args:

156

workername (str): Name of worker to modify

157

n (int): Number of processes to remove (default: 1)

158

159

Dynamically decreases the worker's process pool size.

160

"""

161

162

def worker_pool_autoscale(workername, min_workers, max_workers):

163

"""

164

Configure worker pool autoscaling.

165

166

Args:

167

workername (str): Name of worker to configure

168

min_workers (int): Minimum number of worker processes

169

max_workers (int): Maximum number of worker processes

170

171

Sets autoscaling parameters for the worker's process pool.

172

"""

173

```

174

175

### Queue Management

176

177

Operations for managing worker queue consumption and routing.

178

179

```python { .api }

180

def worker_queue_add_consumer(workername, queue_name):

181

"""

182

Add queue consumer to worker.

183

184

Args:

185

workername (str): Name of worker to modify

186

queue_name (str): Name of queue to start consuming

187

188

Instructs the worker to start consuming from the specified queue.

189

"""

190

191

def worker_queue_cancel_consumer(workername, queue_name):

192

"""

193

Remove queue consumer from worker.

194

195

Args:

196

workername (str): Name of worker to modify

197

queue_name (str): Name of queue to stop consuming

198

199

Instructs the worker to stop consuming from the specified queue.

200

"""

201

202

def get_active_queue_names():

203

"""

204

Get list of all active queue names across the cluster.

205

206

Returns:

207

list: Names of all queues being consumed by workers

208

"""

209

```

210

211

## Worker Information Structure

212

213

### Worker Status Data

214

215

Comprehensive worker information structure returned by inspection operations.

216

217

```python { .api }

218

WorkerInfo = {

219

# Basic identification

220

'hostname': str, # Worker hostname/identifier

221

'status': str, # 'online' or 'offline'

222

'timestamp': float, # Last update timestamp

223

224

# Task statistics

225

'active': int, # Number of currently active tasks

226

'processed': int, # Total number of processed tasks

227

'load': [float, float, float], # System load averages (1m, 5m, 15m)

228

229

# Process information

230

'pool': {

231

'max-concurrency': int, # Maximum concurrent tasks

232

'processes': [int], # Process IDs in pool

233

'max-tasks-per-child': int, # Max tasks per child process

234

'put-guarded-by-semaphore': bool,

235

'timeouts': [float, float], # Soft and hard timeouts

236

'writes': {

237

'total': int, # Total writes

238

'avg': float, # Average write time

239

'all': str, # Write time details

240

}

241

},

242

243

# System information

244

'rusage': {

245

'utime': float, # User CPU time

246

'stime': float, # System CPU time

247

'maxrss': int, # Maximum resident set size

248

'ixrss': int, # Integral shared memory size

249

'idrss': int, # Integral unshared data size

250

'isrss': int, # Integral unshared stack size

251

'minflt': int, # Page reclaims

252

'majflt': int, # Page faults

253

'nswap': int, # Swaps

254

'inblock': int, # Block input operations

255

'oublock': int, # Block output operations

256

'msgsnd': int, # Messages sent

257

'msgrcv': int, # Messages received

258

'nsignals': int, # Signals received

259

'nvcsw': int, # Voluntary context switches

260

'nivcsw': int, # Involuntary context switches

261

},

262

263

# Registered tasks

264

'registered': [str], # List of registered task names

265

266

# Active queues

267

'active_queues': [

268

{

269

'name': str, # Queue name

270

'exchange': {

271

'name': str, # Exchange name

272

'type': str, # Exchange type

273

'durable': bool, # Exchange durability

274

'auto_delete': bool, # Auto-delete setting

275

'arguments': dict, # Exchange arguments

276

},

277

'routing_key': str, # Routing key

278

'durable': bool, # Queue durability

279

'exclusive': bool, # Queue exclusivity

280

'auto_delete': bool, # Auto-delete setting

281

'no_ack': bool, # No acknowledgment setting

282

'alias': str, # Queue alias

283

'bindings': [dict], # Queue bindings

284

'no_declare': bool, # No declaration flag

285

'expires': int, # Queue expiration

286

'message_ttl': int, # Message TTL

287

'max_length': int, # Maximum queue length

288

'max_length_bytes': int, # Maximum queue size in bytes

289

'max_priority': int, # Maximum message priority

290

}

291

],

292

293

# Clock information

294

'clock': int, # Logical clock value

295

296

# Software information

297

'sw_ident': str, # Software identifier

298

'sw_ver': str, # Software version

299

'sw_sys': str, # System information

300

}

301

```

302

303

### Active Task Information

304

305

Structure for active task information returned by worker inspection.

306

307

```python { .api }

308

ActiveTask = {

309

'id': str, # Task UUID

310

'name': str, # Task name

311

'args': list, # Task arguments

312

'kwargs': dict, # Task keyword arguments

313

'type': str, # Task type

314

'hostname': str, # Worker hostname

315

'time_start': float, # Task start timestamp

316

'acknowledged': bool, # Task acknowledgment status

317

'delivery_info': {

318

'exchange': str, # Exchange name

319

'routing_key': str, # Routing key

320

'priority': int, # Message priority

321

'redelivered': bool, # Redelivery flag

322

},

323

'worker_pid': int, # Worker process ID

324

}

325

```

326

327

## Usage Examples

328

329

### Basic Worker Inspection

330

331

```python

332

from flower.inspector import Inspector

333

from tornado.ioloop import IOLoop

334

import celery

335

336

# Create inspector

337

celery_app = celery.Celery('myapp', broker='redis://localhost:6379')

338

io_loop = IOLoop.current()

339

inspector = Inspector(io_loop, celery_app, timeout=10.0)

340

341

# Inspect all workers

342

async def inspect_workers():

343

result = await inspector.inspect()

344

print(f"Found {len(result)} workers")

345

346

for worker_name, worker_info in result.items():

347

print(f"Worker: {worker_name}")

348

print(f" Status: {worker_info.get('status', 'unknown')}")

349

print(f" Active tasks: {worker_info.get('active', 0)}")

350

print(f" Processed: {worker_info.get('processed', 0)}")

351

352

# Run inspection

353

io_loop.run_sync(inspect_workers)

354

```

355

356

### Inspect Specific Worker

357

358

```python

359

# Inspect single worker

360

async def inspect_single_worker():

361

result = await inspector.inspect(workername='celery@worker1')

362

363

if result:

364

worker_info = result['celery@worker1']

365

print(f"Worker celery@worker1:")

366

print(f" Registered tasks: {worker_info.get('registered', [])}")

367

print(f" Active queues: {len(worker_info.get('active_queues', []))}")

368

print(f" Load: {worker_info.get('load', [])}")

369

370

io_loop.run_sync(inspect_single_worker)

371

```

372

373

### Worker Control Operations

374

375

```python

376

from flower.api.control import (

377

WorkerShutDown, WorkerPoolRestart, WorkerPoolGrow,

378

WorkerPoolShrink, WorkerPoolAutoscale,

379

WorkerQueueAddConsumer, WorkerQueueCancelConsumer

380

)

381

382

# Shutdown worker

383

async def shutdown_worker():

384

handler = WorkerShutDown()

385

await handler.post('celery@worker1')

386

387

# Restart worker pool

388

async def restart_pool():

389

handler = WorkerPoolRestart()

390

await handler.post('celery@worker1')

391

392

# Grow worker pool

393

async def grow_pool():

394

handler = WorkerPoolGrow()

395

await handler.post('celery@worker1', n=2) # Add 2 processes

396

397

# Configure autoscaling

398

async def configure_autoscale():

399

handler = WorkerPoolAutoscale()

400

await handler.post('celery@worker1', min=2, max=10)

401

```

402

403

### Queue Consumer Management

404

405

```python

406

# Add queue consumer

407

async def add_queue_consumer():

408

handler = WorkerQueueAddConsumer()

409

await handler.post('celery@worker1', queue='high_priority')

410

411

# Remove queue consumer

412

async def remove_queue_consumer():

413

handler = WorkerQueueCancelConsumer()

414

await handler.post('celery@worker1', queue='low_priority')

415

```

416

417

### Real-time Worker Monitoring

418

419

```python

420

import asyncio

421

from flower.inspector import Inspector

422

423

class WorkerMonitor:

424

def __init__(self, inspector):

425

self.inspector = inspector

426

self.running = False

427

428

async def start_monitoring(self, interval=30):

429

"""Monitor workers every `interval` seconds."""

430

self.running = True

431

432

while self.running:

433

try:

434

workers = await self.inspector.inspect()

435

await self.process_worker_updates(workers)

436

await asyncio.sleep(interval)

437

except Exception as e:

438

print(f"Monitoring error: {e}")

439

await asyncio.sleep(5)

440

441

async def process_worker_updates(self, workers):

442

"""Process worker status updates."""

443

for worker_name, worker_info in workers.items():

444

status = worker_info.get('status', 'unknown')

445

active = worker_info.get('active', 0)

446

load = worker_info.get('load', [0, 0, 0])

447

448

print(f"{worker_name}: {status}, {active} active, load: {load[0]:.2f}")

449

450

# Alert on high load

451

if load[0] > 5.0:

452

print(f"HIGH LOAD WARNING: {worker_name} load: {load[0]:.2f}")

453

454

def stop_monitoring(self):

455

"""Stop monitoring loop."""

456

self.running = False

457

458

# Usage

459

monitor = WorkerMonitor(inspector)

460

asyncio.create_task(monitor.start_monitoring(interval=10))

461

```

462

463

## Integration with Events

464

465

Worker information from the inspector is combined with real-time event data for comprehensive monitoring.

466

467

```python

468

from flower.events import Events

469

from flower.inspector import Inspector

470

471

# Combine inspector and events data

472

class WorkerManager:

473

def __init__(self, celery_app, io_loop):

474

self.inspector = Inspector(io_loop, celery_app, timeout=10.0)

475

self.events = Events(celery_app, io_loop)

476

477

async def get_complete_worker_info(self):

478

"""Get combined worker information from inspection and events."""

479

# Get inspection data

480

inspection_data = await self.inspector.inspect()

481

482

# Get event data

483

event_workers = self.events.workers

484

485

# Combine data

486

combined = {}

487

for worker_name in set(inspection_data.keys()) | set(event_workers.keys()):

488

combined[worker_name] = {

489

'inspection': inspection_data.get(worker_name, {}),

490

'events': event_workers.get(worker_name, {}),

491

'online': worker_name in inspection_data,

492

'last_heartbeat': event_workers.get(worker_name, {}).get('timestamp'),

493

}

494

495

return combined

496

```

497

498

## Error Handling

499

500

Worker management operations include comprehensive error handling for various failure scenarios:

501

502

```python

503

# Handle worker inspection errors

504

try:

505

result = await inspector.inspect()

506

except Exception as e:

507

if 'timeout' in str(e).lower():

508

print("Worker inspection timed out - workers may be overloaded")

509

elif 'connection' in str(e).lower():

510

print("Cannot connect to broker - check broker status")

511

else:

512

print(f"Inspection failed: {e}")

513

514

# Handle control command errors

515

try:

516

await worker_shutdown('celery@worker1')

517

except Exception as e:

518

if 'no such worker' in str(e).lower():

519

print("Worker not found or already offline")

520

else:

521

print(f"Control command failed: {e}")

522

```

523

524

## Performance Considerations

525

526

- Worker inspection can be expensive with many workers - use appropriate timeouts

527

- Cache inspection results when possible to reduce broker load

528

- Monitor inspection latency to detect broker or network issues

529

- Use specific worker names when possible to reduce inspection scope

530

- Consider inspection frequency based on cluster size and requirements