RQ is a simple, lightweight, library for creating background jobs, and processing them.
npx @tessl/cli install tessl/pypi-rq@2.5.0RQ is a simple, lightweight Python library for creating background jobs and processing them with workers backed by Redis. It provides a low barrier to entry while scaling incredibly well for large applications, offering comprehensive functionality for background job processing including job scheduling, worker management, result storage, monitoring, and failure handling with retry mechanisms.
pip install rqredis>=3.5,!=6, click>=5, croniterimport rqCommon pattern for job management:
from rq import Queue, Worker, JobWorker and job utilities:
from rq import get_current_job, cancel_job, requeue_jobCallback and retry functionality:
from rq import Callback, Retry, Repeatimport redis
from rq import Queue, Worker
# Connect to Redis
redis_conn = redis.Redis()
# Create a queue
q = Queue(connection=redis_conn)
# Define a job function
def add_numbers(a, b):
return a + b
# Enqueue a job
job = q.enqueue(add_numbers, 5, 3)
print(f"Job {job.id} enqueued")
# Create and start a worker
worker = Worker([q], connection=redis_conn)
worker.work() # This blocks and processes jobsAdvanced usage with job monitoring:
from rq import Queue, Job, get_current_job
import redis
redis_conn = redis.Redis()
q = Queue(connection=redis_conn)
# Enqueue with options
job = q.enqueue(
add_numbers,
10, 20,
timeout=300, # 5 minute timeout
result_ttl=3600, # Keep result for 1 hour
failure_ttl=86400, # Keep failure info for 1 day
retry=rq.Retry(max=3), # Retry up to 3 times
description="Adding two numbers"
)
# Check job status
print(f"Status: {job.get_status()}")
print(f"Result: {job.result}") # None until completed
# Get job by ID
retrieved_job = Job.fetch(job.id, connection=redis_conn)RQ's architecture consists of four main components:
The system supports distributed processing with multiple workers across multiple machines, comprehensive job lifecycle management, and flexible scheduling patterns including immediate execution, delayed execution, and recurring jobs.
Core job operations including creation, execution tracking, status management, and lifecycle control. Jobs encapsulate function calls with comprehensive metadata and support callbacks, retries, and dependencies.
def get_current_job(connection=None, job_class=None) -> Job | None: ...
def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False): ...
def requeue_job(job_id: str, connection, serializer=None) -> Job: ...
class Job:
def __init__(self, id: str = None, connection = None, serializer=None): ...
@classmethod
def create(cls, func, args=None, kwargs=None, **options) -> 'Job': ...
@classmethod
def fetch(cls, id: str, connection, serializer=None) -> 'Job': ...
def get_status(self, refresh: bool = True) -> JobStatus: ...
def perform(self) -> Any: ...
def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True): ...
def cancel(self, pipeline=None, enqueue_dependents: bool = False): ...
def requeue(self, at_front: bool = False) -> 'Job': ...
def delete(self, pipeline=None, remove_from_queue: bool = True): ...Queue management for job scheduling, enqueueing, and batch operations. Queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, and bulk operations.
class Queue:
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
def enqueue(self, f, *args, **kwargs) -> Job: ...
def enqueue_call(self, func, args=None, kwargs=None, **options) -> Job: ...
def enqueue_at(self, datetime, f, *args, **kwargs) -> Job: ...
def enqueue_in(self, time_delta, func, *args, **kwargs) -> Job: ...
def enqueue_many(self, job_datas, pipeline=None, group_id=None) -> list[Job]: ...
def schedule_job(self, job: Job, datetime, pipeline=None): ...
def empty(self): ...
def delete(self, delete_jobs: bool = True): ...
def get_jobs(self, offset: int = 0, length: int = -1) -> list[Job]: ...Worker processes for job execution with support for multiple queues, different execution strategies, and comprehensive monitoring. Workers handle job lifecycle, error recovery, and provide flexible deployment options.
class Worker:
def __init__(self, queues, name: str = None, connection=None, **kwargs): ...
def work(self, burst: bool = False, logging_level: str = None, **options) -> bool: ...
def execute_job(self, job: Job, queue: Queue): ...
def request_stop(self, signum=None, frame=None): ...
def clean_registries(self): ...
class SimpleWorker(Worker):
def execute_job(self, job: Job, queue: Queue): ...
class SpawnWorker(Worker):
def fork_work_horse(self, job: Job, queue: Queue): ...Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies.
class Callback:
def __init__(self, func, timeout: int = None): ...
class Retry:
def __init__(self, max: int, interval: int | list[int] = 0): ...
@classmethod
def get_interval(cls, count: int, intervals) -> int: ...
class Repeat:
def __init__(self, times: int, interval: int | list[int] = 0): ...
@classmethod
def schedule(cls, job: Job, queue: Queue, pipeline=None): ...Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, and system health monitoring.
class StartedJobRegistry:
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
def get_job_count(self, cleanup: bool = True) -> int: ...
class FinishedJobRegistry:
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
class FailedJobRegistry:
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
class ScheduledJobRegistry:
def __init__(self, name: str = 'default', connection=None, **kwargs): ...from enum import Enum
from typing import Callable, Any, Union, Optional
from datetime import datetime, timedelta
class JobStatus(str, Enum):
CREATED = 'created'
QUEUED = 'queued'
FINISHED = 'finished'
FAILED = 'failed'
STARTED = 'started'
DEFERRED = 'deferred'
SCHEDULED = 'scheduled'
STOPPED = 'stopped'
CANCELED = 'canceled'
class WorkerStatus(str, Enum):
STARTED = 'started'
SUSPENDED = 'suspended'
BUSY = 'busy'
IDLE = 'idle'
class DequeueStrategy(str, Enum):
DEFAULT = 'default'
ROUND_ROBIN = 'round_robin'
RANDOM = 'random'
# Type aliases
FunctionReferenceType = Union[str, Callable[..., Any]]
JobDependencyType = Union['Dependency', 'Job', str, list[Union['Dependency', 'Job', str]]]
SuccessCallbackType = Callable[['Job', Any, Any], Any]
FailureCallbackType = Callable[['Job', Any, Optional[type], Optional[Exception], Any], Any]# Default timeouts and TTLs (in seconds)
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500
DEFAULT_FAILURE_TTL = 31536000 # 1 year
DEFAULT_MAINTENANCE_TASK_INTERVAL = 600 # 10 minutes
CALLBACK_TIMEOUT = 60
# Logging configuration
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'class NoSuchJobError(Exception):
"""Raised when a job cannot be found."""
class DequeueTimeout(Exception):
"""Raised when dequeue operation times out."""
class InvalidJobOperation(Exception):
"""Raised when an invalid operation is performed on a job."""
class DeserializationError(Exception):
"""Raised when job data cannot be deserialized."""
class AbandonedJobError(Exception):
"""Raised when a job is abandoned by its worker."""
class ShutDownImminentException(Exception):
"""Raised when worker shutdown is imminent."""