Provides job scheduling capabilities to RQ (Redis Queue)
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
RQ Scheduler provides job scheduling capabilities to RQ (Redis Queue), enabling developers to schedule jobs to run at specific times, dates, or intervals. It supports one-time scheduled jobs, periodic jobs, and cron-style scheduling patterns with Redis-based persistence and distributed scheduler coordination.
pip install rq-schedulerfrom rq_scheduler import SchedulerUtility functions:
from rq_scheduler.utils import from_unix, to_unix, get_next_scheduled_timefrom datetime import datetime, timedelta
from redis import Redis
from rq_scheduler import Scheduler
# Create scheduler for default queue
redis_conn = Redis()
scheduler = Scheduler(connection=redis_conn)
# Schedule a job to run at a specific time
def my_job(name):
print(f"Hello, {name}!")
# Schedule job to run on January 1st, 2025 at 3:00 AM UTC
scheduler.enqueue_at(datetime(2025, 1, 1, 3, 0), my_job, 'World')
# Schedule job to run in 10 minutes
scheduler.enqueue_in(timedelta(minutes=10), my_job, 'Future')
# Schedule periodic job every hour
scheduler.schedule(
scheduled_time=datetime.utcnow() + timedelta(minutes=5),
func=my_job,
args=['Recurring'],
interval=3600 # Every hour
)
# Run the scheduler daemon
scheduler.run()RQ Scheduler extends RQ's job queue system with time-based scheduling:
Core scheduling methods for one-time and recurring jobs including datetime-based scheduling, time delta scheduling, and cron-style patterns.
def enqueue_at(self, scheduled_time, func, *args, **kwargs):
"""Schedule a job to run at a specific datetime."""
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""Schedule a job to run after a time delay."""
def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, **options):
"""Schedule a periodic or repeated job."""
def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, **options):
"""Schedule a job using cron syntax."""Functions for managing scheduled jobs including querying, canceling, and modifying execution times.
def cancel(self, job):
"""Cancel a scheduled job by removing it from the scheduler queue."""
def get_jobs(self, until=None, with_times=False, offset=None, length=None):
"""Get scheduled jobs iterator with optional filtering and pagination."""
def count(self, until=None):
"""Count scheduled jobs, optionally filtered by time."""
def change_execution_time(self, job, date_time):
"""Change the execution time of a scheduled job."""
def __contains__(self, item):
"""Check if a job is currently scheduled."""Methods for running and controlling the scheduler daemon including locking, heartbeats, and job processing.
def run(self, burst=False):
"""Start the scheduler daemon to process scheduled jobs."""
def acquire_lock(self):
"""Acquire distributed lock for scheduler coordination."""
def remove_lock(self):
"""Release previously acquired distributed lock."""
def heartbeat(self):
"""Send heartbeat to maintain scheduler registration."""
def enqueue_jobs(self):
"""Move all ready scheduled jobs to their target execution queues."""
def enqueue_job(self, job):
"""Move a scheduled job to a queue and handle periodic job rescheduling."""
def get_queue_for_job(self, job):
"""Get the appropriate queue instance for a job."""Time conversion and scheduling utility functions for working with timestamps and cron expressions.
def from_unix(string):
"""Convert Unix timestamp to UTC datetime object."""
def to_unix(dt):
"""Convert datetime object to Unix timestamp."""
def get_next_scheduled_time(cron_string, use_local_timezone=False):
"""Calculate the next scheduled execution time from a cron expression."""
def rationalize_until(until=None):
"""Process 'until' parameter for time-based queries."""
def setup_loghandlers(level='INFO'):
"""Configure logging for RQ Scheduler with colorized output."""class Scheduler:
"""Main scheduler class for managing scheduled jobs."""
def __init__(self, queue_name='default', queue=None, interval=60,
connection=None, job_class=None, queue_class=None, name=None):
"""
Initialize scheduler.
Parameters:
- queue_name: str, name of queue for scheduled jobs
- queue: RQ Queue instance (optional, overrides queue_name)
- interval: int, polling interval in seconds
- connection: Redis connection (required)
- job_class: Custom job class
- queue_class: Custom queue class
- name: Scheduler instance name for coordination
"""
VERSION: tuple = (0, 14, 0)The package provides a command-line interface for running scheduler daemons:
# Run scheduler with default settings
rqscheduler
# Redis connection options
rqscheduler --host redis.example.com --port 6380 --db 1
rqscheduler --password mypassword
rqscheduler --url redis://user:pass@host:port/db
# Execution modes
rqscheduler --burst # Process all jobs then exit
rqscheduler --interval 30 # Custom polling interval (seconds)
# Logging options
rqscheduler --verbose # Debug level logging
rqscheduler --quiet # Warning level logging only
# Advanced options
rqscheduler --path /custom/path # Set import path
rqscheduler --pid /var/run/rq.pid # Write PID to file
rqscheduler --job-class myapp.CustomJob # Custom job class
rqscheduler --queue-class myapp.CustomQueue # Custom queue class
# Show all options
rqscheduler --helpEnvironment Variables:
RQ_REDIS_HOST - Redis host (default: localhost)RQ_REDIS_PORT - Redis port (default: 6379)RQ_REDIS_DB - Redis database (default: 0)RQ_REDIS_PASSWORD - Redis passwordRQ_REDIS_URL - Complete Redis URL (overrides other connection settings)