Provides job scheduling capabilities to RQ (Redis Queue)
npx @tessl/cli install tessl/pypi-rq-scheduler@0.14.0RQ Scheduler provides job scheduling capabilities to RQ (Redis Queue), enabling developers to schedule jobs to run at specific times, dates, or intervals. It supports one-time scheduled jobs, periodic jobs, and cron-style scheduling patterns with Redis-based persistence and distributed scheduler coordination.
pip install rq-schedulerfrom rq_scheduler import SchedulerUtility functions:
from rq_scheduler.utils import from_unix, to_unix, get_next_scheduled_timefrom datetime import datetime, timedelta
from redis import Redis
from rq_scheduler import Scheduler
# Create scheduler for default queue
redis_conn = Redis()
scheduler = Scheduler(connection=redis_conn)
# Schedule a job to run at a specific time
def my_job(name):
print(f"Hello, {name}!")
# Schedule job to run on January 1st, 2025 at 3:00 AM UTC
scheduler.enqueue_at(datetime(2025, 1, 1, 3, 0), my_job, 'World')
# Schedule job to run in 10 minutes
scheduler.enqueue_in(timedelta(minutes=10), my_job, 'Future')
# Schedule periodic job every hour
scheduler.schedule(
scheduled_time=datetime.utcnow() + timedelta(minutes=5),
func=my_job,
args=['Recurring'],
interval=3600 # Every hour
)
# Run the scheduler daemon
scheduler.run()RQ Scheduler extends RQ's job queue system with time-based scheduling:
Core scheduling methods for one-time and recurring jobs including datetime-based scheduling, time delta scheduling, and cron-style patterns.
def enqueue_at(self, scheduled_time, func, *args, **kwargs):
"""Schedule a job to run at a specific datetime."""
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""Schedule a job to run after a time delay."""
def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, **options):
"""Schedule a periodic or repeated job."""
def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, **options):
"""Schedule a job using cron syntax."""Functions for managing scheduled jobs including querying, canceling, and modifying execution times.
def cancel(self, job):
"""Cancel a scheduled job by removing it from the scheduler queue."""
def get_jobs(self, until=None, with_times=False, offset=None, length=None):
"""Get scheduled jobs iterator with optional filtering and pagination."""
def count(self, until=None):
"""Count scheduled jobs, optionally filtered by time."""
def change_execution_time(self, job, date_time):
"""Change the execution time of a scheduled job."""
def __contains__(self, item):
"""Check if a job is currently scheduled."""Methods for running and controlling the scheduler daemon including locking, heartbeats, and job processing.
def run(self, burst=False):
"""Start the scheduler daemon to process scheduled jobs."""
def acquire_lock(self):
"""Acquire distributed lock for scheduler coordination."""
def remove_lock(self):
"""Release previously acquired distributed lock."""
def heartbeat(self):
"""Send heartbeat to maintain scheduler registration."""
def enqueue_jobs(self):
"""Move all ready scheduled jobs to their target execution queues."""
def enqueue_job(self, job):
"""Move a scheduled job to a queue and handle periodic job rescheduling."""
def get_queue_for_job(self, job):
"""Get the appropriate queue instance for a job."""Time conversion and scheduling utility functions for working with timestamps and cron expressions.
def from_unix(string):
"""Convert Unix timestamp to UTC datetime object."""
def to_unix(dt):
"""Convert datetime object to Unix timestamp."""
def get_next_scheduled_time(cron_string, use_local_timezone=False):
"""Calculate the next scheduled execution time from a cron expression."""
def rationalize_until(until=None):
"""Process 'until' parameter for time-based queries."""
def setup_loghandlers(level='INFO'):
"""Configure logging for RQ Scheduler with colorized output."""class Scheduler:
"""Main scheduler class for managing scheduled jobs."""
def __init__(self, queue_name='default', queue=None, interval=60,
connection=None, job_class=None, queue_class=None, name=None):
"""
Initialize scheduler.
Parameters:
- queue_name: str, name of queue for scheduled jobs
- queue: RQ Queue instance (optional, overrides queue_name)
- interval: int, polling interval in seconds
- connection: Redis connection (required)
- job_class: Custom job class
- queue_class: Custom queue class
- name: Scheduler instance name for coordination
"""
VERSION: tuple = (0, 14, 0)The package provides a command-line interface for running scheduler daemons:
# Run scheduler with default settings
rqscheduler
# Redis connection options
rqscheduler --host redis.example.com --port 6380 --db 1
rqscheduler --password mypassword
rqscheduler --url redis://user:pass@host:port/db
# Execution modes
rqscheduler --burst # Process all jobs then exit
rqscheduler --interval 30 # Custom polling interval (seconds)
# Logging options
rqscheduler --verbose # Debug level logging
rqscheduler --quiet # Warning level logging only
# Advanced options
rqscheduler --path /custom/path # Set import path
rqscheduler --pid /var/run/rq.pid # Write PID to file
rqscheduler --job-class myapp.CustomJob # Custom job class
rqscheduler --queue-class myapp.CustomQueue # Custom queue class
# Show all options
rqscheduler --helpEnvironment Variables:
RQ_REDIS_HOST - Redis host (default: localhost)RQ_REDIS_PORT - Redis port (default: 6379)RQ_REDIS_DB - Redis database (default: 0)RQ_REDIS_PASSWORD - Redis passwordRQ_REDIS_URL - Complete Redis URL (overrides other connection settings)