or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-scheduling.mdscheduler-control.mdutilities.md

job-management.mddocs/

0

# Job Management

1

2

RQ Scheduler provides comprehensive tools for managing scheduled jobs including querying, canceling, modifying execution times, and checking job status. These management functions work with job instances or job IDs for flexible job control.

3

4

## Capabilities

5

6

### Job Cancellation

7

8

Remove scheduled jobs from the scheduler queue to prevent execution.

9

10

```python { .api }

11

def cancel(self, job):

12

"""

13

Cancel a scheduled job by removing it from the scheduler queue.

14

15

Parameters:

16

- job: Job instance or str (job_id) to cancel

17

18

Returns:

19

None

20

21

Note:

22

- Does not affect jobs already moved to execution queues

23

- Safe to call on non-existent jobs (no error raised)

24

"""

25

```

26

27

**Usage Examples:**

28

29

```python

30

from rq_scheduler import Scheduler

31

from redis import Redis

32

33

scheduler = Scheduler(connection=Redis())

34

35

# Schedule a job and get reference

36

job = scheduler.enqueue_in(timedelta(hours=1), process_data, data_id=123)

37

38

# Cancel using job instance

39

scheduler.cancel(job)

40

41

# Cancel using job ID string

42

scheduler.cancel('job-id-string')

43

44

# Cancel multiple jobs

45

scheduled_jobs = scheduler.get_jobs()

46

for job in scheduled_jobs:

47

if job.description == 'cleanup_task':

48

scheduler.cancel(job)

49

```

50

51

### Job Querying

52

53

Retrieve scheduled jobs based on time criteria with flexible filtering and pagination options.

54

55

```python { .api }

56

def get_jobs(self, until=None, with_times=False, offset=None, length=None):

57

"""

58

Get scheduled jobs iterator with optional filtering and pagination.

59

60

Parameters:

61

- until: datetime, timedelta, int (epoch), or None - only jobs scheduled before this time

62

- with_times: bool, if True returns (job, scheduled_time) tuples

63

- offset: int, zero-based starting position for pagination

64

- length: int, maximum number of jobs to return

65

66

Returns:

67

Iterator of Job instances or (Job, datetime) tuples if with_times=True

68

69

Note:

70

- If offset or length specified, both must be provided

71

- Jobs are automatically cleaned up if they no longer exist in Redis

72

- Times returned are datetime objects in UTC

73

"""

74

```

75

76

**Usage Examples:**

77

78

```python

79

from datetime import datetime, timedelta

80

81

# Get all scheduled jobs

82

all_jobs = list(scheduler.get_jobs())

83

84

# Get jobs scheduled in the next hour

85

next_hour = datetime.utcnow() + timedelta(hours=1)

86

upcoming_jobs = list(scheduler.get_jobs(until=next_hour))

87

88

# Get jobs with their scheduled times

89

jobs_with_times = list(scheduler.get_jobs(with_times=True))

90

for job, scheduled_time in jobs_with_times:

91

print(f"Job {job.id} scheduled for {scheduled_time}")

92

93

# Paginated results - get first 10 jobs

94

first_batch = list(scheduler.get_jobs(offset=0, length=10))

95

96

# Get next 10 jobs

97

second_batch = list(scheduler.get_jobs(offset=10, length=10))

98

99

# Find specific jobs

100

for job in scheduler.get_jobs():

101

if job.meta.get('priority') == 'high':

102

print(f"High priority job: {job.description}")

103

```

104

105

### Jobs Ready for Execution

106

107

Get jobs that are due to be moved to execution queues.

108

109

```python { .api }

110

def get_jobs_to_queue(self, with_times=False):

111

"""

112

Get jobs that should be queued for execution (scheduled time has passed).

113

114

Parameters:

115

- with_times: bool, if True returns (job, scheduled_time) tuples

116

117

Returns:

118

List of Job instances or (Job, datetime) tuples ready for execution

119

"""

120

```

121

122

**Usage Examples:**

123

124

```python

125

# Check what jobs are ready to run

126

ready_jobs = scheduler.get_jobs_to_queue()

127

print(f"{len(ready_jobs)} jobs ready for execution")

128

129

# Get ready jobs with their original scheduled times

130

ready_with_times = scheduler.get_jobs_to_queue(with_times=True)

131

for job, scheduled_time in ready_with_times:

132

delay = datetime.utcnow() - scheduled_time

133

print(f"Job {job.id} was scheduled for {scheduled_time} (delay: {delay})")

134

```

135

136

### Job Count

137

138

Count scheduled jobs with time-based filtering.

139

140

```python { .api }

141

def count(self, until=None):

142

"""

143

Count scheduled jobs, optionally filtered by time.

144

145

Parameters:

146

- until: datetime, timedelta, int (epoch), or None - count jobs scheduled before this time

147

148

Returns:

149

int, number of scheduled jobs matching criteria

150

"""

151

```

152

153

**Usage Examples:**

154

155

```python

156

# Total scheduled jobs

157

total_jobs = scheduler.count()

158

159

# Jobs in the next 24 hours

160

tomorrow = datetime.utcnow() + timedelta(days=1)

161

upcoming_count = scheduler.count(until=tomorrow)

162

163

# Jobs overdue (should have run already)

164

overdue_count = scheduler.count(until=datetime.utcnow())

165

166

print(f"Total: {total_jobs}, Upcoming: {upcoming_count}, Overdue: {overdue_count}")

167

```

168

169

### Job Existence Check

170

171

Check if a job is currently scheduled using the `in` operator.

172

173

```python { .api }

174

def __contains__(self, item):

175

"""

176

Check if a job is currently scheduled.

177

178

Parameters:

179

- item: Job instance or str (job_id) to check

180

181

Returns:

182

bool, True if job is scheduled, False otherwise

183

"""

184

```

185

186

**Usage Examples:**

187

188

```python

189

# Check using job instance

190

job = scheduler.enqueue_in(timedelta(hours=1), my_function)

191

if job in scheduler:

192

print("Job is scheduled")

193

194

# Check using job ID

195

job_id = "my-custom-job-id"

196

if job_id in scheduler:

197

print("Job with ID exists")

198

199

# Conditional cancellation

200

if job in scheduler:

201

scheduler.cancel(job)

202

print("Job was cancelled")

203

else:

204

print("Job not found or already executed")

205

```

206

207

### Execution Time Modification

208

209

Change the scheduled execution time of existing jobs with atomic Redis operations.

210

211

```python { .api }

212

def change_execution_time(self, job, date_time):

213

"""

214

Change the execution time of a scheduled job.

215

216

Parameters:

217

- job: Job instance to modify

218

- date_time: datetime, new execution time (should be UTC)

219

220

Returns:

221

None

222

223

Raises:

224

ValueError: if job is not in scheduled jobs queue

225

226

Note:

227

- Uses Redis transactions to prevent race conditions

228

- Will retry automatically if job is modified during update

229

"""

230

```

231

232

**Usage Examples:**

233

234

```python

235

from datetime import datetime, timedelta

236

237

# Schedule a job

238

job = scheduler.enqueue_at(

239

datetime(2025, 6, 15, 10, 0),

240

send_notification,

241

"Meeting reminder"

242

)

243

244

# Move it earlier by 30 minutes

245

new_time = datetime(2025, 6, 15, 9, 30)

246

try:

247

scheduler.change_execution_time(job, new_time)

248

print("Job rescheduled successfully")

249

except ValueError as e:

250

print(f"Failed to reschedule: {e}")

251

252

# Reschedule based on business logic

253

for job in scheduler.get_jobs():

254

if job.meta.get('priority') == 'urgent':

255

# Move urgent jobs to run 10 minutes earlier

256

jobs_with_times = scheduler.get_jobs(with_times=True)

257

for j, scheduled_time in jobs_with_times:

258

if j.id == job.id:

259

earlier_time = scheduled_time - timedelta(minutes=10)

260

scheduler.change_execution_time(job, earlier_time)

261

break

262

```

263

264

## Job Status and Lifecycle

265

266

Scheduled jobs in RQ Scheduler follow this lifecycle:

267

268

1. **Scheduled**: Job created and stored in Redis sorted set by execution time

269

2. **Ready**: Job's scheduled time has passed, eligible for queue movement

270

3. **Queued**: Job moved to RQ execution queue (no longer in scheduler)

271

4. **Executing/Completed/Failed**: Standard RQ job lifecycle

272

273

**Important Notes:**

274

275

- Jobs removed from scheduler queue cannot be managed by scheduler methods

276

- Periodic and cron jobs automatically reschedule themselves after execution

277

- Job metadata persists through the lifecycle for tracking and debugging

278

- Failed jobs may be retried according to RQ configuration, not scheduler settings