CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq-scheduler

Provides job scheduling capabilities to RQ (Redis Queue)

Pending
Overview
Eval results
Files

job-management.mddocs/

Job Management

RQ Scheduler provides comprehensive tools for managing scheduled jobs including querying, canceling, modifying execution times, and checking job status. These management functions work with job instances or job IDs for flexible job control.

Capabilities

Job Cancellation

Remove scheduled jobs from the scheduler queue to prevent execution.

def cancel(self, job):
    """
    Cancel a scheduled job by removing it from the scheduler queue.
    
    Parameters:
    - job: Job instance or str (job_id) to cancel
    
    Returns:
    None
    
    Note:
    - Does not affect jobs already moved to execution queues
    - Safe to call on non-existent jobs (no error raised)
    """

Usage Examples:

from rq_scheduler import Scheduler
from redis import Redis

scheduler = Scheduler(connection=Redis())

# Schedule a job and get reference
job = scheduler.enqueue_in(timedelta(hours=1), process_data, data_id=123)

# Cancel using job instance
scheduler.cancel(job)

# Cancel using job ID string
scheduler.cancel('job-id-string')

# Cancel multiple jobs
scheduled_jobs = scheduler.get_jobs()
for job in scheduled_jobs:
    if job.description == 'cleanup_task':
        scheduler.cancel(job)

Job Querying

Retrieve scheduled jobs based on time criteria with flexible filtering and pagination options.

def get_jobs(self, until=None, with_times=False, offset=None, length=None):
    """
    Get scheduled jobs iterator with optional filtering and pagination.
    
    Parameters:
    - until: datetime, timedelta, int (epoch), or None - only jobs scheduled before this time
    - with_times: bool, if True returns (job, scheduled_time) tuples
    - offset: int, zero-based starting position for pagination
    - length: int, maximum number of jobs to return
    
    Returns:
    Iterator of Job instances or (Job, datetime) tuples if with_times=True
    
    Note:
    - If offset or length specified, both must be provided
    - Jobs are automatically cleaned up if they no longer exist in Redis
    - Times returned are datetime objects in UTC
    """

Usage Examples:

from datetime import datetime, timedelta

# Get all scheduled jobs
all_jobs = list(scheduler.get_jobs())

# Get jobs scheduled in the next hour
next_hour = datetime.utcnow() + timedelta(hours=1)
upcoming_jobs = list(scheduler.get_jobs(until=next_hour))

# Get jobs with their scheduled times
jobs_with_times = list(scheduler.get_jobs(with_times=True))
for job, scheduled_time in jobs_with_times:
    print(f"Job {job.id} scheduled for {scheduled_time}")

# Paginated results - get first 10 jobs
first_batch = list(scheduler.get_jobs(offset=0, length=10))

# Get next 10 jobs
second_batch = list(scheduler.get_jobs(offset=10, length=10))

# Find specific jobs
for job in scheduler.get_jobs():
    if job.meta.get('priority') == 'high':
        print(f"High priority job: {job.description}")

Jobs Ready for Execution

Get jobs that are due to be moved to execution queues.

def get_jobs_to_queue(self, with_times=False):
    """
    Get jobs that should be queued for execution (scheduled time has passed).
    
    Parameters:
    - with_times: bool, if True returns (job, scheduled_time) tuples
    
    Returns:
    List of Job instances or (Job, datetime) tuples ready for execution
    """

Usage Examples:

# Check what jobs are ready to run
ready_jobs = scheduler.get_jobs_to_queue()
print(f"{len(ready_jobs)} jobs ready for execution")

# Get ready jobs with their original scheduled times
ready_with_times = scheduler.get_jobs_to_queue(with_times=True)
for job, scheduled_time in ready_with_times:
    delay = datetime.utcnow() - scheduled_time
    print(f"Job {job.id} was scheduled for {scheduled_time} (delay: {delay})")

Job Count

Count scheduled jobs with time-based filtering.

def count(self, until=None):
    """
    Count scheduled jobs, optionally filtered by time.
    
    Parameters:
    - until: datetime, timedelta, int (epoch), or None - count jobs scheduled before this time
    
    Returns:
    int, number of scheduled jobs matching criteria
    """

Usage Examples:

# Total scheduled jobs
total_jobs = scheduler.count()

# Jobs in the next 24 hours
tomorrow = datetime.utcnow() + timedelta(days=1)
upcoming_count = scheduler.count(until=tomorrow)

# Jobs overdue (should have run already)
overdue_count = scheduler.count(until=datetime.utcnow())

print(f"Total: {total_jobs}, Upcoming: {upcoming_count}, Overdue: {overdue_count}")

Job Existence Check

Check if a job is currently scheduled using the in operator.

def __contains__(self, item):
    """
    Check if a job is currently scheduled.
    
    Parameters:
    - item: Job instance or str (job_id) to check
    
    Returns:
    bool, True if job is scheduled, False otherwise
    """

Usage Examples:

# Check using job instance
job = scheduler.enqueue_in(timedelta(hours=1), my_function)
if job in scheduler:
    print("Job is scheduled")

# Check using job ID
job_id = "my-custom-job-id"
if job_id in scheduler:
    print("Job with ID exists")

# Conditional cancellation
if job in scheduler:
    scheduler.cancel(job)
    print("Job was cancelled")
else:
    print("Job not found or already executed")

Execution Time Modification

Change the scheduled execution time of existing jobs with atomic Redis operations.

def change_execution_time(self, job, date_time):
    """
    Change the execution time of a scheduled job.
    
    Parameters:
    - job: Job instance to modify
    - date_time: datetime, new execution time (should be UTC)
    
    Returns:
    None
    
    Raises:
    ValueError: if job is not in scheduled jobs queue
    
    Note:
    - Uses Redis transactions to prevent race conditions
    - Will retry automatically if job is modified during update
    """

Usage Examples:

from datetime import datetime, timedelta

# Schedule a job
job = scheduler.enqueue_at(
    datetime(2025, 6, 15, 10, 0),
    send_notification,
    "Meeting reminder"
)

# Move it earlier by 30 minutes
new_time = datetime(2025, 6, 15, 9, 30)
try:
    scheduler.change_execution_time(job, new_time)
    print("Job rescheduled successfully")
except ValueError as e:
    print(f"Failed to reschedule: {e}")

# Reschedule based on business logic
for job in scheduler.get_jobs():
    if job.meta.get('priority') == 'urgent':
        # Move urgent jobs to run 10 minutes earlier
        jobs_with_times = scheduler.get_jobs(with_times=True)
        for j, scheduled_time in jobs_with_times:
            if j.id == job.id:
                earlier_time = scheduled_time - timedelta(minutes=10)
                scheduler.change_execution_time(job, earlier_time)
                break

Job Status and Lifecycle

Scheduled jobs in RQ Scheduler follow this lifecycle:

  1. Scheduled: Job created and stored in Redis sorted set by execution time
  2. Ready: Job's scheduled time has passed, eligible for queue movement
  3. Queued: Job moved to RQ execution queue (no longer in scheduler)
  4. Executing/Completed/Failed: Standard RQ job lifecycle

Important Notes:

  • Jobs removed from scheduler queue cannot be managed by scheduler methods
  • Periodic and cron jobs automatically reschedule themselves after execution
  • Job metadata persists through the lifecycle for tracking and debugging
  • Failed jobs may be retried according to RQ configuration, not scheduler settings

Install with Tessl CLI

npx tessl i tessl/pypi-rq-scheduler

docs

index.md

job-management.md

job-scheduling.md

scheduler-control.md

utilities.md

tile.json