0
# Event Monitoring
1
2
Real-time event processing system with persistent storage, state management, and Prometheus metrics collection for comprehensive Celery cluster monitoring.
3
4
## Capabilities
5
6
### Events Class
7
8
Main event monitoring class that captures and processes Celery events in real-time.
9
10
```python { .api }
11
class Events(threading.Thread):
12
"""
13
Real-time Celery event monitoring with persistent storage and metrics.
14
15
Runs in a separate thread to capture events from Celery's event system,
16
process them for state management, and optionally persist to disk.
17
"""
18
19
def __init__(self, capp, io_loop, db=None, persistent=False,
20
enable_events=True, state_save_interval=0, **kwargs):
21
"""
22
Initialize event monitoring system.
23
24
Args:
25
capp: Celery application instance
26
io_loop: Tornado IOLoop for async operations
27
db (str, optional): Database file path for persistence
28
persistent (bool): Enable persistent storage (default: False)
29
enable_events (bool): Auto-enable events on workers (default: True)
30
state_save_interval (int): Save interval in seconds (default: 0)
31
**kwargs: Additional configuration options
32
"""
33
34
def start(self):
35
"""
36
Start event monitoring thread.
37
38
Begins the event capture loop and state management.
39
"""
40
41
def stop(self):
42
"""
43
Stop event monitoring thread.
44
45
Gracefully shuts down event capture and saves final state.
46
"""
47
48
def run(self):
49
"""
50
Main event capture loop.
51
52
Continuously captures events from Celery broker and processes them.
53
This method runs in the event monitoring thread.
54
"""
55
56
def save_state(self):
57
"""
58
Save current state to persistent storage.
59
60
Serializes worker and task state to the configured database file.
61
"""
62
63
def on_enable_events(self):
64
"""
65
Enable event monitoring on all workers.
66
67
Sends enable_events control command to all active workers.
68
"""
69
70
def on_event(self, event):
71
"""
72
Process incoming Celery event.
73
74
Args:
75
event (dict): Celery event data
76
77
Updates internal state and triggers any necessary notifications.
78
"""
79
80
# State access properties
81
@property
82
def state(self):
83
"""EventsState instance containing all worker and task data."""
84
85
@property
86
def workers(self):
87
"""Dict of worker information keyed by worker name."""
88
89
@property
90
def tasks(self):
91
"""Dict of task information keyed by task UUID."""
92
93
events_enable_interval = 5000 # Interval for enabling events on workers (ms)
94
```
95
96
### Enhanced Events State
97
98
Extended state management class with metrics and improved event processing.
99
100
```python { .api }
101
class EventsState(celery.events.state.State):
102
"""
103
Enhanced Celery state management with metrics collection.
104
105
Extends Celery's built-in State class to add Prometheus metrics
106
and event counting capabilities.
107
"""
108
109
def __init__(self, *args, **kwargs):
110
"""
111
Initialize enhanced state management.
112
113
Args:
114
*args: Arguments passed to parent State class
115
**kwargs: Keyword arguments passed to parent State class
116
117
Creates internal event counters and metrics instance.
118
"""
119
120
def event(self, event):
121
"""
122
Process and store event with metrics collection.
123
124
Args:
125
event (dict): Celery event data
126
127
Processes the event through the parent class, updates counters,
128
and collects Prometheus metrics for various event types.
129
130
Handles:
131
- Task events: received, started, succeeded, failed, etc.
132
- Worker events: online, offline, heartbeat
133
- Metrics: runtime, prefetch time, worker status
134
"""
135
136
# Enhanced attributes
137
counter: collections.defaultdict # Event counters per worker
138
metrics: PrometheusMetrics # Prometheus metrics instance
139
```
140
141
### Prometheus Metrics
142
143
Comprehensive metrics collection for monitoring cluster performance and health.
144
145
```python { .api }
146
class PrometheusMetrics:
147
"""
148
Prometheus metrics collection for Celery cluster monitoring.
149
150
Provides various metrics for tracking task execution, worker status,
151
and system performance.
152
"""
153
154
def __init__(self):
155
"""Initialize Prometheus metrics with proper labels and buckets."""
156
157
# Core metrics (actual implementation from source code)
158
events: PrometheusCounter = PrometheusCounter(
159
'flower_events_total',
160
"Number of events",
161
['worker', 'type', 'task']
162
)
163
164
runtime: Histogram = Histogram(
165
'flower_task_runtime_seconds',
166
"Task runtime",
167
['worker', 'task'],
168
buckets=options.task_runtime_metric_buckets
169
)
170
171
prefetch_time: Gauge = Gauge(
172
'flower_task_prefetch_time_seconds',
173
"The time the task spent waiting at the celery worker to be executed.",
174
['worker', 'task']
175
)
176
177
number_of_prefetched_tasks: Gauge = Gauge(
178
'flower_worker_prefetched_tasks',
179
'Number of tasks of given type prefetched at a worker',
180
['worker', 'task']
181
)
182
183
worker_online: Gauge = Gauge(
184
'flower_worker_online',
185
"Worker online status",
186
['worker']
187
)
188
189
worker_number_of_currently_executing_tasks: Gauge = Gauge(
190
'flower_worker_number_of_currently_executing_tasks',
191
"Number of tasks currently executing at a worker",
192
['worker']
193
)
194
195
def get_prometheus_metrics():
196
"""
197
Get singleton PrometheusMetrics instance.
198
199
Returns:
200
PrometheusMetrics: Global metrics instance
201
202
Creates the metrics instance on first call and returns the same
203
instance on subsequent calls.
204
"""
205
```
206
207
## Event Types
208
209
### Task Events
210
211
Events related to task lifecycle and execution.
212
213
```python { .api }
214
# Task event types
215
TASK_EVENTS = [
216
'task-sent', # Task was sent to broker
217
'task-received', # Worker received task
218
'task-started', # Worker started executing task
219
'task-succeeded', # Task completed successfully
220
'task-failed', # Task execution failed
221
'task-retried', # Task was retried
222
'task-revoked', # Task was revoked/cancelled
223
]
224
225
# Task event data structure
226
TaskEvent = {
227
'type': str, # Event type
228
'uuid': str, # Task UUID
229
'name': str, # Task name
230
'hostname': str, # Worker hostname
231
'timestamp': float, # Event timestamp
232
'args': list, # Task arguments (if available)
233
'kwargs': dict, # Task keyword arguments (if available)
234
'retries': int, # Number of retries
235
'eta': str, # Estimated time of arrival
236
'expires': str, # Expiration time
237
'result': Any, # Task result (for success events)
238
'traceback': str, # Error traceback (for failure events)
239
'runtime': float, # Execution time (for completion events)
240
}
241
```
242
243
### Worker Events
244
245
Events related to worker status and lifecycle.
246
247
```python { .api }
248
# Worker event types
249
WORKER_EVENTS = [
250
'worker-online', # Worker came online
251
'worker-offline', # Worker went offline
252
'worker-heartbeat', # Worker heartbeat
253
]
254
255
# Worker event data structure
256
WorkerEvent = {
257
'type': str, # Event type
258
'hostname': str, # Worker hostname
259
'timestamp': float, # Event timestamp
260
'active': int, # Number of active tasks
261
'processed': int, # Total processed tasks
262
'load': list, # System load averages
263
'freq': float, # CPU frequency
264
'sw_ident': str, # Software identifier
265
'sw_ver': str, # Software version
266
'sw_sys': str, # System information
267
}
268
```
269
270
## Usage Examples
271
272
### Basic Event Monitoring
273
274
```python
275
from flower.events import Events
276
from tornado.ioloop import IOLoop
277
import celery
278
279
# Create Celery app
280
celery_app = celery.Celery('myapp', broker='redis://localhost:6379')
281
282
# Create event monitor
283
io_loop = IOLoop.current()
284
events = Events(
285
capp=celery_app,
286
io_loop=io_loop,
287
enable_events=True # Auto-enable events on workers
288
)
289
290
# Start monitoring
291
events.start()
292
293
# Access state
294
print(f"Active workers: {len(events.workers)}")
295
print(f"Total tasks: {len(events.tasks)}")
296
297
# Stop monitoring
298
events.stop()
299
```
300
301
### Persistent Event Storage
302
303
```python
304
from flower.events import Events
305
306
# Enable persistence
307
events = Events(
308
capp=celery_app,
309
io_loop=io_loop,
310
persistent=True,
311
db='/var/lib/flower/events.db',
312
state_save_interval=30 # Save every 30 seconds
313
)
314
315
events.start()
316
317
# State is automatically saved and restored
318
```
319
320
### Memory Management
321
322
```python
323
# Configure memory limits
324
events = Events(
325
capp=celery_app,
326
io_loop=io_loop,
327
max_workers_in_memory=1000, # Keep max 1000 workers
328
max_tasks_in_memory=50000 # Keep max 50000 tasks
329
)
330
331
events.start()
332
```
333
334
### Custom Event Processing
335
336
```python
337
from flower.events import Events
338
339
class CustomEvents(Events):
340
def on_event(self, event):
341
# Custom event processing
342
if event['type'] == 'task-failed':
343
print(f"Task {event['uuid']} failed: {event.get('traceback')}")
344
345
# Call parent processing
346
super().on_event(event)
347
348
# Use custom event processor
349
events = CustomEvents(capp=celery_app, io_loop=io_loop)
350
events.start()
351
```
352
353
### Metrics Integration
354
355
```python
356
from flower.events import Events, PrometheusMetrics
357
358
# Events automatically collect Prometheus metrics
359
events = Events(capp=celery_app, io_loop=io_loop)
360
events.start()
361
362
# Access metrics through the events.state.metrics
363
metrics = events.state.metrics
364
365
# Metrics are available at /metrics endpoint when using Flower web interface
366
```
367
368
## State Persistence
369
370
### Database Format
371
372
Flower uses pickle serialization for state persistence:
373
374
```python
375
# State file structure
376
{
377
'workers': {
378
'worker_name': {
379
'hostname': str,
380
'active': int,
381
'processed': int,
382
'load': list,
383
'timestamp': float,
384
# ... additional worker data
385
}
386
},
387
'tasks': {
388
'task_uuid': {
389
'name': str,
390
'state': str,
391
'hostname': str,
392
'timestamp': float,
393
'args': list,
394
'kwargs': dict,
395
'result': Any,
396
'runtime': float,
397
# ... additional task data
398
}
399
}
400
}
401
```
402
403
### Manual State Management
404
405
```python
406
# Manually save state
407
events.save_state()
408
409
# Check if persistence is enabled
410
if events.persistent:
411
print(f"State saved to: {events.db}")
412
413
# Load state on startup (automatic)
414
events = Events(capp=celery_app, io_loop=io_loop, persistent=True, db='state.db')
415
# Previous state is automatically loaded
416
```
417
418
## Performance Considerations
419
420
### Memory Usage
421
422
- Configure appropriate limits for `max_workers_in_memory` and `max_tasks_in_memory`
423
- Monitor memory usage through Prometheus metrics
424
- Use persistent storage to avoid data loss on restart
425
- Regularly clean up old task data
426
427
### Event Processing
428
429
- Event processing is asynchronous to avoid blocking
430
- Large event volumes may require tuning of processing parameters
431
- Consider using Redis broker for better event performance
432
- Monitor event processing lag through metrics
433
434
### Persistence Performance
435
436
- State saving is performed in background to avoid blocking
437
- Adjust `state_save_interval` based on data volume and requirements
438
- Use SSDs for better I/O performance with persistence
439
- Consider database storage for very large deployments