0
# Worker Management
1
2
Worker inspection, status monitoring, and remote control operations for managing Celery worker processes across the cluster.
3
4
## Capabilities
5
6
### Inspector Class
7
8
Core worker inspection and management functionality using Celery's remote control system.
9
10
```python { .api }
11
class Inspector:
12
"""
13
Celery cluster inspection and worker management.
14
15
Provides asynchronous worker inspection and remote control capabilities
16
using Celery's built-in management commands.
17
"""
18
19
def __init__(self, io_loop, capp, timeout):
20
"""
21
Initialize worker inspector.
22
23
Args:
24
io_loop: Tornado IOLoop for async operations
25
capp: Celery application instance
26
timeout (float): Inspection timeout in seconds
27
"""
28
29
def inspect(self, workername=None):
30
"""
31
Inspect workers asynchronously.
32
33
Args:
34
workername (str, optional): Specific worker name, or None for all workers
35
36
Returns:
37
list: List of futures for inspection results
38
39
Performs inspection of worker status, active tasks, registered tasks,
40
and other worker information using Celery's inspection system.
41
"""
42
43
@property
44
def workers(self):
45
"""
46
Get current worker information dictionary.
47
48
Returns:
49
dict: Worker information keyed by worker name, containing:
50
- status: Worker online/offline status
51
- active: Number of active tasks
52
- processed: Total processed tasks
53
- load: System load information
54
- heartbeats: Last heartbeat timestamp
55
- registered: List of registered task names
56
- stats: Worker statistics
57
- active_queues: Active queue information
58
"""
59
60
# Internal methods for inspection
61
def _inspect(self, method, workername):
62
"""
63
Internal method to perform actual worker inspection.
64
65
Args:
66
method (str): Inspection method to execute
67
workername (str): Target worker name or None for all workers
68
69
Executes the specified inspection method using Celery's control interface
70
and handles the response processing.
71
"""
72
73
def _on_update(self, workername, method, response):
74
"""
75
Handle inspection response and update worker information.
76
77
Args:
78
workername (str): Worker name that responded
79
method (str): Inspection method that was executed
80
response (dict): Response data from the worker
81
82
Updates the internal workers dictionary with the latest information.
83
"""
84
85
# Available inspection methods (from source code)
86
methods = ('stats', 'active_queues', 'registered', 'scheduled',
87
'active', 'reserved', 'revoked', 'conf')
88
89
@property
90
def workers(self):
91
"""
92
Worker information dictionary.
93
94
Returns:
95
collections.defaultdict: Worker data keyed by worker name
96
97
Structure:
98
{
99
'worker_name': {
100
'stats': dict, # Worker statistics
101
'active_queues': list, # Active queue names
102
'registered': list, # Registered task names
103
'scheduled': list, # Scheduled tasks
104
'active': list, # Currently executing tasks
105
'reserved': list, # Reserved tasks
106
'revoked': set, # Revoked task IDs
107
'conf': dict, # Worker configuration
108
'timestamp': float, # Last update timestamp
109
}
110
}
111
"""
112
```
113
114
### Worker Control Operations
115
116
Remote control operations for managing worker processes and their behavior.
117
118
```python { .api }
119
# Worker lifecycle control
120
def worker_shutdown(workername):
121
"""
122
Shutdown a specific worker.
123
124
Args:
125
workername (str): Name of worker to shutdown
126
127
Sends shutdown signal to the specified worker process.
128
"""
129
130
def worker_pool_restart(workername):
131
"""
132
Restart worker's process pool.
133
134
Args:
135
workername (str): Name of worker to restart pool
136
137
Restarts the worker's process pool without shutting down the worker.
138
"""
139
140
def worker_pool_grow(workername, n=1):
141
"""
142
Increase worker pool size.
143
144
Args:
145
workername (str): Name of worker to modify
146
n (int): Number of processes to add (default: 1)
147
148
Dynamically increases the worker's process pool size.
149
"""
150
151
def worker_pool_shrink(workername, n=1):
152
"""
153
Decrease worker pool size.
154
155
Args:
156
workername (str): Name of worker to modify
157
n (int): Number of processes to remove (default: 1)
158
159
Dynamically decreases the worker's process pool size.
160
"""
161
162
def worker_pool_autoscale(workername, min_workers, max_workers):
163
"""
164
Configure worker pool autoscaling.
165
166
Args:
167
workername (str): Name of worker to configure
168
min_workers (int): Minimum number of worker processes
169
max_workers (int): Maximum number of worker processes
170
171
Sets autoscaling parameters for the worker's process pool.
172
"""
173
```
174
175
### Queue Management
176
177
Operations for managing worker queue consumption and routing.
178
179
```python { .api }
180
def worker_queue_add_consumer(workername, queue_name):
181
"""
182
Add queue consumer to worker.
183
184
Args:
185
workername (str): Name of worker to modify
186
queue_name (str): Name of queue to start consuming
187
188
Instructs the worker to start consuming from the specified queue.
189
"""
190
191
def worker_queue_cancel_consumer(workername, queue_name):
192
"""
193
Remove queue consumer from worker.
194
195
Args:
196
workername (str): Name of worker to modify
197
queue_name (str): Name of queue to stop consuming
198
199
Instructs the worker to stop consuming from the specified queue.
200
"""
201
202
def get_active_queue_names():
203
"""
204
Get list of all active queue names across the cluster.
205
206
Returns:
207
list: Names of all queues being consumed by workers
208
"""
209
```
210
211
## Worker Information Structure
212
213
### Worker Status Data
214
215
Comprehensive worker information structure returned by inspection operations.
216
217
```python { .api }
218
WorkerInfo = {
219
# Basic identification
220
'hostname': str, # Worker hostname/identifier
221
'status': str, # 'online' or 'offline'
222
'timestamp': float, # Last update timestamp
223
224
# Task statistics
225
'active': int, # Number of currently active tasks
226
'processed': int, # Total number of processed tasks
227
'load': [float, float, float], # System load averages (1m, 5m, 15m)
228
229
# Process information
230
'pool': {
231
'max-concurrency': int, # Maximum concurrent tasks
232
'processes': [int], # Process IDs in pool
233
'max-tasks-per-child': int, # Max tasks per child process
234
'put-guarded-by-semaphore': bool,
235
'timeouts': [float, float], # Soft and hard timeouts
236
'writes': {
237
'total': int, # Total writes
238
'avg': float, # Average write time
239
'all': str, # Write time details
240
}
241
},
242
243
# System information
244
'rusage': {
245
'utime': float, # User CPU time
246
'stime': float, # System CPU time
247
'maxrss': int, # Maximum resident set size
248
'ixrss': int, # Integral shared memory size
249
'idrss': int, # Integral unshared data size
250
'isrss': int, # Integral unshared stack size
251
'minflt': int, # Page reclaims
252
'majflt': int, # Page faults
253
'nswap': int, # Swaps
254
'inblock': int, # Block input operations
255
'oublock': int, # Block output operations
256
'msgsnd': int, # Messages sent
257
'msgrcv': int, # Messages received
258
'nsignals': int, # Signals received
259
'nvcsw': int, # Voluntary context switches
260
'nivcsw': int, # Involuntary context switches
261
},
262
263
# Registered tasks
264
'registered': [str], # List of registered task names
265
266
# Active queues
267
'active_queues': [
268
{
269
'name': str, # Queue name
270
'exchange': {
271
'name': str, # Exchange name
272
'type': str, # Exchange type
273
'durable': bool, # Exchange durability
274
'auto_delete': bool, # Auto-delete setting
275
'arguments': dict, # Exchange arguments
276
},
277
'routing_key': str, # Routing key
278
'durable': bool, # Queue durability
279
'exclusive': bool, # Queue exclusivity
280
'auto_delete': bool, # Auto-delete setting
281
'no_ack': bool, # No acknowledgment setting
282
'alias': str, # Queue alias
283
'bindings': [dict], # Queue bindings
284
'no_declare': bool, # No declaration flag
285
'expires': int, # Queue expiration
286
'message_ttl': int, # Message TTL
287
'max_length': int, # Maximum queue length
288
'max_length_bytes': int, # Maximum queue size in bytes
289
'max_priority': int, # Maximum message priority
290
}
291
],
292
293
# Clock information
294
'clock': int, # Logical clock value
295
296
# Software information
297
'sw_ident': str, # Software identifier
298
'sw_ver': str, # Software version
299
'sw_sys': str, # System information
300
}
301
```
302
303
### Active Task Information
304
305
Structure for active task information returned by worker inspection.
306
307
```python { .api }
308
ActiveTask = {
309
'id': str, # Task UUID
310
'name': str, # Task name
311
'args': list, # Task arguments
312
'kwargs': dict, # Task keyword arguments
313
'type': str, # Task type
314
'hostname': str, # Worker hostname
315
'time_start': float, # Task start timestamp
316
'acknowledged': bool, # Task acknowledgment status
317
'delivery_info': {
318
'exchange': str, # Exchange name
319
'routing_key': str, # Routing key
320
'priority': int, # Message priority
321
'redelivered': bool, # Redelivery flag
322
},
323
'worker_pid': int, # Worker process ID
324
}
325
```
326
327
## Usage Examples
328
329
### Basic Worker Inspection
330
331
```python
332
from flower.inspector import Inspector
333
from tornado.ioloop import IOLoop
334
import celery
335
336
# Create inspector
337
celery_app = celery.Celery('myapp', broker='redis://localhost:6379')
338
io_loop = IOLoop.current()
339
inspector = Inspector(io_loop, celery_app, timeout=10.0)
340
341
# Inspect all workers
342
async def inspect_workers():
343
result = await inspector.inspect()
344
print(f"Found {len(result)} workers")
345
346
for worker_name, worker_info in result.items():
347
print(f"Worker: {worker_name}")
348
print(f" Status: {worker_info.get('status', 'unknown')}")
349
print(f" Active tasks: {worker_info.get('active', 0)}")
350
print(f" Processed: {worker_info.get('processed', 0)}")
351
352
# Run inspection
353
io_loop.run_sync(inspect_workers)
354
```
355
356
### Inspect Specific Worker
357
358
```python
359
# Inspect single worker
360
async def inspect_single_worker():
361
result = await inspector.inspect(workername='celery@worker1')
362
363
if result:
364
worker_info = result['celery@worker1']
365
print(f"Worker celery@worker1:")
366
print(f" Registered tasks: {worker_info.get('registered', [])}")
367
print(f" Active queues: {len(worker_info.get('active_queues', []))}")
368
print(f" Load: {worker_info.get('load', [])}")
369
370
io_loop.run_sync(inspect_single_worker)
371
```
372
373
### Worker Control Operations
374
375
```python
376
from flower.api.control import (
377
WorkerShutDown, WorkerPoolRestart, WorkerPoolGrow,
378
WorkerPoolShrink, WorkerPoolAutoscale,
379
WorkerQueueAddConsumer, WorkerQueueCancelConsumer
380
)
381
382
# Shutdown worker
383
async def shutdown_worker():
384
handler = WorkerShutDown()
385
await handler.post('celery@worker1')
386
387
# Restart worker pool
388
async def restart_pool():
389
handler = WorkerPoolRestart()
390
await handler.post('celery@worker1')
391
392
# Grow worker pool
393
async def grow_pool():
394
handler = WorkerPoolGrow()
395
await handler.post('celery@worker1', n=2) # Add 2 processes
396
397
# Configure autoscaling
398
async def configure_autoscale():
399
handler = WorkerPoolAutoscale()
400
await handler.post('celery@worker1', min=2, max=10)
401
```
402
403
### Queue Consumer Management
404
405
```python
406
# Add queue consumer
407
async def add_queue_consumer():
408
handler = WorkerQueueAddConsumer()
409
await handler.post('celery@worker1', queue='high_priority')
410
411
# Remove queue consumer
412
async def remove_queue_consumer():
413
handler = WorkerQueueCancelConsumer()
414
await handler.post('celery@worker1', queue='low_priority')
415
```
416
417
### Real-time Worker Monitoring
418
419
```python
420
import asyncio
421
from flower.inspector import Inspector
422
423
class WorkerMonitor:
424
def __init__(self, inspector):
425
self.inspector = inspector
426
self.running = False
427
428
async def start_monitoring(self, interval=30):
429
"""Monitor workers every `interval` seconds."""
430
self.running = True
431
432
while self.running:
433
try:
434
workers = await self.inspector.inspect()
435
await self.process_worker_updates(workers)
436
await asyncio.sleep(interval)
437
except Exception as e:
438
print(f"Monitoring error: {e}")
439
await asyncio.sleep(5)
440
441
async def process_worker_updates(self, workers):
442
"""Process worker status updates."""
443
for worker_name, worker_info in workers.items():
444
status = worker_info.get('status', 'unknown')
445
active = worker_info.get('active', 0)
446
load = worker_info.get('load', [0, 0, 0])
447
448
print(f"{worker_name}: {status}, {active} active, load: {load[0]:.2f}")
449
450
# Alert on high load
451
if load[0] > 5.0:
452
print(f"HIGH LOAD WARNING: {worker_name} load: {load[0]:.2f}")
453
454
def stop_monitoring(self):
455
"""Stop monitoring loop."""
456
self.running = False
457
458
# Usage
459
monitor = WorkerMonitor(inspector)
460
asyncio.create_task(monitor.start_monitoring(interval=10))
461
```
462
463
## Integration with Events
464
465
Worker information from the inspector is combined with real-time event data for comprehensive monitoring.
466
467
```python
468
from flower.events import Events
469
from flower.inspector import Inspector
470
471
# Combine inspector and events data
472
class WorkerManager:
473
def __init__(self, celery_app, io_loop):
474
self.inspector = Inspector(io_loop, celery_app, timeout=10.0)
475
self.events = Events(celery_app, io_loop)
476
477
async def get_complete_worker_info(self):
478
"""Get combined worker information from inspection and events."""
479
# Get inspection data
480
inspection_data = await self.inspector.inspect()
481
482
# Get event data
483
event_workers = self.events.workers
484
485
# Combine data
486
combined = {}
487
for worker_name in set(inspection_data.keys()) | set(event_workers.keys()):
488
combined[worker_name] = {
489
'inspection': inspection_data.get(worker_name, {}),
490
'events': event_workers.get(worker_name, {}),
491
'online': worker_name in inspection_data,
492
'last_heartbeat': event_workers.get(worker_name, {}).get('timestamp'),
493
}
494
495
return combined
496
```
497
498
## Error Handling
499
500
Worker management operations include comprehensive error handling for various failure scenarios:
501
502
```python
503
# Handle worker inspection errors
504
try:
505
result = await inspector.inspect()
506
except Exception as e:
507
if 'timeout' in str(e).lower():
508
print("Worker inspection timed out - workers may be overloaded")
509
elif 'connection' in str(e).lower():
510
print("Cannot connect to broker - check broker status")
511
else:
512
print(f"Inspection failed: {e}")
513
514
# Handle control command errors
515
try:
516
await worker_shutdown('celery@worker1')
517
except Exception as e:
518
if 'no such worker' in str(e).lower():
519
print("Worker not found or already offline")
520
else:
521
print(f"Control command failed: {e}")
522
```
523
524
## Performance Considerations
525
526
- Worker inspection can be expensive with many workers - use appropriate timeouts
527
- Cache inspection results when possible to reduce broker load
528
- Monitor inspection latency to detect broker or network issues
529
- Use specific worker names when possible to reduce inspection scope
530
- Consider inspection frequency based on cluster size and requirements