Provides job scheduling capabilities to RQ (Redis Queue)
—
RQ Scheduler provides multiple methods for scheduling jobs to run at specific times or intervals. All scheduling methods support comprehensive job configuration options and return RQ Job instances that can be tracked and managed.
Schedule jobs to run at specific datetime values. All times should be in UTC for consistent behavior across timezones.
def enqueue_at(self, scheduled_time, func, *args, **kwargs):
"""
Schedule a job to run at a specific datetime.
Parameters:
- scheduled_time: datetime, when to run the job (should be UTC)
- func: callable, function to execute
- *args: arguments to pass to func
- **kwargs: keyword arguments and scheduling options
Scheduling Options (passed as **kwargs):
- timeout: int, job timeout in seconds
- job_id: str, custom job ID
- job_ttl: int, job time-to-live in seconds
- job_result_ttl: int, result storage time in seconds
- job_description: str, human-readable job description
- depends_on: Job or job_id, job dependency
- meta: dict, custom metadata
- queue_name: str, target queue name (overrides scheduler default)
- on_success: callable, success callback
- on_failure: callable, failure callback
- at_front: bool, enqueue at front of queue when scheduled
Returns:
Job instance with scheduled status
"""Usage Examples:
from datetime import datetime
from rq_scheduler import Scheduler
from redis import Redis
scheduler = Scheduler(connection=Redis())
# Basic scheduling
job = scheduler.enqueue_at(
datetime(2025, 12, 31, 23, 59),
print_message,
"Happy New Year!"
)
# With options
job = scheduler.enqueue_at(
datetime(2025, 6, 15, 10, 30),
process_data,
data_id=12345,
timeout=300,
job_description="Monthly data processing",
meta={'department': 'analytics'},
queue_name='high_priority'
)Schedule jobs to run after a specific time delay from the current moment.
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""
Schedule a job to run after a time delay.
Parameters:
- time_delta: timedelta, delay before job execution
- func: callable, function to execute
- *args: arguments to pass to func
- **kwargs: keyword arguments and scheduling options (same as enqueue_at)
Returns:
Job instance with scheduled status
"""Usage Examples:
from datetime import timedelta
# Run in 10 minutes
scheduler.enqueue_in(timedelta(minutes=10), send_reminder, user_id=123)
# Run in 2 hours with retry logic
scheduler.enqueue_in(
timedelta(hours=2),
backup_database,
timeout=1800,
on_failure=handle_backup_failure
)
# Run tomorrow
scheduler.enqueue_in(timedelta(days=1), daily_report)Schedule jobs to repeat at regular intervals. Supports limited repetitions or infinite recurring jobs.
def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None,
repeat=None, result_ttl=None, ttl=None, timeout=None, id=None,
description=None, queue_name=None, meta=None, depends_on=None,
on_success=None, on_failure=None, at_front=None):
"""
Schedule a periodic or repeated job.
Parameters:
- scheduled_time: datetime, first execution time
- func: callable, function to execute
- args: tuple, positional arguments for func
- kwargs: dict, keyword arguments for func
- interval: int, seconds between repetitions
- repeat: int, number of times to repeat (None for infinite)
- result_ttl: int, result storage time (-1 for periodic jobs default)
- ttl: int, job time-to-live
- timeout: int, job timeout in seconds
- id: str, custom job ID
- description: str, job description
- queue_name: str, target queue name
- meta: dict, custom metadata
- depends_on: Job or job_id, job dependency
- on_success: callable, success callback
- on_failure: callable, failure callback
- at_front: bool, enqueue at front of queue
Returns:
Job instance configured for repetition
Raises:
ValueError: if repeat is specified without interval
"""Usage Examples:
from datetime import datetime, timedelta
# Run every hour starting in 5 minutes
scheduler.schedule(
scheduled_time=datetime.utcnow() + timedelta(minutes=5),
func=check_system_health,
interval=3600 # 1 hour
)
# Run every 30 minutes, but only 10 times
scheduler.schedule(
scheduled_time=datetime.utcnow() + timedelta(minutes=1),
func=poll_api,
args=['https://api.example.com/status'],
interval=1800, # 30 minutes
repeat=10
)
# Daily backup at 2 AM
scheduler.schedule(
scheduled_time=datetime(2025, 1, 2, 2, 0), # Tomorrow at 2 AM
func=perform_backup,
kwargs={'backup_type': 'incremental'},
interval=86400, # Daily
description="Daily incremental backup"
)Schedule jobs using cron expressions for complex recurring patterns. Supports standard cron syntax with optional timezone handling.
def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None,
description=None, meta=None, use_local_timezone=False, depends_on=None,
on_success=None, on_failure=None, at_front=False):
"""
Schedule a job using cron syntax.
Parameters:
- cron_string: str, cron expression (minute hour day month weekday)
- func: callable, function to execute
- args: tuple, positional arguments for func
- kwargs: dict, keyword arguments for func
- repeat: int, number of times to repeat (None for infinite)
- queue_name: str, target queue name
- result_ttl: int, result storage time (default -1)
- ttl: int, job time-to-live
- id: str, custom job ID
- timeout: int, job timeout in seconds
- description: str, job description
- meta: dict, custom metadata
- use_local_timezone: bool, use local timezone instead of UTC
- depends_on: Job or job_id, job dependency
- on_success: callable, success callback
- on_failure: callable, failure callback
- at_front: bool, enqueue at front of queue
Returns:
Job instance configured for cron scheduling
"""Usage Examples:
# Every weekday at 9 AM
scheduler.cron(
"0 9 * * 1-5", # minute=0, hour=9, any day, any month, Mon-Fri
send_daily_report,
description="Weekday morning report"
)
# Every 15 minutes
scheduler.cron("*/15 * * * *", check_queue_status)
# First day of every month at midnight
scheduler.cron(
"0 0 1 * *",
monthly_cleanup,
args=['logs', 'temp_files'],
description="Monthly cleanup task"
)
# Every Sunday at 3 AM, but only run 12 times (quarterly for 3 years)
scheduler.cron(
"0 3 * * 0",
quarterly_report,
repeat=12,
timeout=7200 # 2 hours
)
# Using local timezone
scheduler.cron(
"30 14 * * *", # 2:30 PM local time
afternoon_task,
use_local_timezone=True
)Cron expressions use the standard five-field format:
* * * * *
| | | | |
| | | | └─── day of week (0-7, Sunday = 0 or 7)
| | | └───── month (1-12)
| | └─────── day of month (1-31)
| └───────── hour (0-23)
└─────────── minute (0-59)Special Characters:
* - any value, - value list separator (e.g., 1,3,5)- - range of values (e.g., 1-5)/ - step values (e.g., */15 for every 15 minutes)Common Patterns:
0 * * * * - every hour at minute 00 0 * * * - daily at midnight0 0 * * 0 - weekly on Sunday at midnight0 0 1 * * - monthly on the 1st at midnight*/30 * * * * - every 30 minutesInstall with Tessl CLI
npx tessl i tessl/pypi-rq-scheduler