or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-scheduling.mdscheduler-control.mdutilities.md

scheduler-control.mddocs/

0

# Scheduler Control

1

2

RQ Scheduler provides methods for running and controlling the scheduler daemon process. These functions handle distributed coordination, job processing, and lifecycle management for scheduler instances.

3

4

## Capabilities

5

6

### Scheduler Daemon

7

8

Run the scheduler daemon to continuously monitor scheduled jobs and move them to execution queues when their time arrives.

9

10

```python { .api }

11

def run(self, burst=False):

12

"""

13

Start the scheduler daemon to process scheduled jobs.

14

15

Parameters:

16

- burst: bool, if True run once and exit, if False run continuously

17

18

Returns:

19

None (runs until interrupted or burst mode completes)

20

21

Behavior:

22

- Registers scheduler birth and installs signal handlers

23

- Runs polling loop at configured interval

24

- Acquires distributed lock before processing jobs

25

- Automatically handles scheduler cleanup on exit

26

- Supports graceful shutdown via SIGINT/SIGTERM

27

"""

28

```

29

30

**Usage Examples:**

31

32

```python

33

from rq_scheduler import Scheduler

34

from redis import Redis

35

36

# Basic daemon mode (runs forever)

37

scheduler = Scheduler(connection=Redis())

38

scheduler.run() # Blocks until interrupted

39

40

# Burst mode (process all ready jobs then exit)

41

scheduler = Scheduler(connection=Redis())

42

scheduler.run(burst=True)

43

44

# Custom polling interval

45

scheduler = Scheduler(connection=Redis(), interval=30) # Check every 30 seconds

46

scheduler.run()

47

48

# With signal handling in a script

49

import signal

50

import sys

51

52

def signal_handler(sig, frame):

53

print('Shutting down scheduler...')

54

sys.exit(0)

55

56

signal.signal(signal.SIGINT, signal_handler)

57

scheduler.run()

58

```

59

60

### Distributed Locking

61

62

Coordinate multiple scheduler instances to prevent duplicate job processing using Redis-based distributed locking.

63

64

```python { .api }

65

def acquire_lock(self):

66

"""

67

Acquire distributed lock for scheduler coordination.

68

69

Returns:

70

bool, True if lock acquired successfully, False if another scheduler holds it

71

72

Behavior:

73

- Lock expires automatically after interval + 10 seconds

74

- Only one scheduler instance can hold lock at a time

75

- Lock prevents duplicate job processing across instances

76

"""

77

78

def remove_lock(self):

79

"""

80

Release previously acquired distributed lock.

81

82

Returns:

83

None

84

85

Behavior:

86

- Only removes lock if this instance acquired it

87

- Safe to call even if lock not held

88

- Automatically called during scheduler shutdown

89

"""

90

```

91

92

**Usage Examples:**

93

94

```python

95

# Manual lock management for custom scheduler logic

96

scheduler = Scheduler(connection=Redis())

97

98

if scheduler.acquire_lock():

99

try:

100

# Process jobs while holding lock

101

jobs_processed = scheduler.enqueue_jobs()

102

print(f"Processed {len(jobs_processed)} jobs")

103

finally:

104

scheduler.remove_lock()

105

else:

106

print("Another scheduler is active")

107

108

# Check multiple schedulers coordination

109

scheduler1 = Scheduler(connection=Redis(), name="scheduler-1")

110

scheduler2 = Scheduler(connection=Redis(), name="scheduler-2")

111

112

print(f"Scheduler 1 lock: {scheduler1.acquire_lock()}") # True

113

print(f"Scheduler 2 lock: {scheduler2.acquire_lock()}") # False

114

115

scheduler1.remove_lock()

116

print(f"Scheduler 2 lock: {scheduler2.acquire_lock()}") # True

117

```

118

119

### Job Processing

120

121

Move scheduled jobs that are ready for execution into RQ queues.

122

123

```python { .api }

124

def enqueue_jobs(self):

125

"""

126

Move all ready scheduled jobs to their target execution queues.

127

128

Returns:

129

list, Job instances that were moved to queues

130

131

Behavior:

132

- Only processes jobs with scheduled time <= current time

133

- Handles periodic job rescheduling automatically

134

- Manages cron job next execution calculation

135

- Decrements repeat counters for limited repetition jobs

136

"""

137

```

138

139

**Usage Examples:**

140

141

```python

142

# Manual job processing (useful for testing or custom logic)

143

scheduler = Scheduler(connection=Redis())

144

145

# Process ready jobs once

146

processed_jobs = scheduler.enqueue_jobs()

147

print(f"Moved {len(processed_jobs)} jobs to execution queues")

148

149

for job in processed_jobs:

150

print(f"Processed job: {job.description} -> queue: {job.origin}")

151

152

# Custom processing loop with additional logic

153

import time

154

from datetime import datetime

155

156

while True:

157

if scheduler.acquire_lock():

158

try:

159

jobs = scheduler.enqueue_jobs()

160

if jobs:

161

print(f"{datetime.now()}: Processed {len(jobs)} jobs")

162

163

# Custom business logic here

164

check_scheduler_health()

165

166

finally:

167

scheduler.remove_lock()

168

169

time.sleep(60) # Wait 1 minute

170

```

171

172

### Individual Job Processing

173

174

Process single scheduled jobs and manage queue assignment for jobs.

175

176

```python { .api }

177

def enqueue_job(self, job):

178

"""

179

Move a scheduled job to a queue and handle periodic job rescheduling.

180

181

Parameters:

182

- job: Job instance to process

183

184

Returns:

185

None

186

187

Behavior:

188

- Moves job from scheduler queue to appropriate execution queue

189

- Handles periodic job rescheduling automatically

190

- Manages cron job next execution calculation

191

- Decrements repeat counters for limited repetition jobs

192

"""

193

194

def get_queue_for_job(self, job):

195

"""

196

Get the appropriate queue instance for a job.

197

198

Parameters:

199

- job: Job instance to get queue for

200

201

Returns:

202

Queue instance where job should be executed

203

204

Behavior:

205

- Uses job.origin to determine target queue

206

- Respects custom queue_class from job metadata

207

- Creates queue instance with proper connection and job_class

208

"""

209

```

210

211

**Usage Examples:**

212

213

```python

214

# Manual job processing with custom logic

215

scheduler = Scheduler(connection=Redis())

216

217

# Get specific jobs and process individually

218

for job in scheduler.get_jobs_to_queue():

219

# Custom validation before processing

220

if validate_job_conditions(job):

221

target_queue = scheduler.get_queue_for_job(job)

222

print(f"Processing job {job.id} to queue {target_queue.name}")

223

scheduler.enqueue_job(job)

224

else:

225

print(f"Skipping job {job.id} - conditions not met")

226

227

# Custom queue routing logic

228

def custom_job_processor(scheduler, job):

229

"""Process job with custom queue selection."""

230

if job.meta.get('priority') == 'high':

231

# High priority jobs go to fast queue

232

job.origin = 'high_priority'

233

234

queue = scheduler.get_queue_for_job(job)

235

scheduler.enqueue_job(job)

236

return queue

237

238

# Process jobs with custom routing

239

ready_jobs = scheduler.get_jobs_to_queue()

240

for job in ready_jobs:

241

queue = custom_job_processor(scheduler, job)

242

print(f"Job {job.id} sent to {queue.name}")

243

```

244

245

### Scheduler Heartbeat

246

247

Maintain scheduler registration and prevent timeout in distributed setups.

248

249

```python { .api }

250

def heartbeat(self):

251

"""

252

Send heartbeat to maintain scheduler registration.

253

254

Returns:

255

None

256

257

Behavior:

258

- Extends scheduler key expiration time

259

- Prevents scheduler from appearing inactive

260

- Called automatically during run() loop

261

"""

262

```

263

264

**Usage Examples:**

265

266

```python

267

# Manual heartbeat for custom scheduler loops

268

scheduler = Scheduler(connection=Redis())

269

scheduler.register_birth()

270

271

try:

272

while True:

273

scheduler.heartbeat() # Keep scheduler registered

274

275

# Custom processing logic

276

if custom_condition():

277

process_special_jobs()

278

279

time.sleep(30)

280

281

finally:

282

scheduler.register_death()

283

```

284

285

### Instance Registration

286

287

Manage scheduler instance lifecycle for distributed coordination and monitoring.

288

289

```python { .api }

290

def register_birth(self):

291

"""

292

Register scheduler instance startup.

293

294

Returns:

295

None

296

297

Raises:

298

ValueError: if scheduler with same name already active

299

300

Behavior:

301

- Creates scheduler instance key in Redis

302

- Sets automatic expiration based on polling interval

303

- Prevents duplicate scheduler names

304

"""

305

306

def register_death(self):

307

"""

308

Register scheduler instance shutdown.

309

310

Returns:

311

None

312

313

Behavior:

314

- Marks scheduler as inactive in Redis

315

- Allows other schedulers to detect shutdown

316

- Called automatically during graceful shutdown

317

"""

318

```

319

320

**Usage Examples:**

321

322

```python

323

# Manual instance lifecycle management

324

scheduler = Scheduler(connection=Redis(), name="worker-1")

325

326

try:

327

scheduler.register_birth()

328

print("Scheduler registered successfully")

329

330

# Run scheduler logic

331

while running:

332

process_jobs()

333

334

except ValueError as e:

335

print(f"Registration failed: {e}")

336

# Handle duplicate scheduler name

337

338

finally:

339

scheduler.register_death()

340

print("Scheduler shutdown registered")

341

342

# Check for existing schedulers before starting

343

import uuid

344

345

unique_name = f"scheduler-{uuid.uuid4().hex[:8]}"

346

scheduler = Scheduler(connection=Redis(), name=unique_name)

347

348

try:

349

scheduler.register_birth()

350

scheduler.run()

351

except ValueError:

352

print("Scheduler name conflict - using burst mode instead")

353

scheduler.run(burst=True)

354

```

355

356

## Scheduler Properties

357

358

Access scheduler instance information and configuration:

359

360

```python { .api }

361

@property

362

def key(self):

363

"""

364

Returns the scheduler's Redis hash key.

365

366

Returns:

367

str, Redis key for this scheduler instance

368

"""

369

370

@property

371

def pid(self):

372

"""

373

Returns the current process ID.

374

375

Returns:

376

int, process ID of scheduler

377

"""

378

```

379

380

**Usage Examples:**

381

382

```python

383

scheduler = Scheduler(connection=Redis(), name="main-scheduler")

384

385

print(f"Scheduler key: {scheduler.key}")

386

print(f"Process ID: {scheduler.pid}")

387

388

# Useful for monitoring and debugging

389

import os

390

print(f"Scheduler {scheduler.name} running as PID {scheduler.pid}")

391

assert scheduler.pid == os.getpid()

392

```

393

394

## Signal Handling

395

396

The scheduler automatically installs signal handlers for graceful shutdown:

397

398

- **SIGINT** (Ctrl+C): Triggers clean shutdown sequence

399

- **SIGTERM**: Triggers clean shutdown sequence

400

401

**Shutdown Sequence:**

402

1. Stop polling loop

403

2. Release distributed lock

404

3. Register scheduler death

405

4. Exit process

406

407

**Custom Signal Handling:**

408

409

```python

410

import signal

411

import sys

412

413

def custom_shutdown(signum, frame):

414

print(f"Received signal {signum}")

415

scheduler.remove_lock()

416

scheduler.register_death()

417

print("Custom cleanup completed")

418

sys.exit(0)

419

420

# Override default handlers if needed

421

signal.signal(signal.SIGINT, custom_shutdown)

422

signal.signal(signal.SIGTERM, custom_shutdown)

423

424

scheduler.run() # Will use custom handlers

425

```