or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md
tile.json

tessl/pypi-rq

RQ is a simple, lightweight, library for creating background jobs, and processing them.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/rq@2.5.x

To install, run

npx @tessl/cli install tessl/pypi-rq@2.5.0

index.mddocs/

RQ (Redis Queue)

RQ is a simple, lightweight Python library for creating background jobs and processing them with workers backed by Redis. It provides a low barrier to entry while scaling incredibly well for large applications, offering comprehensive functionality for background job processing including job scheduling, worker management, result storage, monitoring, and failure handling with retry mechanisms.

Package Information

  • Package Name: rq
  • Language: Python
  • Installation: pip install rq
  • Dependencies: redis>=3.5,!=6, click>=5, croniter
  • Python Version: >=3.9

Core Imports

import rq

Common pattern for job management:

from rq import Queue, Worker, Job

Worker and job utilities:

from rq import get_current_job, cancel_job, requeue_job

Callback and retry functionality:

from rq import Callback, Retry, Repeat

Basic Usage

import redis
from rq import Queue, Worker

# Connect to Redis
redis_conn = redis.Redis()

# Create a queue
q = Queue(connection=redis_conn)

# Define a job function
def add_numbers(a, b):
    return a + b

# Enqueue a job
job = q.enqueue(add_numbers, 5, 3)
print(f"Job {job.id} enqueued")

# Create and start a worker
worker = Worker([q], connection=redis_conn)
worker.work()  # This blocks and processes jobs

Advanced usage with job monitoring:

from rq import Queue, Job, get_current_job
import redis

redis_conn = redis.Redis()
q = Queue(connection=redis_conn)

# Enqueue with options
job = q.enqueue(
    add_numbers, 
    10, 20,
    timeout=300,           # 5 minute timeout
    result_ttl=3600,       # Keep result for 1 hour
    failure_ttl=86400,     # Keep failure info for 1 day
    retry=rq.Retry(max=3), # Retry up to 3 times
    description="Adding two numbers"
)

# Check job status
print(f"Status: {job.get_status()}")
print(f"Result: {job.result}")  # None until completed

# Get job by ID
retrieved_job = Job.fetch(job.id, connection=redis_conn)

Architecture

RQ's architecture consists of four main components:

  • Queue: Redis-backed FIFO queues that hold jobs waiting to be processed
  • Job: Work units containing function calls, parameters, metadata, and execution state
  • Worker: Processes that fetch jobs from queues and execute them, with support for multiple workers and fork-based isolation
  • Registry: Collections that track jobs by status (started, finished, failed, scheduled, etc.) for monitoring and management

The system supports distributed processing with multiple workers across multiple machines, comprehensive job lifecycle management, and flexible scheduling patterns including immediate execution, delayed execution, and recurring jobs.

Capabilities

Job Management

Core job operations including creation, execution tracking, status management, and lifecycle control. Jobs encapsulate function calls with comprehensive metadata and support callbacks, retries, and dependencies.

def get_current_job(connection=None, job_class=None) -> Job | None: ...
def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False): ...
def requeue_job(job_id: str, connection, serializer=None) -> Job: ...

class Job:
    def __init__(self, id: str = None, connection = None, serializer=None): ...
    @classmethod
    def create(cls, func, args=None, kwargs=None, **options) -> 'Job': ...
    @classmethod  
    def fetch(cls, id: str, connection, serializer=None) -> 'Job': ...
    def get_status(self, refresh: bool = True) -> JobStatus: ...
    def perform(self) -> Any: ...
    def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True): ...
    def cancel(self, pipeline=None, enqueue_dependents: bool = False): ...
    def requeue(self, at_front: bool = False) -> 'Job': ...
    def delete(self, pipeline=None, remove_from_queue: bool = True): ...

Job Management

Queue Operations

Queue management for job scheduling, enqueueing, and batch operations. Queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, and bulk operations.

class Queue:
    def __init__(self, name: str = 'default', connection=None, **kwargs): ...
    def enqueue(self, f, *args, **kwargs) -> Job: ...
    def enqueue_call(self, func, args=None, kwargs=None, **options) -> Job: ...
    def enqueue_at(self, datetime, f, *args, **kwargs) -> Job: ...
    def enqueue_in(self, time_delta, func, *args, **kwargs) -> Job: ...
    def enqueue_many(self, job_datas, pipeline=None, group_id=None) -> list[Job]: ...
    def schedule_job(self, job: Job, datetime, pipeline=None): ...
    def empty(self): ...
    def delete(self, delete_jobs: bool = True): ...
    def get_jobs(self, offset: int = 0, length: int = -1) -> list[Job]: ...

Queue Operations

Worker Management

Worker processes for job execution with support for multiple queues, different execution strategies, and comprehensive monitoring. Workers handle job lifecycle, error recovery, and provide flexible deployment options.

class Worker:
    def __init__(self, queues, name: str = None, connection=None, **kwargs): ...
    def work(self, burst: bool = False, logging_level: str = None, **options) -> bool: ...
    def execute_job(self, job: Job, queue: Queue): ...
    def request_stop(self, signum=None, frame=None): ...
    def clean_registries(self): ...

class SimpleWorker(Worker):
    def execute_job(self, job: Job, queue: Queue): ...

class SpawnWorker(Worker):
    def fork_work_horse(self, job: Job, queue: Queue): ...

Worker Management

Job Patterns

Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies.

class Callback:
    def __init__(self, func, timeout: int = None): ...

class Retry:
    def __init__(self, max: int, interval: int | list[int] = 0): ...
    @classmethod
    def get_interval(cls, count: int, intervals) -> int: ...

class Repeat:
    def __init__(self, times: int, interval: int | list[int] = 0): ...
    @classmethod
    def schedule(cls, job: Job, queue: Queue, pipeline=None): ...

Job Patterns

Registries and Monitoring

Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, and system health monitoring.

class StartedJobRegistry:
    def __init__(self, name: str = 'default', connection=None, **kwargs): ...
    def get_job_count(self, cleanup: bool = True) -> int: ...

class FinishedJobRegistry:
    def __init__(self, name: str = 'default', connection=None, **kwargs): ...

class FailedJobRegistry:
    def __init__(self, name: str = 'default', connection=None, **kwargs): ...

class ScheduledJobRegistry:
    def __init__(self, name: str = 'default', connection=None, **kwargs): ...

Registries and Monitoring

Types

Core Types

from enum import Enum
from typing import Callable, Any, Union, Optional
from datetime import datetime, timedelta

class JobStatus(str, Enum):
    CREATED = 'created'
    QUEUED = 'queued'
    FINISHED = 'finished'
    FAILED = 'failed'
    STARTED = 'started'
    DEFERRED = 'deferred'
    SCHEDULED = 'scheduled'
    STOPPED = 'stopped'
    CANCELED = 'canceled'

class WorkerStatus(str, Enum):
    STARTED = 'started'
    SUSPENDED = 'suspended'
    BUSY = 'busy'
    IDLE = 'idle'

class DequeueStrategy(str, Enum):
    DEFAULT = 'default'
    ROUND_ROBIN = 'round_robin'
    RANDOM = 'random'

# Type aliases
FunctionReferenceType = Union[str, Callable[..., Any]]
JobDependencyType = Union['Dependency', 'Job', str, list[Union['Dependency', 'Job', str]]]
SuccessCallbackType = Callable[['Job', Any, Any], Any]
FailureCallbackType = Callable[['Job', Any, Optional[type], Optional[Exception], Any], Any]

Constants

# Default timeouts and TTLs (in seconds)
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500
DEFAULT_FAILURE_TTL = 31536000  # 1 year
DEFAULT_MAINTENANCE_TASK_INTERVAL = 600  # 10 minutes
CALLBACK_TIMEOUT = 60

# Logging configuration
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'

Exceptions

class NoSuchJobError(Exception):
    """Raised when a job cannot be found."""

class DequeueTimeout(Exception):
    """Raised when dequeue operation times out."""

class InvalidJobOperation(Exception):
    """Raised when an invalid operation is performed on a job."""

class DeserializationError(Exception):
    """Raised when job data cannot be deserialized."""

class AbandonedJobError(Exception):
    """Raised when a job is abandoned by its worker."""

class ShutDownImminentException(Exception):
    """Raised when worker shutdown is imminent."""