0
# Scheduler Control
1
2
RQ Scheduler provides methods for running and controlling the scheduler daemon process. These functions handle distributed coordination, job processing, and lifecycle management for scheduler instances.
3
4
## Capabilities
5
6
### Scheduler Daemon
7
8
Run the scheduler daemon to continuously monitor scheduled jobs and move them to execution queues when their time arrives.
9
10
```python { .api }
11
def run(self, burst=False):
12
"""
13
Start the scheduler daemon to process scheduled jobs.
14
15
Parameters:
16
- burst: bool, if True run once and exit, if False run continuously
17
18
Returns:
19
None (runs until interrupted or burst mode completes)
20
21
Behavior:
22
- Registers scheduler birth and installs signal handlers
23
- Runs polling loop at configured interval
24
- Acquires distributed lock before processing jobs
25
- Automatically handles scheduler cleanup on exit
26
- Supports graceful shutdown via SIGINT/SIGTERM
27
"""
28
```
29
30
**Usage Examples:**
31
32
```python
33
from rq_scheduler import Scheduler
34
from redis import Redis
35
36
# Basic daemon mode (runs forever)
37
scheduler = Scheduler(connection=Redis())
38
scheduler.run() # Blocks until interrupted
39
40
# Burst mode (process all ready jobs then exit)
41
scheduler = Scheduler(connection=Redis())
42
scheduler.run(burst=True)
43
44
# Custom polling interval
45
scheduler = Scheduler(connection=Redis(), interval=30) # Check every 30 seconds
46
scheduler.run()
47
48
# With signal handling in a script
49
import signal
50
import sys
51
52
def signal_handler(sig, frame):
53
print('Shutting down scheduler...')
54
sys.exit(0)
55
56
signal.signal(signal.SIGINT, signal_handler)
57
scheduler.run()
58
```
59
60
### Distributed Locking
61
62
Coordinate multiple scheduler instances to prevent duplicate job processing using Redis-based distributed locking.
63
64
```python { .api }
65
def acquire_lock(self):
66
"""
67
Acquire distributed lock for scheduler coordination.
68
69
Returns:
70
bool, True if lock acquired successfully, False if another scheduler holds it
71
72
Behavior:
73
- Lock expires automatically after interval + 10 seconds
74
- Only one scheduler instance can hold lock at a time
75
- Lock prevents duplicate job processing across instances
76
"""
77
78
def remove_lock(self):
79
"""
80
Release previously acquired distributed lock.
81
82
Returns:
83
None
84
85
Behavior:
86
- Only removes lock if this instance acquired it
87
- Safe to call even if lock not held
88
- Automatically called during scheduler shutdown
89
"""
90
```
91
92
**Usage Examples:**
93
94
```python
95
# Manual lock management for custom scheduler logic
96
scheduler = Scheduler(connection=Redis())
97
98
if scheduler.acquire_lock():
99
try:
100
# Process jobs while holding lock
101
jobs_processed = scheduler.enqueue_jobs()
102
print(f"Processed {len(jobs_processed)} jobs")
103
finally:
104
scheduler.remove_lock()
105
else:
106
print("Another scheduler is active")
107
108
# Check multiple schedulers coordination
109
scheduler1 = Scheduler(connection=Redis(), name="scheduler-1")
110
scheduler2 = Scheduler(connection=Redis(), name="scheduler-2")
111
112
print(f"Scheduler 1 lock: {scheduler1.acquire_lock()}") # True
113
print(f"Scheduler 2 lock: {scheduler2.acquire_lock()}") # False
114
115
scheduler1.remove_lock()
116
print(f"Scheduler 2 lock: {scheduler2.acquire_lock()}") # True
117
```
118
119
### Job Processing
120
121
Move scheduled jobs that are ready for execution into RQ queues.
122
123
```python { .api }
124
def enqueue_jobs(self):
125
"""
126
Move all ready scheduled jobs to their target execution queues.
127
128
Returns:
129
list, Job instances that were moved to queues
130
131
Behavior:
132
- Only processes jobs with scheduled time <= current time
133
- Handles periodic job rescheduling automatically
134
- Manages cron job next execution calculation
135
- Decrements repeat counters for limited repetition jobs
136
"""
137
```
138
139
**Usage Examples:**
140
141
```python
142
# Manual job processing (useful for testing or custom logic)
143
scheduler = Scheduler(connection=Redis())
144
145
# Process ready jobs once
146
processed_jobs = scheduler.enqueue_jobs()
147
print(f"Moved {len(processed_jobs)} jobs to execution queues")
148
149
for job in processed_jobs:
150
print(f"Processed job: {job.description} -> queue: {job.origin}")
151
152
# Custom processing loop with additional logic
153
import time
154
from datetime import datetime
155
156
while True:
157
if scheduler.acquire_lock():
158
try:
159
jobs = scheduler.enqueue_jobs()
160
if jobs:
161
print(f"{datetime.now()}: Processed {len(jobs)} jobs")
162
163
# Custom business logic here
164
check_scheduler_health()
165
166
finally:
167
scheduler.remove_lock()
168
169
time.sleep(60) # Wait 1 minute
170
```
171
172
### Individual Job Processing
173
174
Process single scheduled jobs and manage queue assignment for jobs.
175
176
```python { .api }
177
def enqueue_job(self, job):
178
"""
179
Move a scheduled job to a queue and handle periodic job rescheduling.
180
181
Parameters:
182
- job: Job instance to process
183
184
Returns:
185
None
186
187
Behavior:
188
- Moves job from scheduler queue to appropriate execution queue
189
- Handles periodic job rescheduling automatically
190
- Manages cron job next execution calculation
191
- Decrements repeat counters for limited repetition jobs
192
"""
193
194
def get_queue_for_job(self, job):
195
"""
196
Get the appropriate queue instance for a job.
197
198
Parameters:
199
- job: Job instance to get queue for
200
201
Returns:
202
Queue instance where job should be executed
203
204
Behavior:
205
- Uses job.origin to determine target queue
206
- Respects custom queue_class from job metadata
207
- Creates queue instance with proper connection and job_class
208
"""
209
```
210
211
**Usage Examples:**
212
213
```python
214
# Manual job processing with custom logic
215
scheduler = Scheduler(connection=Redis())
216
217
# Get specific jobs and process individually
218
for job in scheduler.get_jobs_to_queue():
219
# Custom validation before processing
220
if validate_job_conditions(job):
221
target_queue = scheduler.get_queue_for_job(job)
222
print(f"Processing job {job.id} to queue {target_queue.name}")
223
scheduler.enqueue_job(job)
224
else:
225
print(f"Skipping job {job.id} - conditions not met")
226
227
# Custom queue routing logic
228
def custom_job_processor(scheduler, job):
229
"""Process job with custom queue selection."""
230
if job.meta.get('priority') == 'high':
231
# High priority jobs go to fast queue
232
job.origin = 'high_priority'
233
234
queue = scheduler.get_queue_for_job(job)
235
scheduler.enqueue_job(job)
236
return queue
237
238
# Process jobs with custom routing
239
ready_jobs = scheduler.get_jobs_to_queue()
240
for job in ready_jobs:
241
queue = custom_job_processor(scheduler, job)
242
print(f"Job {job.id} sent to {queue.name}")
243
```
244
245
### Scheduler Heartbeat
246
247
Maintain scheduler registration and prevent timeout in distributed setups.
248
249
```python { .api }
250
def heartbeat(self):
251
"""
252
Send heartbeat to maintain scheduler registration.
253
254
Returns:
255
None
256
257
Behavior:
258
- Extends scheduler key expiration time
259
- Prevents scheduler from appearing inactive
260
- Called automatically during run() loop
261
"""
262
```
263
264
**Usage Examples:**
265
266
```python
267
# Manual heartbeat for custom scheduler loops
268
scheduler = Scheduler(connection=Redis())
269
scheduler.register_birth()
270
271
try:
272
while True:
273
scheduler.heartbeat() # Keep scheduler registered
274
275
# Custom processing logic
276
if custom_condition():
277
process_special_jobs()
278
279
time.sleep(30)
280
281
finally:
282
scheduler.register_death()
283
```
284
285
### Instance Registration
286
287
Manage scheduler instance lifecycle for distributed coordination and monitoring.
288
289
```python { .api }
290
def register_birth(self):
291
"""
292
Register scheduler instance startup.
293
294
Returns:
295
None
296
297
Raises:
298
ValueError: if scheduler with same name already active
299
300
Behavior:
301
- Creates scheduler instance key in Redis
302
- Sets automatic expiration based on polling interval
303
- Prevents duplicate scheduler names
304
"""
305
306
def register_death(self):
307
"""
308
Register scheduler instance shutdown.
309
310
Returns:
311
None
312
313
Behavior:
314
- Marks scheduler as inactive in Redis
315
- Allows other schedulers to detect shutdown
316
- Called automatically during graceful shutdown
317
"""
318
```
319
320
**Usage Examples:**
321
322
```python
323
# Manual instance lifecycle management
324
scheduler = Scheduler(connection=Redis(), name="worker-1")
325
326
try:
327
scheduler.register_birth()
328
print("Scheduler registered successfully")
329
330
# Run scheduler logic
331
while running:
332
process_jobs()
333
334
except ValueError as e:
335
print(f"Registration failed: {e}")
336
# Handle duplicate scheduler name
337
338
finally:
339
scheduler.register_death()
340
print("Scheduler shutdown registered")
341
342
# Check for existing schedulers before starting
343
import uuid
344
345
unique_name = f"scheduler-{uuid.uuid4().hex[:8]}"
346
scheduler = Scheduler(connection=Redis(), name=unique_name)
347
348
try:
349
scheduler.register_birth()
350
scheduler.run()
351
except ValueError:
352
print("Scheduler name conflict - using burst mode instead")
353
scheduler.run(burst=True)
354
```
355
356
## Scheduler Properties
357
358
Access scheduler instance information and configuration:
359
360
```python { .api }
361
@property
362
def key(self):
363
"""
364
Returns the scheduler's Redis hash key.
365
366
Returns:
367
str, Redis key for this scheduler instance
368
"""
369
370
@property
371
def pid(self):
372
"""
373
Returns the current process ID.
374
375
Returns:
376
int, process ID of scheduler
377
"""
378
```
379
380
**Usage Examples:**
381
382
```python
383
scheduler = Scheduler(connection=Redis(), name="main-scheduler")
384
385
print(f"Scheduler key: {scheduler.key}")
386
print(f"Process ID: {scheduler.pid}")
387
388
# Useful for monitoring and debugging
389
import os
390
print(f"Scheduler {scheduler.name} running as PID {scheduler.pid}")
391
assert scheduler.pid == os.getpid()
392
```
393
394
## Signal Handling
395
396
The scheduler automatically installs signal handlers for graceful shutdown:
397
398
- **SIGINT** (Ctrl+C): Triggers clean shutdown sequence
399
- **SIGTERM**: Triggers clean shutdown sequence
400
401
**Shutdown Sequence:**
402
1. Stop polling loop
403
2. Release distributed lock
404
3. Register scheduler death
405
4. Exit process
406
407
**Custom Signal Handling:**
408
409
```python
410
import signal
411
import sys
412
413
def custom_shutdown(signum, frame):
414
print(f"Received signal {signum}")
415
scheduler.remove_lock()
416
scheduler.register_death()
417
print("Custom cleanup completed")
418
sys.exit(0)
419
420
# Override default handlers if needed
421
signal.signal(signal.SIGINT, custom_shutdown)
422
signal.signal(signal.SIGTERM, custom_shutdown)
423
424
scheduler.run() # Will use custom handlers
425
```