CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq

RQ is a simple, lightweight, library for creating background jobs, and processing them.

Pending
Overview
Eval results
Files

job-management.mddocs/

Job Management

Comprehensive job lifecycle management including creation, execution tracking, status monitoring, and control operations. Jobs in RQ encapsulate function calls with rich metadata, status tracking, and support for advanced patterns like callbacks, retries, and dependencies.

Capabilities

Job Creation and Retrieval

Create new jobs and retrieve existing ones by ID with full serialization support.

class Job:
    def __init__(self, id: str = None, connection = None, serializer=None):
        """
        Initialize a Job instance.
        
        Args:
            id (str, optional): Job identifier. Generated if not provided.
            connection: Redis connection instance.
            serializer: Custom serializer for job data.
        """

    @classmethod
    def create(
        cls, 
        func, 
        args=None, 
        kwargs=None, 
        connection=None,
        result_ttl=None,
        ttl=None,
        status=None,
        description=None,
        depends_on=None,
        timeout=None,
        id=None,
        origin='',
        meta=None,
        failure_ttl=None,
        serializer=None,
        group_id=None,
        on_success=None,
        on_failure=None,
        on_stopped=None
    ) -> 'Job':
        """
        Create a new job instance.
        
        Args:
            func: Function to execute or function reference string.
            args (tuple): Positional arguments for the function.
            kwargs (dict): Keyword arguments for the function.
            connection: Redis connection.
            result_ttl (int): Result time-to-live in seconds.
            ttl (int): Job time-to-live in seconds.
            status (JobStatus): Initial job status.
            description (str): Human-readable job description.
            depends_on: Job dependencies.
            timeout (int): Job execution timeout in seconds.
            id (str): Custom job ID.
            origin (str): Queue name where job originated.
            meta (dict): Additional job metadata.
            failure_ttl (int): Failure info time-to-live in seconds.
            serializer: Custom serializer.
            group_id (str): Job group identifier.
            on_success (Callback): Success callback.
            on_failure (Callback): Failure callback.
            on_stopped (Callback): Stopped callback.
            
        Returns:
            Job: New job instance.
        """

    @classmethod
    def fetch(cls, id: str, connection, serializer=None) -> 'Job':
        """
        Retrieve an existing job by ID.
        
        Args:
            id (str): Job identifier.
            connection: Redis connection.
            serializer: Custom serializer.
            
        Returns:
            Job: Retrieved job instance.
            
        Raises:
            NoSuchJobError: If job doesn't exist.
        """

    @classmethod
    def exists(cls, job_id: str, connection) -> bool:
        """
        Check if a job exists.
        
        Args:
            job_id (str): Job identifier.
            connection: Redis connection.
            
        Returns:
            bool: True if job exists, False otherwise.
        """

    @classmethod
    def fetch_many(cls, job_ids: list[str], connection, serializer=None) -> list['Job | None']:
        """
        Fetch multiple jobs by their IDs.
        
        Args:
            job_ids (list[str]): List of job identifiers.
            connection: Redis connection.
            serializer: Custom serializer.
            
        Returns:
            list[Job | None]: List of jobs (None for non-existent jobs).
        """

Job Status and Lifecycle

Monitor and control job execution status throughout its lifecycle.

def get_status(self, refresh: bool = True) -> JobStatus:
    """
    Get current job status.
    
    Args:
        refresh (bool): Whether to refresh from Redis before returning.
        
    Returns:
        JobStatus: Current job status.
    """

def set_status(self, status: JobStatus, pipeline=None):
    """
    Set job status.
    
    Args:
        status (JobStatus): New status to set.
        pipeline: Redis pipeline for batched operations.
    """

def refresh(self):
    """Refresh job data from Redis."""

def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True):
    """
    Save job to Redis.
    
    Args:
        pipeline: Redis pipeline for batched operations.
        include_meta (bool): Whether to save metadata.
        include_result (bool): Whether to save result.
    """

def delete(self, pipeline=None, remove_from_queue: bool = True, delete_dependents: bool = False):
    """
    Delete job from Redis.
    
    Args:
        pipeline: Redis pipeline for batched operations.
        remove_from_queue (bool): Remove from queue if queued.
        delete_dependents (bool): Delete dependent jobs.
    """

def cleanup(self, ttl: int = None, pipeline=None, remove_from_queue: bool = True):
    """
    Clean up job data.
    
    Args:
        ttl (int): Time-to-live for cleanup.
        pipeline: Redis pipeline.
        remove_from_queue (bool): Remove from queue.
    """

Job Execution and Results

Execute jobs and manage execution results with comprehensive error handling.

def perform(self) -> Any:
    """
    Execute the job function.
    
    Returns:
        Any: Function return value.
        
    Raises:
        Various: Any exception raised by the job function.
    """

def return_value(self, refresh: bool = False) -> Any:
    """
    Get job return value.
    
    Args:
        refresh (bool): Whether to refresh from Redis.
        
    Returns:
        Any: Job return value if completed, None otherwise.
    """

def latest_result(self, timeout: int = 0) -> 'Result | None':
    """
    Get the latest job result.
    
    Args:
        timeout (int): Maximum wait time for result.
        
    Returns:
        Result | None: Latest result or None.
    """

def results(self) -> list['Result']:
    """
    Get all job results.
    
    Returns:
        list[Result]: All job results.
    """

def get_call_string(self) -> str | None:
    """
    Get string representation of the function call.
    
    Returns:
        str | None: Function call string.
    """

Job Control Operations

Control job execution with cancellation, requeuing, and retry mechanisms.

def cancel(self, pipeline=None, enqueue_dependents: bool = False, remove_from_dependencies: bool = False):
    """
    Cancel job execution.
    
    Args:
        pipeline: Redis pipeline for batched operations.
        enqueue_dependents (bool): Enqueue dependent jobs.
        remove_from_dependencies (bool): Remove from dependency lists.
    """

def requeue(self, at_front: bool = False) -> 'Job':
    """
    Requeue job for execution.
    
    Args:
        at_front (bool): Add to front of queue.
        
    Returns:
        Job: The requeued job.
    """

def retry(self, queue: 'Queue', pipeline=None):
    """
    Retry job execution.
    
    Args:
        queue (Queue): Queue to retry in.
        pipeline: Redis pipeline.
    """

def get_retry_interval(self) -> int:
    """
    Get retry interval for this job.
    
    Returns:
        int: Retry interval in seconds.
    """

Standalone Job Functions

Module-level functions for job operations without requiring job instances.

def get_current_job(connection=None, job_class=None) -> 'Job | None':
    """
    Get the currently executing job within a worker context.
    
    Args:
        connection: Redis connection. Uses default if None.
        job_class: Job class to use for deserialization.
        
    Returns:
        Job | None: Current job if in worker context, None otherwise.
    """

def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False):
    """
    Cancel a job by its ID.
    
    Args:
        job_id (str): Job identifier to cancel.
        connection: Redis connection.
        serializer: Custom serializer.
        enqueue_dependents (bool): Enqueue dependent jobs after cancellation.
        
    Raises:
        NoSuchJobError: If job doesn't exist.
    """

def requeue_job(job_id: str, connection, serializer=None) -> 'Job':
    """
    Requeue a job by its ID.
    
    Args:
        job_id (str): Job identifier to requeue.
        connection: Redis connection.
        serializer: Custom serializer.
        
    Returns:
        Job: The requeued job.
        
    Raises:
        NoSuchJobError: If job doesn't exist.
    """

Job Properties and Metadata

Access and modify job properties, metadata, and execution information.

# Core Properties
@property
def id(self) -> str:
    """Job identifier."""

@property  
def key(self) -> bytes:
    """Redis key for this job."""

@property
def func(self):
    """Function to execute."""

@property
def args(self) -> tuple:
    """Function positional arguments."""

@property
def kwargs(self) -> dict:
    """Function keyword arguments."""

@property
def description(self) -> str | None:
    """Job description."""

@property
def origin(self) -> str:
    """Queue where job originated."""

@property
def timeout(self) -> float | None:
    """Job timeout in seconds."""

@property
def result_ttl(self) -> int | None:
    """Result time-to-live in seconds."""

@property
def failure_ttl(self) -> int | None:
    """Failure info time-to-live in seconds."""

@property
def ttl(self) -> int | None:
    """Job time-to-live in seconds."""

# Timing Properties
@property
def enqueued_at(self) -> datetime | None:
    """When job was enqueued."""

@property
def started_at(self) -> datetime | None:
    """When job execution started."""

@property
def ended_at(self) -> datetime | None:
    """When job execution ended."""

@property
def last_heartbeat(self) -> datetime | None:
    """Last worker heartbeat timestamp."""

# Status Properties
@property
def is_finished(self) -> bool:
    """True if job finished successfully."""

@property
def is_queued(self) -> bool:
    """True if job is queued for execution."""

@property
def is_failed(self) -> bool:
    """True if job failed."""

@property
def is_started(self) -> bool:
    """True if job execution started."""

@property
def is_deferred(self) -> bool:
    """True if job is deferred (waiting for dependencies)."""

@property
def is_canceled(self) -> bool:
    """True if job was canceled."""

@property
def is_scheduled(self) -> bool:
    """True if job is scheduled for future execution."""

@property
def is_stopped(self) -> bool:
    """True if job was stopped."""

# Metadata Properties
@property
def meta(self) -> dict:
    """Job metadata dictionary."""

@property
def worker_name(self) -> str | None:
    """Name of worker that executed/is executing the job."""

@property
def group_id(self) -> str | None:
    """Job group identifier."""

def get_meta(self, refresh: bool = True) -> dict:
    """
    Get job metadata.
    
    Args:
        refresh (bool): Refresh from Redis before returning.
        
    Returns:
        dict: Job metadata.
    """

def save_meta(self):
    """Save metadata to Redis."""

Job Dependencies

Manage job dependencies and execution ordering.

@property
def dependency(self) -> 'Job | None':
    """First job dependency."""

@property
def dependency_ids(self) -> list[bytes]:
    """List of dependency job keys."""

@property
def dependent_ids(self) -> list[str]:
    """List of dependent job IDs."""

def fetch_dependencies(self, watch: bool = False, pipeline=None) -> list['Job']:
    """
    Fetch all job dependencies.
    
    Args:
        watch (bool): Watch dependencies for changes.
        pipeline: Redis pipeline.
        
    Returns:
        list[Job]: List of dependency jobs.
    """

def register_dependency(self, pipeline=None):
    """
    Register job dependencies in Redis.
    
    Args:
        pipeline: Redis pipeline for batched operations.
    """

def dependencies_are_met(
    self, 
    parent_job: 'Job' = None, 
    pipeline=None, 
    exclude_job_id: str = None,
    refresh_job_status: bool = True
) -> bool:
    """
    Check if all job dependencies are satisfied.
    
    Args:
        parent_job (Job): Parent job context.
        pipeline: Redis pipeline.
        exclude_job_id (str): Job ID to exclude from check.
        refresh_job_status (bool): Refresh dependency status from Redis.
        
    Returns:
        bool: True if dependencies are met, False otherwise.
    """

def delete_dependents(self, pipeline=None):
    """
    Delete all dependent jobs.
    
    Args:
        pipeline: Redis pipeline for batched operations.
    """

Usage Examples

Basic Job Creation and Monitoring

import redis
from rq import Job

# Connect to Redis
conn = redis.Redis()

# Define a job function
def process_data(data_id, options=None):
    # Simulate processing
    import time
    time.sleep(2)
    return f"Processed data {data_id}"

# Create a job
job = Job.create(
    func=process_data,
    args=('data_123',),
    kwargs={'options': {'fast': True}},
    connection=conn,
    timeout=300,
    description="Process data batch 123"
)

# Save job to Redis
job.save()

print(f"Created job: {job.id}")
print(f"Status: {job.get_status()}")
print(f"Description: {job.description}")

# Later, retrieve and check the job
retrieved_job = Job.fetch(job.id, connection=conn)
print(f"Retrieved job status: {retrieved_job.get_status()}")

Job Metadata and Results

from rq import Job, get_current_job
import redis

conn = redis.Redis()

def tracked_function(item_count):
    # Get current job to update metadata
    job = get_current_job()
    
    if job:
        job.meta['progress'] = 0
        job.save_meta()
    
    # Simulate processing with progress updates
    for i in range(item_count):
        # Do work...
        
        if job:
            job.meta['progress'] = (i + 1) / item_count * 100
            job.meta['current_item'] = i + 1
            job.save_meta()
    
    return f"Completed processing {item_count} items"

# Create job with initial metadata
job = Job.create(
    func=tracked_function,
    args=(100,),
    connection=conn,
    meta={'stage': 'queued', 'priority': 'high'}
)

job.save()

# Monitor progress (from another process/thread)
while not job.is_finished and not job.is_failed:
    job.refresh()
    meta = job.get_meta()
    print(f"Progress: {meta.get('progress', 0)}%")
    time.sleep(1)

# Get final result
if job.is_finished:
    print(f"Result: {job.return_value()}")
elif job.is_failed:
    print("Job failed")

Job Cancellation and Requeuing

from rq import Job, cancel_job, requeue_job
import redis

conn = redis.Redis()

# Create a long-running job
def long_task():
    import time
    time.sleep(60)
    return "Done"

job = Job.create(func=long_task, connection=conn)
job.save()

# Cancel the job
cancel_job(job.id, connection=conn)
print(f"Job {job.id} canceled")

# Create another job and then requeue it
job2 = Job.create(func=long_task, connection=conn)
job2.save()

# Requeue (useful after failures or for retrying)
requeued_job = requeue_job(job2.id, connection=conn)
print(f"Job requeued: {requeued_job.id}")

Install with Tessl CLI

npx tessl i tessl/pypi-rq

docs

index.md

job-management.md

job-patterns.md

queue-operations.md

registries-monitoring.md

worker-management.md

tile.json