or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md

index.mddocs/

0

# RQ (Redis Queue)

1

2

RQ is a simple, lightweight Python library for creating background jobs and processing them with workers backed by Redis. It provides a low barrier to entry while scaling incredibly well for large applications, offering comprehensive functionality for background job processing including job scheduling, worker management, result storage, monitoring, and failure handling with retry mechanisms.

3

4

## Package Information

5

6

- **Package Name**: rq

7

- **Language**: Python

8

- **Installation**: `pip install rq`

9

- **Dependencies**: `redis>=3.5,!=6`, `click>=5`, `croniter`

10

- **Python Version**: >=3.9

11

12

## Core Imports

13

14

```python

15

import rq

16

```

17

18

Common pattern for job management:

19

20

```python

21

from rq import Queue, Worker, Job

22

```

23

24

Worker and job utilities:

25

26

```python

27

from rq import get_current_job, cancel_job, requeue_job

28

```

29

30

Callback and retry functionality:

31

32

```python

33

from rq import Callback, Retry, Repeat

34

```

35

36

## Basic Usage

37

38

```python

39

import redis

40

from rq import Queue, Worker

41

42

# Connect to Redis

43

redis_conn = redis.Redis()

44

45

# Create a queue

46

q = Queue(connection=redis_conn)

47

48

# Define a job function

49

def add_numbers(a, b):

50

return a + b

51

52

# Enqueue a job

53

job = q.enqueue(add_numbers, 5, 3)

54

print(f"Job {job.id} enqueued")

55

56

# Create and start a worker

57

worker = Worker([q], connection=redis_conn)

58

worker.work() # This blocks and processes jobs

59

```

60

61

Advanced usage with job monitoring:

62

63

```python

64

from rq import Queue, Job, get_current_job

65

import redis

66

67

redis_conn = redis.Redis()

68

q = Queue(connection=redis_conn)

69

70

# Enqueue with options

71

job = q.enqueue(

72

add_numbers,

73

10, 20,

74

timeout=300, # 5 minute timeout

75

result_ttl=3600, # Keep result for 1 hour

76

failure_ttl=86400, # Keep failure info for 1 day

77

retry=rq.Retry(max=3), # Retry up to 3 times

78

description="Adding two numbers"

79

)

80

81

# Check job status

82

print(f"Status: {job.get_status()}")

83

print(f"Result: {job.result}") # None until completed

84

85

# Get job by ID

86

retrieved_job = Job.fetch(job.id, connection=redis_conn)

87

```

88

89

## Architecture

90

91

RQ's architecture consists of four main components:

92

93

- **Queue**: Redis-backed FIFO queues that hold jobs waiting to be processed

94

- **Job**: Work units containing function calls, parameters, metadata, and execution state

95

- **Worker**: Processes that fetch jobs from queues and execute them, with support for multiple workers and fork-based isolation

96

- **Registry**: Collections that track jobs by status (started, finished, failed, scheduled, etc.) for monitoring and management

97

98

The system supports distributed processing with multiple workers across multiple machines, comprehensive job lifecycle management, and flexible scheduling patterns including immediate execution, delayed execution, and recurring jobs.

99

100

## Capabilities

101

102

### Job Management

103

104

Core job operations including creation, execution tracking, status management, and lifecycle control. Jobs encapsulate function calls with comprehensive metadata and support callbacks, retries, and dependencies.

105

106

```python { .api }

107

def get_current_job(connection=None, job_class=None) -> Job | None: ...

108

def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False): ...

109

def requeue_job(job_id: str, connection, serializer=None) -> Job: ...

110

111

class Job:

112

def __init__(self, id: str = None, connection = None, serializer=None): ...

113

@classmethod

114

def create(cls, func, args=None, kwargs=None, **options) -> 'Job': ...

115

@classmethod

116

def fetch(cls, id: str, connection, serializer=None) -> 'Job': ...

117

def get_status(self, refresh: bool = True) -> JobStatus: ...

118

def perform(self) -> Any: ...

119

def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True): ...

120

def cancel(self, pipeline=None, enqueue_dependents: bool = False): ...

121

def requeue(self, at_front: bool = False) -> 'Job': ...

122

def delete(self, pipeline=None, remove_from_queue: bool = True): ...

123

```

124

125

[Job Management](./job-management.md)

126

127

### Queue Operations

128

129

Queue management for job scheduling, enqueueing, and batch operations. Queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, and bulk operations.

130

131

```python { .api }

132

class Queue:

133

def __init__(self, name: str = 'default', connection=None, **kwargs): ...

134

def enqueue(self, f, *args, **kwargs) -> Job: ...

135

def enqueue_call(self, func, args=None, kwargs=None, **options) -> Job: ...

136

def enqueue_at(self, datetime, f, *args, **kwargs) -> Job: ...

137

def enqueue_in(self, time_delta, func, *args, **kwargs) -> Job: ...

138

def enqueue_many(self, job_datas, pipeline=None, group_id=None) -> list[Job]: ...

139

def schedule_job(self, job: Job, datetime, pipeline=None): ...

140

def empty(self): ...

141

def delete(self, delete_jobs: bool = True): ...

142

def get_jobs(self, offset: int = 0, length: int = -1) -> list[Job]: ...

143

```

144

145

[Queue Operations](./queue-operations.md)

146

147

### Worker Management

148

149

Worker processes for job execution with support for multiple queues, different execution strategies, and comprehensive monitoring. Workers handle job lifecycle, error recovery, and provide flexible deployment options.

150

151

```python { .api }

152

class Worker:

153

def __init__(self, queues, name: str = None, connection=None, **kwargs): ...

154

def work(self, burst: bool = False, logging_level: str = None, **options) -> bool: ...

155

def execute_job(self, job: Job, queue: Queue): ...

156

def request_stop(self, signum=None, frame=None): ...

157

def clean_registries(self): ...

158

159

class SimpleWorker(Worker):

160

def execute_job(self, job: Job, queue: Queue): ...

161

162

class SpawnWorker(Worker):

163

def fork_work_horse(self, job: Job, queue: Queue): ...

164

```

165

166

[Worker Management](./worker-management.md)

167

168

### Job Patterns

169

170

Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies.

171

172

```python { .api }

173

class Callback:

174

def __init__(self, func, timeout: int = None): ...

175

176

class Retry:

177

def __init__(self, max: int, interval: int | list[int] = 0): ...

178

@classmethod

179

def get_interval(cls, count: int, intervals) -> int: ...

180

181

class Repeat:

182

def __init__(self, times: int, interval: int | list[int] = 0): ...

183

@classmethod

184

def schedule(cls, job: Job, queue: Queue, pipeline=None): ...

185

```

186

187

[Job Patterns](./job-patterns.md)

188

189

### Registries and Monitoring

190

191

Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, and system health monitoring.

192

193

```python { .api }

194

class StartedJobRegistry:

195

def __init__(self, name: str = 'default', connection=None, **kwargs): ...

196

def get_job_count(self, cleanup: bool = True) -> int: ...

197

198

class FinishedJobRegistry:

199

def __init__(self, name: str = 'default', connection=None, **kwargs): ...

200

201

class FailedJobRegistry:

202

def __init__(self, name: str = 'default', connection=None, **kwargs): ...

203

204

class ScheduledJobRegistry:

205

def __init__(self, name: str = 'default', connection=None, **kwargs): ...

206

```

207

208

[Registries and Monitoring](./registries-monitoring.md)

209

210

## Types

211

212

### Core Types

213

214

```python { .api }

215

from enum import Enum

216

from typing import Callable, Any, Union, Optional

217

from datetime import datetime, timedelta

218

219

class JobStatus(str, Enum):

220

CREATED = 'created'

221

QUEUED = 'queued'

222

FINISHED = 'finished'

223

FAILED = 'failed'

224

STARTED = 'started'

225

DEFERRED = 'deferred'

226

SCHEDULED = 'scheduled'

227

STOPPED = 'stopped'

228

CANCELED = 'canceled'

229

230

class WorkerStatus(str, Enum):

231

STARTED = 'started'

232

SUSPENDED = 'suspended'

233

BUSY = 'busy'

234

IDLE = 'idle'

235

236

class DequeueStrategy(str, Enum):

237

DEFAULT = 'default'

238

ROUND_ROBIN = 'round_robin'

239

RANDOM = 'random'

240

241

# Type aliases

242

FunctionReferenceType = Union[str, Callable[..., Any]]

243

JobDependencyType = Union['Dependency', 'Job', str, list[Union['Dependency', 'Job', str]]]

244

SuccessCallbackType = Callable[['Job', Any, Any], Any]

245

FailureCallbackType = Callable[['Job', Any, Optional[type], Optional[Exception], Any], Any]

246

```

247

248

## Constants

249

250

```python { .api }

251

# Default timeouts and TTLs (in seconds)

252

DEFAULT_WORKER_TTL = 420

253

DEFAULT_JOB_MONITORING_INTERVAL = 30

254

DEFAULT_RESULT_TTL = 500

255

DEFAULT_FAILURE_TTL = 31536000 # 1 year

256

DEFAULT_MAINTENANCE_TASK_INTERVAL = 600 # 10 minutes

257

CALLBACK_TIMEOUT = 60

258

259

# Logging configuration

260

DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'

261

DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'

262

```

263

264

## Exceptions

265

266

```python { .api }

267

class NoSuchJobError(Exception):

268

"""Raised when a job cannot be found."""

269

270

class DequeueTimeout(Exception):

271

"""Raised when dequeue operation times out."""

272

273

class InvalidJobOperation(Exception):

274

"""Raised when an invalid operation is performed on a job."""

275

276

class DeserializationError(Exception):

277

"""Raised when job data cannot be deserialized."""

278

279

class AbandonedJobError(Exception):

280

"""Raised when a job is abandoned by its worker."""

281

282

class ShutDownImminentException(Exception):

283

"""Raised when worker shutdown is imminent."""

284

```