or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-scheduling.mdscheduler-control.mdutilities.md

index.mddocs/

0

# RQ Scheduler

1

2

RQ Scheduler provides job scheduling capabilities to RQ (Redis Queue), enabling developers to schedule jobs to run at specific times, dates, or intervals. It supports one-time scheduled jobs, periodic jobs, and cron-style scheduling patterns with Redis-based persistence and distributed scheduler coordination.

3

4

## Package Information

5

6

- **Package Name**: rq-scheduler

7

- **Language**: Python

8

- **Installation**: `pip install rq-scheduler`

9

10

## Core Imports

11

12

```python

13

from rq_scheduler import Scheduler

14

```

15

16

Utility functions:

17

18

```python

19

from rq_scheduler.utils import from_unix, to_unix, get_next_scheduled_time

20

```

21

22

## Basic Usage

23

24

```python

25

from datetime import datetime, timedelta

26

from redis import Redis

27

from rq_scheduler import Scheduler

28

29

# Create scheduler for default queue

30

redis_conn = Redis()

31

scheduler = Scheduler(connection=redis_conn)

32

33

# Schedule a job to run at a specific time

34

def my_job(name):

35

print(f"Hello, {name}!")

36

37

# Schedule job to run on January 1st, 2025 at 3:00 AM UTC

38

scheduler.enqueue_at(datetime(2025, 1, 1, 3, 0), my_job, 'World')

39

40

# Schedule job to run in 10 minutes

41

scheduler.enqueue_in(timedelta(minutes=10), my_job, 'Future')

42

43

# Schedule periodic job every hour

44

scheduler.schedule(

45

scheduled_time=datetime.utcnow() + timedelta(minutes=5),

46

func=my_job,

47

args=['Recurring'],

48

interval=3600 # Every hour

49

)

50

51

# Run the scheduler daemon

52

scheduler.run()

53

```

54

55

## Architecture

56

57

RQ Scheduler extends RQ's job queue system with time-based scheduling:

58

59

- **Scheduler**: Main class managing scheduled jobs using Redis sorted sets

60

- **Redis Storage**: Jobs stored by execution time for efficient time-based retrieval

61

- **Lock Coordination**: Distributed locking prevents multiple scheduler instances from conflicting

62

- **Queue Integration**: Scheduled jobs move to RQ queues when execution time arrives

63

64

## Capabilities

65

66

### Job Scheduling

67

68

Core scheduling methods for one-time and recurring jobs including datetime-based scheduling, time delta scheduling, and cron-style patterns.

69

70

```python { .api }

71

def enqueue_at(self, scheduled_time, func, *args, **kwargs):

72

"""Schedule a job to run at a specific datetime."""

73

74

def enqueue_in(self, time_delta, func, *args, **kwargs):

75

"""Schedule a job to run after a time delay."""

76

77

def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, **options):

78

"""Schedule a periodic or repeated job."""

79

80

def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, **options):

81

"""Schedule a job using cron syntax."""

82

```

83

84

[Job Scheduling](./job-scheduling.md)

85

86

### Job Management

87

88

Functions for managing scheduled jobs including querying, canceling, and modifying execution times.

89

90

```python { .api }

91

def cancel(self, job):

92

"""Cancel a scheduled job by removing it from the scheduler queue."""

93

94

def get_jobs(self, until=None, with_times=False, offset=None, length=None):

95

"""Get scheduled jobs iterator with optional filtering and pagination."""

96

97

def count(self, until=None):

98

"""Count scheduled jobs, optionally filtered by time."""

99

100

def change_execution_time(self, job, date_time):

101

"""Change the execution time of a scheduled job."""

102

103

def __contains__(self, item):

104

"""Check if a job is currently scheduled."""

105

```

106

107

[Job Management](./job-management.md)

108

109

### Scheduler Control

110

111

Methods for running and controlling the scheduler daemon including locking, heartbeats, and job processing.

112

113

```python { .api }

114

def run(self, burst=False):

115

"""Start the scheduler daemon to process scheduled jobs."""

116

117

def acquire_lock(self):

118

"""Acquire distributed lock for scheduler coordination."""

119

120

def remove_lock(self):

121

"""Release previously acquired distributed lock."""

122

123

def heartbeat(self):

124

"""Send heartbeat to maintain scheduler registration."""

125

126

def enqueue_jobs(self):

127

"""Move all ready scheduled jobs to their target execution queues."""

128

129

def enqueue_job(self, job):

130

"""Move a scheduled job to a queue and handle periodic job rescheduling."""

131

132

def get_queue_for_job(self, job):

133

"""Get the appropriate queue instance for a job."""

134

```

135

136

[Scheduler Control](./scheduler-control.md)

137

138

### Utility Functions

139

140

Time conversion and scheduling utility functions for working with timestamps and cron expressions.

141

142

```python { .api }

143

def from_unix(string):

144

"""Convert Unix timestamp to UTC datetime object."""

145

146

def to_unix(dt):

147

"""Convert datetime object to Unix timestamp."""

148

149

def get_next_scheduled_time(cron_string, use_local_timezone=False):

150

"""Calculate the next scheduled execution time from a cron expression."""

151

152

def rationalize_until(until=None):

153

"""Process 'until' parameter for time-based queries."""

154

155

def setup_loghandlers(level='INFO'):

156

"""Configure logging for RQ Scheduler with colorized output."""

157

```

158

159

[Utilities](./utilities.md)

160

161

## Types

162

163

```python { .api }

164

class Scheduler:

165

"""Main scheduler class for managing scheduled jobs."""

166

167

def __init__(self, queue_name='default', queue=None, interval=60,

168

connection=None, job_class=None, queue_class=None, name=None):

169

"""

170

Initialize scheduler.

171

172

Parameters:

173

- queue_name: str, name of queue for scheduled jobs

174

- queue: RQ Queue instance (optional, overrides queue_name)

175

- interval: int, polling interval in seconds

176

- connection: Redis connection (required)

177

- job_class: Custom job class

178

- queue_class: Custom queue class

179

- name: Scheduler instance name for coordination

180

"""

181

182

VERSION: tuple = (0, 14, 0)

183

```

184

185

## Console Scripts

186

187

The package provides a command-line interface for running scheduler daemons:

188

189

```bash

190

# Run scheduler with default settings

191

rqscheduler

192

193

# Redis connection options

194

rqscheduler --host redis.example.com --port 6380 --db 1

195

rqscheduler --password mypassword

196

rqscheduler --url redis://user:pass@host:port/db

197

198

# Execution modes

199

rqscheduler --burst # Process all jobs then exit

200

rqscheduler --interval 30 # Custom polling interval (seconds)

201

202

# Logging options

203

rqscheduler --verbose # Debug level logging

204

rqscheduler --quiet # Warning level logging only

205

206

# Advanced options

207

rqscheduler --path /custom/path # Set import path

208

rqscheduler --pid /var/run/rq.pid # Write PID to file

209

rqscheduler --job-class myapp.CustomJob # Custom job class

210

rqscheduler --queue-class myapp.CustomQueue # Custom queue class

211

212

# Show all options

213

rqscheduler --help

214

```

215

216

**Environment Variables:**

217

- `RQ_REDIS_HOST` - Redis host (default: localhost)

218

- `RQ_REDIS_PORT` - Redis port (default: 6379)

219

- `RQ_REDIS_DB` - Redis database (default: 0)

220

- `RQ_REDIS_PASSWORD` - Redis password

221

- `RQ_REDIS_URL` - Complete Redis URL (overrides other connection settings)