Provides job scheduling capabilities to RQ (Redis Queue)
—
RQ Scheduler provides comprehensive tools for managing scheduled jobs including querying, canceling, modifying execution times, and checking job status. These management functions work with job instances or job IDs for flexible job control.
Remove scheduled jobs from the scheduler queue to prevent execution.
def cancel(self, job):
"""
Cancel a scheduled job by removing it from the scheduler queue.
Parameters:
- job: Job instance or str (job_id) to cancel
Returns:
None
Note:
- Does not affect jobs already moved to execution queues
- Safe to call on non-existent jobs (no error raised)
"""Usage Examples:
from rq_scheduler import Scheduler
from redis import Redis
scheduler = Scheduler(connection=Redis())
# Schedule a job and get reference
job = scheduler.enqueue_in(timedelta(hours=1), process_data, data_id=123)
# Cancel using job instance
scheduler.cancel(job)
# Cancel using job ID string
scheduler.cancel('job-id-string')
# Cancel multiple jobs
scheduled_jobs = scheduler.get_jobs()
for job in scheduled_jobs:
if job.description == 'cleanup_task':
scheduler.cancel(job)Retrieve scheduled jobs based on time criteria with flexible filtering and pagination options.
def get_jobs(self, until=None, with_times=False, offset=None, length=None):
"""
Get scheduled jobs iterator with optional filtering and pagination.
Parameters:
- until: datetime, timedelta, int (epoch), or None - only jobs scheduled before this time
- with_times: bool, if True returns (job, scheduled_time) tuples
- offset: int, zero-based starting position for pagination
- length: int, maximum number of jobs to return
Returns:
Iterator of Job instances or (Job, datetime) tuples if with_times=True
Note:
- If offset or length specified, both must be provided
- Jobs are automatically cleaned up if they no longer exist in Redis
- Times returned are datetime objects in UTC
"""Usage Examples:
from datetime import datetime, timedelta
# Get all scheduled jobs
all_jobs = list(scheduler.get_jobs())
# Get jobs scheduled in the next hour
next_hour = datetime.utcnow() + timedelta(hours=1)
upcoming_jobs = list(scheduler.get_jobs(until=next_hour))
# Get jobs with their scheduled times
jobs_with_times = list(scheduler.get_jobs(with_times=True))
for job, scheduled_time in jobs_with_times:
print(f"Job {job.id} scheduled for {scheduled_time}")
# Paginated results - get first 10 jobs
first_batch = list(scheduler.get_jobs(offset=0, length=10))
# Get next 10 jobs
second_batch = list(scheduler.get_jobs(offset=10, length=10))
# Find specific jobs
for job in scheduler.get_jobs():
if job.meta.get('priority') == 'high':
print(f"High priority job: {job.description}")Get jobs that are due to be moved to execution queues.
def get_jobs_to_queue(self, with_times=False):
"""
Get jobs that should be queued for execution (scheduled time has passed).
Parameters:
- with_times: bool, if True returns (job, scheduled_time) tuples
Returns:
List of Job instances or (Job, datetime) tuples ready for execution
"""Usage Examples:
# Check what jobs are ready to run
ready_jobs = scheduler.get_jobs_to_queue()
print(f"{len(ready_jobs)} jobs ready for execution")
# Get ready jobs with their original scheduled times
ready_with_times = scheduler.get_jobs_to_queue(with_times=True)
for job, scheduled_time in ready_with_times:
delay = datetime.utcnow() - scheduled_time
print(f"Job {job.id} was scheduled for {scheduled_time} (delay: {delay})")Count scheduled jobs with time-based filtering.
def count(self, until=None):
"""
Count scheduled jobs, optionally filtered by time.
Parameters:
- until: datetime, timedelta, int (epoch), or None - count jobs scheduled before this time
Returns:
int, number of scheduled jobs matching criteria
"""Usage Examples:
# Total scheduled jobs
total_jobs = scheduler.count()
# Jobs in the next 24 hours
tomorrow = datetime.utcnow() + timedelta(days=1)
upcoming_count = scheduler.count(until=tomorrow)
# Jobs overdue (should have run already)
overdue_count = scheduler.count(until=datetime.utcnow())
print(f"Total: {total_jobs}, Upcoming: {upcoming_count}, Overdue: {overdue_count}")Check if a job is currently scheduled using the in operator.
def __contains__(self, item):
"""
Check if a job is currently scheduled.
Parameters:
- item: Job instance or str (job_id) to check
Returns:
bool, True if job is scheduled, False otherwise
"""Usage Examples:
# Check using job instance
job = scheduler.enqueue_in(timedelta(hours=1), my_function)
if job in scheduler:
print("Job is scheduled")
# Check using job ID
job_id = "my-custom-job-id"
if job_id in scheduler:
print("Job with ID exists")
# Conditional cancellation
if job in scheduler:
scheduler.cancel(job)
print("Job was cancelled")
else:
print("Job not found or already executed")Change the scheduled execution time of existing jobs with atomic Redis operations.
def change_execution_time(self, job, date_time):
"""
Change the execution time of a scheduled job.
Parameters:
- job: Job instance to modify
- date_time: datetime, new execution time (should be UTC)
Returns:
None
Raises:
ValueError: if job is not in scheduled jobs queue
Note:
- Uses Redis transactions to prevent race conditions
- Will retry automatically if job is modified during update
"""Usage Examples:
from datetime import datetime, timedelta
# Schedule a job
job = scheduler.enqueue_at(
datetime(2025, 6, 15, 10, 0),
send_notification,
"Meeting reminder"
)
# Move it earlier by 30 minutes
new_time = datetime(2025, 6, 15, 9, 30)
try:
scheduler.change_execution_time(job, new_time)
print("Job rescheduled successfully")
except ValueError as e:
print(f"Failed to reschedule: {e}")
# Reschedule based on business logic
for job in scheduler.get_jobs():
if job.meta.get('priority') == 'urgent':
# Move urgent jobs to run 10 minutes earlier
jobs_with_times = scheduler.get_jobs(with_times=True)
for j, scheduled_time in jobs_with_times:
if j.id == job.id:
earlier_time = scheduled_time - timedelta(minutes=10)
scheduler.change_execution_time(job, earlier_time)
breakScheduled jobs in RQ Scheduler follow this lifecycle:
Important Notes:
Install with Tessl CLI
npx tessl i tessl/pypi-rq-scheduler