0
# Registries and Monitoring
1
2
Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, system health monitoring, and operational insights for distributed job processing systems.
3
4
## Capabilities
5
6
### Base Registry Operations
7
8
Common functionality shared across all job registries.
9
10
```python { .api }
11
class BaseRegistry:
12
def __init__(
13
self,
14
name: str = 'default',
15
connection=None,
16
job_class=None,
17
queue: 'Queue' = None,
18
serializer=None,
19
death_penalty_class=None
20
):
21
"""
22
Initialize a job registry.
23
24
Args:
25
name (str): Registry name, typically matches queue name.
26
connection: Redis connection instance.
27
job_class: Job class for deserialization.
28
queue (Queue): Associated queue instance.
29
serializer: Custom serializer for job data.
30
death_penalty_class: Death penalty class for timeouts.
31
"""
32
33
@property
34
def name(self) -> str:
35
"""Registry name."""
36
37
@property
38
def key(self) -> str:
39
"""Redis key for this registry."""
40
41
@property
42
def connection(self):
43
"""Redis connection instance."""
44
45
@property
46
def count(self) -> int:
47
"""Number of jobs in the registry."""
48
49
def get_job_count(self, cleanup: bool = True) -> int:
50
"""
51
Get count of jobs in registry.
52
53
Args:
54
cleanup (bool): Remove expired jobs before counting.
55
56
Returns:
57
int: Number of jobs in registry.
58
"""
59
60
def get_job_ids(self, start: int = 0, end: int = -1) -> list[str]:
61
"""
62
Get job IDs from registry.
63
64
Args:
65
start (int): Start index.
66
end (int): End index (-1 for all).
67
68
Returns:
69
list[str]: Job IDs in registry.
70
"""
71
72
def get_jobs(self, start: int = 0, end: int = -1) -> list['Job']:
73
"""
74
Get jobs from registry.
75
76
Args:
77
start (int): Start index.
78
end (int): End index (-1 for all).
79
80
Returns:
81
list[Job]: Jobs in registry.
82
"""
83
84
def add(self, job: 'Job', ttl: int = None, pipeline=None):
85
"""
86
Add job to registry.
87
88
Args:
89
job (Job): Job to add.
90
ttl (int): Time-to-live in seconds.
91
pipeline: Redis pipeline for batched operations.
92
"""
93
94
def remove(self, job: 'Job', pipeline=None):
95
"""
96
Remove job from registry.
97
98
Args:
99
job (Job): Job to remove.
100
pipeline: Redis pipeline for batched operations.
101
"""
102
103
def cleanup(self, timestamp: datetime = None) -> int:
104
"""
105
Remove expired jobs from registry.
106
107
Args:
108
timestamp (datetime): Reference timestamp for expiration.
109
110
Returns:
111
int: Number of jobs removed.
112
"""
113
```
114
115
### Started Job Registry
116
117
Track jobs currently being executed by workers.
118
119
```python { .api }
120
class StartedJobRegistry(BaseRegistry):
121
"""Registry for jobs currently being processed by workers."""
122
123
def cleanup(self, timestamp: datetime = None) -> int:
124
"""
125
Clean up stale started jobs (jobs whose workers have died).
126
127
Args:
128
timestamp (datetime): Reference timestamp.
129
130
Returns:
131
int: Number of stale jobs cleaned up.
132
"""
133
134
def get_expiration_time(self, job: 'Job') -> datetime:
135
"""
136
Get when a started job should be considered stale.
137
138
Args:
139
job (Job): Job to check.
140
141
Returns:
142
datetime: Expiration timestamp.
143
"""
144
145
# Usage
146
started_registry = queue.started_job_registry
147
print(f"Jobs currently running: {started_registry.count}")
148
149
# Get detailed information about running jobs
150
running_jobs = started_registry.get_jobs()
151
for job in running_jobs:
152
print(f"Job {job.id}: {job.description} (started: {job.started_at})")
153
```
154
155
### Finished Job Registry
156
157
Track successfully completed jobs with results.
158
159
```python { .api }
160
class FinishedJobRegistry(BaseRegistry):
161
"""Registry for successfully completed jobs."""
162
163
def add(self, job: 'Job', ttl: int = None, pipeline=None):
164
"""
165
Add completed job to finished registry.
166
167
Args:
168
job (Job): Completed job.
169
ttl (int): How long to keep the job record.
170
pipeline: Redis pipeline.
171
"""
172
173
def get_job_results(self, start: int = 0, end: int = -1) -> list[tuple['Job', Any]]:
174
"""
175
Get jobs with their results.
176
177
Args:
178
start (int): Start index.
179
end (int): End index.
180
181
Returns:
182
list[tuple[Job, Any]]: List of (job, result) tuples.
183
"""
184
185
# Usage
186
finished_registry = queue.finished_job_registry
187
print(f"Completed jobs: {finished_registry.count}")
188
189
# Get recent successful jobs
190
recent_jobs = finished_registry.get_jobs(start=0, end=10)
191
for job in recent_jobs:
192
print(f"Job {job.id}: {job.description} -> {job.result}")
193
```
194
195
### Failed Job Registry
196
197
Track failed jobs with error information and retry capabilities.
198
199
```python { .api }
200
class FailedJobRegistry(BaseRegistry):
201
"""Registry for failed jobs with error information."""
202
203
def add(self, job: 'Job', ttl: int = None, pipeline=None, exc_string: str = ''):
204
"""
205
Add failed job to registry.
206
207
Args:
208
job (Job): Failed job.
209
ttl (int): How long to keep failure information.
210
pipeline: Redis pipeline.
211
exc_string (str): Exception information string.
212
"""
213
214
def requeue(self, job_or_id) -> 'Job':
215
"""
216
Requeue a failed job for retry.
217
218
Args:
219
job_or_id: Job instance or job ID.
220
221
Returns:
222
Job: The requeued job.
223
"""
224
225
def remove(self, job: 'Job', delete_job: bool = False):
226
"""
227
Remove job from failed registry.
228
229
Args:
230
job (Job): Job to remove.
231
delete_job (bool): Also delete the job data.
232
"""
233
234
def get_job_failures(self) -> list[dict]:
235
"""
236
Get failure information for all failed jobs.
237
238
Returns:
239
list[dict]: Failure details including exceptions.
240
"""
241
242
# Usage
243
failed_registry = queue.failed_job_registry
244
print(f"Failed jobs: {failed_registry.count}")
245
246
# Analyze failures
247
failed_jobs = failed_registry.get_jobs()
248
for job in failed_jobs:
249
print(f"Failed job {job.id}: {job.exc_info}")
250
251
# Optionally requeue for retry
252
if should_retry(job): # Your logic here
253
failed_registry.requeue(job)
254
print(f"Requeued job {job.id}")
255
```
256
257
### Deferred Job Registry
258
259
Track jobs waiting for dependencies to complete.
260
261
```python { .api }
262
class DeferredJobRegistry(BaseRegistry):
263
"""Registry for jobs waiting for dependencies."""
264
265
def add(self, job: 'Job', ttl: int = None, pipeline=None):
266
"""
267
Add job waiting for dependencies.
268
269
Args:
270
job (Job): Job to defer.
271
ttl (int): How long to keep deferred.
272
pipeline: Redis pipeline.
273
"""
274
275
def requeue_dependents(self, dependency_job: 'Job') -> list['Job']:
276
"""
277
Requeue jobs that were waiting for a dependency.
278
279
Args:
280
dependency_job (Job): Dependency that was completed.
281
282
Returns:
283
list[Job]: Jobs that were requeued.
284
"""
285
286
# Usage
287
deferred_registry = queue.deferred_job_registry
288
print(f"Jobs waiting for dependencies: {deferred_registry.count}")
289
290
# Check which jobs are waiting
291
waiting_jobs = deferred_registry.get_jobs()
292
for job in waiting_jobs:
293
deps = job.fetch_dependencies()
294
print(f"Job {job.id} waiting for {len(deps)} dependencies")
295
```
296
297
### Scheduled Job Registry
298
299
Track jobs scheduled for future execution.
300
301
```python { .api }
302
class ScheduledJobRegistry(BaseRegistry):
303
"""Registry for jobs scheduled for future execution."""
304
305
def add(self, job: 'Job', ttl: int = None, pipeline=None):
306
"""
307
Add scheduled job to registry.
308
309
Args:
310
job (Job): Job to schedule.
311
ttl (int): Time-to-live.
312
pipeline: Redis pipeline.
313
"""
314
315
def get_scheduled_time(self, job: 'Job') -> datetime:
316
"""
317
Get scheduled execution time for a job.
318
319
Args:
320
job (Job): Scheduled job.
321
322
Returns:
323
datetime: When job is scheduled to run.
324
"""
325
326
def get_jobs_to_schedule(self, timestamp: datetime = None) -> list['Job']:
327
"""
328
Get jobs that should be moved to queue for execution.
329
330
Args:
331
timestamp (datetime): Current time reference.
332
333
Returns:
334
list[Job]: Jobs ready for execution.
335
"""
336
337
def schedule_job(self, job: 'Job', scheduled_time: datetime, pipeline=None):
338
"""
339
Schedule a job for future execution.
340
341
Args:
342
job (Job): Job to schedule.
343
scheduled_time (datetime): When to execute.
344
pipeline: Redis pipeline.
345
"""
346
347
# Usage
348
scheduled_registry = queue.scheduled_job_registry
349
print(f"Scheduled jobs: {scheduled_registry.count}")
350
351
# Check upcoming jobs
352
upcoming_jobs = scheduled_registry.get_jobs()
353
for job in upcoming_jobs:
354
scheduled_time = scheduled_registry.get_scheduled_time(job)
355
print(f"Job {job.id} scheduled for {scheduled_time}")
356
```
357
358
### Canceled Job Registry
359
360
Track jobs that have been canceled.
361
362
```python { .api }
363
class CanceledJobRegistry(BaseRegistry):
364
"""Registry for canceled jobs."""
365
366
def add(self, job: 'Job', ttl: int = None, pipeline=None):
367
"""
368
Add canceled job to registry.
369
370
Args:
371
job (Job): Canceled job.
372
ttl (int): How long to keep cancellation record.
373
pipeline: Redis pipeline.
374
"""
375
376
# Usage
377
canceled_registry = queue.canceled_job_registry
378
print(f"Canceled jobs: {canceled_registry.count}")
379
380
# Analyze cancellation patterns
381
canceled_jobs = canceled_registry.get_jobs()
382
for job in canceled_jobs:
383
print(f"Canceled job {job.id}: {job.description}")
384
```
385
386
### Execution Registry
387
388
Track detailed execution records for jobs including multiple attempts.
389
390
```python { .api }
391
class ExecutionRegistry(BaseRegistry):
392
"""Registry for detailed job execution records."""
393
394
def add_execution(
395
self,
396
job: 'Job',
397
status: str,
398
started_at: datetime = None,
399
ended_at: datetime = None,
400
result=None,
401
exc_info: str = None
402
):
403
"""
404
Add execution record for a job.
405
406
Args:
407
job (Job): Job that was executed.
408
status (str): Execution status.
409
started_at (datetime): Execution start time.
410
ended_at (datetime): Execution end time.
411
result: Execution result.
412
exc_info (str): Exception information if failed.
413
"""
414
415
def get_executions(self, job: 'Job') -> list['Execution']:
416
"""
417
Get all execution records for a job.
418
419
Args:
420
job (Job): Job to get executions for.
421
422
Returns:
423
list[Execution]: Execution records.
424
"""
425
426
class Execution:
427
"""Represents a single job execution attempt."""
428
429
@property
430
def job_id(self) -> str:
431
"""Job identifier."""
432
433
@property
434
def status(self) -> str:
435
"""Execution status."""
436
437
@property
438
def started_at(self) -> datetime:
439
"""When execution started."""
440
441
@property
442
def ended_at(self) -> datetime:
443
"""When execution ended."""
444
445
@property
446
def result(self):
447
"""Execution result."""
448
449
@property
450
def exc_info(self) -> str:
451
"""Exception information if failed."""
452
453
# Usage
454
execution_registry = queue.execution_registry
455
executions = execution_registry.get_executions(job)
456
print(f"Job {job.id} has {len(executions)} execution attempts")
457
```
458
459
## Monitoring and Analytics
460
461
### System Health Monitoring
462
463
```python
464
from rq import Queue, Worker
465
import redis
466
from datetime import datetime, timedelta
467
468
conn = redis.Redis()
469
470
def system_health_check(queue_names: list[str]) -> dict:
471
"""
472
Comprehensive system health check.
473
474
Args:
475
queue_names (list[str]): Queues to monitor.
476
477
Returns:
478
dict: System health metrics.
479
"""
480
health_report = {
481
'timestamp': datetime.now().isoformat(),
482
'queues': {},
483
'workers': {},
484
'overall_status': 'healthy'
485
}
486
487
# Check each queue
488
for queue_name in queue_names:
489
queue = Queue(queue_name, connection=conn)
490
491
queue_health = {
492
'name': queue_name,
493
'queued_jobs': queue.count,
494
'is_empty': queue.is_empty,
495
'registries': {
496
'started': queue.started_job_registry.count,
497
'finished': queue.finished_job_registry.count,
498
'failed': queue.failed_job_registry.count,
499
'deferred': queue.deferred_job_registry.count,
500
'scheduled': queue.scheduled_job_registry.count,
501
'canceled': queue.canceled_job_registry.count
502
}
503
}
504
505
# Calculate health score
506
total_jobs = sum(queue_health['registries'].values()) + queue_health['queued_jobs']
507
if total_jobs > 0:
508
failure_rate = queue_health['registries']['failed'] / total_jobs
509
if failure_rate > 0.1: # More than 10% failure rate
510
queue_health['status'] = 'unhealthy'
511
health_report['overall_status'] = 'degraded'
512
elif failure_rate > 0.05: # More than 5% failure rate
513
queue_health['status'] = 'warning'
514
if health_report['overall_status'] == 'healthy':
515
health_report['overall_status'] = 'warning'
516
else:
517
queue_health['status'] = 'healthy'
518
else:
519
queue_health['status'] = 'idle'
520
521
health_report['queues'][queue_name] = queue_health
522
523
# Check workers
524
workers_info = {
525
'active_count': Worker.count(connection=conn),
526
'workers': []
527
}
528
529
for worker in Worker.all(connection=conn):
530
worker_info = {
531
'name': worker.name,
532
'queues': worker.queue_names(),
533
'successful_jobs': worker.successful_job_count,
534
'failed_jobs': worker.failed_job_count,
535
'current_job': worker.current_job.id if worker.current_job else None,
536
'last_heartbeat': worker.last_heartbeat.isoformat() if worker.last_heartbeat else None
537
}
538
workers_info['workers'].append(worker_info)
539
540
health_report['workers'] = workers_info
541
542
return health_report
543
544
# Generate health report
545
queues_to_monitor = ['high_priority', 'normal', 'low_priority']
546
health = system_health_check(queues_to_monitor)
547
548
print(f"System Status: {health['overall_status']}")
549
print(f"Active Workers: {health['workers']['active_count']}")
550
551
for queue_name, queue_info in health['queues'].items():
552
print(f"\nQueue: {queue_name} ({queue_info['status']})")
553
print(f" Queued: {queue_info['queued_jobs']}")
554
print(f" Running: {queue_info['registries']['started']}")
555
print(f" Failed: {queue_info['registries']['failed']}")
556
print(f" Completed: {queue_info['registries']['finished']}")
557
```
558
559
### Job Analytics and Reporting
560
561
```python
562
from rq import Queue
563
import redis
564
from collections import defaultdict
565
from datetime import datetime, timedelta
566
567
conn = redis.Redis()
568
569
def generate_job_analytics(queue_name: str, hours: int = 24) -> dict:
570
"""
571
Generate analytics for job processing over time period.
572
573
Args:
574
queue_name (str): Queue to analyze.
575
hours (int): Hours to look back.
576
577
Returns:
578
dict: Analytics report.
579
"""
580
queue = Queue(queue_name, connection=conn)
581
cutoff_time = datetime.now() - timedelta(hours=hours)
582
583
# Get jobs from different registries
584
finished_jobs = [
585
job for job in queue.finished_job_registry.get_jobs()
586
if job.ended_at and job.ended_at > cutoff_time
587
]
588
589
failed_jobs = [
590
job for job in queue.failed_job_registry.get_jobs()
591
if job.ended_at and job.ended_at > cutoff_time
592
]
593
594
# Calculate metrics
595
total_completed = len(finished_jobs)
596
total_failed = len(failed_jobs)
597
total_processed = total_completed + total_failed
598
599
# Success rate
600
success_rate = (total_completed / total_processed * 100) if total_processed > 0 else 0
601
602
# Processing times
603
processing_times = [
604
(job.ended_at - job.started_at).total_seconds()
605
for job in finished_jobs
606
if job.started_at and job.ended_at
607
]
608
609
avg_processing_time = sum(processing_times) / len(processing_times) if processing_times else 0
610
611
# Function analysis
612
function_stats = defaultdict(lambda: {'count': 0, 'failures': 0})
613
614
for job in finished_jobs + failed_jobs:
615
func_name = job.func_name or 'unknown'
616
function_stats[func_name]['count'] += 1
617
if job in failed_jobs:
618
function_stats[func_name]['failures'] += 1
619
620
# Hourly breakdown
621
hourly_stats = defaultdict(lambda: {'completed': 0, 'failed': 0})
622
623
for job in finished_jobs:
624
if job.ended_at:
625
hour_key = job.ended_at.strftime('%Y-%m-%d %H:00')
626
hourly_stats[hour_key]['completed'] += 1
627
628
for job in failed_jobs:
629
if job.ended_at:
630
hour_key = job.ended_at.strftime('%Y-%m-%d %H:00')
631
hourly_stats[hour_key]['failed'] += 1
632
633
return {
634
'queue_name': queue_name,
635
'period_hours': hours,
636
'summary': {
637
'total_processed': total_processed,
638
'total_completed': total_completed,
639
'total_failed': total_failed,
640
'success_rate': round(success_rate, 2),
641
'avg_processing_time': round(avg_processing_time, 2)
642
},
643
'current_state': {
644
'queued': queue.count,
645
'running': queue.started_job_registry.count,
646
'scheduled': queue.scheduled_job_registry.count
647
},
648
'function_stats': dict(function_stats),
649
'hourly_breakdown': dict(hourly_stats)
650
}
651
652
# Generate analytics report
653
analytics = generate_job_analytics('data_processing', hours=24)
654
655
print(f"Analytics for {analytics['queue_name']} (last {analytics['period_hours']} hours)")
656
print(f"Success Rate: {analytics['summary']['success_rate']}%")
657
print(f"Average Processing Time: {analytics['summary']['avg_processing_time']}s")
658
print(f"Total Processed: {analytics['summary']['total_processed']}")
659
660
print("\nFunction Performance:")
661
for func, stats in analytics['function_stats'].items():
662
failure_rate = (stats['failures'] / stats['count'] * 100) if stats['count'] > 0 else 0
663
print(f" {func}: {stats['count']} jobs, {failure_rate:.1f}% failure rate")
664
665
print(f"\nCurrent State:")
666
print(f" Queued: {analytics['current_state']['queued']}")
667
print(f" Running: {analytics['current_state']['running']}")
668
print(f" Scheduled: {analytics['current_state']['scheduled']}")
669
```
670
671
### Real-time Monitoring Dashboard
672
673
```python
674
from rq import Queue, Worker
675
import redis
676
import time
677
import json
678
679
conn = redis.Redis()
680
681
class RQMonitor:
682
"""Real-time RQ monitoring dashboard."""
683
684
def __init__(self, queue_names: list[str]):
685
self.queue_names = queue_names
686
self.queues = {name: Queue(name, connection=conn) for name in queue_names}
687
688
def get_current_snapshot(self) -> dict:
689
"""Get current system snapshot."""
690
snapshot = {
691
'timestamp': datetime.now().isoformat(),
692
'queues': {},
693
'workers': self._get_worker_info(),
694
'system': self._get_system_info()
695
}
696
697
for name, queue in self.queues.items():
698
snapshot['queues'][name] = {
699
'queued': queue.count,
700
'started': queue.started_job_registry.count,
701
'finished': queue.finished_job_registry.count,
702
'failed': queue.failed_job_registry.count,
703
'deferred': queue.deferred_job_registry.count,
704
'scheduled': queue.scheduled_job_registry.count,
705
'canceled': queue.canceled_job_registry.count
706
}
707
708
return snapshot
709
710
def _get_worker_info(self) -> dict:
711
"""Get worker information."""
712
workers = Worker.all(connection=conn)
713
return {
714
'count': len(workers),
715
'details': [
716
{
717
'name': w.name,
718
'queues': w.queue_names(),
719
'current_job': w.current_job.id if w.current_job else None,
720
'successful': w.successful_job_count,
721
'failed': w.failed_job_count
722
}
723
for w in workers
724
]
725
}
726
727
def _get_system_info(self) -> dict:
728
"""Get system-level information."""
729
total_queued = sum(q.count for q in self.queues.values())
730
total_running = sum(q.started_job_registry.count for q in self.queues.values())
731
total_failed = sum(q.failed_job_registry.count for q in self.queues.values())
732
733
return {
734
'total_queued': total_queued,
735
'total_running': total_running,
736
'total_failed': total_failed,
737
'redis_info': self._get_redis_info()
738
}
739
740
def _get_redis_info(self) -> dict:
741
"""Get Redis server information."""
742
info = conn.info()
743
return {
744
'version': info.get('redis_version'),
745
'memory_used': info.get('used_memory_human'),
746
'connected_clients': info.get('connected_clients'),
747
'uptime': info.get('uptime_in_seconds')
748
}
749
750
def start_monitoring(self, interval: int = 5):
751
"""Start real-time monitoring with specified interval."""
752
print("Starting RQ Monitor...")
753
print("Press Ctrl+C to stop")
754
755
try:
756
while True:
757
snapshot = self.get_current_snapshot()
758
self._display_snapshot(snapshot)
759
time.sleep(interval)
760
except KeyboardInterrupt:
761
print("\nMonitoring stopped")
762
763
def _display_snapshot(self, snapshot: dict):
764
"""Display monitoring snapshot."""
765
print("\n" + "="*50)
766
print(f"RQ System Status - {snapshot['timestamp']}")
767
print("="*50)
768
769
# Workers
770
print(f"Workers: {snapshot['workers']['count']} active")
771
for worker in snapshot['workers']['details']:
772
status = f"processing {worker['current_job']}" if worker['current_job'] else "idle"
773
print(f" {worker['name']}: {status} (✓{worker['successful']} ✗{worker['failed']})")
774
775
# Queues
776
print("\nQueues:")
777
for name, stats in snapshot['queues'].items():
778
print(f" {name}:")
779
print(f" Queued: {stats['queued']}, Running: {stats['started']}")
780
print(f" ✓ {stats['finished']}, ✗ {stats['failed']}, ⏰ {stats['scheduled']}")
781
782
# System
783
sys_info = snapshot['system']
784
print(f"\nSystem: {sys_info['total_queued']} queued, {sys_info['total_running']} running")
785
print(f"Redis: {sys_info['redis_info']['version']}, {sys_info['redis_info']['memory_used']} memory")
786
787
# Usage
788
monitor = RQMonitor(['high_priority', 'normal', 'background'])
789
monitor.start_monitoring(interval=10) # Update every 10 seconds
790
```