CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq

RQ is a simple, lightweight, library for creating background jobs, and processing them.

Pending
Overview
Eval results
Files

registries-monitoring.mddocs/

Registries and Monitoring

Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, system health monitoring, and operational insights for distributed job processing systems.

Capabilities

Base Registry Operations

Common functionality shared across all job registries.

class BaseRegistry:
    def __init__(
        self,
        name: str = 'default',
        connection=None,
        job_class=None,
        queue: 'Queue' = None,
        serializer=None,
        death_penalty_class=None
    ):
        """
        Initialize a job registry.
        
        Args:
            name (str): Registry name, typically matches queue name.
            connection: Redis connection instance.
            job_class: Job class for deserialization.
            queue (Queue): Associated queue instance.
            serializer: Custom serializer for job data.
            death_penalty_class: Death penalty class for timeouts.
        """

    @property
    def name(self) -> str:
        """Registry name."""

    @property
    def key(self) -> str:
        """Redis key for this registry."""

    @property
    def connection(self):
        """Redis connection instance."""

    @property
    def count(self) -> int:
        """Number of jobs in the registry."""

    def get_job_count(self, cleanup: bool = True) -> int:
        """
        Get count of jobs in registry.
        
        Args:
            cleanup (bool): Remove expired jobs before counting.
            
        Returns:
            int: Number of jobs in registry.
        """

    def get_job_ids(self, start: int = 0, end: int = -1) -> list[str]:
        """
        Get job IDs from registry.
        
        Args:
            start (int): Start index.
            end (int): End index (-1 for all).
            
        Returns:
            list[str]: Job IDs in registry.
        """

    def get_jobs(self, start: int = 0, end: int = -1) -> list['Job']:
        """
        Get jobs from registry.
        
        Args:
            start (int): Start index.
            end (int): End index (-1 for all).
            
        Returns:
            list[Job]: Jobs in registry.
        """

    def add(self, job: 'Job', ttl: int = None, pipeline=None):
        """
        Add job to registry.
        
        Args:
            job (Job): Job to add.
            ttl (int): Time-to-live in seconds.
            pipeline: Redis pipeline for batched operations.
        """

    def remove(self, job: 'Job', pipeline=None):
        """
        Remove job from registry.
        
        Args:
            job (Job): Job to remove.
            pipeline: Redis pipeline for batched operations.
        """

    def cleanup(self, timestamp: datetime = None) -> int:
        """
        Remove expired jobs from registry.
        
        Args:
            timestamp (datetime): Reference timestamp for expiration.
            
        Returns:
            int: Number of jobs removed.
        """

Started Job Registry

Track jobs currently being executed by workers.

class StartedJobRegistry(BaseRegistry):
    """Registry for jobs currently being processed by workers."""

    def cleanup(self, timestamp: datetime = None) -> int:
        """
        Clean up stale started jobs (jobs whose workers have died).
        
        Args:
            timestamp (datetime): Reference timestamp.
            
        Returns:
            int: Number of stale jobs cleaned up.
        """

    def get_expiration_time(self, job: 'Job') -> datetime:
        """
        Get when a started job should be considered stale.
        
        Args:
            job (Job): Job to check.
            
        Returns:
            datetime: Expiration timestamp.
        """

# Usage
started_registry = queue.started_job_registry
print(f"Jobs currently running: {started_registry.count}")

# Get detailed information about running jobs
running_jobs = started_registry.get_jobs()
for job in running_jobs:
    print(f"Job {job.id}: {job.description} (started: {job.started_at})")

Finished Job Registry

Track successfully completed jobs with results.

class FinishedJobRegistry(BaseRegistry):
    """Registry for successfully completed jobs."""

    def add(self, job: 'Job', ttl: int = None, pipeline=None):
        """
        Add completed job to finished registry.
        
        Args:
            job (Job): Completed job.
            ttl (int): How long to keep the job record.
            pipeline: Redis pipeline.
        """

    def get_job_results(self, start: int = 0, end: int = -1) -> list[tuple['Job', Any]]:
        """
        Get jobs with their results.
        
        Args:
            start (int): Start index.
            end (int): End index.
            
        Returns:
            list[tuple[Job, Any]]: List of (job, result) tuples.
        """

# Usage
finished_registry = queue.finished_job_registry
print(f"Completed jobs: {finished_registry.count}")

# Get recent successful jobs
recent_jobs = finished_registry.get_jobs(start=0, end=10)
for job in recent_jobs:
    print(f"Job {job.id}: {job.description} -> {job.result}")

Failed Job Registry

Track failed jobs with error information and retry capabilities.

class FailedJobRegistry(BaseRegistry):
    """Registry for failed jobs with error information."""

    def add(self, job: 'Job', ttl: int = None, pipeline=None, exc_string: str = ''):
        """
        Add failed job to registry.
        
        Args:
            job (Job): Failed job.
            ttl (int): How long to keep failure information.
            pipeline: Redis pipeline.
            exc_string (str): Exception information string.
        """

    def requeue(self, job_or_id) -> 'Job':
        """
        Requeue a failed job for retry.
        
        Args:
            job_or_id: Job instance or job ID.
            
        Returns:
            Job: The requeued job.
        """

    def remove(self, job: 'Job', delete_job: bool = False):
        """
        Remove job from failed registry.
        
        Args:
            job (Job): Job to remove.
            delete_job (bool): Also delete the job data.
        """

    def get_job_failures(self) -> list[dict]:
        """
        Get failure information for all failed jobs.
        
        Returns:
            list[dict]: Failure details including exceptions.
        """

# Usage
failed_registry = queue.failed_job_registry
print(f"Failed jobs: {failed_registry.count}")

# Analyze failures
failed_jobs = failed_registry.get_jobs()
for job in failed_jobs:
    print(f"Failed job {job.id}: {job.exc_info}")
    
    # Optionally requeue for retry
    if should_retry(job):  # Your logic here
        failed_registry.requeue(job)
        print(f"Requeued job {job.id}")

Deferred Job Registry

Track jobs waiting for dependencies to complete.

class DeferredJobRegistry(BaseRegistry):
    """Registry for jobs waiting for dependencies."""

    def add(self, job: 'Job', ttl: int = None, pipeline=None):
        """
        Add job waiting for dependencies.
        
        Args:
            job (Job): Job to defer.
            ttl (int): How long to keep deferred.
            pipeline: Redis pipeline.
        """

    def requeue_dependents(self, dependency_job: 'Job') -> list['Job']:
        """
        Requeue jobs that were waiting for a dependency.
        
        Args:
            dependency_job (Job): Dependency that was completed.
            
        Returns:
            list[Job]: Jobs that were requeued.
        """

# Usage
deferred_registry = queue.deferred_job_registry
print(f"Jobs waiting for dependencies: {deferred_registry.count}")

# Check which jobs are waiting
waiting_jobs = deferred_registry.get_jobs()
for job in waiting_jobs:
    deps = job.fetch_dependencies()
    print(f"Job {job.id} waiting for {len(deps)} dependencies")

Scheduled Job Registry

Track jobs scheduled for future execution.

class ScheduledJobRegistry(BaseRegistry):
    """Registry for jobs scheduled for future execution."""

    def add(self, job: 'Job', ttl: int = None, pipeline=None):
        """
        Add scheduled job to registry.
        
        Args:
            job (Job): Job to schedule.
            ttl (int): Time-to-live.
            pipeline: Redis pipeline.
        """

    def get_scheduled_time(self, job: 'Job') -> datetime:
        """
        Get scheduled execution time for a job.
        
        Args:
            job (Job): Scheduled job.
            
        Returns:
            datetime: When job is scheduled to run.
        """

    def get_jobs_to_schedule(self, timestamp: datetime = None) -> list['Job']:
        """
        Get jobs that should be moved to queue for execution.
        
        Args:
            timestamp (datetime): Current time reference.
            
        Returns:
            list[Job]: Jobs ready for execution.
        """

    def schedule_job(self, job: 'Job', scheduled_time: datetime, pipeline=None):
        """
        Schedule a job for future execution.
        
        Args:
            job (Job): Job to schedule.
            scheduled_time (datetime): When to execute.
            pipeline: Redis pipeline.
        """

# Usage
scheduled_registry = queue.scheduled_job_registry
print(f"Scheduled jobs: {scheduled_registry.count}")

# Check upcoming jobs
upcoming_jobs = scheduled_registry.get_jobs()
for job in upcoming_jobs:
    scheduled_time = scheduled_registry.get_scheduled_time(job)
    print(f"Job {job.id} scheduled for {scheduled_time}")

Canceled Job Registry

Track jobs that have been canceled.

class CanceledJobRegistry(BaseRegistry):
    """Registry for canceled jobs."""

    def add(self, job: 'Job', ttl: int = None, pipeline=None):
        """
        Add canceled job to registry.
        
        Args:
            job (Job): Canceled job.
            ttl (int): How long to keep cancellation record.
            pipeline: Redis pipeline.
        """

# Usage
canceled_registry = queue.canceled_job_registry
print(f"Canceled jobs: {canceled_registry.count}")

# Analyze cancellation patterns
canceled_jobs = canceled_registry.get_jobs()
for job in canceled_jobs:
    print(f"Canceled job {job.id}: {job.description}")

Execution Registry

Track detailed execution records for jobs including multiple attempts.

class ExecutionRegistry(BaseRegistry):
    """Registry for detailed job execution records."""

    def add_execution(
        self,
        job: 'Job',
        status: str,
        started_at: datetime = None,
        ended_at: datetime = None,
        result=None,
        exc_info: str = None
    ):
        """
        Add execution record for a job.
        
        Args:
            job (Job): Job that was executed.
            status (str): Execution status.
            started_at (datetime): Execution start time.
            ended_at (datetime): Execution end time.
            result: Execution result.
            exc_info (str): Exception information if failed.
        """

    def get_executions(self, job: 'Job') -> list['Execution']:
        """
        Get all execution records for a job.
        
        Args:
            job (Job): Job to get executions for.
            
        Returns:
            list[Execution]: Execution records.
        """

class Execution:
    """Represents a single job execution attempt."""
    
    @property
    def job_id(self) -> str:
        """Job identifier."""
    
    @property
    def status(self) -> str:
        """Execution status."""
    
    @property
    def started_at(self) -> datetime:
        """When execution started."""
    
    @property
    def ended_at(self) -> datetime:
        """When execution ended."""
    
    @property
    def result(self):
        """Execution result."""
    
    @property
    def exc_info(self) -> str:
        """Exception information if failed."""

# Usage
execution_registry = queue.execution_registry
executions = execution_registry.get_executions(job)
print(f"Job {job.id} has {len(executions)} execution attempts")

Monitoring and Analytics

System Health Monitoring

from rq import Queue, Worker
import redis
from datetime import datetime, timedelta

conn = redis.Redis()

def system_health_check(queue_names: list[str]) -> dict:
    """
    Comprehensive system health check.
    
    Args:
        queue_names (list[str]): Queues to monitor.
        
    Returns:
        dict: System health metrics.
    """
    health_report = {
        'timestamp': datetime.now().isoformat(),
        'queues': {},
        'workers': {},
        'overall_status': 'healthy'
    }
    
    # Check each queue
    for queue_name in queue_names:
        queue = Queue(queue_name, connection=conn)
        
        queue_health = {
            'name': queue_name,
            'queued_jobs': queue.count,
            'is_empty': queue.is_empty,
            'registries': {
                'started': queue.started_job_registry.count,
                'finished': queue.finished_job_registry.count,
                'failed': queue.failed_job_registry.count,
                'deferred': queue.deferred_job_registry.count,
                'scheduled': queue.scheduled_job_registry.count,
                'canceled': queue.canceled_job_registry.count
            }
        }
        
        # Calculate health score
        total_jobs = sum(queue_health['registries'].values()) + queue_health['queued_jobs']
        if total_jobs > 0:
            failure_rate = queue_health['registries']['failed'] / total_jobs
            if failure_rate > 0.1:  # More than 10% failure rate
                queue_health['status'] = 'unhealthy'
                health_report['overall_status'] = 'degraded'
            elif failure_rate > 0.05:  # More than 5% failure rate
                queue_health['status'] = 'warning'
                if health_report['overall_status'] == 'healthy':
                    health_report['overall_status'] = 'warning'
            else:
                queue_health['status'] = 'healthy'
        else:
            queue_health['status'] = 'idle'
        
        health_report['queues'][queue_name] = queue_health
    
    # Check workers
    workers_info = {
        'active_count': Worker.count(connection=conn),
        'workers': []
    }
    
    for worker in Worker.all(connection=conn):
        worker_info = {
            'name': worker.name,
            'queues': worker.queue_names(),
            'successful_jobs': worker.successful_job_count,
            'failed_jobs': worker.failed_job_count,
            'current_job': worker.current_job.id if worker.current_job else None,
            'last_heartbeat': worker.last_heartbeat.isoformat() if worker.last_heartbeat else None
        }
        workers_info['workers'].append(worker_info)
    
    health_report['workers'] = workers_info
    
    return health_report

# Generate health report
queues_to_monitor = ['high_priority', 'normal', 'low_priority']
health = system_health_check(queues_to_monitor)

print(f"System Status: {health['overall_status']}")
print(f"Active Workers: {health['workers']['active_count']}")

for queue_name, queue_info in health['queues'].items():
    print(f"\nQueue: {queue_name} ({queue_info['status']})")
    print(f"  Queued: {queue_info['queued_jobs']}")
    print(f"  Running: {queue_info['registries']['started']}")
    print(f"  Failed: {queue_info['registries']['failed']}")
    print(f"  Completed: {queue_info['registries']['finished']}")

Job Analytics and Reporting

from rq import Queue
import redis
from collections import defaultdict
from datetime import datetime, timedelta

conn = redis.Redis()

def generate_job_analytics(queue_name: str, hours: int = 24) -> dict:
    """
    Generate analytics for job processing over time period.
    
    Args:
        queue_name (str): Queue to analyze.
        hours (int): Hours to look back.
        
    Returns:
        dict: Analytics report.
    """
    queue = Queue(queue_name, connection=conn)
    cutoff_time = datetime.now() - timedelta(hours=hours)
    
    # Get jobs from different registries
    finished_jobs = [
        job for job in queue.finished_job_registry.get_jobs()
        if job.ended_at and job.ended_at > cutoff_time
    ]
    
    failed_jobs = [
        job for job in queue.failed_job_registry.get_jobs()
        if job.ended_at and job.ended_at > cutoff_time
    ]
    
    # Calculate metrics
    total_completed = len(finished_jobs)
    total_failed = len(failed_jobs)
    total_processed = total_completed + total_failed
    
    # Success rate
    success_rate = (total_completed / total_processed * 100) if total_processed > 0 else 0
    
    # Processing times
    processing_times = [
        (job.ended_at - job.started_at).total_seconds()
        for job in finished_jobs
        if job.started_at and job.ended_at
    ]
    
    avg_processing_time = sum(processing_times) / len(processing_times) if processing_times else 0
    
    # Function analysis
    function_stats = defaultdict(lambda: {'count': 0, 'failures': 0})
    
    for job in finished_jobs + failed_jobs:
        func_name = job.func_name or 'unknown'
        function_stats[func_name]['count'] += 1
        if job in failed_jobs:
            function_stats[func_name]['failures'] += 1
    
    # Hourly breakdown
    hourly_stats = defaultdict(lambda: {'completed': 0, 'failed': 0})
    
    for job in finished_jobs:
        if job.ended_at:
            hour_key = job.ended_at.strftime('%Y-%m-%d %H:00')
            hourly_stats[hour_key]['completed'] += 1
    
    for job in failed_jobs:
        if job.ended_at:
            hour_key = job.ended_at.strftime('%Y-%m-%d %H:00')
            hourly_stats[hour_key]['failed'] += 1
    
    return {
        'queue_name': queue_name,
        'period_hours': hours,
        'summary': {
            'total_processed': total_processed,
            'total_completed': total_completed,
            'total_failed': total_failed,
            'success_rate': round(success_rate, 2),
            'avg_processing_time': round(avg_processing_time, 2)
        },
        'current_state': {
            'queued': queue.count,
            'running': queue.started_job_registry.count,
            'scheduled': queue.scheduled_job_registry.count
        },
        'function_stats': dict(function_stats),
        'hourly_breakdown': dict(hourly_stats)
    }

# Generate analytics report
analytics = generate_job_analytics('data_processing', hours=24)

print(f"Analytics for {analytics['queue_name']} (last {analytics['period_hours']} hours)")
print(f"Success Rate: {analytics['summary']['success_rate']}%")
print(f"Average Processing Time: {analytics['summary']['avg_processing_time']}s")
print(f"Total Processed: {analytics['summary']['total_processed']}")

print("\nFunction Performance:")
for func, stats in analytics['function_stats'].items():
    failure_rate = (stats['failures'] / stats['count'] * 100) if stats['count'] > 0 else 0
    print(f"  {func}: {stats['count']} jobs, {failure_rate:.1f}% failure rate")

print(f"\nCurrent State:")
print(f"  Queued: {analytics['current_state']['queued']}")
print(f"  Running: {analytics['current_state']['running']}")
print(f"  Scheduled: {analytics['current_state']['scheduled']}")

Real-time Monitoring Dashboard

from rq import Queue, Worker
import redis
import time
import json

conn = redis.Redis()

class RQMonitor:
    """Real-time RQ monitoring dashboard."""
    
    def __init__(self, queue_names: list[str]):
        self.queue_names = queue_names
        self.queues = {name: Queue(name, connection=conn) for name in queue_names}
    
    def get_current_snapshot(self) -> dict:
        """Get current system snapshot."""
        snapshot = {
            'timestamp': datetime.now().isoformat(),
            'queues': {},
            'workers': self._get_worker_info(),
            'system': self._get_system_info()
        }
        
        for name, queue in self.queues.items():
            snapshot['queues'][name] = {
                'queued': queue.count,
                'started': queue.started_job_registry.count,
                'finished': queue.finished_job_registry.count,
                'failed': queue.failed_job_registry.count,
                'deferred': queue.deferred_job_registry.count,
                'scheduled': queue.scheduled_job_registry.count,
                'canceled': queue.canceled_job_registry.count
            }
        
        return snapshot
    
    def _get_worker_info(self) -> dict:
        """Get worker information."""
        workers = Worker.all(connection=conn)
        return {
            'count': len(workers),
            'details': [
                {
                    'name': w.name,
                    'queues': w.queue_names(),
                    'current_job': w.current_job.id if w.current_job else None,
                    'successful': w.successful_job_count,
                    'failed': w.failed_job_count
                }
                for w in workers
            ]
        }
    
    def _get_system_info(self) -> dict:
        """Get system-level information."""
        total_queued = sum(q.count for q in self.queues.values())
        total_running = sum(q.started_job_registry.count for q in self.queues.values())
        total_failed = sum(q.failed_job_registry.count for q in self.queues.values())
        
        return {
            'total_queued': total_queued,
            'total_running': total_running,
            'total_failed': total_failed,
            'redis_info': self._get_redis_info()
        }
    
    def _get_redis_info(self) -> dict:
        """Get Redis server information."""
        info = conn.info()
        return {
            'version': info.get('redis_version'),
            'memory_used': info.get('used_memory_human'),
            'connected_clients': info.get('connected_clients'),
            'uptime': info.get('uptime_in_seconds')
        }
    
    def start_monitoring(self, interval: int = 5):
        """Start real-time monitoring with specified interval."""
        print("Starting RQ Monitor...")
        print("Press Ctrl+C to stop")
        
        try:
            while True:
                snapshot = self.get_current_snapshot()
                self._display_snapshot(snapshot)
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\nMonitoring stopped")
    
    def _display_snapshot(self, snapshot: dict):
        """Display monitoring snapshot."""
        print("\n" + "="*50)
        print(f"RQ System Status - {snapshot['timestamp']}")
        print("="*50)
        
        # Workers
        print(f"Workers: {snapshot['workers']['count']} active")
        for worker in snapshot['workers']['details']:
            status = f"processing {worker['current_job']}" if worker['current_job'] else "idle"
            print(f"  {worker['name']}: {status} (✓{worker['successful']} ✗{worker['failed']})")
        
        # Queues
        print("\nQueues:")
        for name, stats in snapshot['queues'].items():
            print(f"  {name}:")
            print(f"    Queued: {stats['queued']}, Running: {stats['started']}")
            print(f"    ✓ {stats['finished']}, ✗ {stats['failed']}, ⏰ {stats['scheduled']}")
        
        # System
        sys_info = snapshot['system']
        print(f"\nSystem: {sys_info['total_queued']} queued, {sys_info['total_running']} running")
        print(f"Redis: {sys_info['redis_info']['version']}, {sys_info['redis_info']['memory_used']} memory")

# Usage
monitor = RQMonitor(['high_priority', 'normal', 'background'])
monitor.start_monitoring(interval=10)  # Update every 10 seconds

Install with Tessl CLI

npx tessl i tessl/pypi-rq

docs

index.md

job-management.md

job-patterns.md

queue-operations.md

registries-monitoring.md

worker-management.md

tile.json