or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdauthentication.mdbroker.mdcommand-line.mdevents.mdindex.mdrest-api.mdtasks.mdutilities.mdweb-interface.mdworkers.md

events.mddocs/

0

# Event Monitoring

1

2

Real-time event processing system with persistent storage, state management, and Prometheus metrics collection for comprehensive Celery cluster monitoring.

3

4

## Capabilities

5

6

### Events Class

7

8

Main event monitoring class that captures and processes Celery events in real-time.

9

10

```python { .api }

11

class Events(threading.Thread):

12

"""

13

Real-time Celery event monitoring with persistent storage and metrics.

14

15

Runs in a separate thread to capture events from Celery's event system,

16

process them for state management, and optionally persist to disk.

17

"""

18

19

def __init__(self, capp, io_loop, db=None, persistent=False,

20

enable_events=True, state_save_interval=0, **kwargs):

21

"""

22

Initialize event monitoring system.

23

24

Args:

25

capp: Celery application instance

26

io_loop: Tornado IOLoop for async operations

27

db (str, optional): Database file path for persistence

28

persistent (bool): Enable persistent storage (default: False)

29

enable_events (bool): Auto-enable events on workers (default: True)

30

state_save_interval (int): Save interval in seconds (default: 0)

31

**kwargs: Additional configuration options

32

"""

33

34

def start(self):

35

"""

36

Start event monitoring thread.

37

38

Begins the event capture loop and state management.

39

"""

40

41

def stop(self):

42

"""

43

Stop event monitoring thread.

44

45

Gracefully shuts down event capture and saves final state.

46

"""

47

48

def run(self):

49

"""

50

Main event capture loop.

51

52

Continuously captures events from Celery broker and processes them.

53

This method runs in the event monitoring thread.

54

"""

55

56

def save_state(self):

57

"""

58

Save current state to persistent storage.

59

60

Serializes worker and task state to the configured database file.

61

"""

62

63

def on_enable_events(self):

64

"""

65

Enable event monitoring on all workers.

66

67

Sends enable_events control command to all active workers.

68

"""

69

70

def on_event(self, event):

71

"""

72

Process incoming Celery event.

73

74

Args:

75

event (dict): Celery event data

76

77

Updates internal state and triggers any necessary notifications.

78

"""

79

80

# State access properties

81

@property

82

def state(self):

83

"""EventsState instance containing all worker and task data."""

84

85

@property

86

def workers(self):

87

"""Dict of worker information keyed by worker name."""

88

89

@property

90

def tasks(self):

91

"""Dict of task information keyed by task UUID."""

92

93

events_enable_interval = 5000 # Interval for enabling events on workers (ms)

94

```

95

96

### Enhanced Events State

97

98

Extended state management class with metrics and improved event processing.

99

100

```python { .api }

101

class EventsState(celery.events.state.State):

102

"""

103

Enhanced Celery state management with metrics collection.

104

105

Extends Celery's built-in State class to add Prometheus metrics

106

and event counting capabilities.

107

"""

108

109

def __init__(self, *args, **kwargs):

110

"""

111

Initialize enhanced state management.

112

113

Args:

114

*args: Arguments passed to parent State class

115

**kwargs: Keyword arguments passed to parent State class

116

117

Creates internal event counters and metrics instance.

118

"""

119

120

def event(self, event):

121

"""

122

Process and store event with metrics collection.

123

124

Args:

125

event (dict): Celery event data

126

127

Processes the event through the parent class, updates counters,

128

and collects Prometheus metrics for various event types.

129

130

Handles:

131

- Task events: received, started, succeeded, failed, etc.

132

- Worker events: online, offline, heartbeat

133

- Metrics: runtime, prefetch time, worker status

134

"""

135

136

# Enhanced attributes

137

counter: collections.defaultdict # Event counters per worker

138

metrics: PrometheusMetrics # Prometheus metrics instance

139

```

140

141

### Prometheus Metrics

142

143

Comprehensive metrics collection for monitoring cluster performance and health.

144

145

```python { .api }

146

class PrometheusMetrics:

147

"""

148

Prometheus metrics collection for Celery cluster monitoring.

149

150

Provides various metrics for tracking task execution, worker status,

151

and system performance.

152

"""

153

154

def __init__(self):

155

"""Initialize Prometheus metrics with proper labels and buckets."""

156

157

# Core metrics (actual implementation from source code)

158

events: PrometheusCounter = PrometheusCounter(

159

'flower_events_total',

160

"Number of events",

161

['worker', 'type', 'task']

162

)

163

164

runtime: Histogram = Histogram(

165

'flower_task_runtime_seconds',

166

"Task runtime",

167

['worker', 'task'],

168

buckets=options.task_runtime_metric_buckets

169

)

170

171

prefetch_time: Gauge = Gauge(

172

'flower_task_prefetch_time_seconds',

173

"The time the task spent waiting at the celery worker to be executed.",

174

['worker', 'task']

175

)

176

177

number_of_prefetched_tasks: Gauge = Gauge(

178

'flower_worker_prefetched_tasks',

179

'Number of tasks of given type prefetched at a worker',

180

['worker', 'task']

181

)

182

183

worker_online: Gauge = Gauge(

184

'flower_worker_online',

185

"Worker online status",

186

['worker']

187

)

188

189

worker_number_of_currently_executing_tasks: Gauge = Gauge(

190

'flower_worker_number_of_currently_executing_tasks',

191

"Number of tasks currently executing at a worker",

192

['worker']

193

)

194

195

def get_prometheus_metrics():

196

"""

197

Get singleton PrometheusMetrics instance.

198

199

Returns:

200

PrometheusMetrics: Global metrics instance

201

202

Creates the metrics instance on first call and returns the same

203

instance on subsequent calls.

204

"""

205

```

206

207

## Event Types

208

209

### Task Events

210

211

Events related to task lifecycle and execution.

212

213

```python { .api }

214

# Task event types

215

TASK_EVENTS = [

216

'task-sent', # Task was sent to broker

217

'task-received', # Worker received task

218

'task-started', # Worker started executing task

219

'task-succeeded', # Task completed successfully

220

'task-failed', # Task execution failed

221

'task-retried', # Task was retried

222

'task-revoked', # Task was revoked/cancelled

223

]

224

225

# Task event data structure

226

TaskEvent = {

227

'type': str, # Event type

228

'uuid': str, # Task UUID

229

'name': str, # Task name

230

'hostname': str, # Worker hostname

231

'timestamp': float, # Event timestamp

232

'args': list, # Task arguments (if available)

233

'kwargs': dict, # Task keyword arguments (if available)

234

'retries': int, # Number of retries

235

'eta': str, # Estimated time of arrival

236

'expires': str, # Expiration time

237

'result': Any, # Task result (for success events)

238

'traceback': str, # Error traceback (for failure events)

239

'runtime': float, # Execution time (for completion events)

240

}

241

```

242

243

### Worker Events

244

245

Events related to worker status and lifecycle.

246

247

```python { .api }

248

# Worker event types

249

WORKER_EVENTS = [

250

'worker-online', # Worker came online

251

'worker-offline', # Worker went offline

252

'worker-heartbeat', # Worker heartbeat

253

]

254

255

# Worker event data structure

256

WorkerEvent = {

257

'type': str, # Event type

258

'hostname': str, # Worker hostname

259

'timestamp': float, # Event timestamp

260

'active': int, # Number of active tasks

261

'processed': int, # Total processed tasks

262

'load': list, # System load averages

263

'freq': float, # CPU frequency

264

'sw_ident': str, # Software identifier

265

'sw_ver': str, # Software version

266

'sw_sys': str, # System information

267

}

268

```

269

270

## Usage Examples

271

272

### Basic Event Monitoring

273

274

```python

275

from flower.events import Events

276

from tornado.ioloop import IOLoop

277

import celery

278

279

# Create Celery app

280

celery_app = celery.Celery('myapp', broker='redis://localhost:6379')

281

282

# Create event monitor

283

io_loop = IOLoop.current()

284

events = Events(

285

capp=celery_app,

286

io_loop=io_loop,

287

enable_events=True # Auto-enable events on workers

288

)

289

290

# Start monitoring

291

events.start()

292

293

# Access state

294

print(f"Active workers: {len(events.workers)}")

295

print(f"Total tasks: {len(events.tasks)}")

296

297

# Stop monitoring

298

events.stop()

299

```

300

301

### Persistent Event Storage

302

303

```python

304

from flower.events import Events

305

306

# Enable persistence

307

events = Events(

308

capp=celery_app,

309

io_loop=io_loop,

310

persistent=True,

311

db='/var/lib/flower/events.db',

312

state_save_interval=30 # Save every 30 seconds

313

)

314

315

events.start()

316

317

# State is automatically saved and restored

318

```

319

320

### Memory Management

321

322

```python

323

# Configure memory limits

324

events = Events(

325

capp=celery_app,

326

io_loop=io_loop,

327

max_workers_in_memory=1000, # Keep max 1000 workers

328

max_tasks_in_memory=50000 # Keep max 50000 tasks

329

)

330

331

events.start()

332

```

333

334

### Custom Event Processing

335

336

```python

337

from flower.events import Events

338

339

class CustomEvents(Events):

340

def on_event(self, event):

341

# Custom event processing

342

if event['type'] == 'task-failed':

343

print(f"Task {event['uuid']} failed: {event.get('traceback')}")

344

345

# Call parent processing

346

super().on_event(event)

347

348

# Use custom event processor

349

events = CustomEvents(capp=celery_app, io_loop=io_loop)

350

events.start()

351

```

352

353

### Metrics Integration

354

355

```python

356

from flower.events import Events, PrometheusMetrics

357

358

# Events automatically collect Prometheus metrics

359

events = Events(capp=celery_app, io_loop=io_loop)

360

events.start()

361

362

# Access metrics through the events.state.metrics

363

metrics = events.state.metrics

364

365

# Metrics are available at /metrics endpoint when using Flower web interface

366

```

367

368

## State Persistence

369

370

### Database Format

371

372

Flower uses pickle serialization for state persistence:

373

374

```python

375

# State file structure

376

{

377

'workers': {

378

'worker_name': {

379

'hostname': str,

380

'active': int,

381

'processed': int,

382

'load': list,

383

'timestamp': float,

384

# ... additional worker data

385

}

386

},

387

'tasks': {

388

'task_uuid': {

389

'name': str,

390

'state': str,

391

'hostname': str,

392

'timestamp': float,

393

'args': list,

394

'kwargs': dict,

395

'result': Any,

396

'runtime': float,

397

# ... additional task data

398

}

399

}

400

}

401

```

402

403

### Manual State Management

404

405

```python

406

# Manually save state

407

events.save_state()

408

409

# Check if persistence is enabled

410

if events.persistent:

411

print(f"State saved to: {events.db}")

412

413

# Load state on startup (automatic)

414

events = Events(capp=celery_app, io_loop=io_loop, persistent=True, db='state.db')

415

# Previous state is automatically loaded

416

```

417

418

## Performance Considerations

419

420

### Memory Usage

421

422

- Configure appropriate limits for `max_workers_in_memory` and `max_tasks_in_memory`

423

- Monitor memory usage through Prometheus metrics

424

- Use persistent storage to avoid data loss on restart

425

- Regularly clean up old task data

426

427

### Event Processing

428

429

- Event processing is asynchronous to avoid blocking

430

- Large event volumes may require tuning of processing parameters

431

- Consider using Redis broker for better event performance

432

- Monitor event processing lag through metrics

433

434

### Persistence Performance

435

436

- State saving is performed in background to avoid blocking

437

- Adjust `state_save_interval` based on data volume and requirements

438

- Use SSDs for better I/O performance with persistence

439

- Consider database storage for very large deployments