0
# RQ (Redis Queue)
1
2
RQ is a simple, lightweight Python library for creating background jobs and processing them with workers backed by Redis. It provides a low barrier to entry while scaling incredibly well for large applications, offering comprehensive functionality for background job processing including job scheduling, worker management, result storage, monitoring, and failure handling with retry mechanisms.
3
4
## Package Information
5
6
- **Package Name**: rq
7
- **Language**: Python
8
- **Installation**: `pip install rq`
9
- **Dependencies**: `redis>=3.5,!=6`, `click>=5`, `croniter`
10
- **Python Version**: >=3.9
11
12
## Core Imports
13
14
```python
15
import rq
16
```
17
18
Common pattern for job management:
19
20
```python
21
from rq import Queue, Worker, Job
22
```
23
24
Worker and job utilities:
25
26
```python
27
from rq import get_current_job, cancel_job, requeue_job
28
```
29
30
Callback and retry functionality:
31
32
```python
33
from rq import Callback, Retry, Repeat
34
```
35
36
## Basic Usage
37
38
```python
39
import redis
40
from rq import Queue, Worker
41
42
# Connect to Redis
43
redis_conn = redis.Redis()
44
45
# Create a queue
46
q = Queue(connection=redis_conn)
47
48
# Define a job function
49
def add_numbers(a, b):
50
return a + b
51
52
# Enqueue a job
53
job = q.enqueue(add_numbers, 5, 3)
54
print(f"Job {job.id} enqueued")
55
56
# Create and start a worker
57
worker = Worker([q], connection=redis_conn)
58
worker.work() # This blocks and processes jobs
59
```
60
61
Advanced usage with job monitoring:
62
63
```python
64
from rq import Queue, Job, get_current_job
65
import redis
66
67
redis_conn = redis.Redis()
68
q = Queue(connection=redis_conn)
69
70
# Enqueue with options
71
job = q.enqueue(
72
add_numbers,
73
10, 20,
74
timeout=300, # 5 minute timeout
75
result_ttl=3600, # Keep result for 1 hour
76
failure_ttl=86400, # Keep failure info for 1 day
77
retry=rq.Retry(max=3), # Retry up to 3 times
78
description="Adding two numbers"
79
)
80
81
# Check job status
82
print(f"Status: {job.get_status()}")
83
print(f"Result: {job.result}") # None until completed
84
85
# Get job by ID
86
retrieved_job = Job.fetch(job.id, connection=redis_conn)
87
```
88
89
## Architecture
90
91
RQ's architecture consists of four main components:
92
93
- **Queue**: Redis-backed FIFO queues that hold jobs waiting to be processed
94
- **Job**: Work units containing function calls, parameters, metadata, and execution state
95
- **Worker**: Processes that fetch jobs from queues and execute them, with support for multiple workers and fork-based isolation
96
- **Registry**: Collections that track jobs by status (started, finished, failed, scheduled, etc.) for monitoring and management
97
98
The system supports distributed processing with multiple workers across multiple machines, comprehensive job lifecycle management, and flexible scheduling patterns including immediate execution, delayed execution, and recurring jobs.
99
100
## Capabilities
101
102
### Job Management
103
104
Core job operations including creation, execution tracking, status management, and lifecycle control. Jobs encapsulate function calls with comprehensive metadata and support callbacks, retries, and dependencies.
105
106
```python { .api }
107
def get_current_job(connection=None, job_class=None) -> Job | None: ...
108
def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False): ...
109
def requeue_job(job_id: str, connection, serializer=None) -> Job: ...
110
111
class Job:
112
def __init__(self, id: str = None, connection = None, serializer=None): ...
113
@classmethod
114
def create(cls, func, args=None, kwargs=None, **options) -> 'Job': ...
115
@classmethod
116
def fetch(cls, id: str, connection, serializer=None) -> 'Job': ...
117
def get_status(self, refresh: bool = True) -> JobStatus: ...
118
def perform(self) -> Any: ...
119
def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True): ...
120
def cancel(self, pipeline=None, enqueue_dependents: bool = False): ...
121
def requeue(self, at_front: bool = False) -> 'Job': ...
122
def delete(self, pipeline=None, remove_from_queue: bool = True): ...
123
```
124
125
[Job Management](./job-management.md)
126
127
### Queue Operations
128
129
Queue management for job scheduling, enqueueing, and batch operations. Queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, and bulk operations.
130
131
```python { .api }
132
class Queue:
133
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
134
def enqueue(self, f, *args, **kwargs) -> Job: ...
135
def enqueue_call(self, func, args=None, kwargs=None, **options) -> Job: ...
136
def enqueue_at(self, datetime, f, *args, **kwargs) -> Job: ...
137
def enqueue_in(self, time_delta, func, *args, **kwargs) -> Job: ...
138
def enqueue_many(self, job_datas, pipeline=None, group_id=None) -> list[Job]: ...
139
def schedule_job(self, job: Job, datetime, pipeline=None): ...
140
def empty(self): ...
141
def delete(self, delete_jobs: bool = True): ...
142
def get_jobs(self, offset: int = 0, length: int = -1) -> list[Job]: ...
143
```
144
145
[Queue Operations](./queue-operations.md)
146
147
### Worker Management
148
149
Worker processes for job execution with support for multiple queues, different execution strategies, and comprehensive monitoring. Workers handle job lifecycle, error recovery, and provide flexible deployment options.
150
151
```python { .api }
152
class Worker:
153
def __init__(self, queues, name: str = None, connection=None, **kwargs): ...
154
def work(self, burst: bool = False, logging_level: str = None, **options) -> bool: ...
155
def execute_job(self, job: Job, queue: Queue): ...
156
def request_stop(self, signum=None, frame=None): ...
157
def clean_registries(self): ...
158
159
class SimpleWorker(Worker):
160
def execute_job(self, job: Job, queue: Queue): ...
161
162
class SpawnWorker(Worker):
163
def fork_work_horse(self, job: Job, queue: Queue): ...
164
```
165
166
[Worker Management](./worker-management.md)
167
168
### Job Patterns
169
170
Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies.
171
172
```python { .api }
173
class Callback:
174
def __init__(self, func, timeout: int = None): ...
175
176
class Retry:
177
def __init__(self, max: int, interval: int | list[int] = 0): ...
178
@classmethod
179
def get_interval(cls, count: int, intervals) -> int: ...
180
181
class Repeat:
182
def __init__(self, times: int, interval: int | list[int] = 0): ...
183
@classmethod
184
def schedule(cls, job: Job, queue: Queue, pipeline=None): ...
185
```
186
187
[Job Patterns](./job-patterns.md)
188
189
### Registries and Monitoring
190
191
Job registries for tracking job states and comprehensive monitoring capabilities. Registries provide visibility into job execution, failure analysis, and system health monitoring.
192
193
```python { .api }
194
class StartedJobRegistry:
195
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
196
def get_job_count(self, cleanup: bool = True) -> int: ...
197
198
class FinishedJobRegistry:
199
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
200
201
class FailedJobRegistry:
202
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
203
204
class ScheduledJobRegistry:
205
def __init__(self, name: str = 'default', connection=None, **kwargs): ...
206
```
207
208
[Registries and Monitoring](./registries-monitoring.md)
209
210
## Types
211
212
### Core Types
213
214
```python { .api }
215
from enum import Enum
216
from typing import Callable, Any, Union, Optional
217
from datetime import datetime, timedelta
218
219
class JobStatus(str, Enum):
220
CREATED = 'created'
221
QUEUED = 'queued'
222
FINISHED = 'finished'
223
FAILED = 'failed'
224
STARTED = 'started'
225
DEFERRED = 'deferred'
226
SCHEDULED = 'scheduled'
227
STOPPED = 'stopped'
228
CANCELED = 'canceled'
229
230
class WorkerStatus(str, Enum):
231
STARTED = 'started'
232
SUSPENDED = 'suspended'
233
BUSY = 'busy'
234
IDLE = 'idle'
235
236
class DequeueStrategy(str, Enum):
237
DEFAULT = 'default'
238
ROUND_ROBIN = 'round_robin'
239
RANDOM = 'random'
240
241
# Type aliases
242
FunctionReferenceType = Union[str, Callable[..., Any]]
243
JobDependencyType = Union['Dependency', 'Job', str, list[Union['Dependency', 'Job', str]]]
244
SuccessCallbackType = Callable[['Job', Any, Any], Any]
245
FailureCallbackType = Callable[['Job', Any, Optional[type], Optional[Exception], Any], Any]
246
```
247
248
## Constants
249
250
```python { .api }
251
# Default timeouts and TTLs (in seconds)
252
DEFAULT_WORKER_TTL = 420
253
DEFAULT_JOB_MONITORING_INTERVAL = 30
254
DEFAULT_RESULT_TTL = 500
255
DEFAULT_FAILURE_TTL = 31536000 # 1 year
256
DEFAULT_MAINTENANCE_TASK_INTERVAL = 600 # 10 minutes
257
CALLBACK_TIMEOUT = 60
258
259
# Logging configuration
260
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
261
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
262
```
263
264
## Exceptions
265
266
```python { .api }
267
class NoSuchJobError(Exception):
268
"""Raised when a job cannot be found."""
269
270
class DequeueTimeout(Exception):
271
"""Raised when dequeue operation times out."""
272
273
class InvalidJobOperation(Exception):
274
"""Raised when an invalid operation is performed on a job."""
275
276
class DeserializationError(Exception):
277
"""Raised when job data cannot be deserialized."""
278
279
class AbandonedJobError(Exception):
280
"""Raised when a job is abandoned by its worker."""
281
282
class ShutDownImminentException(Exception):
283
"""Raised when worker shutdown is imminent."""
284
```