RQ is a simple, lightweight, library for creating background jobs, and processing them.
—
Comprehensive job lifecycle management including creation, execution tracking, status monitoring, and control operations. Jobs in RQ encapsulate function calls with rich metadata, status tracking, and support for advanced patterns like callbacks, retries, and dependencies.
Create new jobs and retrieve existing ones by ID with full serialization support.
class Job:
def __init__(self, id: str = None, connection = None, serializer=None):
"""
Initialize a Job instance.
Args:
id (str, optional): Job identifier. Generated if not provided.
connection: Redis connection instance.
serializer: Custom serializer for job data.
"""
@classmethod
def create(
cls,
func,
args=None,
kwargs=None,
connection=None,
result_ttl=None,
ttl=None,
status=None,
description=None,
depends_on=None,
timeout=None,
id=None,
origin='',
meta=None,
failure_ttl=None,
serializer=None,
group_id=None,
on_success=None,
on_failure=None,
on_stopped=None
) -> 'Job':
"""
Create a new job instance.
Args:
func: Function to execute or function reference string.
args (tuple): Positional arguments for the function.
kwargs (dict): Keyword arguments for the function.
connection: Redis connection.
result_ttl (int): Result time-to-live in seconds.
ttl (int): Job time-to-live in seconds.
status (JobStatus): Initial job status.
description (str): Human-readable job description.
depends_on: Job dependencies.
timeout (int): Job execution timeout in seconds.
id (str): Custom job ID.
origin (str): Queue name where job originated.
meta (dict): Additional job metadata.
failure_ttl (int): Failure info time-to-live in seconds.
serializer: Custom serializer.
group_id (str): Job group identifier.
on_success (Callback): Success callback.
on_failure (Callback): Failure callback.
on_stopped (Callback): Stopped callback.
Returns:
Job: New job instance.
"""
@classmethod
def fetch(cls, id: str, connection, serializer=None) -> 'Job':
"""
Retrieve an existing job by ID.
Args:
id (str): Job identifier.
connection: Redis connection.
serializer: Custom serializer.
Returns:
Job: Retrieved job instance.
Raises:
NoSuchJobError: If job doesn't exist.
"""
@classmethod
def exists(cls, job_id: str, connection) -> bool:
"""
Check if a job exists.
Args:
job_id (str): Job identifier.
connection: Redis connection.
Returns:
bool: True if job exists, False otherwise.
"""
@classmethod
def fetch_many(cls, job_ids: list[str], connection, serializer=None) -> list['Job | None']:
"""
Fetch multiple jobs by their IDs.
Args:
job_ids (list[str]): List of job identifiers.
connection: Redis connection.
serializer: Custom serializer.
Returns:
list[Job | None]: List of jobs (None for non-existent jobs).
"""Monitor and control job execution status throughout its lifecycle.
def get_status(self, refresh: bool = True) -> JobStatus:
"""
Get current job status.
Args:
refresh (bool): Whether to refresh from Redis before returning.
Returns:
JobStatus: Current job status.
"""
def set_status(self, status: JobStatus, pipeline=None):
"""
Set job status.
Args:
status (JobStatus): New status to set.
pipeline: Redis pipeline for batched operations.
"""
def refresh(self):
"""Refresh job data from Redis."""
def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True):
"""
Save job to Redis.
Args:
pipeline: Redis pipeline for batched operations.
include_meta (bool): Whether to save metadata.
include_result (bool): Whether to save result.
"""
def delete(self, pipeline=None, remove_from_queue: bool = True, delete_dependents: bool = False):
"""
Delete job from Redis.
Args:
pipeline: Redis pipeline for batched operations.
remove_from_queue (bool): Remove from queue if queued.
delete_dependents (bool): Delete dependent jobs.
"""
def cleanup(self, ttl: int = None, pipeline=None, remove_from_queue: bool = True):
"""
Clean up job data.
Args:
ttl (int): Time-to-live for cleanup.
pipeline: Redis pipeline.
remove_from_queue (bool): Remove from queue.
"""Execute jobs and manage execution results with comprehensive error handling.
def perform(self) -> Any:
"""
Execute the job function.
Returns:
Any: Function return value.
Raises:
Various: Any exception raised by the job function.
"""
def return_value(self, refresh: bool = False) -> Any:
"""
Get job return value.
Args:
refresh (bool): Whether to refresh from Redis.
Returns:
Any: Job return value if completed, None otherwise.
"""
def latest_result(self, timeout: int = 0) -> 'Result | None':
"""
Get the latest job result.
Args:
timeout (int): Maximum wait time for result.
Returns:
Result | None: Latest result or None.
"""
def results(self) -> list['Result']:
"""
Get all job results.
Returns:
list[Result]: All job results.
"""
def get_call_string(self) -> str | None:
"""
Get string representation of the function call.
Returns:
str | None: Function call string.
"""Control job execution with cancellation, requeuing, and retry mechanisms.
def cancel(self, pipeline=None, enqueue_dependents: bool = False, remove_from_dependencies: bool = False):
"""
Cancel job execution.
Args:
pipeline: Redis pipeline for batched operations.
enqueue_dependents (bool): Enqueue dependent jobs.
remove_from_dependencies (bool): Remove from dependency lists.
"""
def requeue(self, at_front: bool = False) -> 'Job':
"""
Requeue job for execution.
Args:
at_front (bool): Add to front of queue.
Returns:
Job: The requeued job.
"""
def retry(self, queue: 'Queue', pipeline=None):
"""
Retry job execution.
Args:
queue (Queue): Queue to retry in.
pipeline: Redis pipeline.
"""
def get_retry_interval(self) -> int:
"""
Get retry interval for this job.
Returns:
int: Retry interval in seconds.
"""Module-level functions for job operations without requiring job instances.
def get_current_job(connection=None, job_class=None) -> 'Job | None':
"""
Get the currently executing job within a worker context.
Args:
connection: Redis connection. Uses default if None.
job_class: Job class to use for deserialization.
Returns:
Job | None: Current job if in worker context, None otherwise.
"""
def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False):
"""
Cancel a job by its ID.
Args:
job_id (str): Job identifier to cancel.
connection: Redis connection.
serializer: Custom serializer.
enqueue_dependents (bool): Enqueue dependent jobs after cancellation.
Raises:
NoSuchJobError: If job doesn't exist.
"""
def requeue_job(job_id: str, connection, serializer=None) -> 'Job':
"""
Requeue a job by its ID.
Args:
job_id (str): Job identifier to requeue.
connection: Redis connection.
serializer: Custom serializer.
Returns:
Job: The requeued job.
Raises:
NoSuchJobError: If job doesn't exist.
"""Access and modify job properties, metadata, and execution information.
# Core Properties
@property
def id(self) -> str:
"""Job identifier."""
@property
def key(self) -> bytes:
"""Redis key for this job."""
@property
def func(self):
"""Function to execute."""
@property
def args(self) -> tuple:
"""Function positional arguments."""
@property
def kwargs(self) -> dict:
"""Function keyword arguments."""
@property
def description(self) -> str | None:
"""Job description."""
@property
def origin(self) -> str:
"""Queue where job originated."""
@property
def timeout(self) -> float | None:
"""Job timeout in seconds."""
@property
def result_ttl(self) -> int | None:
"""Result time-to-live in seconds."""
@property
def failure_ttl(self) -> int | None:
"""Failure info time-to-live in seconds."""
@property
def ttl(self) -> int | None:
"""Job time-to-live in seconds."""
# Timing Properties
@property
def enqueued_at(self) -> datetime | None:
"""When job was enqueued."""
@property
def started_at(self) -> datetime | None:
"""When job execution started."""
@property
def ended_at(self) -> datetime | None:
"""When job execution ended."""
@property
def last_heartbeat(self) -> datetime | None:
"""Last worker heartbeat timestamp."""
# Status Properties
@property
def is_finished(self) -> bool:
"""True if job finished successfully."""
@property
def is_queued(self) -> bool:
"""True if job is queued for execution."""
@property
def is_failed(self) -> bool:
"""True if job failed."""
@property
def is_started(self) -> bool:
"""True if job execution started."""
@property
def is_deferred(self) -> bool:
"""True if job is deferred (waiting for dependencies)."""
@property
def is_canceled(self) -> bool:
"""True if job was canceled."""
@property
def is_scheduled(self) -> bool:
"""True if job is scheduled for future execution."""
@property
def is_stopped(self) -> bool:
"""True if job was stopped."""
# Metadata Properties
@property
def meta(self) -> dict:
"""Job metadata dictionary."""
@property
def worker_name(self) -> str | None:
"""Name of worker that executed/is executing the job."""
@property
def group_id(self) -> str | None:
"""Job group identifier."""
def get_meta(self, refresh: bool = True) -> dict:
"""
Get job metadata.
Args:
refresh (bool): Refresh from Redis before returning.
Returns:
dict: Job metadata.
"""
def save_meta(self):
"""Save metadata to Redis."""Manage job dependencies and execution ordering.
@property
def dependency(self) -> 'Job | None':
"""First job dependency."""
@property
def dependency_ids(self) -> list[bytes]:
"""List of dependency job keys."""
@property
def dependent_ids(self) -> list[str]:
"""List of dependent job IDs."""
def fetch_dependencies(self, watch: bool = False, pipeline=None) -> list['Job']:
"""
Fetch all job dependencies.
Args:
watch (bool): Watch dependencies for changes.
pipeline: Redis pipeline.
Returns:
list[Job]: List of dependency jobs.
"""
def register_dependency(self, pipeline=None):
"""
Register job dependencies in Redis.
Args:
pipeline: Redis pipeline for batched operations.
"""
def dependencies_are_met(
self,
parent_job: 'Job' = None,
pipeline=None,
exclude_job_id: str = None,
refresh_job_status: bool = True
) -> bool:
"""
Check if all job dependencies are satisfied.
Args:
parent_job (Job): Parent job context.
pipeline: Redis pipeline.
exclude_job_id (str): Job ID to exclude from check.
refresh_job_status (bool): Refresh dependency status from Redis.
Returns:
bool: True if dependencies are met, False otherwise.
"""
def delete_dependents(self, pipeline=None):
"""
Delete all dependent jobs.
Args:
pipeline: Redis pipeline for batched operations.
"""import redis
from rq import Job
# Connect to Redis
conn = redis.Redis()
# Define a job function
def process_data(data_id, options=None):
# Simulate processing
import time
time.sleep(2)
return f"Processed data {data_id}"
# Create a job
job = Job.create(
func=process_data,
args=('data_123',),
kwargs={'options': {'fast': True}},
connection=conn,
timeout=300,
description="Process data batch 123"
)
# Save job to Redis
job.save()
print(f"Created job: {job.id}")
print(f"Status: {job.get_status()}")
print(f"Description: {job.description}")
# Later, retrieve and check the job
retrieved_job = Job.fetch(job.id, connection=conn)
print(f"Retrieved job status: {retrieved_job.get_status()}")from rq import Job, get_current_job
import redis
conn = redis.Redis()
def tracked_function(item_count):
# Get current job to update metadata
job = get_current_job()
if job:
job.meta['progress'] = 0
job.save_meta()
# Simulate processing with progress updates
for i in range(item_count):
# Do work...
if job:
job.meta['progress'] = (i + 1) / item_count * 100
job.meta['current_item'] = i + 1
job.save_meta()
return f"Completed processing {item_count} items"
# Create job with initial metadata
job = Job.create(
func=tracked_function,
args=(100,),
connection=conn,
meta={'stage': 'queued', 'priority': 'high'}
)
job.save()
# Monitor progress (from another process/thread)
while not job.is_finished and not job.is_failed:
job.refresh()
meta = job.get_meta()
print(f"Progress: {meta.get('progress', 0)}%")
time.sleep(1)
# Get final result
if job.is_finished:
print(f"Result: {job.return_value()}")
elif job.is_failed:
print("Job failed")from rq import Job, cancel_job, requeue_job
import redis
conn = redis.Redis()
# Create a long-running job
def long_task():
import time
time.sleep(60)
return "Done"
job = Job.create(func=long_task, connection=conn)
job.save()
# Cancel the job
cancel_job(job.id, connection=conn)
print(f"Job {job.id} canceled")
# Create another job and then requeue it
job2 = Job.create(func=long_task, connection=conn)
job2.save()
# Requeue (useful after failures or for retrying)
requeued_job = requeue_job(job2.id, connection=conn)
print(f"Job requeued: {requeued_job.id}")Install with Tessl CLI
npx tessl i tessl/pypi-rq