RQ is a simple, lightweight, library for creating background jobs, and processing them.
—
Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies. These patterns enable sophisticated job processing workflows with automatic recovery, scheduled repetition, and event-driven processing.
Execute callback functions on job completion, failure, or termination.
class Callback:
def __init__(self, func, timeout: int = None):
"""
Initialize a callback.
Args:
func: Callback function or function reference string.
timeout (int): Callback timeout in seconds. Defaults to CALLBACK_TIMEOUT (60).
"""
@property
def name(self) -> str:
"""Function name or path for the callback."""
@property
def func(self):
"""Callback function or function reference."""
@property
def timeout(self) -> int:
"""Callback timeout in seconds."""Handle successful job completion with result processing.
def success_callback_example(job: 'Job', connection, result):
"""
Example success callback function signature.
Args:
job (Job): The completed job.
connection: Redis connection.
result: Job return value.
Returns:
Any: Callback return value (optional).
"""
# Usage in job creation
def process_data(data):
return f"Processed {len(data)} items"
def on_success(job, connection, result):
print(f"Job {job.id} completed successfully: {result}")
# Could send notifications, update databases, etc.
job = queue.enqueue(
process_data,
['item1', 'item2', 'item3'],
on_success=Callback(on_success, timeout=30)
)Handle job failures with error information and recovery logic.
def failure_callback_example(job: 'Job', connection, exc_type, exc_value, traceback):
"""
Example failure callback function signature.
Args:
job (Job): The failed job.
connection: Redis connection.
exc_type: Exception type.
exc_value: Exception instance.
traceback: Exception traceback.
Returns:
Any: Callback return value (optional).
"""
# Usage in job creation
def risky_operation(data):
if not data:
raise ValueError("No data provided")
return f"Processed {data}"
def on_failure(job, connection, exc_type, exc_value, traceback):
error_msg = f"Job {job.id} failed: {exc_value}"
print(error_msg)
# Could log to external service, send alerts, etc.
# Optionally requeue with different parameters
if isinstance(exc_value, ValueError):
# Requeue with default data
queue.enqueue(risky_operation, "default_data")
job = queue.enqueue(
risky_operation,
None, # This will cause failure
on_failure=Callback(on_failure)
)Handle jobs that are stopped or interrupted during execution.
def stopped_callback_example(job: 'Job', connection):
"""
Example stopped callback function signature.
Args:
job (Job): The stopped job.
connection: Redis connection.
Returns:
Any: Callback return value (optional).
"""
# Usage in job creation
def long_running_task():
import time
for i in range(100):
time.sleep(1) # Could be interrupted
return "Completed"
def on_stopped(job, connection):
print(f"Job {job.id} was stopped before completion")
# Could clean up resources, send notifications, etc.
job = queue.enqueue(
long_running_task,
on_stopped=Callback(on_stopped)
)Configure automatic retry behavior for failed jobs.
class Retry:
def __init__(self, max: int, interval: int | list[int] = 0):
"""
Initialize retry configuration.
Args:
max (int): Maximum number of retry attempts.
interval (int | list[int]): Retry interval(s) in seconds.
- int: Fixed interval between retries.
- list[int]: Sequence of intervals for each retry attempt.
"""
@property
def max(self) -> int:
"""Maximum retry attempts."""
@property
def intervals(self) -> list[int]:
"""Retry interval sequence."""
@classmethod
def get_interval(cls, count: int, intervals: list[int] | None) -> int:
"""
Get retry interval for attempt count.
Args:
count (int): Current retry attempt number (0-based).
intervals (list[int] | None): Configured intervals.
Returns:
int: Interval in seconds for this retry attempt.
"""Schedule jobs to repeat automatically with configurable intervals.
class Repeat:
def __init__(self, times: int, interval: int | list[int] = 0):
"""
Initialize repeat configuration.
Args:
times (int): Number of times to repeat the job.
interval (int | list[int]): Interval(s) between repetitions in seconds.
- int: Fixed interval between repetitions.
- list[int]: Sequence of intervals for each repetition.
"""
@property
def times(self) -> int:
"""Number of repetitions."""
@property
def intervals(self) -> list[int]:
"""Repeat interval sequence."""
@classmethod
def get_interval(cls, count: int, intervals: list[int]) -> int:
"""
Get repeat interval for repetition count.
Args:
count (int): Current repetition number (0-based).
intervals (list[int]): Configured intervals.
Returns:
int: Interval in seconds for this repetition.
"""
@classmethod
def schedule(cls, job: 'Job', queue: 'Queue', pipeline=None):
"""
Schedule the next repetition of a job.
Args:
job (Job): Job to repeat.
queue (Queue): Queue to schedule in.
pipeline: Redis pipeline for batched operations.
"""Create dependencies between jobs for workflow orchestration.
class Dependency:
def __init__(
self,
jobs,
allow_failure: bool = False,
enqueue_at_front: bool = False
):
"""
Initialize job dependency.
Args:
jobs: Job instances, job IDs, or sequence of jobs/IDs.
allow_failure (bool): Allow dependent job to run even if dependencies fail.
enqueue_at_front (bool): Enqueue dependent job at front of queue.
"""
@property
def dependencies(self):
"""Sequence of dependency jobs or job IDs."""
@property
def allow_failure(self) -> bool:
"""Whether to allow dependency failures."""
@property
def enqueue_at_front(self) -> bool:
"""Whether to enqueue at front of queue."""from rq import Queue, Callback
import redis
conn = redis.Redis()
queue = Queue(connection=conn)
def process_order(order_id):
# Simulate order processing
import time
time.sleep(2)
return f"Order {order_id} processed successfully"
def send_confirmation_email(job, connection, result):
"""Success callback: send confirmation email."""
order_id = job.args[0]
print(f"Sending confirmation email for order {order_id}: {result}")
# In real app: send actual email
def handle_order_failure(job, connection, exc_type, exc_value, traceback):
"""Failure callback: handle processing failure."""
order_id = job.args[0]
print(f"Order {order_id} processing failed: {exc_value}")
# In real app: notify customer service, log error, etc.
# Enqueue job with callbacks
job = queue.enqueue(
process_order,
"ORD-12345",
on_success=Callback(send_confirmation_email),
on_failure=Callback(handle_order_failure),
description="Process customer order"
)
print(f"Enqueued order processing job: {job.id}")from rq import Queue, Retry
import redis
import random
conn = redis.Redis()
queue = Queue(connection=conn)
def unreliable_api_call(endpoint):
"""Simulates an unreliable API that fails sometimes."""
if random.random() < 0.7: # 70% failure rate
raise ConnectionError(f"Failed to connect to {endpoint}")
return f"Successfully called {endpoint}"
# Simple retry: 3 attempts with 5 second intervals
simple_retry = Retry(max=3, interval=5)
job1 = queue.enqueue(
unreliable_api_call,
"/api/users",
retry=simple_retry,
description="API call with simple retry"
)
# Exponential backoff: increasing intervals
exponential_retry = Retry(max=4, interval=[1, 2, 4, 8]) # 1s, 2s, 4s, 8s
job2 = queue.enqueue(
unreliable_api_call,
"/api/orders",
retry=exponential_retry,
description="API call with exponential backoff"
)
# Custom retry intervals
custom_retry = Retry(max=3, interval=[10, 30, 60]) # 10s, 30s, 1min
job3 = queue.enqueue(
unreliable_api_call,
"/api/payments",
retry=custom_retry,
description="API call with custom intervals"
)
print("Enqueued jobs with different retry strategies")from rq import Queue, Repeat
import redis
from datetime import datetime
conn = redis.Redis()
queue = Queue(connection=conn)
def generate_report(report_type):
"""Generate a periodic report."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Generating {report_type} report at {timestamp}")
return f"{report_type} report generated at {timestamp}"
def cleanup_temp_files():
"""Clean up temporary files."""
import os
temp_count = len([f for f in os.listdir('/tmp') if f.startswith('temp_')])
print(f"Cleaned up {temp_count} temporary files")
return f"Cleaned {temp_count} files"
# Repeat 5 times every hour (3600 seconds)
hourly_repeat = Repeat(times=5, interval=3600)
report_job = queue.enqueue(
generate_report,
"sales_summary",
repeat=hourly_repeat,
description="Hourly sales report"
)
# Repeat with increasing intervals (daily, then weekly)
increasing_repeat = Repeat(times=3, interval=[86400, 604800, 604800]) # 1 day, 1 week, 1 week
cleanup_job = queue.enqueue(
cleanup_temp_files,
repeat=increasing_repeat,
description="Progressive cleanup schedule"
)
print(f"Scheduled repeating jobs: {report_job.id}, {cleanup_job.id}")from rq import Queue, Job
import redis
conn = redis.Redis()
queue = Queue(connection=conn)
def download_data(source):
"""Download data from source."""
import time
time.sleep(2)
print(f"Downloaded data from {source}")
return f"data_{source}.csv"
def validate_data(filename):
"""Validate downloaded data."""
import time
time.sleep(1)
print(f"Validated {filename}")
return f"validated_{filename}"
def process_data(filename):
"""Process validated data."""
import time
time.sleep(3)
print(f"Processed {filename}")
return f"results_{filename}"
def generate_report(filenames):
"""Generate report from processed data."""
print(f"Generated report from {len(filenames)} files")
return f"report_from_{len(filenames)}_sources.pdf"
# Create initial download jobs
download1 = queue.enqueue(download_data, "api_source_1")
download2 = queue.enqueue(download_data, "api_source_2")
download3 = queue.enqueue(download_data, "database_dump")
# Create validation jobs that depend on downloads
validate1 = queue.enqueue(
validate_data,
depends_on=download1,
description="Validate API source 1 data"
)
validate2 = queue.enqueue(
validate_data,
depends_on=download2,
description="Validate API source 2 data"
)
validate3 = queue.enqueue(
validate_data,
depends_on=download3,
description="Validate database dump"
)
# Create processing jobs that depend on validation
process1 = queue.enqueue(process_data, depends_on=validate1)
process2 = queue.enqueue(process_data, depends_on=validate2)
process3 = queue.enqueue(process_data, depends_on=validate3)
# Create final report job that depends on all processing
report_job = queue.enqueue(
generate_report,
depends_on=[process1, process2, process3],
description="Generate final report"
)
print("Created dependency chain:")
print(f"Downloads: {download1.id}, {download2.id}, {download3.id}")
print(f"Validations: {validate1.id}, {validate2.id}, {validate3.id}")
print(f"Processing: {process1.id}, {process2.id}, {process3.id}")
print(f"Report: {report_job.id}")from rq import Queue, Job, Retry, Repeat, Callback
import redis
conn = redis.Redis()
queue = Queue(connection=conn)
# Define workflow functions
def extract_data(source_id):
"""First step: extract data."""
if source_id == "unreliable_source":
import random
if random.random() < 0.3: # 30% failure rate
raise ConnectionError("Source temporarily unavailable")
return f"extracted_data_{source_id}"
def transform_data(data):
"""Second step: transform data."""
return f"transformed_{data}"
def load_data(data):
"""Third step: load data."""
return f"loaded_{data}"
def notify_completion(job, connection, result):
"""Success callback: notify stakeholders."""
print(f"Data pipeline completed successfully: {result}")
def handle_failure(job, connection, exc_type, exc_value, tb):
"""Failure callback: handle pipeline failures."""
print(f"Pipeline step failed: {exc_value}")
# Could trigger alternative workflow
def periodic_health_check():
"""Periodic system health check."""
import random
health_score = random.randint(70, 100)
print(f"System health: {health_score}%")
return health_score
# Create ETL pipeline with error handling
extract_job = queue.enqueue(
extract_data,
"unreliable_source",
retry=Retry(max=3, interval=[5, 15, 30]), # Retry with backoff
on_failure=Callback(handle_failure),
description="Extract data from unreliable source"
)
transform_job = queue.enqueue(
transform_data,
depends_on=extract_job,
on_failure=Callback(handle_failure),
description="Transform extracted data"
)
load_job = queue.enqueue(
load_data,
depends_on=transform_job,
on_success=Callback(notify_completion),
on_failure=Callback(handle_failure),
description="Load transformed data"
)
# Schedule periodic health checks
health_check_job = queue.enqueue(
periodic_health_check,
repeat=Repeat(times=24, interval=3600), # Every hour for 24 hours
description="Hourly system health check"
)
print("Complex workflow created:")
print(f"ETL Pipeline: {extract_job.id} -> {transform_job.id} -> {load_job.id}")
print(f"Health Check: {health_check_job.id} (repeating)")from rq import Queue, Job, Callback
import redis
import json
conn = redis.Redis()
queue = Queue(connection=conn)
def audit_job_completion(job, connection, result):
"""Audit callback: log job completion details."""
audit_data = {
'job_id': job.id,
'function': job.func_name,
'args': job.args,
'kwargs': job.kwargs,
'result': str(result)[:100], # Truncate long results
'duration': (job.ended_at - job.started_at).total_seconds(),
'worker': job.worker_name
}
# In real app: send to audit service
print(f"AUDIT: {json.dumps(audit_data, indent=2)}")
def cascade_failure_handler(job, connection, exc_type, exc_value, tb):
"""Handle failures with cascading cleanup."""
print(f"Job {job.id} failed, starting cleanup cascade")
# Cancel related jobs
if hasattr(job, 'meta') and 'related_jobs' in job.meta:
for related_job_id in job.meta['related_jobs']:
try:
related_job = Job.fetch(related_job_id, connection=connection)
if related_job.get_status() in ['queued', 'started']:
related_job.cancel()
print(f"Cancelled related job: {related_job_id}")
except:
pass
def business_logic_processor(data_type, data):
"""Business logic with metadata."""
current_job = Job.get_current_job()
if current_job:
current_job.meta['processing_stage'] = 'started'
current_job.save_meta()
# Simulate processing
import time
time.sleep(2)
if current_job:
current_job.meta['processing_stage'] = 'completed'
current_job.save_meta()
return f"Processed {data_type}: {data}"
# Create jobs with advanced callback patterns
main_job = queue.enqueue(
business_logic_processor,
"customer_data",
{"records": 1000},
on_success=Callback(audit_job_completion, timeout=30),
on_failure=Callback(cascade_failure_handler, timeout=60),
meta={'related_jobs': [], 'priority': 'high'},
description="Main business logic processing"
)
# Create related jobs
related_jobs = []
for i in range(3):
related_job = queue.enqueue(
business_logic_processor,
f"related_data_{i}",
{"records": 100},
depends_on=main_job,
on_success=Callback(audit_job_completion),
description=f"Related processing {i}"
)
related_jobs.append(related_job.id)
# Update main job metadata with related job IDs
main_job.meta['related_jobs'] = related_jobs
main_job.save_meta()
print(f"Created job chain with advanced callbacks: {main_job.id}")
print(f"Related jobs: {related_jobs}")Install with Tessl CLI
npx tessl i tessl/pypi-rq