or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-scheduling.mdscheduler-control.mdutilities.md

utilities.mddocs/

0

# Utilities

1

2

RQ Scheduler provides utility functions for time conversion, cron expression handling, and logging configuration. These functions support the scheduler's time-based operations and provide tools for working with scheduling data.

3

4

## Capabilities

5

6

### Time Conversion

7

8

Convert between datetime objects and Unix timestamps for Redis storage and time calculations.

9

10

```python { .api }

11

def from_unix(string):

12

"""

13

Convert Unix timestamp to UTC datetime object.

14

15

Parameters:

16

- string: str or numeric, Unix timestamp (seconds since epoch)

17

18

Returns:

19

datetime, UTC datetime object

20

21

Note:

22

- Input can be string or numeric type

23

- Always returns UTC datetime regardless of local timezone

24

- Used internally by scheduler for time deserialization

25

"""

26

27

def to_unix(dt):

28

"""

29

Convert datetime object to Unix timestamp.

30

31

Parameters:

32

- dt: datetime, datetime object to convert

33

34

Returns:

35

int, Unix timestamp (seconds since epoch)

36

37

Note:

38

- Assumes input datetime is UTC if timezone-naive

39

- Uses calendar.timegm for UTC conversion

40

- Used internally by scheduler for time serialization

41

"""

42

```

43

44

**Usage Examples:**

45

46

```python

47

from datetime import datetime

48

from rq_scheduler.utils import from_unix, to_unix

49

50

# Convert datetime to timestamp

51

dt = datetime(2025, 6, 15, 14, 30, 0)

52

timestamp = to_unix(dt)

53

print(f"Timestamp: {timestamp}") # 1750262200

54

55

# Convert timestamp back to datetime

56

restored_dt = from_unix(timestamp)

57

print(f"DateTime: {restored_dt}") # 2025-06-15 14:30:00

58

59

# Working with scheduler internals

60

restored_dt2 = from_unix("1750262200") # String input works too

61

assert restored_dt == restored_dt2

62

63

# Timezone handling

64

import pytz

65

utc_dt = datetime(2025, 6, 15, 14, 30, 0, tzinfo=pytz.UTC)

66

naive_dt = datetime(2025, 6, 15, 14, 30, 0)

67

68

# Both produce same timestamp (treated as UTC)

69

assert to_unix(utc_dt) == to_unix(naive_dt)

70

```

71

72

### Cron Expression Processing

73

74

Calculate next execution times for cron-style job scheduling.

75

76

```python { .api }

77

def get_next_scheduled_time(cron_string, use_local_timezone=False):

78

"""

79

Calculate the next scheduled execution time from a cron expression.

80

81

Parameters:

82

- cron_string: str, cron expression (minute hour day month weekday)

83

- use_local_timezone: bool, use local timezone instead of UTC

84

85

Returns:

86

datetime, next execution time as timezone-aware datetime

87

88

Note:

89

- Uses crontab library for cron parsing

90

- Returns timezone-aware datetime object

91

- UTC timezone used by default for consistency

92

- Local timezone option for user-facing scheduling

93

"""

94

```

95

96

**Usage Examples:**

97

98

```python

99

from rq_scheduler.utils import get_next_scheduled_time

100

from datetime import datetime

101

102

# Every day at 2:30 AM UTC

103

next_time = get_next_scheduled_time("30 2 * * *")

104

print(f"Next execution: {next_time}")

105

106

# Every weekday at 9 AM local time

107

next_local = get_next_scheduled_time("0 9 * * 1-5", use_local_timezone=True)

108

print(f"Next weekday 9 AM: {next_local}")

109

110

# Every 15 minutes

111

next_quarter = get_next_scheduled_time("*/15 * * * *")

112

print(f"Next 15-minute mark: {next_quarter}")

113

114

# Complex scheduling - first Monday of each month at noon

115

next_complex = get_next_scheduled_time("0 12 1-7 * 1")

116

print(f"Next first Monday: {next_complex}")

117

118

# Working with scheduler

119

from rq_scheduler import Scheduler

120

from redis import Redis

121

122

scheduler = Scheduler(connection=Redis())

123

124

# Schedule using calculated time

125

cron_expr = "0 0 * * 0" # Every Sunday at midnight

126

next_sunday = get_next_scheduled_time(cron_expr)

127

scheduler.enqueue_at(next_sunday, weekly_cleanup)

128

```

129

130

### Time Parameter Processing

131

132

Process and normalize time parameters for scheduler queries.

133

134

```python { .api }

135

def rationalize_until(until=None):

136

"""

137

Process 'until' parameter for time-based queries.

138

139

Parameters:

140

- until: None, datetime, timedelta, or numeric - time constraint

141

142

Returns:

143

str or numeric, normalized time value for Redis queries

144

145

Behavior:

146

- None -> "+inf" (no time limit)

147

- datetime -> Unix timestamp

148

- timedelta -> Unix timestamp (current time + delta)

149

- numeric -> passed through unchanged

150

151

Note:

152

- Used internally by count() and get_jobs() methods

153

- Handles various time input formats consistently

154

"""

155

```

156

157

**Usage Examples:**

158

159

```python

160

from datetime import datetime, timedelta

161

from rq_scheduler.utils import rationalize_until

162

163

# Various input types

164

print(rationalize_until(None)) # "+inf"

165

print(rationalize_until(datetime(2025, 6, 15))) # Unix timestamp

166

print(rationalize_until(timedelta(hours=1))) # Current time + 1 hour timestamp

167

print(rationalize_until(1750262400)) # 1750262400 (unchanged)

168

169

# Usage in custom queries (internal scheduler pattern)

170

from rq_scheduler import Scheduler

171

from redis import Redis

172

173

scheduler = Scheduler(connection=Redis())

174

175

# These calls use rationalize_until internally:

176

scheduler.count(until=datetime(2025, 12, 31))

177

scheduler.count(until=timedelta(days=7))

178

scheduler.count(until=None) # All jobs

179

180

# Manual usage for custom Redis queries

181

until_value = rationalize_until(timedelta(hours=2))

182

# Use until_value in custom Redis ZRANGEBYSCORE operations

183

```

184

185

### Logging Configuration

186

187

Set up logging handlers optimized for scheduler operations.

188

189

```python { .api }

190

def setup_loghandlers(level='INFO'):

191

"""

192

Configure logging for RQ Scheduler with colorized output.

193

194

Parameters:

195

- level: str, logging level ('DEBUG', 'INFO', 'WARNING', 'ERROR')

196

197

Returns:

198

None

199

200

Behavior:

201

- Configures 'rq_scheduler.scheduler' logger

202

- Uses ColorizingStreamHandler for colored console output

203

- Sets timestamp format optimized for scheduler monitoring

204

- Idempotent - safe to call multiple times

205

"""

206

```

207

208

**Usage Examples:**

209

210

```python

211

from rq_scheduler.utils import setup_loghandlers

212

213

# Basic setup with INFO level

214

setup_loghandlers()

215

216

# Debug level for troubleshooting

217

setup_loghandlers('DEBUG')

218

219

# Warning level for production

220

setup_loghandlers('WARNING')

221

222

# Integration with scheduler

223

from rq_scheduler import Scheduler

224

from redis import Redis

225

226

# Setup logging before creating scheduler

227

setup_loghandlers('DEBUG')

228

229

scheduler = Scheduler(connection=Redis())

230

scheduler.run() # Will output detailed logs

231

232

# Custom logging configuration

233

import logging

234

from rq_scheduler.utils import setup_loghandlers

235

236

# Setup scheduler logging first

237

setup_loghandlers('INFO')

238

239

# Add custom application logging

240

app_logger = logging.getLogger('myapp')

241

app_logger.setLevel(logging.INFO)

242

handler = logging.StreamHandler()

243

handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(message)s'))

244

app_logger.addHandler(handler)

245

246

# Both loggers will work together

247

scheduler = Scheduler(connection=Redis())

248

app_logger.info("Starting scheduler")

249

scheduler.run()

250

```

251

252

## Integration with Scheduler Internals

253

254

These utilities are used internally by the scheduler but can be helpful for custom scheduling logic:

255

256

```python

257

from datetime import datetime, timedelta

258

from rq_scheduler import Scheduler

259

from rq_scheduler.utils import to_unix, from_unix, get_next_scheduled_time

260

from redis import Redis

261

262

# Custom scheduler with utility functions

263

class CustomScheduler(Scheduler):

264

def schedule_with_validation(self, cron_expr, func, *args, **kwargs):

265

"""Schedule job with cron validation and logging."""

266

267

# Validate cron expression

268

try:

269

next_time = get_next_scheduled_time(cron_expr)

270

print(f"Job will first run at: {next_time}")

271

except Exception as e:

272

raise ValueError(f"Invalid cron expression: {e}")

273

274

# Schedule the job

275

return self.cron(cron_expr, func, *args, **kwargs)

276

277

def get_job_timestamps(self):

278

"""Get all scheduled jobs with Unix timestamps."""

279

jobs_with_times = self.get_jobs(with_times=True)

280

return [(job, to_unix(scheduled_time)) for job, scheduled_time in jobs_with_times]

281

282

# Usage

283

scheduler = CustomScheduler(connection=Redis())

284

285

# Use custom methods with utilities

286

job = scheduler.schedule_with_validation("0 */6 * * *", cleanup_task)

287

timestamps = scheduler.get_job_timestamps()

288

289

for job, timestamp in timestamps:

290

dt = from_unix(timestamp)

291

print(f"Job {job.id} scheduled for {dt}")

292

```