CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq

RQ is a simple, lightweight, library for creating background jobs, and processing them.

Pending
Overview
Eval results
Files

queue-operations.mddocs/

Queue Operations

Comprehensive queue management for job scheduling, enqueueing, and batch operations. RQ queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, bulk operations, and queue monitoring.

Capabilities

Queue Creation and Configuration

Create and configure queues with various options for job processing behavior.

class Queue:
    def __init__(
        self,
        name: str = 'default',
        connection=None,
        default_timeout: int = None,
        is_async: bool = True,
        job_class=None,
        serializer=None,
        death_penalty_class=None,
        **kwargs
    ):
        """
        Initialize a Queue instance.
        
        Args:
            name (str): Queue name. Defaults to 'default'.
            connection: Redis connection instance.
            default_timeout (int): Default job timeout in seconds.
            is_async (bool): Whether to process jobs asynchronously.
            job_class: Custom Job class to use.
            serializer: Custom serializer for job data.
            death_penalty_class: Custom death penalty class for timeouts.
            **kwargs: Additional queue configuration options.
        """

    @classmethod
    def all(cls, connection, job_class=None, serializer=None, death_penalty_class=None) -> list['Queue']:
        """
        Get all existing queues.
        
        Args:
            connection: Redis connection.
            job_class: Job class for deserialization.
            serializer: Custom serializer.
            death_penalty_class: Death penalty class.
            
        Returns:
            list[Queue]: All queues in Redis.
        """

    @classmethod
    def from_queue_key(
        cls, 
        queue_key: str, 
        connection, 
        job_class=None, 
        serializer=None,
        death_penalty_class=None
    ) -> 'Queue':
        """
        Create Queue instance from Redis queue key.
        
        Args:
            queue_key (str): Redis queue key.
            connection: Redis connection.
            job_class: Job class for deserialization.
            serializer: Custom serializer.
            death_penalty_class: Death penalty class.
            
        Returns:
            Queue: Queue instance.
        """

Basic Job Enqueueing

Core methods for adding jobs to queues with various execution options.

def enqueue(self, f, *args, **kwargs) -> 'Job':
    """
    Enqueue a function call for execution.
    
    Args:
        f: Function to execute.
        *args: Positional arguments for the function.
        **kwargs: Keyword arguments for the function and job options.
        
    Job options in kwargs:
        timeout (int): Job timeout in seconds.
        result_ttl (int): Result time-to-live in seconds.
        ttl (int): Job time-to-live in seconds.
        failure_ttl (int): Failure info time-to-live in seconds.
        description (str): Job description.
        depends_on: Job dependencies.
        job_id (str): Custom job ID.
        at_front (bool): Add to front of queue.
        meta (dict): Job metadata.
        retry (Retry): Retry configuration.
        repeat (Repeat): Repeat configuration.
        on_success (Callback): Success callback.
        on_failure (Callback): Failure callback.
        on_stopped (Callback): Stopped callback.
        
    Returns:
        Job: The enqueued job.
    """

def enqueue_call(
    self,
    func,
    args=None,
    kwargs=None,
    timeout=None,
    result_ttl=None,
    ttl=None,
    failure_ttl=None,
    description=None,
    depends_on=None,
    job_id=None,
    at_front=False,
    meta=None,
    retry=None,
    repeat=None,
    on_success=None,
    on_failure=None,
    on_stopped=None,
    pipeline=None
) -> 'Job':
    """
    Enqueue a function call with explicit parameters.
    
    Args:
        func: Function to execute.
        args (tuple): Function positional arguments.
        kwargs (dict): Function keyword arguments.
        timeout (int): Job timeout in seconds.
        result_ttl (int): Result time-to-live in seconds.
        ttl (int): Job time-to-live in seconds.
        failure_ttl (int): Failure info time-to-live in seconds.
        description (str): Job description.
        depends_on: Job dependencies.
        job_id (str): Custom job ID.
        at_front (bool): Add to front of queue.
        meta (dict): Job metadata.
        retry (Retry): Retry configuration.
        repeat (Repeat): Repeat configuration.
        on_success (Callback): Success callback.
        on_failure (Callback): Failure callback.
        on_stopped (Callback): Stopped callback.
        pipeline: Redis pipeline for batched operations.
        
    Returns:
        Job: The enqueued job.
    """

def enqueue_job(self, job: 'Job', pipeline=None, at_front: bool = False) -> 'Job':
    """
    Enqueue an existing job.
    
    Args:
        job (Job): Job to enqueue.
        pipeline: Redis pipeline for batched operations.
        at_front (bool): Add to front of queue.
        
    Returns:
        Job: The enqueued job.
    """

Scheduled Job Enqueueing

Schedule jobs for future execution with precise timing control.

def enqueue_at(self, datetime, f, *args, **kwargs) -> 'Job':
    """
    Schedule a job for execution at a specific datetime.
    
    Args:
        datetime (datetime): When to execute the job.
        f: Function to execute.
        *args: Function positional arguments.
        **kwargs: Function keyword arguments and job options.
        
    Returns:
        Job: The scheduled job.
    """

def enqueue_in(self, time_delta, func, *args, **kwargs) -> 'Job':
    """
    Schedule a job for execution after a time delay.
    
    Args:
        time_delta (timedelta): Delay before execution.
        func: Function to execute.
        *args: Function positional arguments.
        **kwargs: Function keyword arguments and job options.
        
    Returns:
        Job: The scheduled job.
    """

def schedule_job(self, job: 'Job', datetime, pipeline=None):
    """
    Schedule an existing job for future execution.
    
    Args:
        job (Job): Job to schedule.
        datetime (datetime): When to execute the job.
        pipeline: Redis pipeline for batched operations.
    """

Batch Operations

Efficiently handle multiple jobs with batch enqueueing and processing.

def enqueue_many(self, job_datas, pipeline=None, group_id: str = None) -> list['Job']:
    """
    Enqueue multiple jobs in a single operation.
    
    Args:
        job_datas: Iterable of EnqueueData instances or job specifications.
        pipeline: Redis pipeline for batched operations.
        group_id (str): Group identifier for related jobs.
        
    Returns:
        list[Job]: List of enqueued jobs.
    """

@classmethod
def prepare_data(
    cls,
    func,
    args=None,
    kwargs=None,
    timeout=None,
    result_ttl=None,
    ttl=None,
    failure_ttl=None,
    description=None,
    depends_on=None,
    job_id=None,
    at_front=False,
    meta=None,
    retry=None,
    on_success=None,
    on_failure=None,
    on_stopped=None,
    repeat=None
):
    """
    Prepare job data for batch enqueueing.
    
    Args:
        func: Function to execute.
        args (tuple): Function arguments.
        kwargs (dict): Function keyword arguments.
        timeout (int): Job timeout.
        result_ttl (int): Result TTL.
        ttl (int): Job TTL.
        failure_ttl (int): Failure TTL.
        description (str): Job description.
        depends_on: Job dependencies.
        job_id (str): Custom job ID.
        at_front (bool): Priority enqueueing.
        meta (dict): Job metadata.
        retry (Retry): Retry configuration.
        on_success (Callback): Success callback.
        on_failure (Callback): Failure callback.
        on_stopped (Callback): Stopped callback.
        repeat (Repeat): Repeat configuration.
        
    Returns:
        EnqueueData: Prepared job data for batch operations.
    """

Queue Monitoring and Management

Monitor queue state and manage queue lifecycle.

@property
def count(self) -> int:
    """Number of jobs in the queue."""

@property
def is_empty(self) -> bool:
    """True if queue has no jobs."""

@property
def job_ids(self) -> list[str]:
    """List of all job IDs in the queue."""

@property
def jobs(self) -> list['Job']:
    """List of all valid jobs in the queue."""

def get_job_ids(self, offset: int = 0, length: int = -1) -> list[str]:
    """
    Get a slice of job IDs from the queue.
    
    Args:
        offset (int): Starting position.
        length (int): Number of IDs to return (-1 for all).
        
    Returns:
        list[str]: Job IDs.
    """

def get_jobs(self, offset: int = 0, length: int = -1) -> list['Job']:
    """
    Get a slice of jobs from the queue.
    
    Args:
        offset (int): Starting position.
        length (int): Number of jobs to return (-1 for all).
        
    Returns:
        list[Job]: Jobs in the queue.
    """

def fetch_job(self, job_id: str) -> 'Job | None':
    """
    Fetch a specific job from the queue.
    
    Args:
        job_id (str): Job identifier.
        
    Returns:
        Job | None: Job if found, None otherwise.
    """

def get_job_position(self, job_or_id) -> int | None:
    """
    Get position of a job in the queue.
    
    Args:
        job_or_id: Job instance or job ID.
        
    Returns:
        int | None: Position in queue (0-based) or None if not found.
    """

Queue Maintenance

Maintain queue health with cleanup and management operations.

def empty(self):
    """Remove all jobs from the queue."""

def delete(self, delete_jobs: bool = True):
    """
    Delete the queue.
    
    Args:
        delete_jobs (bool): Whether to delete associated jobs.
    """

def compact(self):
    """Remove invalid job references while preserving FIFO order."""

def remove(self, job_or_id, pipeline=None):
    """
    Remove a specific job from the queue.
    
    Args:
        job_or_id: Job instance or job ID to remove.
        pipeline: Redis pipeline for batched operations.
    """

def push_job_id(self, job_id: str, pipeline=None, at_front: bool = False):
    """
    Push a job ID onto the queue.
    
    Args:
        job_id (str): Job identifier.
        pipeline: Redis pipeline.
        at_front (bool): Add to front of queue.
    """

def pop_job_id(self) -> str | None:
    """
    Pop a job ID from the front of the queue.
    
    Returns:
        str | None: Job ID or None if queue is empty.
    """

Multi-Queue Operations

Handle operations across multiple queues for load balancing and priority processing.

@classmethod
def dequeue_any(
    cls,
    queues,
    timeout: int = None,
    connection=None,
    job_class=None,
    serializer=None,
    death_penalty_class=None
) -> tuple['Job', 'Queue'] | None:
    """
    Dequeue a job from any of the given queues.
    
    Args:
        queues: Iterable of Queue instances.
        timeout (int): Timeout in seconds for blocking dequeue.
        connection: Redis connection.
        job_class: Job class for deserialization.
        serializer: Custom serializer.
        death_penalty_class: Death penalty class.
        
    Returns:
        tuple[Job, Queue] | None: (Job, Queue) tuple or None if timeout.
    """

@classmethod
def lpop(cls, queue_keys, timeout: int = None, connection=None):
    """
    Pop from multiple queue keys using Redis BLPOP.
    
    Args:
        queue_keys: List of queue key strings.
        timeout (int): Timeout in seconds.
        connection: Redis connection.
        
    Returns:
        tuple: (queue_key, job_id) or None if timeout.
    """

@classmethod
def lmove(cls, connection, queue_key: str, timeout: int = None):
    """
    Move job using Redis BLMOVE operation.
    
    Args:
        connection: Redis connection.
        queue_key (str): Source queue key.
        timeout (int): Timeout in seconds.
        
    Returns:
        Job data or None if timeout.
    """

Queue Properties and Configuration

Access queue configuration and runtime properties.

@property
def name(self) -> str:
    """Queue name."""

@property
def key(self) -> str:
    """Redis key for the queue."""

@property
def connection(self):
    """Redis connection instance."""

@property
def serializer(self):
    """Serializer used for job data."""

@property
def is_async(self) -> bool:
    """Whether queue processes jobs asynchronously."""

@property
def intermediate_queue_key(self) -> str:
    """Redis key for intermediate queue."""

@property
def intermediate_queue(self):
    """IntermediateQueue instance for this queue."""

def get_redis_server_version(self) -> tuple[int, int, int]:
    """
    Get Redis server version.
    
    Returns:
        tuple[int, int, int]: (major, minor, patch) version numbers.
    """

Registry Access

Access job registries for monitoring different job states.

@property
def failed_job_registry(self):
    """Registry of failed jobs."""

@property
def started_job_registry(self):
    """Registry of jobs currently being executed."""

@property
def finished_job_registry(self):
    """Registry of successfully completed jobs."""

@property
def deferred_job_registry(self):
    """Registry of jobs waiting for dependencies."""

@property
def scheduled_job_registry(self):
    """Registry of scheduled jobs."""

@property
def canceled_job_registry(self):
    """Registry of canceled jobs."""

Usage Examples

Basic Queue Operations

import redis
from rq import Queue

# Connect to Redis
conn = redis.Redis()

# Create a queue
q = Queue('data_processing', connection=conn)

# Simple function
def process_item(item_id, priority='normal'):
    return f"Processed item {item_id} with {priority} priority"

# Enqueue jobs
job1 = q.enqueue(process_item, 'item_001')
job2 = q.enqueue(process_item, 'item_002', priority='high')

# Enqueue with options
job3 = q.enqueue(
    process_item,
    'item_003',
    timeout=300,
    result_ttl=3600,
    description="Process critical item",
    meta={'department': 'sales', 'urgent': True}
)

print(f"Enqueued {len([job1, job2, job3])} jobs")
print(f"Queue count: {q.count}")

Scheduled Job Enqueueing

from datetime import datetime, timedelta
from rq import Queue
import redis

conn = redis.Redis()
q = Queue('scheduled_tasks', connection=conn)

def send_reminder(user_id, message):
    return f"Sent reminder to user {user_id}: {message}"

# Schedule for specific time
future_time = datetime.now() + timedelta(hours=1)
scheduled_job = q.enqueue_at(
    future_time,
    send_reminder,
    user_id=123,
    message="Don't forget your appointment!"
)

# Schedule with delay
delayed_job = q.enqueue_in(
    timedelta(minutes=30),
    send_reminder,
    user_id=456,
    message="Meeting starts in 30 minutes"
)

print(f"Scheduled job: {scheduled_job.id}")
print(f"Delayed job: {delayed_job.id}")

Batch Job Enqueueing

from rq import Queue
import redis

conn = redis.Redis()
q = Queue('batch_processing', connection=conn)

def process_data_chunk(data_chunk, config=None):
    return f"Processed {len(data_chunk)} items"

# Prepare multiple jobs
job_data_list = []
data_chunks = [list(range(i, i+10)) for i in range(0, 100, 10)]

for i, chunk in enumerate(data_chunks):
    job_data = Queue.prepare_data(
        func=process_data_chunk,
        args=(chunk,),
        kwargs={'config': {'batch_id': i}},
        description=f"Process chunk {i}",
        meta={'chunk_size': len(chunk)}
    )
    job_data_list.append(job_data)

# Enqueue all jobs at once
jobs = q.enqueue_many(job_data_list, group_id='batch_001')

print(f"Enqueued {len(jobs)} jobs in batch")
print(f"Queue count: {q.count}")

Queue Monitoring and Management

from rq import Queue
import redis

conn = redis.Redis()
q = Queue('monitoring_example', connection=conn)

# Add some jobs
for i in range(5):
    q.enqueue(lambda x: x * 2, i)

# Monitor queue
print(f"Queue: {q.name}")
print(f"Total jobs: {q.count}")
print(f"Is empty: {q.is_empty}")

# Get job information
job_ids = q.get_job_ids()
print(f"Job IDs: {job_ids}")

# Get first 3 jobs
first_jobs = q.get_jobs(offset=0, length=3)
for job in first_jobs:
    print(f"Job {job.id}: {job.description}")

# Find specific job
if job_ids:
    specific_job = q.fetch_job(job_ids[0])
    position = q.get_job_position(specific_job)
    print(f"Job {specific_job.id} is at position {position}")

# Queue maintenance
print("Before compact:", q.count)
q.compact()  # Remove any invalid job references
print("After compact:", q.count)

# Registry access
print(f"Failed jobs: {q.failed_job_registry.count}")
print(f"Finished jobs: {q.finished_job_registry.count}")

Multi-Queue Processing

from rq import Queue
import redis

conn = redis.Redis()

# Create multiple queues
high_priority = Queue('high_priority', connection=conn)
normal_priority = Queue('normal', connection=conn)
low_priority = Queue('low_priority', connection=conn)

def important_task():
    return "Completed important task"

def regular_task():
    return "Completed regular task"

# Add jobs to different queues
high_priority.enqueue(important_task)
normal_priority.enqueue(regular_task)
low_priority.enqueue(regular_task)

# Dequeue from multiple queues (priority order)
queues = [high_priority, normal_priority, low_priority]
result = Queue.dequeue_any(queues, timeout=1, connection=conn)

if result:
    job, queue = result
    print(f"Dequeued job {job.id} from queue {queue.name}")
else:
    print("No jobs available")

# Get all queues
all_queues = Queue.all(connection=conn)
print(f"Total queues: {len(all_queues)}")
for queue in all_queues:
    print(f"Queue {queue.name}: {queue.count} jobs")

Install with Tessl CLI

npx tessl i tessl/pypi-rq

docs

index.md

job-management.md

job-patterns.md

queue-operations.md

registries-monitoring.md

worker-management.md

tile.json