RQ is a simple, lightweight, library for creating background jobs, and processing them.
—
Comprehensive worker process management for job execution with support for multiple queues, different execution strategies, monitoring, and flexible deployment options. RQ workers handle job lifecycle, error recovery, and provide robust distributed processing capabilities.
Create and configure workers with various execution strategies and options.
class Worker:
def __init__(
self,
queues,
name: str = None,
default_result_ttl=500,
connection=None,
exc_handler=None,
exception_handlers=None,
maintenance_interval: int = 600,
default_worker_ttl: int = None,
worker_ttl: int = None,
job_class=None,
queue_class=None,
log_job_description: bool = True,
job_monitoring_interval=30,
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer=None,
work_horse_killed_handler=None
):
"""
Initialize a Worker instance.
Args:
queues: Queue instances or names to process.
name (str): Worker name. Auto-generated if None.
default_result_ttl (int): Default result TTL in seconds.
connection: Redis connection instance.
exc_handler: Legacy exception handler (deprecated).
exception_handlers (list): List of exception handler functions.
maintenance_interval (int): Maintenance task interval in seconds.
default_worker_ttl (int): Default worker TTL.
worker_ttl (int): Worker TTL in seconds.
job_class: Custom Job class.
queue_class: Custom Queue class.
log_job_description (bool): Log job descriptions.
job_monitoring_interval (int): Job monitoring interval in seconds.
disable_default_exception_handler (bool): Disable default exception handling.
prepare_for_work (bool): Prepare worker for work immediately.
serializer: Custom serializer.
work_horse_killed_handler: Handler for work horse termination.
"""
class SimpleWorker(Worker):
"""Worker that executes jobs in the same process/thread."""
def execute_job(self, job: 'Job', queue: 'Queue'):
"""
Execute job in the same process without forking.
Args:
job (Job): Job to execute.
queue (Queue): Queue the job came from.
"""
class SpawnWorker(Worker):
"""Worker that spawns child processes for job execution."""
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
"""
Spawn work horse process using os.spawn().
Args:
job (Job): Job to execute.
queue (Queue): Queue the job came from.
"""Find and manage worker instances across the system.
@classmethod
def all(
cls,
connection=None,
job_class=None,
queue_class=None,
queue: 'Queue' = None,
serializer=None
) -> list['Worker']:
"""
Get all workers.
Args:
connection: Redis connection.
job_class: Job class for deserialization.
queue_class: Queue class for deserialization.
queue (Queue): Filter by specific queue.
serializer: Custom serializer.
Returns:
list[Worker]: All worker instances.
"""
@classmethod
def all_keys(cls, connection=None, queue: 'Queue' = None) -> list[str]:
"""
Get all worker keys.
Args:
connection: Redis connection.
queue (Queue): Filter by specific queue.
Returns:
list[str]: Worker Redis keys.
"""
@classmethod
def count(cls, connection=None, queue: 'Queue' = None) -> int:
"""
Count active workers.
Args:
connection: Redis connection.
queue (Queue): Filter by specific queue.
Returns:
int: Number of active workers.
"""
@classmethod
def find_by_key(
cls,
worker_key: str,
connection,
job_class=None,
queue_class=None,
serializer=None
) -> 'Worker | None':
"""
Find worker by its Redis key.
Args:
worker_key (str): Worker Redis key.
connection: Redis connection.
job_class: Job class.
queue_class: Queue class.
serializer: Custom serializer.
Returns:
Worker | None: Worker instance or None if not found.
"""Core worker execution loop with comprehensive job processing capabilities.
def work(
self,
burst: bool = False,
logging_level: str = None,
date_format: str = '%H:%M:%S',
log_format: str = '%(asctime)s %(message)s',
max_jobs: int = None,
max_idle_time: int = None,
with_scheduler: bool = False,
dequeue_strategy: 'DequeueStrategy' = 'default'
) -> bool:
"""
Main worker loop for processing jobs.
Args:
burst (bool): Exit after processing available jobs.
logging_level (str): Logging level ('DEBUG', 'INFO', etc.).
date_format (str): Log date format.
log_format (str): Log message format.
max_jobs (int): Maximum jobs to process before exiting.
max_idle_time (int): Maximum idle time before exiting.
with_scheduler (bool): Run scheduler in the same process.
dequeue_strategy (DequeueStrategy): Queue dequeuing strategy.
Returns:
bool: True if worker exited cleanly, False otherwise.
"""
def execute_job(self, job: 'Job', queue: 'Queue'):
"""
Execute a single job (abstract method implemented by subclasses).
Args:
job (Job): Job to execute.
queue (Queue): Queue the job came from.
"""Monitor worker status, statistics, and health information.
@property
def name(self) -> str:
"""Worker name/identifier."""
@property
def key(self) -> str:
"""Redis key for this worker."""
@property
def connection(self):
"""Redis connection instance."""
@property
def queues(self) -> list['Queue']:
"""List of queues this worker processes."""
@property
def version(self) -> str:
"""RQ version."""
@property
def python_version(self) -> str:
"""Python version string."""
@property
def hostname(self) -> str | None:
"""Worker hostname."""
@property
def ip_address(self) -> str:
"""Worker IP address."""
@property
def pid(self) -> int | None:
"""Worker process ID."""
@property
def birth_date(self) -> datetime | None:
"""When worker was created."""
@property
def last_heartbeat(self) -> datetime | None:
"""Last heartbeat timestamp."""
@property
def successful_job_count(self) -> int:
"""Number of successfully completed jobs."""
@property
def failed_job_count(self) -> int:
"""Number of failed jobs."""
@property
def total_working_time(self) -> float:
"""Total time spent working (seconds)."""
@property
def current_job_working_time(self) -> float:
"""Time spent on current job (seconds)."""
def refresh(self):
"""Refresh worker data from Redis."""
def queue_names(self) -> list[str]:
"""
Get queue names this worker processes.
Returns:
list[str]: Queue names.
"""
def queue_keys(self) -> list[str]:
"""
Get queue Redis keys this worker processes.
Returns:
list[str]: Queue keys.
"""Control worker lifecycle with graceful shutdown and signal handling.
def request_stop(self, signum=None, frame=None):
"""
Request graceful worker shutdown.
Args:
signum: Signal number (for signal handlers).
frame: Frame object (for signal handlers).
"""
def request_force_stop(self, signum: int, frame=None):
"""
Request immediate worker shutdown (abstract method).
Args:
signum (int): Signal number.
frame: Frame object.
"""
def kill_horse(self, sig=15):
"""
Kill the work horse process (for Worker class).
Args:
sig (int): Signal to send to work horse.
"""
def wait_for_horse(self) -> tuple[int | None, int | None, Any]:
"""
Wait for work horse process to complete (for Worker class).
Returns:
tuple: (exit_code, signal, resource_usage).
"""Maintain worker health with registry cleanup and monitoring.
def clean_registries(self):
"""Clean job registries of expired entries."""
def validate_queues(self):
"""Validate that all queues are valid Queue instances."""
def get_redis_server_version(self) -> tuple[int, int, int]:
"""
Get Redis server version.
Returns:
tuple[int, int, int]: (major, minor, patch) version.
"""
@property
def should_run_maintenance_tasks(self) -> bool:
"""True if it's time to run maintenance tasks."""
@property
def dequeue_timeout(self) -> int:
"""Timeout for dequeue operations."""
@property
def connection_timeout(self) -> int:
"""Redis connection timeout."""Advanced work horse process management for fork-based execution.
# Worker class specific methods
@property
def is_horse(self) -> bool:
"""True if this is the work horse process."""
@property
def horse_pid(self) -> int:
"""Work horse process ID."""
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
"""
Fork a work horse process to execute the job.
Args:
job (Job): Job to execute.
queue (Queue): Queue the job came from.
"""
def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
"""
Monitor work horse process execution.
Args:
job (Job): Job being executed.
queue (Queue): Queue the job came from.
"""
def get_heartbeat_ttl(self, job: 'Job') -> int:
"""
Get heartbeat TTL for job monitoring.
Args:
job (Job): Job being executed.
Returns:
int: Heartbeat TTL in seconds.
"""Configure how workers dequeue jobs from multiple queues.
from enum import Enum
class DequeueStrategy(str, Enum):
DEFAULT = 'default' # Process queues in order
ROUND_ROBIN = 'round_robin' # Rotate between queues
RANDOM = 'random' # Random queue selection
class WorkerStatus(str, Enum):
STARTED = 'started'
SUSPENDED = 'suspended'
BUSY = 'busy'
IDLE = 'idle'import redis
from rq import Queue, Worker
# Connect to Redis
conn = redis.Redis()
# Create queues
high_priority = Queue('high', connection=conn)
normal_priority = Queue('normal', connection=conn)
# Create worker for multiple queues
worker = Worker([high_priority, normal_priority], connection=conn)
# Add some jobs
def process_data(data):
import time
time.sleep(2)
return f"Processed: {data}"
high_priority.enqueue(process_data, "urgent_data")
normal_priority.enqueue(process_data, "regular_data")
print(f"Worker: {worker.name}")
print(f"Processing queues: {worker.queue_names()}")
# Start processing (this blocks)
worker.work()from rq import Worker, Queue
import redis
import logging
conn = redis.Redis()
q = Queue('configured_worker', connection=conn)
# Custom exception handler
def handle_failed_job(job, exc_type, exc_value, traceback):
print(f"Job {job.id} failed: {exc_value}")
# Could send alerts, log to external service, etc.
# Create configured worker
worker = Worker(
[q],
connection=conn,
name='custom_worker_001',
exception_handlers=[handle_failed_job],
default_result_ttl=3600, # Keep results for 1 hour
job_monitoring_interval=15, # Check job progress every 15s
maintenance_interval=300, # Run maintenance every 5 minutes
log_job_description=True
)
# Set up logging
logging.basicConfig(level=logging.INFO)
# Work with specific options
worker.work(
burst=False, # Keep running
max_jobs=100, # Stop after 100 jobs
logging_level='INFO',
dequeue_strategy='round_robin'
)from rq import Worker, SimpleWorker, SpawnWorker, Queue
import redis
conn = redis.Redis()
q = Queue('worker_types', connection=conn)
def cpu_intensive_task(n):
# Simulate CPU intensive work
total = sum(i * i for i in range(n))
return total
# Add jobs
for i in range(5):
q.enqueue(cpu_intensive_task, 10000 * (i + 1))
# Standard worker (forks for each job)
standard_worker = Worker([q], connection=conn, name='standard')
# Simple worker (no forking, same process)
simple_worker = SimpleWorker([q], connection=conn, name='simple')
# Spawn worker (uses os.spawn instead of fork)
spawn_worker = SpawnWorker([q], connection=conn, name='spawn')
print("Worker types created:")
print(f"Standard: {standard_worker.name}")
print(f"Simple: {simple_worker.name}")
print(f"Spawn: {spawn_worker.name}")
# Use simple worker for this example (no forking)
simple_worker.work(burst=True)from rq import Worker, Queue
import redis
import time
conn = redis.Redis()
q = Queue('monitoring', connection=conn)
# Add a long-running job
def long_running_job():
import time
for i in range(10):
time.sleep(1)
print(f"Working... step {i+1}/10")
return "Completed long job"
job = q.enqueue(long_running_job)
# Create worker
worker = Worker([q], connection=conn, name='monitored_worker')
# Monitor worker in separate process/thread
def monitor_worker():
while True:
worker.refresh() # Get latest data from Redis
print(f"Worker: {worker.name}")
print(f"Status: {'busy' if worker.current_job else 'idle'}")
print(f"Successful jobs: {worker.successful_job_count}")
print(f"Failed jobs: {worker.failed_job_count}")
print(f"Total working time: {worker.total_working_time:.2f}s")
if worker.current_job:
print(f"Current job: {worker.current_job.id}")
print(f"Job working time: {worker.current_job_working_time:.2f}s")
print("---")
time.sleep(2)
# Get all workers
all_workers = Worker.all(connection=conn)
print(f"Total workers: {len(all_workers)}")
for w in all_workers:
print(f"Worker {w.name}: {w.successful_job_count} successful jobs")
# Find specific worker
found_worker = Worker.find_by_key(worker.key, connection=conn)
if found_worker:
print(f"Found worker: {found_worker.name}")from rq import Worker, Queue
import redis
import signal
import os
import time
conn = redis.Redis()
q = Queue('lifecycle', connection=conn)
def interruptible_job():
import time
for i in range(20):
time.sleep(1)
print(f"Job progress: {i+1}/20")
return "Job completed"
# Add job
job = q.enqueue(interruptible_job)
# Create worker
worker = Worker([q], connection=conn)
# Set up signal handlers for graceful shutdown
def signal_handler(signum, frame):
print(f"Received signal {signum}, requesting worker stop...")
worker.request_stop(signum, frame)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
print(f"Starting worker {worker.name} (PID: {os.getpid()})")
print("Press Ctrl+C for graceful shutdown")
# Start worker with limits
worker.work(
burst=False,
max_idle_time=30, # Exit if idle for 30 seconds
logging_level='INFO'
)
except KeyboardInterrupt:
print("Worker stopped by user")
except Exception as e:
print(f"Worker error: {e}")
finally:
print("Worker shutdown complete")from rq import Worker, Queue
import redis
from multiprocessing import Process
import time
conn = redis.Redis()
# Create multiple queues for different priorities
high_q = Queue('high_priority', connection=conn)
normal_q = Queue('normal_priority', connection=conn)
low_q = Queue('low_priority', connection=conn)
def process_item(item_id, priority):
processing_time = {'high': 1, 'normal': 2, 'low': 3}
time.sleep(processing_time[priority])
return f"Processed item {item_id} with {priority} priority"
# Add jobs to different queues
for i in range(3):
high_q.enqueue(process_item, f"H{i}", 'high')
normal_q.enqueue(process_item, f"N{i}", 'normal')
low_q.enqueue(process_item, f"L{i}", 'low')
def start_worker(worker_name, queues):
"""Function to start a worker in a separate process."""
worker = Worker(queues, connection=conn, name=worker_name)
print(f"Starting worker {worker_name}")
worker.work(burst=True) # Process all available jobs then exit
print(f"Worker {worker_name} finished")
# Start multiple workers
processes = []
# High priority worker (only processes high priority queue)
p1 = Process(target=start_worker, args=('worker_high', [high_q]))
# General workers (process all queues in priority order)
p2 = Process(target=start_worker, args=('worker_general_1', [high_q, normal_q, low_q]))
p3 = Process(target=start_worker, args=('worker_general_2', [high_q, normal_q, low_q]))
processes = [p1, p2, p3]
# Start all workers
for p in processes:
p.start()
# Wait for completion
for p in processes:
p.join()
print("All workers completed")
# Check final queue states
print(f"High priority queue: {high_q.count} jobs remaining")
print(f"Normal priority queue: {normal_q.count} jobs remaining")
print(f"Low priority queue: {low_q.count} jobs remaining")Install with Tessl CLI
npx tessl i tessl/pypi-rq