Provides job scheduling capabilities to RQ (Redis Queue)
npx @tessl/cli install tessl/pypi-rq-scheduler@0.14.00
# RQ Scheduler
1
2
RQ Scheduler provides job scheduling capabilities to RQ (Redis Queue), enabling developers to schedule jobs to run at specific times, dates, or intervals. It supports one-time scheduled jobs, periodic jobs, and cron-style scheduling patterns with Redis-based persistence and distributed scheduler coordination.
3
4
## Package Information
5
6
- **Package Name**: rq-scheduler
7
- **Language**: Python
8
- **Installation**: `pip install rq-scheduler`
9
10
## Core Imports
11
12
```python
13
from rq_scheduler import Scheduler
14
```
15
16
Utility functions:
17
18
```python
19
from rq_scheduler.utils import from_unix, to_unix, get_next_scheduled_time
20
```
21
22
## Basic Usage
23
24
```python
25
from datetime import datetime, timedelta
26
from redis import Redis
27
from rq_scheduler import Scheduler
28
29
# Create scheduler for default queue
30
redis_conn = Redis()
31
scheduler = Scheduler(connection=redis_conn)
32
33
# Schedule a job to run at a specific time
34
def my_job(name):
35
print(f"Hello, {name}!")
36
37
# Schedule job to run on January 1st, 2025 at 3:00 AM UTC
38
scheduler.enqueue_at(datetime(2025, 1, 1, 3, 0), my_job, 'World')
39
40
# Schedule job to run in 10 minutes
41
scheduler.enqueue_in(timedelta(minutes=10), my_job, 'Future')
42
43
# Schedule periodic job every hour
44
scheduler.schedule(
45
scheduled_time=datetime.utcnow() + timedelta(minutes=5),
46
func=my_job,
47
args=['Recurring'],
48
interval=3600 # Every hour
49
)
50
51
# Run the scheduler daemon
52
scheduler.run()
53
```
54
55
## Architecture
56
57
RQ Scheduler extends RQ's job queue system with time-based scheduling:
58
59
- **Scheduler**: Main class managing scheduled jobs using Redis sorted sets
60
- **Redis Storage**: Jobs stored by execution time for efficient time-based retrieval
61
- **Lock Coordination**: Distributed locking prevents multiple scheduler instances from conflicting
62
- **Queue Integration**: Scheduled jobs move to RQ queues when execution time arrives
63
64
## Capabilities
65
66
### Job Scheduling
67
68
Core scheduling methods for one-time and recurring jobs including datetime-based scheduling, time delta scheduling, and cron-style patterns.
69
70
```python { .api }
71
def enqueue_at(self, scheduled_time, func, *args, **kwargs):
72
"""Schedule a job to run at a specific datetime."""
73
74
def enqueue_in(self, time_delta, func, *args, **kwargs):
75
"""Schedule a job to run after a time delay."""
76
77
def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, **options):
78
"""Schedule a periodic or repeated job."""
79
80
def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, **options):
81
"""Schedule a job using cron syntax."""
82
```
83
84
[Job Scheduling](./job-scheduling.md)
85
86
### Job Management
87
88
Functions for managing scheduled jobs including querying, canceling, and modifying execution times.
89
90
```python { .api }
91
def cancel(self, job):
92
"""Cancel a scheduled job by removing it from the scheduler queue."""
93
94
def get_jobs(self, until=None, with_times=False, offset=None, length=None):
95
"""Get scheduled jobs iterator with optional filtering and pagination."""
96
97
def count(self, until=None):
98
"""Count scheduled jobs, optionally filtered by time."""
99
100
def change_execution_time(self, job, date_time):
101
"""Change the execution time of a scheduled job."""
102
103
def __contains__(self, item):
104
"""Check if a job is currently scheduled."""
105
```
106
107
[Job Management](./job-management.md)
108
109
### Scheduler Control
110
111
Methods for running and controlling the scheduler daemon including locking, heartbeats, and job processing.
112
113
```python { .api }
114
def run(self, burst=False):
115
"""Start the scheduler daemon to process scheduled jobs."""
116
117
def acquire_lock(self):
118
"""Acquire distributed lock for scheduler coordination."""
119
120
def remove_lock(self):
121
"""Release previously acquired distributed lock."""
122
123
def heartbeat(self):
124
"""Send heartbeat to maintain scheduler registration."""
125
126
def enqueue_jobs(self):
127
"""Move all ready scheduled jobs to their target execution queues."""
128
129
def enqueue_job(self, job):
130
"""Move a scheduled job to a queue and handle periodic job rescheduling."""
131
132
def get_queue_for_job(self, job):
133
"""Get the appropriate queue instance for a job."""
134
```
135
136
[Scheduler Control](./scheduler-control.md)
137
138
### Utility Functions
139
140
Time conversion and scheduling utility functions for working with timestamps and cron expressions.
141
142
```python { .api }
143
def from_unix(string):
144
"""Convert Unix timestamp to UTC datetime object."""
145
146
def to_unix(dt):
147
"""Convert datetime object to Unix timestamp."""
148
149
def get_next_scheduled_time(cron_string, use_local_timezone=False):
150
"""Calculate the next scheduled execution time from a cron expression."""
151
152
def rationalize_until(until=None):
153
"""Process 'until' parameter for time-based queries."""
154
155
def setup_loghandlers(level='INFO'):
156
"""Configure logging for RQ Scheduler with colorized output."""
157
```
158
159
[Utilities](./utilities.md)
160
161
## Types
162
163
```python { .api }
164
class Scheduler:
165
"""Main scheduler class for managing scheduled jobs."""
166
167
def __init__(self, queue_name='default', queue=None, interval=60,
168
connection=None, job_class=None, queue_class=None, name=None):
169
"""
170
Initialize scheduler.
171
172
Parameters:
173
- queue_name: str, name of queue for scheduled jobs
174
- queue: RQ Queue instance (optional, overrides queue_name)
175
- interval: int, polling interval in seconds
176
- connection: Redis connection (required)
177
- job_class: Custom job class
178
- queue_class: Custom queue class
179
- name: Scheduler instance name for coordination
180
"""
181
182
VERSION: tuple = (0, 14, 0)
183
```
184
185
## Console Scripts
186
187
The package provides a command-line interface for running scheduler daemons:
188
189
```bash
190
# Run scheduler with default settings
191
rqscheduler
192
193
# Redis connection options
194
rqscheduler --host redis.example.com --port 6380 --db 1
195
rqscheduler --password mypassword
196
rqscheduler --url redis://user:pass@host:port/db
197
198
# Execution modes
199
rqscheduler --burst # Process all jobs then exit
200
rqscheduler --interval 30 # Custom polling interval (seconds)
201
202
# Logging options
203
rqscheduler --verbose # Debug level logging
204
rqscheduler --quiet # Warning level logging only
205
206
# Advanced options
207
rqscheduler --path /custom/path # Set import path
208
rqscheduler --pid /var/run/rq.pid # Write PID to file
209
rqscheduler --job-class myapp.CustomJob # Custom job class
210
rqscheduler --queue-class myapp.CustomQueue # Custom queue class
211
212
# Show all options
213
rqscheduler --help
214
```
215
216
**Environment Variables:**
217
- `RQ_REDIS_HOST` - Redis host (default: localhost)
218
- `RQ_REDIS_PORT` - Redis port (default: 6379)
219
- `RQ_REDIS_DB` - Redis database (default: 0)
220
- `RQ_REDIS_PASSWORD` - Redis password
221
- `RQ_REDIS_URL` - Complete Redis URL (overrides other connection settings)