0
# Job Management
1
2
RQ Scheduler provides comprehensive tools for managing scheduled jobs including querying, canceling, modifying execution times, and checking job status. These management functions work with job instances or job IDs for flexible job control.
3
4
## Capabilities
5
6
### Job Cancellation
7
8
Remove scheduled jobs from the scheduler queue to prevent execution.
9
10
```python { .api }
11
def cancel(self, job):
12
"""
13
Cancel a scheduled job by removing it from the scheduler queue.
14
15
Parameters:
16
- job: Job instance or str (job_id) to cancel
17
18
Returns:
19
None
20
21
Note:
22
- Does not affect jobs already moved to execution queues
23
- Safe to call on non-existent jobs (no error raised)
24
"""
25
```
26
27
**Usage Examples:**
28
29
```python
30
from rq_scheduler import Scheduler
31
from redis import Redis
32
33
scheduler = Scheduler(connection=Redis())
34
35
# Schedule a job and get reference
36
job = scheduler.enqueue_in(timedelta(hours=1), process_data, data_id=123)
37
38
# Cancel using job instance
39
scheduler.cancel(job)
40
41
# Cancel using job ID string
42
scheduler.cancel('job-id-string')
43
44
# Cancel multiple jobs
45
scheduled_jobs = scheduler.get_jobs()
46
for job in scheduled_jobs:
47
if job.description == 'cleanup_task':
48
scheduler.cancel(job)
49
```
50
51
### Job Querying
52
53
Retrieve scheduled jobs based on time criteria with flexible filtering and pagination options.
54
55
```python { .api }
56
def get_jobs(self, until=None, with_times=False, offset=None, length=None):
57
"""
58
Get scheduled jobs iterator with optional filtering and pagination.
59
60
Parameters:
61
- until: datetime, timedelta, int (epoch), or None - only jobs scheduled before this time
62
- with_times: bool, if True returns (job, scheduled_time) tuples
63
- offset: int, zero-based starting position for pagination
64
- length: int, maximum number of jobs to return
65
66
Returns:
67
Iterator of Job instances or (Job, datetime) tuples if with_times=True
68
69
Note:
70
- If offset or length specified, both must be provided
71
- Jobs are automatically cleaned up if they no longer exist in Redis
72
- Times returned are datetime objects in UTC
73
"""
74
```
75
76
**Usage Examples:**
77
78
```python
79
from datetime import datetime, timedelta
80
81
# Get all scheduled jobs
82
all_jobs = list(scheduler.get_jobs())
83
84
# Get jobs scheduled in the next hour
85
next_hour = datetime.utcnow() + timedelta(hours=1)
86
upcoming_jobs = list(scheduler.get_jobs(until=next_hour))
87
88
# Get jobs with their scheduled times
89
jobs_with_times = list(scheduler.get_jobs(with_times=True))
90
for job, scheduled_time in jobs_with_times:
91
print(f"Job {job.id} scheduled for {scheduled_time}")
92
93
# Paginated results - get first 10 jobs
94
first_batch = list(scheduler.get_jobs(offset=0, length=10))
95
96
# Get next 10 jobs
97
second_batch = list(scheduler.get_jobs(offset=10, length=10))
98
99
# Find specific jobs
100
for job in scheduler.get_jobs():
101
if job.meta.get('priority') == 'high':
102
print(f"High priority job: {job.description}")
103
```
104
105
### Jobs Ready for Execution
106
107
Get jobs that are due to be moved to execution queues.
108
109
```python { .api }
110
def get_jobs_to_queue(self, with_times=False):
111
"""
112
Get jobs that should be queued for execution (scheduled time has passed).
113
114
Parameters:
115
- with_times: bool, if True returns (job, scheduled_time) tuples
116
117
Returns:
118
List of Job instances or (Job, datetime) tuples ready for execution
119
"""
120
```
121
122
**Usage Examples:**
123
124
```python
125
# Check what jobs are ready to run
126
ready_jobs = scheduler.get_jobs_to_queue()
127
print(f"{len(ready_jobs)} jobs ready for execution")
128
129
# Get ready jobs with their original scheduled times
130
ready_with_times = scheduler.get_jobs_to_queue(with_times=True)
131
for job, scheduled_time in ready_with_times:
132
delay = datetime.utcnow() - scheduled_time
133
print(f"Job {job.id} was scheduled for {scheduled_time} (delay: {delay})")
134
```
135
136
### Job Count
137
138
Count scheduled jobs with time-based filtering.
139
140
```python { .api }
141
def count(self, until=None):
142
"""
143
Count scheduled jobs, optionally filtered by time.
144
145
Parameters:
146
- until: datetime, timedelta, int (epoch), or None - count jobs scheduled before this time
147
148
Returns:
149
int, number of scheduled jobs matching criteria
150
"""
151
```
152
153
**Usage Examples:**
154
155
```python
156
# Total scheduled jobs
157
total_jobs = scheduler.count()
158
159
# Jobs in the next 24 hours
160
tomorrow = datetime.utcnow() + timedelta(days=1)
161
upcoming_count = scheduler.count(until=tomorrow)
162
163
# Jobs overdue (should have run already)
164
overdue_count = scheduler.count(until=datetime.utcnow())
165
166
print(f"Total: {total_jobs}, Upcoming: {upcoming_count}, Overdue: {overdue_count}")
167
```
168
169
### Job Existence Check
170
171
Check if a job is currently scheduled using the `in` operator.
172
173
```python { .api }
174
def __contains__(self, item):
175
"""
176
Check if a job is currently scheduled.
177
178
Parameters:
179
- item: Job instance or str (job_id) to check
180
181
Returns:
182
bool, True if job is scheduled, False otherwise
183
"""
184
```
185
186
**Usage Examples:**
187
188
```python
189
# Check using job instance
190
job = scheduler.enqueue_in(timedelta(hours=1), my_function)
191
if job in scheduler:
192
print("Job is scheduled")
193
194
# Check using job ID
195
job_id = "my-custom-job-id"
196
if job_id in scheduler:
197
print("Job with ID exists")
198
199
# Conditional cancellation
200
if job in scheduler:
201
scheduler.cancel(job)
202
print("Job was cancelled")
203
else:
204
print("Job not found or already executed")
205
```
206
207
### Execution Time Modification
208
209
Change the scheduled execution time of existing jobs with atomic Redis operations.
210
211
```python { .api }
212
def change_execution_time(self, job, date_time):
213
"""
214
Change the execution time of a scheduled job.
215
216
Parameters:
217
- job: Job instance to modify
218
- date_time: datetime, new execution time (should be UTC)
219
220
Returns:
221
None
222
223
Raises:
224
ValueError: if job is not in scheduled jobs queue
225
226
Note:
227
- Uses Redis transactions to prevent race conditions
228
- Will retry automatically if job is modified during update
229
"""
230
```
231
232
**Usage Examples:**
233
234
```python
235
from datetime import datetime, timedelta
236
237
# Schedule a job
238
job = scheduler.enqueue_at(
239
datetime(2025, 6, 15, 10, 0),
240
send_notification,
241
"Meeting reminder"
242
)
243
244
# Move it earlier by 30 minutes
245
new_time = datetime(2025, 6, 15, 9, 30)
246
try:
247
scheduler.change_execution_time(job, new_time)
248
print("Job rescheduled successfully")
249
except ValueError as e:
250
print(f"Failed to reschedule: {e}")
251
252
# Reschedule based on business logic
253
for job in scheduler.get_jobs():
254
if job.meta.get('priority') == 'urgent':
255
# Move urgent jobs to run 10 minutes earlier
256
jobs_with_times = scheduler.get_jobs(with_times=True)
257
for j, scheduled_time in jobs_with_times:
258
if j.id == job.id:
259
earlier_time = scheduled_time - timedelta(minutes=10)
260
scheduler.change_execution_time(job, earlier_time)
261
break
262
```
263
264
## Job Status and Lifecycle
265
266
Scheduled jobs in RQ Scheduler follow this lifecycle:
267
268
1. **Scheduled**: Job created and stored in Redis sorted set by execution time
269
2. **Ready**: Job's scheduled time has passed, eligible for queue movement
270
3. **Queued**: Job moved to RQ execution queue (no longer in scheduler)
271
4. **Executing/Completed/Failed**: Standard RQ job lifecycle
272
273
**Important Notes:**
274
275
- Jobs removed from scheduler queue cannot be managed by scheduler methods
276
- Periodic and cron jobs automatically reschedule themselves after execution
277
- Job metadata persists through the lifecycle for tracking and debugging
278
- Failed jobs may be retried according to RQ configuration, not scheduler settings