CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq

RQ is a simple, lightweight, library for creating background jobs, and processing them.

Pending
Overview
Eval results
Files

job-patterns.mddocs/

Job Patterns

Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies. These patterns enable sophisticated job processing workflows with automatic recovery, scheduled repetition, and event-driven processing.

Capabilities

Job Callbacks

Execute callback functions on job completion, failure, or termination.

class Callback:
    def __init__(self, func, timeout: int = None):
        """
        Initialize a callback.
        
        Args:
            func: Callback function or function reference string.
            timeout (int): Callback timeout in seconds. Defaults to CALLBACK_TIMEOUT (60).
        """

    @property
    def name(self) -> str:
        """Function name or path for the callback."""

    @property
    def func(self):
        """Callback function or function reference."""

    @property
    def timeout(self) -> int:
        """Callback timeout in seconds."""

Success Callbacks

Handle successful job completion with result processing.

def success_callback_example(job: 'Job', connection, result):
    """
    Example success callback function signature.
    
    Args:
        job (Job): The completed job.
        connection: Redis connection.
        result: Job return value.
        
    Returns:
        Any: Callback return value (optional).
    """

# Usage in job creation
def process_data(data):
    return f"Processed {len(data)} items"

def on_success(job, connection, result):
    print(f"Job {job.id} completed successfully: {result}")
    # Could send notifications, update databases, etc.

job = queue.enqueue(
    process_data,
    ['item1', 'item2', 'item3'],
    on_success=Callback(on_success, timeout=30)
)

Failure Callbacks

Handle job failures with error information and recovery logic.

def failure_callback_example(job: 'Job', connection, exc_type, exc_value, traceback):
    """
    Example failure callback function signature.
    
    Args:
        job (Job): The failed job.
        connection: Redis connection.
        exc_type: Exception type.
        exc_value: Exception instance.
        traceback: Exception traceback.
        
    Returns:
        Any: Callback return value (optional).
    """

# Usage in job creation
def risky_operation(data):
    if not data:
        raise ValueError("No data provided")
    return f"Processed {data}"

def on_failure(job, connection, exc_type, exc_value, traceback):
    error_msg = f"Job {job.id} failed: {exc_value}"
    print(error_msg)
    # Could log to external service, send alerts, etc.
    
    # Optionally requeue with different parameters
    if isinstance(exc_value, ValueError):
        # Requeue with default data
        queue.enqueue(risky_operation, "default_data")

job = queue.enqueue(
    risky_operation,
    None,  # This will cause failure
    on_failure=Callback(on_failure)
)

Stopped Callbacks

Handle jobs that are stopped or interrupted during execution.

def stopped_callback_example(job: 'Job', connection):
    """
    Example stopped callback function signature.
    
    Args:
        job (Job): The stopped job.
        connection: Redis connection.
        
    Returns:
        Any: Callback return value (optional).
    """

# Usage in job creation
def long_running_task():
    import time
    for i in range(100):
        time.sleep(1)  # Could be interrupted
    return "Completed"

def on_stopped(job, connection):
    print(f"Job {job.id} was stopped before completion")
    # Could clean up resources, send notifications, etc.

job = queue.enqueue(
    long_running_task,
    on_stopped=Callback(on_stopped)
)

Job Retry Patterns

Configure automatic retry behavior for failed jobs.

class Retry:
    def __init__(self, max: int, interval: int | list[int] = 0):
        """
        Initialize retry configuration.
        
        Args:
            max (int): Maximum number of retry attempts.
            interval (int | list[int]): Retry interval(s) in seconds.
                - int: Fixed interval between retries.
                - list[int]: Sequence of intervals for each retry attempt.
        """

    @property
    def max(self) -> int:
        """Maximum retry attempts."""

    @property
    def intervals(self) -> list[int]:
        """Retry interval sequence."""

    @classmethod
    def get_interval(cls, count: int, intervals: list[int] | None) -> int:
        """
        Get retry interval for attempt count.
        
        Args:
            count (int): Current retry attempt number (0-based).
            intervals (list[int] | None): Configured intervals.
            
        Returns:
            int: Interval in seconds for this retry attempt.
        """

Job Repetition Patterns

Schedule jobs to repeat automatically with configurable intervals.

class Repeat:
    def __init__(self, times: int, interval: int | list[int] = 0):
        """
        Initialize repeat configuration.
        
        Args:
            times (int): Number of times to repeat the job.
            interval (int | list[int]): Interval(s) between repetitions in seconds.
                - int: Fixed interval between repetitions.
                - list[int]: Sequence of intervals for each repetition.
        """

    @property
    def times(self) -> int:
        """Number of repetitions."""

    @property
    def intervals(self) -> list[int]:
        """Repeat interval sequence."""

    @classmethod
    def get_interval(cls, count: int, intervals: list[int]) -> int:
        """
        Get repeat interval for repetition count.
        
        Args:
            count (int): Current repetition number (0-based).
            intervals (list[int]): Configured intervals.
            
        Returns:
            int: Interval in seconds for this repetition.
        """

    @classmethod
    def schedule(cls, job: 'Job', queue: 'Queue', pipeline=None):
        """
        Schedule the next repetition of a job.
        
        Args:
            job (Job): Job to repeat.
            queue (Queue): Queue to schedule in.
            pipeline: Redis pipeline for batched operations.
        """

Job Dependencies

Create dependencies between jobs for workflow orchestration.

class Dependency:
    def __init__(
        self,
        jobs,
        allow_failure: bool = False,
        enqueue_at_front: bool = False
    ):
        """
        Initialize job dependency.
        
        Args:
            jobs: Job instances, job IDs, or sequence of jobs/IDs.
            allow_failure (bool): Allow dependent job to run even if dependencies fail.
            enqueue_at_front (bool): Enqueue dependent job at front of queue.
        """

    @property
    def dependencies(self):
        """Sequence of dependency jobs or job IDs."""

    @property
    def allow_failure(self) -> bool:
        """Whether to allow dependency failures."""

    @property
    def enqueue_at_front(self) -> bool:
        """Whether to enqueue at front of queue."""

Usage Examples

Basic Callback Usage

from rq import Queue, Callback
import redis

conn = redis.Redis()
queue = Queue(connection=conn)

def process_order(order_id):
    # Simulate order processing
    import time
    time.sleep(2)
    return f"Order {order_id} processed successfully"

def send_confirmation_email(job, connection, result):
    """Success callback: send confirmation email."""
    order_id = job.args[0]
    print(f"Sending confirmation email for order {order_id}: {result}")
    # In real app: send actual email

def handle_order_failure(job, connection, exc_type, exc_value, traceback):
    """Failure callback: handle processing failure."""
    order_id = job.args[0]
    print(f"Order {order_id} processing failed: {exc_value}")
    # In real app: notify customer service, log error, etc.

# Enqueue job with callbacks
job = queue.enqueue(
    process_order,
    "ORD-12345",
    on_success=Callback(send_confirmation_email),
    on_failure=Callback(handle_order_failure),
    description="Process customer order"
)

print(f"Enqueued order processing job: {job.id}")

Retry Patterns

from rq import Queue, Retry
import redis
import random

conn = redis.Redis()
queue = Queue(connection=conn)

def unreliable_api_call(endpoint):
    """Simulates an unreliable API that fails sometimes."""
    if random.random() < 0.7:  # 70% failure rate
        raise ConnectionError(f"Failed to connect to {endpoint}")
    return f"Successfully called {endpoint}"

# Simple retry: 3 attempts with 5 second intervals
simple_retry = Retry(max=3, interval=5)

job1 = queue.enqueue(
    unreliable_api_call,
    "/api/users",
    retry=simple_retry,
    description="API call with simple retry"
)

# Exponential backoff: increasing intervals
exponential_retry = Retry(max=4, interval=[1, 2, 4, 8])  # 1s, 2s, 4s, 8s

job2 = queue.enqueue(
    unreliable_api_call,
    "/api/orders",
    retry=exponential_retry,
    description="API call with exponential backoff"
)

# Custom retry intervals
custom_retry = Retry(max=3, interval=[10, 30, 60])  # 10s, 30s, 1min

job3 = queue.enqueue(
    unreliable_api_call,
    "/api/payments",
    retry=custom_retry,
    description="API call with custom intervals"
)

print("Enqueued jobs with different retry strategies")

Repetition Patterns

from rq import Queue, Repeat
import redis
from datetime import datetime

conn = redis.Redis()
queue = Queue(connection=conn)

def generate_report(report_type):
    """Generate a periodic report."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"Generating {report_type} report at {timestamp}")
    return f"{report_type} report generated at {timestamp}"

def cleanup_temp_files():
    """Clean up temporary files."""
    import os
    temp_count = len([f for f in os.listdir('/tmp') if f.startswith('temp_')])
    print(f"Cleaned up {temp_count} temporary files")
    return f"Cleaned {temp_count} files"

# Repeat 5 times every hour (3600 seconds)
hourly_repeat = Repeat(times=5, interval=3600)

report_job = queue.enqueue(
    generate_report,
    "sales_summary",
    repeat=hourly_repeat,
    description="Hourly sales report"
)

# Repeat with increasing intervals (daily, then weekly)
increasing_repeat = Repeat(times=3, interval=[86400, 604800, 604800])  # 1 day, 1 week, 1 week

cleanup_job = queue.enqueue(
    cleanup_temp_files,
    repeat=increasing_repeat,
    description="Progressive cleanup schedule"
)

print(f"Scheduled repeating jobs: {report_job.id}, {cleanup_job.id}")

Job Dependencies

from rq import Queue, Job
import redis

conn = redis.Redis()
queue = Queue(connection=conn)

def download_data(source):
    """Download data from source."""
    import time
    time.sleep(2)
    print(f"Downloaded data from {source}")
    return f"data_{source}.csv"

def validate_data(filename):
    """Validate downloaded data."""
    import time
    time.sleep(1)
    print(f"Validated {filename}")
    return f"validated_{filename}"

def process_data(filename):
    """Process validated data."""
    import time
    time.sleep(3)
    print(f"Processed {filename}")
    return f"results_{filename}"

def generate_report(filenames):
    """Generate report from processed data."""
    print(f"Generated report from {len(filenames)} files")
    return f"report_from_{len(filenames)}_sources.pdf"

# Create initial download jobs
download1 = queue.enqueue(download_data, "api_source_1")
download2 = queue.enqueue(download_data, "api_source_2")
download3 = queue.enqueue(download_data, "database_dump")

# Create validation jobs that depend on downloads
validate1 = queue.enqueue(
    validate_data,
    depends_on=download1,
    description="Validate API source 1 data"
)

validate2 = queue.enqueue(
    validate_data,
    depends_on=download2,
    description="Validate API source 2 data"
)

validate3 = queue.enqueue(
    validate_data,
    depends_on=download3,
    description="Validate database dump"
)

# Create processing jobs that depend on validation
process1 = queue.enqueue(process_data, depends_on=validate1)
process2 = queue.enqueue(process_data, depends_on=validate2)
process3 = queue.enqueue(process_data, depends_on=validate3)

# Create final report job that depends on all processing
report_job = queue.enqueue(
    generate_report,
    depends_on=[process1, process2, process3],
    description="Generate final report"
)

print("Created dependency chain:")
print(f"Downloads: {download1.id}, {download2.id}, {download3.id}")
print(f"Validations: {validate1.id}, {validate2.id}, {validate3.id}")
print(f"Processing: {process1.id}, {process2.id}, {process3.id}")
print(f"Report: {report_job.id}")

Complex Workflow Example

from rq import Queue, Job, Retry, Repeat, Callback
import redis

conn = redis.Redis()
queue = Queue(connection=conn)

# Define workflow functions
def extract_data(source_id):
    """First step: extract data."""
    if source_id == "unreliable_source":
        import random
        if random.random() < 0.3:  # 30% failure rate
            raise ConnectionError("Source temporarily unavailable")
    return f"extracted_data_{source_id}"

def transform_data(data):
    """Second step: transform data."""
    return f"transformed_{data}"

def load_data(data):
    """Third step: load data."""
    return f"loaded_{data}"

def notify_completion(job, connection, result):
    """Success callback: notify stakeholders."""
    print(f"Data pipeline completed successfully: {result}")

def handle_failure(job, connection, exc_type, exc_value, tb):
    """Failure callback: handle pipeline failures."""
    print(f"Pipeline step failed: {exc_value}")
    # Could trigger alternative workflow

def periodic_health_check():
    """Periodic system health check."""
    import random
    health_score = random.randint(70, 100)
    print(f"System health: {health_score}%")
    return health_score

# Create ETL pipeline with error handling
extract_job = queue.enqueue(
    extract_data,
    "unreliable_source",
    retry=Retry(max=3, interval=[5, 15, 30]),  # Retry with backoff
    on_failure=Callback(handle_failure),
    description="Extract data from unreliable source"
)

transform_job = queue.enqueue(
    transform_data,
    depends_on=extract_job,
    on_failure=Callback(handle_failure),
    description="Transform extracted data"
)

load_job = queue.enqueue(
    load_data,
    depends_on=transform_job,
    on_success=Callback(notify_completion),
    on_failure=Callback(handle_failure),
    description="Load transformed data"
)

# Schedule periodic health checks
health_check_job = queue.enqueue(
    periodic_health_check,
    repeat=Repeat(times=24, interval=3600),  # Every hour for 24 hours
    description="Hourly system health check"
)

print("Complex workflow created:")
print(f"ETL Pipeline: {extract_job.id} -> {transform_job.id} -> {load_job.id}")
print(f"Health Check: {health_check_job.id} (repeating)")

Advanced Callback Patterns

from rq import Queue, Job, Callback
import redis
import json

conn = redis.Redis()
queue = Queue(connection=conn)

def audit_job_completion(job, connection, result):
    """Audit callback: log job completion details."""
    audit_data = {
        'job_id': job.id,
        'function': job.func_name,
        'args': job.args,
        'kwargs': job.kwargs,
        'result': str(result)[:100],  # Truncate long results
        'duration': (job.ended_at - job.started_at).total_seconds(),
        'worker': job.worker_name
    }
    
    # In real app: send to audit service
    print(f"AUDIT: {json.dumps(audit_data, indent=2)}")

def cascade_failure_handler(job, connection, exc_type, exc_value, tb):
    """Handle failures with cascading cleanup."""
    print(f"Job {job.id} failed, starting cleanup cascade")
    
    # Cancel related jobs
    if hasattr(job, 'meta') and 'related_jobs' in job.meta:
        for related_job_id in job.meta['related_jobs']:
            try:
                related_job = Job.fetch(related_job_id, connection=connection)
                if related_job.get_status() in ['queued', 'started']:
                    related_job.cancel()
                    print(f"Cancelled related job: {related_job_id}")
            except:
                pass

def business_logic_processor(data_type, data):
    """Business logic with metadata."""
    current_job = Job.get_current_job()
    if current_job:
        current_job.meta['processing_stage'] = 'started'
        current_job.save_meta()
    
    # Simulate processing
    import time
    time.sleep(2)
    
    if current_job:
        current_job.meta['processing_stage'] = 'completed'
        current_job.save_meta()
    
    return f"Processed {data_type}: {data}"

# Create jobs with advanced callback patterns
main_job = queue.enqueue(
    business_logic_processor,
    "customer_data",
    {"records": 1000},
    on_success=Callback(audit_job_completion, timeout=30),
    on_failure=Callback(cascade_failure_handler, timeout=60),
    meta={'related_jobs': [], 'priority': 'high'},
    description="Main business logic processing"
)

# Create related jobs
related_jobs = []
for i in range(3):
    related_job = queue.enqueue(
        business_logic_processor,
        f"related_data_{i}",
        {"records": 100},
        depends_on=main_job,
        on_success=Callback(audit_job_completion),
        description=f"Related processing {i}"
    )
    related_jobs.append(related_job.id)

# Update main job metadata with related job IDs
main_job.meta['related_jobs'] = related_jobs
main_job.save_meta()

print(f"Created job chain with advanced callbacks: {main_job.id}")
print(f"Related jobs: {related_jobs}")

Install with Tessl CLI

npx tessl i tessl/pypi-rq

docs

index.md

job-management.md

job-patterns.md

queue-operations.md

registries-monitoring.md

worker-management.md

tile.json