or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md

registries-monitoring.mddocs/

0

# Registries and Monitoring

1

2

Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, system health monitoring, and operational insights for distributed job processing systems.

3

4

## Capabilities

5

6

### Base Registry Operations

7

8

Common functionality shared across all job registries.

9

10

```python { .api }

11

class BaseRegistry:

12

def __init__(

13

self,

14

name: str = 'default',

15

connection=None,

16

job_class=None,

17

queue: 'Queue' = None,

18

serializer=None,

19

death_penalty_class=None

20

):

21

"""

22

Initialize a job registry.

23

24

Args:

25

name (str): Registry name, typically matches queue name.

26

connection: Redis connection instance.

27

job_class: Job class for deserialization.

28

queue (Queue): Associated queue instance.

29

serializer: Custom serializer for job data.

30

death_penalty_class: Death penalty class for timeouts.

31

"""

32

33

@property

34

def name(self) -> str:

35

"""Registry name."""

36

37

@property

38

def key(self) -> str:

39

"""Redis key for this registry."""

40

41

@property

42

def connection(self):

43

"""Redis connection instance."""

44

45

@property

46

def count(self) -> int:

47

"""Number of jobs in the registry."""

48

49

def get_job_count(self, cleanup: bool = True) -> int:

50

"""

51

Get count of jobs in registry.

52

53

Args:

54

cleanup (bool): Remove expired jobs before counting.

55

56

Returns:

57

int: Number of jobs in registry.

58

"""

59

60

def get_job_ids(self, start: int = 0, end: int = -1) -> list[str]:

61

"""

62

Get job IDs from registry.

63

64

Args:

65

start (int): Start index.

66

end (int): End index (-1 for all).

67

68

Returns:

69

list[str]: Job IDs in registry.

70

"""

71

72

def get_jobs(self, start: int = 0, end: int = -1) -> list['Job']:

73

"""

74

Get jobs from registry.

75

76

Args:

77

start (int): Start index.

78

end (int): End index (-1 for all).

79

80

Returns:

81

list[Job]: Jobs in registry.

82

"""

83

84

def add(self, job: 'Job', ttl: int = None, pipeline=None):

85

"""

86

Add job to registry.

87

88

Args:

89

job (Job): Job to add.

90

ttl (int): Time-to-live in seconds.

91

pipeline: Redis pipeline for batched operations.

92

"""

93

94

def remove(self, job: 'Job', pipeline=None):

95

"""

96

Remove job from registry.

97

98

Args:

99

job (Job): Job to remove.

100

pipeline: Redis pipeline for batched operations.

101

"""

102

103

def cleanup(self, timestamp: datetime = None) -> int:

104

"""

105

Remove expired jobs from registry.

106

107

Args:

108

timestamp (datetime): Reference timestamp for expiration.

109

110

Returns:

111

int: Number of jobs removed.

112

"""

113

```

114

115

### Started Job Registry

116

117

Track jobs currently being executed by workers.

118

119

```python { .api }

120

class StartedJobRegistry(BaseRegistry):

121

"""Registry for jobs currently being processed by workers."""

122

123

def cleanup(self, timestamp: datetime = None) -> int:

124

"""

125

Clean up stale started jobs (jobs whose workers have died).

126

127

Args:

128

timestamp (datetime): Reference timestamp.

129

130

Returns:

131

int: Number of stale jobs cleaned up.

132

"""

133

134

def get_expiration_time(self, job: 'Job') -> datetime:

135

"""

136

Get when a started job should be considered stale.

137

138

Args:

139

job (Job): Job to check.

140

141

Returns:

142

datetime: Expiration timestamp.

143

"""

144

145

# Usage

146

started_registry = queue.started_job_registry

147

print(f"Jobs currently running: {started_registry.count}")

148

149

# Get detailed information about running jobs

150

running_jobs = started_registry.get_jobs()

151

for job in running_jobs:

152

print(f"Job {job.id}: {job.description} (started: {job.started_at})")

153

```

154

155

### Finished Job Registry

156

157

Track successfully completed jobs with results.

158

159

```python { .api }

160

class FinishedJobRegistry(BaseRegistry):

161

"""Registry for successfully completed jobs."""

162

163

def add(self, job: 'Job', ttl: int = None, pipeline=None):

164

"""

165

Add completed job to finished registry.

166

167

Args:

168

job (Job): Completed job.

169

ttl (int): How long to keep the job record.

170

pipeline: Redis pipeline.

171

"""

172

173

def get_job_results(self, start: int = 0, end: int = -1) -> list[tuple['Job', Any]]:

174

"""

175

Get jobs with their results.

176

177

Args:

178

start (int): Start index.

179

end (int): End index.

180

181

Returns:

182

list[tuple[Job, Any]]: List of (job, result) tuples.

183

"""

184

185

# Usage

186

finished_registry = queue.finished_job_registry

187

print(f"Completed jobs: {finished_registry.count}")

188

189

# Get recent successful jobs

190

recent_jobs = finished_registry.get_jobs(start=0, end=10)

191

for job in recent_jobs:

192

print(f"Job {job.id}: {job.description} -> {job.result}")

193

```

194

195

### Failed Job Registry

196

197

Track failed jobs with error information and retry capabilities.

198

199

```python { .api }

200

class FailedJobRegistry(BaseRegistry):

201

"""Registry for failed jobs with error information."""

202

203

def add(self, job: 'Job', ttl: int = None, pipeline=None, exc_string: str = ''):

204

"""

205

Add failed job to registry.

206

207

Args:

208

job (Job): Failed job.

209

ttl (int): How long to keep failure information.

210

pipeline: Redis pipeline.

211

exc_string (str): Exception information string.

212

"""

213

214

def requeue(self, job_or_id) -> 'Job':

215

"""

216

Requeue a failed job for retry.

217

218

Args:

219

job_or_id: Job instance or job ID.

220

221

Returns:

222

Job: The requeued job.

223

"""

224

225

def remove(self, job: 'Job', delete_job: bool = False):

226

"""

227

Remove job from failed registry.

228

229

Args:

230

job (Job): Job to remove.

231

delete_job (bool): Also delete the job data.

232

"""

233

234

def get_job_failures(self) -> list[dict]:

235

"""

236

Get failure information for all failed jobs.

237

238

Returns:

239

list[dict]: Failure details including exceptions.

240

"""

241

242

# Usage

243

failed_registry = queue.failed_job_registry

244

print(f"Failed jobs: {failed_registry.count}")

245

246

# Analyze failures

247

failed_jobs = failed_registry.get_jobs()

248

for job in failed_jobs:

249

print(f"Failed job {job.id}: {job.exc_info}")

250

251

# Optionally requeue for retry

252

if should_retry(job): # Your logic here

253

failed_registry.requeue(job)

254

print(f"Requeued job {job.id}")

255

```

256

257

### Deferred Job Registry

258

259

Track jobs waiting for dependencies to complete.

260

261

```python { .api }

262

class DeferredJobRegistry(BaseRegistry):

263

"""Registry for jobs waiting for dependencies."""

264

265

def add(self, job: 'Job', ttl: int = None, pipeline=None):

266

"""

267

Add job waiting for dependencies.

268

269

Args:

270

job (Job): Job to defer.

271

ttl (int): How long to keep deferred.

272

pipeline: Redis pipeline.

273

"""

274

275

def requeue_dependents(self, dependency_job: 'Job') -> list['Job']:

276

"""

277

Requeue jobs that were waiting for a dependency.

278

279

Args:

280

dependency_job (Job): Dependency that was completed.

281

282

Returns:

283

list[Job]: Jobs that were requeued.

284

"""

285

286

# Usage

287

deferred_registry = queue.deferred_job_registry

288

print(f"Jobs waiting for dependencies: {deferred_registry.count}")

289

290

# Check which jobs are waiting

291

waiting_jobs = deferred_registry.get_jobs()

292

for job in waiting_jobs:

293

deps = job.fetch_dependencies()

294

print(f"Job {job.id} waiting for {len(deps)} dependencies")

295

```

296

297

### Scheduled Job Registry

298

299

Track jobs scheduled for future execution.

300

301

```python { .api }

302

class ScheduledJobRegistry(BaseRegistry):

303

"""Registry for jobs scheduled for future execution."""

304

305

def add(self, job: 'Job', ttl: int = None, pipeline=None):

306

"""

307

Add scheduled job to registry.

308

309

Args:

310

job (Job): Job to schedule.

311

ttl (int): Time-to-live.

312

pipeline: Redis pipeline.

313

"""

314

315

def get_scheduled_time(self, job: 'Job') -> datetime:

316

"""

317

Get scheduled execution time for a job.

318

319

Args:

320

job (Job): Scheduled job.

321

322

Returns:

323

datetime: When job is scheduled to run.

324

"""

325

326

def get_jobs_to_schedule(self, timestamp: datetime = None) -> list['Job']:

327

"""

328

Get jobs that should be moved to queue for execution.

329

330

Args:

331

timestamp (datetime): Current time reference.

332

333

Returns:

334

list[Job]: Jobs ready for execution.

335

"""

336

337

def schedule_job(self, job: 'Job', scheduled_time: datetime, pipeline=None):

338

"""

339

Schedule a job for future execution.

340

341

Args:

342

job (Job): Job to schedule.

343

scheduled_time (datetime): When to execute.

344

pipeline: Redis pipeline.

345

"""

346

347

# Usage

348

scheduled_registry = queue.scheduled_job_registry

349

print(f"Scheduled jobs: {scheduled_registry.count}")

350

351

# Check upcoming jobs

352

upcoming_jobs = scheduled_registry.get_jobs()

353

for job in upcoming_jobs:

354

scheduled_time = scheduled_registry.get_scheduled_time(job)

355

print(f"Job {job.id} scheduled for {scheduled_time}")

356

```

357

358

### Canceled Job Registry

359

360

Track jobs that have been canceled.

361

362

```python { .api }

363

class CanceledJobRegistry(BaseRegistry):

364

"""Registry for canceled jobs."""

365

366

def add(self, job: 'Job', ttl: int = None, pipeline=None):

367

"""

368

Add canceled job to registry.

369

370

Args:

371

job (Job): Canceled job.

372

ttl (int): How long to keep cancellation record.

373

pipeline: Redis pipeline.

374

"""

375

376

# Usage

377

canceled_registry = queue.canceled_job_registry

378

print(f"Canceled jobs: {canceled_registry.count}")

379

380

# Analyze cancellation patterns

381

canceled_jobs = canceled_registry.get_jobs()

382

for job in canceled_jobs:

383

print(f"Canceled job {job.id}: {job.description}")

384

```

385

386

### Execution Registry

387

388

Track detailed execution records for jobs including multiple attempts.

389

390

```python { .api }

391

class ExecutionRegistry(BaseRegistry):

392

"""Registry for detailed job execution records."""

393

394

def add_execution(

395

self,

396

job: 'Job',

397

status: str,

398

started_at: datetime = None,

399

ended_at: datetime = None,

400

result=None,

401

exc_info: str = None

402

):

403

"""

404

Add execution record for a job.

405

406

Args:

407

job (Job): Job that was executed.

408

status (str): Execution status.

409

started_at (datetime): Execution start time.

410

ended_at (datetime): Execution end time.

411

result: Execution result.

412

exc_info (str): Exception information if failed.

413

"""

414

415

def get_executions(self, job: 'Job') -> list['Execution']:

416

"""

417

Get all execution records for a job.

418

419

Args:

420

job (Job): Job to get executions for.

421

422

Returns:

423

list[Execution]: Execution records.

424

"""

425

426

class Execution:

427

"""Represents a single job execution attempt."""

428

429

@property

430

def job_id(self) -> str:

431

"""Job identifier."""

432

433

@property

434

def status(self) -> str:

435

"""Execution status."""

436

437

@property

438

def started_at(self) -> datetime:

439

"""When execution started."""

440

441

@property

442

def ended_at(self) -> datetime:

443

"""When execution ended."""

444

445

@property

446

def result(self):

447

"""Execution result."""

448

449

@property

450

def exc_info(self) -> str:

451

"""Exception information if failed."""

452

453

# Usage

454

execution_registry = queue.execution_registry

455

executions = execution_registry.get_executions(job)

456

print(f"Job {job.id} has {len(executions)} execution attempts")

457

```

458

459

## Monitoring and Analytics

460

461

### System Health Monitoring

462

463

```python

464

from rq import Queue, Worker

465

import redis

466

from datetime import datetime, timedelta

467

468

conn = redis.Redis()

469

470

def system_health_check(queue_names: list[str]) -> dict:

471

"""

472

Comprehensive system health check.

473

474

Args:

475

queue_names (list[str]): Queues to monitor.

476

477

Returns:

478

dict: System health metrics.

479

"""

480

health_report = {

481

'timestamp': datetime.now().isoformat(),

482

'queues': {},

483

'workers': {},

484

'overall_status': 'healthy'

485

}

486

487

# Check each queue

488

for queue_name in queue_names:

489

queue = Queue(queue_name, connection=conn)

490

491

queue_health = {

492

'name': queue_name,

493

'queued_jobs': queue.count,

494

'is_empty': queue.is_empty,

495

'registries': {

496

'started': queue.started_job_registry.count,

497

'finished': queue.finished_job_registry.count,

498

'failed': queue.failed_job_registry.count,

499

'deferred': queue.deferred_job_registry.count,

500

'scheduled': queue.scheduled_job_registry.count,

501

'canceled': queue.canceled_job_registry.count

502

}

503

}

504

505

# Calculate health score

506

total_jobs = sum(queue_health['registries'].values()) + queue_health['queued_jobs']

507

if total_jobs > 0:

508

failure_rate = queue_health['registries']['failed'] / total_jobs

509

if failure_rate > 0.1: # More than 10% failure rate

510

queue_health['status'] = 'unhealthy'

511

health_report['overall_status'] = 'degraded'

512

elif failure_rate > 0.05: # More than 5% failure rate

513

queue_health['status'] = 'warning'

514

if health_report['overall_status'] == 'healthy':

515

health_report['overall_status'] = 'warning'

516

else:

517

queue_health['status'] = 'healthy'

518

else:

519

queue_health['status'] = 'idle'

520

521

health_report['queues'][queue_name] = queue_health

522

523

# Check workers

524

workers_info = {

525

'active_count': Worker.count(connection=conn),

526

'workers': []

527

}

528

529

for worker in Worker.all(connection=conn):

530

worker_info = {

531

'name': worker.name,

532

'queues': worker.queue_names(),

533

'successful_jobs': worker.successful_job_count,

534

'failed_jobs': worker.failed_job_count,

535

'current_job': worker.current_job.id if worker.current_job else None,

536

'last_heartbeat': worker.last_heartbeat.isoformat() if worker.last_heartbeat else None

537

}

538

workers_info['workers'].append(worker_info)

539

540

health_report['workers'] = workers_info

541

542

return health_report

543

544

# Generate health report

545

queues_to_monitor = ['high_priority', 'normal', 'low_priority']

546

health = system_health_check(queues_to_monitor)

547

548

print(f"System Status: {health['overall_status']}")

549

print(f"Active Workers: {health['workers']['active_count']}")

550

551

for queue_name, queue_info in health['queues'].items():

552

print(f"\nQueue: {queue_name} ({queue_info['status']})")

553

print(f" Queued: {queue_info['queued_jobs']}")

554

print(f" Running: {queue_info['registries']['started']}")

555

print(f" Failed: {queue_info['registries']['failed']}")

556

print(f" Completed: {queue_info['registries']['finished']}")

557

```

558

559

### Job Analytics and Reporting

560

561

```python

562

from rq import Queue

563

import redis

564

from collections import defaultdict

565

from datetime import datetime, timedelta

566

567

conn = redis.Redis()

568

569

def generate_job_analytics(queue_name: str, hours: int = 24) -> dict:

570

"""

571

Generate analytics for job processing over time period.

572

573

Args:

574

queue_name (str): Queue to analyze.

575

hours (int): Hours to look back.

576

577

Returns:

578

dict: Analytics report.

579

"""

580

queue = Queue(queue_name, connection=conn)

581

cutoff_time = datetime.now() - timedelta(hours=hours)

582

583

# Get jobs from different registries

584

finished_jobs = [

585

job for job in queue.finished_job_registry.get_jobs()

586

if job.ended_at and job.ended_at > cutoff_time

587

]

588

589

failed_jobs = [

590

job for job in queue.failed_job_registry.get_jobs()

591

if job.ended_at and job.ended_at > cutoff_time

592

]

593

594

# Calculate metrics

595

total_completed = len(finished_jobs)

596

total_failed = len(failed_jobs)

597

total_processed = total_completed + total_failed

598

599

# Success rate

600

success_rate = (total_completed / total_processed * 100) if total_processed > 0 else 0

601

602

# Processing times

603

processing_times = [

604

(job.ended_at - job.started_at).total_seconds()

605

for job in finished_jobs

606

if job.started_at and job.ended_at

607

]

608

609

avg_processing_time = sum(processing_times) / len(processing_times) if processing_times else 0

610

611

# Function analysis

612

function_stats = defaultdict(lambda: {'count': 0, 'failures': 0})

613

614

for job in finished_jobs + failed_jobs:

615

func_name = job.func_name or 'unknown'

616

function_stats[func_name]['count'] += 1

617

if job in failed_jobs:

618

function_stats[func_name]['failures'] += 1

619

620

# Hourly breakdown

621

hourly_stats = defaultdict(lambda: {'completed': 0, 'failed': 0})

622

623

for job in finished_jobs:

624

if job.ended_at:

625

hour_key = job.ended_at.strftime('%Y-%m-%d %H:00')

626

hourly_stats[hour_key]['completed'] += 1

627

628

for job in failed_jobs:

629

if job.ended_at:

630

hour_key = job.ended_at.strftime('%Y-%m-%d %H:00')

631

hourly_stats[hour_key]['failed'] += 1

632

633

return {

634

'queue_name': queue_name,

635

'period_hours': hours,

636

'summary': {

637

'total_processed': total_processed,

638

'total_completed': total_completed,

639

'total_failed': total_failed,

640

'success_rate': round(success_rate, 2),

641

'avg_processing_time': round(avg_processing_time, 2)

642

},

643

'current_state': {

644

'queued': queue.count,

645

'running': queue.started_job_registry.count,

646

'scheduled': queue.scheduled_job_registry.count

647

},

648

'function_stats': dict(function_stats),

649

'hourly_breakdown': dict(hourly_stats)

650

}

651

652

# Generate analytics report

653

analytics = generate_job_analytics('data_processing', hours=24)

654

655

print(f"Analytics for {analytics['queue_name']} (last {analytics['period_hours']} hours)")

656

print(f"Success Rate: {analytics['summary']['success_rate']}%")

657

print(f"Average Processing Time: {analytics['summary']['avg_processing_time']}s")

658

print(f"Total Processed: {analytics['summary']['total_processed']}")

659

660

print("\nFunction Performance:")

661

for func, stats in analytics['function_stats'].items():

662

failure_rate = (stats['failures'] / stats['count'] * 100) if stats['count'] > 0 else 0

663

print(f" {func}: {stats['count']} jobs, {failure_rate:.1f}% failure rate")

664

665

print(f"\nCurrent State:")

666

print(f" Queued: {analytics['current_state']['queued']}")

667

print(f" Running: {analytics['current_state']['running']}")

668

print(f" Scheduled: {analytics['current_state']['scheduled']}")

669

```

670

671

### Real-time Monitoring Dashboard

672

673

```python

674

from rq import Queue, Worker

675

import redis

676

import time

677

import json

678

679

conn = redis.Redis()

680

681

class RQMonitor:

682

"""Real-time RQ monitoring dashboard."""

683

684

def __init__(self, queue_names: list[str]):

685

self.queue_names = queue_names

686

self.queues = {name: Queue(name, connection=conn) for name in queue_names}

687

688

def get_current_snapshot(self) -> dict:

689

"""Get current system snapshot."""

690

snapshot = {

691

'timestamp': datetime.now().isoformat(),

692

'queues': {},

693

'workers': self._get_worker_info(),

694

'system': self._get_system_info()

695

}

696

697

for name, queue in self.queues.items():

698

snapshot['queues'][name] = {

699

'queued': queue.count,

700

'started': queue.started_job_registry.count,

701

'finished': queue.finished_job_registry.count,

702

'failed': queue.failed_job_registry.count,

703

'deferred': queue.deferred_job_registry.count,

704

'scheduled': queue.scheduled_job_registry.count,

705

'canceled': queue.canceled_job_registry.count

706

}

707

708

return snapshot

709

710

def _get_worker_info(self) -> dict:

711

"""Get worker information."""

712

workers = Worker.all(connection=conn)

713

return {

714

'count': len(workers),

715

'details': [

716

{

717

'name': w.name,

718

'queues': w.queue_names(),

719

'current_job': w.current_job.id if w.current_job else None,

720

'successful': w.successful_job_count,

721

'failed': w.failed_job_count

722

}

723

for w in workers

724

]

725

}

726

727

def _get_system_info(self) -> dict:

728

"""Get system-level information."""

729

total_queued = sum(q.count for q in self.queues.values())

730

total_running = sum(q.started_job_registry.count for q in self.queues.values())

731

total_failed = sum(q.failed_job_registry.count for q in self.queues.values())

732

733

return {

734

'total_queued': total_queued,

735

'total_running': total_running,

736

'total_failed': total_failed,

737

'redis_info': self._get_redis_info()

738

}

739

740

def _get_redis_info(self) -> dict:

741

"""Get Redis server information."""

742

info = conn.info()

743

return {

744

'version': info.get('redis_version'),

745

'memory_used': info.get('used_memory_human'),

746

'connected_clients': info.get('connected_clients'),

747

'uptime': info.get('uptime_in_seconds')

748

}

749

750

def start_monitoring(self, interval: int = 5):

751

"""Start real-time monitoring with specified interval."""

752

print("Starting RQ Monitor...")

753

print("Press Ctrl+C to stop")

754

755

try:

756

while True:

757

snapshot = self.get_current_snapshot()

758

self._display_snapshot(snapshot)

759

time.sleep(interval)

760

except KeyboardInterrupt:

761

print("\nMonitoring stopped")

762

763

def _display_snapshot(self, snapshot: dict):

764

"""Display monitoring snapshot."""

765

print("\n" + "="*50)

766

print(f"RQ System Status - {snapshot['timestamp']}")

767

print("="*50)

768

769

# Workers

770

print(f"Workers: {snapshot['workers']['count']} active")

771

for worker in snapshot['workers']['details']:

772

status = f"processing {worker['current_job']}" if worker['current_job'] else "idle"

773

print(f" {worker['name']}: {status} (✓{worker['successful']} ✗{worker['failed']})")

774

775

# Queues

776

print("\nQueues:")

777

for name, stats in snapshot['queues'].items():

778

print(f" {name}:")

779

print(f" Queued: {stats['queued']}, Running: {stats['started']}")

780

print(f" ✓ {stats['finished']}, ✗ {stats['failed']}, ⏰ {stats['scheduled']}")

781

782

# System

783

sys_info = snapshot['system']

784

print(f"\nSystem: {sys_info['total_queued']} queued, {sys_info['total_running']} running")

785

print(f"Redis: {sys_info['redis_info']['version']}, {sys_info['redis_info']['memory_used']} memory")

786

787

# Usage

788

monitor = RQMonitor(['high_priority', 'normal', 'background'])

789

monitor.start_monitoring(interval=10) # Update every 10 seconds

790

```