CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq

RQ is a simple, lightweight, library for creating background jobs, and processing them.

Pending
Overview
Eval results
Files

worker-management.mddocs/

Worker Management

Comprehensive worker process management for job execution with support for multiple queues, different execution strategies, monitoring, and flexible deployment options. RQ workers handle job lifecycle, error recovery, and provide robust distributed processing capabilities.

Capabilities

Worker Creation and Configuration

Create and configure workers with various execution strategies and options.

class Worker:
    def __init__(
        self,
        queues,
        name: str = None,
        default_result_ttl=500,
        connection=None,
        exc_handler=None,
        exception_handlers=None,
        maintenance_interval: int = 600,
        default_worker_ttl: int = None,
        worker_ttl: int = None,
        job_class=None,
        queue_class=None,
        log_job_description: bool = True,
        job_monitoring_interval=30,
        disable_default_exception_handler: bool = False,
        prepare_for_work: bool = True,
        serializer=None,
        work_horse_killed_handler=None
    ):
        """
        Initialize a Worker instance.
        
        Args:
            queues: Queue instances or names to process.
            name (str): Worker name. Auto-generated if None.
            default_result_ttl (int): Default result TTL in seconds.
            connection: Redis connection instance.
            exc_handler: Legacy exception handler (deprecated).
            exception_handlers (list): List of exception handler functions.
            maintenance_interval (int): Maintenance task interval in seconds.
            default_worker_ttl (int): Default worker TTL.
            worker_ttl (int): Worker TTL in seconds.
            job_class: Custom Job class.
            queue_class: Custom Queue class.
            log_job_description (bool): Log job descriptions.
            job_monitoring_interval (int): Job monitoring interval in seconds.
            disable_default_exception_handler (bool): Disable default exception handling.
            prepare_for_work (bool): Prepare worker for work immediately.
            serializer: Custom serializer.
            work_horse_killed_handler: Handler for work horse termination.
        """

class SimpleWorker(Worker):
    """Worker that executes jobs in the same process/thread."""
    
    def execute_job(self, job: 'Job', queue: 'Queue'):
        """
        Execute job in the same process without forking.
        
        Args:
            job (Job): Job to execute.
            queue (Queue): Queue the job came from.
        """

class SpawnWorker(Worker):
    """Worker that spawns child processes for job execution."""
    
    def fork_work_horse(self, job: 'Job', queue: 'Queue'):
        """
        Spawn work horse process using os.spawn().
        
        Args:
            job (Job): Job to execute.
            queue (Queue): Queue the job came from.
        """

Worker Discovery and Management

Find and manage worker instances across the system.

@classmethod
def all(
    cls,
    connection=None,
    job_class=None,
    queue_class=None,
    queue: 'Queue' = None,
    serializer=None
) -> list['Worker']:
    """
    Get all workers.
    
    Args:
        connection: Redis connection.
        job_class: Job class for deserialization.
        queue_class: Queue class for deserialization.
        queue (Queue): Filter by specific queue.
        serializer: Custom serializer.
        
    Returns:
        list[Worker]: All worker instances.
    """

@classmethod
def all_keys(cls, connection=None, queue: 'Queue' = None) -> list[str]:
    """
    Get all worker keys.
    
    Args:
        connection: Redis connection.
        queue (Queue): Filter by specific queue.
        
    Returns:
        list[str]: Worker Redis keys.
    """

@classmethod
def count(cls, connection=None, queue: 'Queue' = None) -> int:
    """
    Count active workers.
    
    Args:
        connection: Redis connection.
        queue (Queue): Filter by specific queue.
        
    Returns:
        int: Number of active workers.
    """

@classmethod
def find_by_key(
    cls,
    worker_key: str,
    connection,
    job_class=None,
    queue_class=None,
    serializer=None
) -> 'Worker | None':
    """
    Find worker by its Redis key.
    
    Args:
        worker_key (str): Worker Redis key.
        connection: Redis connection.
        job_class: Job class.
        queue_class: Queue class.
        serializer: Custom serializer.
        
    Returns:
        Worker | None: Worker instance or None if not found.
    """

Main Work Loop

Core worker execution loop with comprehensive job processing capabilities.

def work(
    self,
    burst: bool = False,
    logging_level: str = None,
    date_format: str = '%H:%M:%S',
    log_format: str = '%(asctime)s %(message)s',
    max_jobs: int = None,
    max_idle_time: int = None,
    with_scheduler: bool = False,
    dequeue_strategy: 'DequeueStrategy' = 'default'
) -> bool:
    """
    Main worker loop for processing jobs.
    
    Args:
        burst (bool): Exit after processing available jobs.
        logging_level (str): Logging level ('DEBUG', 'INFO', etc.).
        date_format (str): Log date format.
        log_format (str): Log message format.
        max_jobs (int): Maximum jobs to process before exiting.
        max_idle_time (int): Maximum idle time before exiting.
        with_scheduler (bool): Run scheduler in the same process.
        dequeue_strategy (DequeueStrategy): Queue dequeuing strategy.
        
    Returns:
        bool: True if worker exited cleanly, False otherwise.
    """

def execute_job(self, job: 'Job', queue: 'Queue'):
    """
    Execute a single job (abstract method implemented by subclasses).
    
    Args:
        job (Job): Job to execute.
        queue (Queue): Queue the job came from.
    """

Worker State and Monitoring

Monitor worker status, statistics, and health information.

@property
def name(self) -> str:
    """Worker name/identifier."""

@property
def key(self) -> str:
    """Redis key for this worker."""

@property
def connection(self):
    """Redis connection instance."""

@property
def queues(self) -> list['Queue']:
    """List of queues this worker processes."""

@property
def version(self) -> str:
    """RQ version."""

@property
def python_version(self) -> str:
    """Python version string."""

@property
def hostname(self) -> str | None:
    """Worker hostname."""

@property
def ip_address(self) -> str:
    """Worker IP address."""

@property
def pid(self) -> int | None:
    """Worker process ID."""

@property
def birth_date(self) -> datetime | None:
    """When worker was created."""

@property
def last_heartbeat(self) -> datetime | None:
    """Last heartbeat timestamp."""

@property
def successful_job_count(self) -> int:
    """Number of successfully completed jobs."""

@property
def failed_job_count(self) -> int:
    """Number of failed jobs."""

@property
def total_working_time(self) -> float:
    """Total time spent working (seconds)."""

@property
def current_job_working_time(self) -> float:
    """Time spent on current job (seconds)."""

def refresh(self):
    """Refresh worker data from Redis."""

def queue_names(self) -> list[str]:
    """
    Get queue names this worker processes.
    
    Returns:
        list[str]: Queue names.
    """

def queue_keys(self) -> list[str]:
    """
    Get queue Redis keys this worker processes.
    
    Returns:
        list[str]: Queue keys.
    """

Worker Lifecycle Control

Control worker lifecycle with graceful shutdown and signal handling.

def request_stop(self, signum=None, frame=None):
    """
    Request graceful worker shutdown.
    
    Args:
        signum: Signal number (for signal handlers).
        frame: Frame object (for signal handlers).
    """

def request_force_stop(self, signum: int, frame=None):
    """
    Request immediate worker shutdown (abstract method).
    
    Args:
        signum (int): Signal number.
        frame: Frame object.
    """

def kill_horse(self, sig=15):
    """
    Kill the work horse process (for Worker class).
    
    Args:
        sig (int): Signal to send to work horse.
    """

def wait_for_horse(self) -> tuple[int | None, int | None, Any]:
    """
    Wait for work horse process to complete (for Worker class).
    
    Returns:
        tuple: (exit_code, signal, resource_usage).
    """

Worker Maintenance and Health

Maintain worker health with registry cleanup and monitoring.

def clean_registries(self):
    """Clean job registries of expired entries."""

def validate_queues(self):
    """Validate that all queues are valid Queue instances."""

def get_redis_server_version(self) -> tuple[int, int, int]:
    """
    Get Redis server version.
    
    Returns:
        tuple[int, int, int]: (major, minor, patch) version.
    """

@property
def should_run_maintenance_tasks(self) -> bool:
    """True if it's time to run maintenance tasks."""

@property
def dequeue_timeout(self) -> int:
    """Timeout for dequeue operations."""

@property
def connection_timeout(self) -> int:
    """Redis connection timeout."""

Work Horse Management

Advanced work horse process management for fork-based execution.

# Worker class specific methods

@property
def is_horse(self) -> bool:
    """True if this is the work horse process."""

@property
def horse_pid(self) -> int:
    """Work horse process ID."""

def fork_work_horse(self, job: 'Job', queue: 'Queue'):
    """
    Fork a work horse process to execute the job.
    
    Args:
        job (Job): Job to execute.
        queue (Queue): Queue the job came from.
    """

def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
    """
    Monitor work horse process execution.
    
    Args:
        job (Job): Job being executed.
        queue (Queue): Queue the job came from.
    """

def get_heartbeat_ttl(self, job: 'Job') -> int:
    """
    Get heartbeat TTL for job monitoring.
    
    Args:
        job (Job): Job being executed.
        
    Returns:
        int: Heartbeat TTL in seconds.
    """

Dequeue Strategies

Configure how workers dequeue jobs from multiple queues.

from enum import Enum

class DequeueStrategy(str, Enum):
    DEFAULT = 'default'        # Process queues in order
    ROUND_ROBIN = 'round_robin'  # Rotate between queues
    RANDOM = 'random'          # Random queue selection

class WorkerStatus(str, Enum):
    STARTED = 'started'
    SUSPENDED = 'suspended'
    BUSY = 'busy'
    IDLE = 'idle'

Usage Examples

Basic Worker Usage

import redis
from rq import Queue, Worker

# Connect to Redis
conn = redis.Redis()

# Create queues
high_priority = Queue('high', connection=conn)
normal_priority = Queue('normal', connection=conn)

# Create worker for multiple queues
worker = Worker([high_priority, normal_priority], connection=conn)

# Add some jobs
def process_data(data):
    import time
    time.sleep(2)
    return f"Processed: {data}"

high_priority.enqueue(process_data, "urgent_data")
normal_priority.enqueue(process_data, "regular_data")

print(f"Worker: {worker.name}")
print(f"Processing queues: {worker.queue_names()}")

# Start processing (this blocks)
worker.work()

Worker with Configuration

from rq import Worker, Queue
import redis
import logging

conn = redis.Redis()
q = Queue('configured_worker', connection=conn)

# Custom exception handler
def handle_failed_job(job, exc_type, exc_value, traceback):
    print(f"Job {job.id} failed: {exc_value}")
    # Could send alerts, log to external service, etc.

# Create configured worker
worker = Worker(
    [q],
    connection=conn,
    name='custom_worker_001',
    exception_handlers=[handle_failed_job],
    default_result_ttl=3600,        # Keep results for 1 hour
    job_monitoring_interval=15,      # Check job progress every 15s
    maintenance_interval=300,        # Run maintenance every 5 minutes
    log_job_description=True
)

# Set up logging
logging.basicConfig(level=logging.INFO)

# Work with specific options
worker.work(
    burst=False,                    # Keep running
    max_jobs=100,                  # Stop after 100 jobs
    logging_level='INFO',
    dequeue_strategy='round_robin'
)

Different Worker Types

from rq import Worker, SimpleWorker, SpawnWorker, Queue
import redis

conn = redis.Redis()
q = Queue('worker_types', connection=conn)

def cpu_intensive_task(n):
    # Simulate CPU intensive work
    total = sum(i * i for i in range(n))
    return total

# Add jobs
for i in range(5):
    q.enqueue(cpu_intensive_task, 10000 * (i + 1))

# Standard worker (forks for each job)
standard_worker = Worker([q], connection=conn, name='standard')

# Simple worker (no forking, same process)
simple_worker = SimpleWorker([q], connection=conn, name='simple')

# Spawn worker (uses os.spawn instead of fork)
spawn_worker = SpawnWorker([q], connection=conn, name='spawn')

print("Worker types created:")
print(f"Standard: {standard_worker.name}")
print(f"Simple: {simple_worker.name}")
print(f"Spawn: {spawn_worker.name}")

# Use simple worker for this example (no forking)
simple_worker.work(burst=True)

Worker Monitoring and Management

from rq import Worker, Queue
import redis
import time

conn = redis.Redis()
q = Queue('monitoring', connection=conn)

# Add a long-running job
def long_running_job():
    import time
    for i in range(10):
        time.sleep(1)
        print(f"Working... step {i+1}/10")
    return "Completed long job"

job = q.enqueue(long_running_job)

# Create worker
worker = Worker([q], connection=conn, name='monitored_worker')

# Monitor worker in separate process/thread
def monitor_worker():
    while True:
        worker.refresh()  # Get latest data from Redis
        print(f"Worker: {worker.name}")
        print(f"Status: {'busy' if worker.current_job else 'idle'}")
        print(f"Successful jobs: {worker.successful_job_count}")
        print(f"Failed jobs: {worker.failed_job_count}")
        print(f"Total working time: {worker.total_working_time:.2f}s")
        
        if worker.current_job:
            print(f"Current job: {worker.current_job.id}")
            print(f"Job working time: {worker.current_job_working_time:.2f}s")
        
        print("---")
        time.sleep(2)

# Get all workers
all_workers = Worker.all(connection=conn)
print(f"Total workers: {len(all_workers)}")

for w in all_workers:
    print(f"Worker {w.name}: {w.successful_job_count} successful jobs")

# Find specific worker
found_worker = Worker.find_by_key(worker.key, connection=conn)
if found_worker:
    print(f"Found worker: {found_worker.name}")

Worker Lifecycle Management

from rq import Worker, Queue
import redis
import signal
import os
import time

conn = redis.Redis()
q = Queue('lifecycle', connection=conn)

def interruptible_job():
    import time
    for i in range(20):
        time.sleep(1)
        print(f"Job progress: {i+1}/20")
    return "Job completed"

# Add job
job = q.enqueue(interruptible_job)

# Create worker
worker = Worker([q], connection=conn)

# Set up signal handlers for graceful shutdown
def signal_handler(signum, frame):
    print(f"Received signal {signum}, requesting worker stop...")
    worker.request_stop(signum, frame)

signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

try:
    print(f"Starting worker {worker.name} (PID: {os.getpid()})")
    print("Press Ctrl+C for graceful shutdown")
    
    # Start worker with limits
    worker.work(
        burst=False,
        max_idle_time=30,  # Exit if idle for 30 seconds
        logging_level='INFO'
    )
    
except KeyboardInterrupt:
    print("Worker stopped by user")
except Exception as e:
    print(f"Worker error: {e}")
finally:
    print("Worker shutdown complete")

Batch Processing with Multiple Workers

from rq import Worker, Queue
import redis
from multiprocessing import Process
import time

conn = redis.Redis()

# Create multiple queues for different priorities
high_q = Queue('high_priority', connection=conn)
normal_q = Queue('normal_priority', connection=conn)
low_q = Queue('low_priority', connection=conn)

def process_item(item_id, priority):
    processing_time = {'high': 1, 'normal': 2, 'low': 3}
    time.sleep(processing_time[priority])
    return f"Processed item {item_id} with {priority} priority"

# Add jobs to different queues
for i in range(3):
    high_q.enqueue(process_item, f"H{i}", 'high')
    normal_q.enqueue(process_item, f"N{i}", 'normal')
    low_q.enqueue(process_item, f"L{i}", 'low')

def start_worker(worker_name, queues):
    """Function to start a worker in a separate process."""
    worker = Worker(queues, connection=conn, name=worker_name)
    print(f"Starting worker {worker_name}")
    worker.work(burst=True)  # Process all available jobs then exit
    print(f"Worker {worker_name} finished")

# Start multiple workers
processes = []

# High priority worker (only processes high priority queue)
p1 = Process(target=start_worker, args=('worker_high', [high_q]))

# General workers (process all queues in priority order)
p2 = Process(target=start_worker, args=('worker_general_1', [high_q, normal_q, low_q]))
p3 = Process(target=start_worker, args=('worker_general_2', [high_q, normal_q, low_q]))

processes = [p1, p2, p3]

# Start all workers
for p in processes:
    p.start()

# Wait for completion
for p in processes:
    p.join()

print("All workers completed")

# Check final queue states
print(f"High priority queue: {high_q.count} jobs remaining")
print(f"Normal priority queue: {normal_q.count} jobs remaining")
print(f"Low priority queue: {low_q.count} jobs remaining")

Install with Tessl CLI

npx tessl i tessl/pypi-rq

docs

index.md

job-management.md

job-patterns.md

queue-operations.md

registries-monitoring.md

worker-management.md

tile.json