0
# Queue Operations
1
2
Comprehensive queue management for job scheduling, enqueueing, and batch operations. RQ queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, bulk operations, and queue monitoring.
3
4
## Capabilities
5
6
### Queue Creation and Configuration
7
8
Create and configure queues with various options for job processing behavior.
9
10
```python { .api }
11
class Queue:
12
def __init__(
13
self,
14
name: str = 'default',
15
connection=None,
16
default_timeout: int = None,
17
is_async: bool = True,
18
job_class=None,
19
serializer=None,
20
death_penalty_class=None,
21
**kwargs
22
):
23
"""
24
Initialize a Queue instance.
25
26
Args:
27
name (str): Queue name. Defaults to 'default'.
28
connection: Redis connection instance.
29
default_timeout (int): Default job timeout in seconds.
30
is_async (bool): Whether to process jobs asynchronously.
31
job_class: Custom Job class to use.
32
serializer: Custom serializer for job data.
33
death_penalty_class: Custom death penalty class for timeouts.
34
**kwargs: Additional queue configuration options.
35
"""
36
37
@classmethod
38
def all(cls, connection, job_class=None, serializer=None, death_penalty_class=None) -> list['Queue']:
39
"""
40
Get all existing queues.
41
42
Args:
43
connection: Redis connection.
44
job_class: Job class for deserialization.
45
serializer: Custom serializer.
46
death_penalty_class: Death penalty class.
47
48
Returns:
49
list[Queue]: All queues in Redis.
50
"""
51
52
@classmethod
53
def from_queue_key(
54
cls,
55
queue_key: str,
56
connection,
57
job_class=None,
58
serializer=None,
59
death_penalty_class=None
60
) -> 'Queue':
61
"""
62
Create Queue instance from Redis queue key.
63
64
Args:
65
queue_key (str): Redis queue key.
66
connection: Redis connection.
67
job_class: Job class for deserialization.
68
serializer: Custom serializer.
69
death_penalty_class: Death penalty class.
70
71
Returns:
72
Queue: Queue instance.
73
"""
74
```
75
76
### Basic Job Enqueueing
77
78
Core methods for adding jobs to queues with various execution options.
79
80
```python { .api }
81
def enqueue(self, f, *args, **kwargs) -> 'Job':
82
"""
83
Enqueue a function call for execution.
84
85
Args:
86
f: Function to execute.
87
*args: Positional arguments for the function.
88
**kwargs: Keyword arguments for the function and job options.
89
90
Job options in kwargs:
91
timeout (int): Job timeout in seconds.
92
result_ttl (int): Result time-to-live in seconds.
93
ttl (int): Job time-to-live in seconds.
94
failure_ttl (int): Failure info time-to-live in seconds.
95
description (str): Job description.
96
depends_on: Job dependencies.
97
job_id (str): Custom job ID.
98
at_front (bool): Add to front of queue.
99
meta (dict): Job metadata.
100
retry (Retry): Retry configuration.
101
repeat (Repeat): Repeat configuration.
102
on_success (Callback): Success callback.
103
on_failure (Callback): Failure callback.
104
on_stopped (Callback): Stopped callback.
105
106
Returns:
107
Job: The enqueued job.
108
"""
109
110
def enqueue_call(
111
self,
112
func,
113
args=None,
114
kwargs=None,
115
timeout=None,
116
result_ttl=None,
117
ttl=None,
118
failure_ttl=None,
119
description=None,
120
depends_on=None,
121
job_id=None,
122
at_front=False,
123
meta=None,
124
retry=None,
125
repeat=None,
126
on_success=None,
127
on_failure=None,
128
on_stopped=None,
129
pipeline=None
130
) -> 'Job':
131
"""
132
Enqueue a function call with explicit parameters.
133
134
Args:
135
func: Function to execute.
136
args (tuple): Function positional arguments.
137
kwargs (dict): Function keyword arguments.
138
timeout (int): Job timeout in seconds.
139
result_ttl (int): Result time-to-live in seconds.
140
ttl (int): Job time-to-live in seconds.
141
failure_ttl (int): Failure info time-to-live in seconds.
142
description (str): Job description.
143
depends_on: Job dependencies.
144
job_id (str): Custom job ID.
145
at_front (bool): Add to front of queue.
146
meta (dict): Job metadata.
147
retry (Retry): Retry configuration.
148
repeat (Repeat): Repeat configuration.
149
on_success (Callback): Success callback.
150
on_failure (Callback): Failure callback.
151
on_stopped (Callback): Stopped callback.
152
pipeline: Redis pipeline for batched operations.
153
154
Returns:
155
Job: The enqueued job.
156
"""
157
158
def enqueue_job(self, job: 'Job', pipeline=None, at_front: bool = False) -> 'Job':
159
"""
160
Enqueue an existing job.
161
162
Args:
163
job (Job): Job to enqueue.
164
pipeline: Redis pipeline for batched operations.
165
at_front (bool): Add to front of queue.
166
167
Returns:
168
Job: The enqueued job.
169
"""
170
```
171
172
### Scheduled Job Enqueueing
173
174
Schedule jobs for future execution with precise timing control.
175
176
```python { .api }
177
def enqueue_at(self, datetime, f, *args, **kwargs) -> 'Job':
178
"""
179
Schedule a job for execution at a specific datetime.
180
181
Args:
182
datetime (datetime): When to execute the job.
183
f: Function to execute.
184
*args: Function positional arguments.
185
**kwargs: Function keyword arguments and job options.
186
187
Returns:
188
Job: The scheduled job.
189
"""
190
191
def enqueue_in(self, time_delta, func, *args, **kwargs) -> 'Job':
192
"""
193
Schedule a job for execution after a time delay.
194
195
Args:
196
time_delta (timedelta): Delay before execution.
197
func: Function to execute.
198
*args: Function positional arguments.
199
**kwargs: Function keyword arguments and job options.
200
201
Returns:
202
Job: The scheduled job.
203
"""
204
205
def schedule_job(self, job: 'Job', datetime, pipeline=None):
206
"""
207
Schedule an existing job for future execution.
208
209
Args:
210
job (Job): Job to schedule.
211
datetime (datetime): When to execute the job.
212
pipeline: Redis pipeline for batched operations.
213
"""
214
```
215
216
### Batch Operations
217
218
Efficiently handle multiple jobs with batch enqueueing and processing.
219
220
```python { .api }
221
def enqueue_many(self, job_datas, pipeline=None, group_id: str = None) -> list['Job']:
222
"""
223
Enqueue multiple jobs in a single operation.
224
225
Args:
226
job_datas: Iterable of EnqueueData instances or job specifications.
227
pipeline: Redis pipeline for batched operations.
228
group_id (str): Group identifier for related jobs.
229
230
Returns:
231
list[Job]: List of enqueued jobs.
232
"""
233
234
@classmethod
235
def prepare_data(
236
cls,
237
func,
238
args=None,
239
kwargs=None,
240
timeout=None,
241
result_ttl=None,
242
ttl=None,
243
failure_ttl=None,
244
description=None,
245
depends_on=None,
246
job_id=None,
247
at_front=False,
248
meta=None,
249
retry=None,
250
on_success=None,
251
on_failure=None,
252
on_stopped=None,
253
repeat=None
254
):
255
"""
256
Prepare job data for batch enqueueing.
257
258
Args:
259
func: Function to execute.
260
args (tuple): Function arguments.
261
kwargs (dict): Function keyword arguments.
262
timeout (int): Job timeout.
263
result_ttl (int): Result TTL.
264
ttl (int): Job TTL.
265
failure_ttl (int): Failure TTL.
266
description (str): Job description.
267
depends_on: Job dependencies.
268
job_id (str): Custom job ID.
269
at_front (bool): Priority enqueueing.
270
meta (dict): Job metadata.
271
retry (Retry): Retry configuration.
272
on_success (Callback): Success callback.
273
on_failure (Callback): Failure callback.
274
on_stopped (Callback): Stopped callback.
275
repeat (Repeat): Repeat configuration.
276
277
Returns:
278
EnqueueData: Prepared job data for batch operations.
279
"""
280
```
281
282
### Queue Monitoring and Management
283
284
Monitor queue state and manage queue lifecycle.
285
286
```python { .api }
287
@property
288
def count(self) -> int:
289
"""Number of jobs in the queue."""
290
291
@property
292
def is_empty(self) -> bool:
293
"""True if queue has no jobs."""
294
295
@property
296
def job_ids(self) -> list[str]:
297
"""List of all job IDs in the queue."""
298
299
@property
300
def jobs(self) -> list['Job']:
301
"""List of all valid jobs in the queue."""
302
303
def get_job_ids(self, offset: int = 0, length: int = -1) -> list[str]:
304
"""
305
Get a slice of job IDs from the queue.
306
307
Args:
308
offset (int): Starting position.
309
length (int): Number of IDs to return (-1 for all).
310
311
Returns:
312
list[str]: Job IDs.
313
"""
314
315
def get_jobs(self, offset: int = 0, length: int = -1) -> list['Job']:
316
"""
317
Get a slice of jobs from the queue.
318
319
Args:
320
offset (int): Starting position.
321
length (int): Number of jobs to return (-1 for all).
322
323
Returns:
324
list[Job]: Jobs in the queue.
325
"""
326
327
def fetch_job(self, job_id: str) -> 'Job | None':
328
"""
329
Fetch a specific job from the queue.
330
331
Args:
332
job_id (str): Job identifier.
333
334
Returns:
335
Job | None: Job if found, None otherwise.
336
"""
337
338
def get_job_position(self, job_or_id) -> int | None:
339
"""
340
Get position of a job in the queue.
341
342
Args:
343
job_or_id: Job instance or job ID.
344
345
Returns:
346
int | None: Position in queue (0-based) or None if not found.
347
"""
348
```
349
350
### Queue Maintenance
351
352
Maintain queue health with cleanup and management operations.
353
354
```python { .api }
355
def empty(self):
356
"""Remove all jobs from the queue."""
357
358
def delete(self, delete_jobs: bool = True):
359
"""
360
Delete the queue.
361
362
Args:
363
delete_jobs (bool): Whether to delete associated jobs.
364
"""
365
366
def compact(self):
367
"""Remove invalid job references while preserving FIFO order."""
368
369
def remove(self, job_or_id, pipeline=None):
370
"""
371
Remove a specific job from the queue.
372
373
Args:
374
job_or_id: Job instance or job ID to remove.
375
pipeline: Redis pipeline for batched operations.
376
"""
377
378
def push_job_id(self, job_id: str, pipeline=None, at_front: bool = False):
379
"""
380
Push a job ID onto the queue.
381
382
Args:
383
job_id (str): Job identifier.
384
pipeline: Redis pipeline.
385
at_front (bool): Add to front of queue.
386
"""
387
388
def pop_job_id(self) -> str | None:
389
"""
390
Pop a job ID from the front of the queue.
391
392
Returns:
393
str | None: Job ID or None if queue is empty.
394
"""
395
```
396
397
### Multi-Queue Operations
398
399
Handle operations across multiple queues for load balancing and priority processing.
400
401
```python { .api }
402
@classmethod
403
def dequeue_any(
404
cls,
405
queues,
406
timeout: int = None,
407
connection=None,
408
job_class=None,
409
serializer=None,
410
death_penalty_class=None
411
) -> tuple['Job', 'Queue'] | None:
412
"""
413
Dequeue a job from any of the given queues.
414
415
Args:
416
queues: Iterable of Queue instances.
417
timeout (int): Timeout in seconds for blocking dequeue.
418
connection: Redis connection.
419
job_class: Job class for deserialization.
420
serializer: Custom serializer.
421
death_penalty_class: Death penalty class.
422
423
Returns:
424
tuple[Job, Queue] | None: (Job, Queue) tuple or None if timeout.
425
"""
426
427
@classmethod
428
def lpop(cls, queue_keys, timeout: int = None, connection=None):
429
"""
430
Pop from multiple queue keys using Redis BLPOP.
431
432
Args:
433
queue_keys: List of queue key strings.
434
timeout (int): Timeout in seconds.
435
connection: Redis connection.
436
437
Returns:
438
tuple: (queue_key, job_id) or None if timeout.
439
"""
440
441
@classmethod
442
def lmove(cls, connection, queue_key: str, timeout: int = None):
443
"""
444
Move job using Redis BLMOVE operation.
445
446
Args:
447
connection: Redis connection.
448
queue_key (str): Source queue key.
449
timeout (int): Timeout in seconds.
450
451
Returns:
452
Job data or None if timeout.
453
"""
454
```
455
456
### Queue Properties and Configuration
457
458
Access queue configuration and runtime properties.
459
460
```python { .api }
461
@property
462
def name(self) -> str:
463
"""Queue name."""
464
465
@property
466
def key(self) -> str:
467
"""Redis key for the queue."""
468
469
@property
470
def connection(self):
471
"""Redis connection instance."""
472
473
@property
474
def serializer(self):
475
"""Serializer used for job data."""
476
477
@property
478
def is_async(self) -> bool:
479
"""Whether queue processes jobs asynchronously."""
480
481
@property
482
def intermediate_queue_key(self) -> str:
483
"""Redis key for intermediate queue."""
484
485
@property
486
def intermediate_queue(self):
487
"""IntermediateQueue instance for this queue."""
488
489
def get_redis_server_version(self) -> tuple[int, int, int]:
490
"""
491
Get Redis server version.
492
493
Returns:
494
tuple[int, int, int]: (major, minor, patch) version numbers.
495
"""
496
```
497
498
### Registry Access
499
500
Access job registries for monitoring different job states.
501
502
```python { .api }
503
@property
504
def failed_job_registry(self):
505
"""Registry of failed jobs."""
506
507
@property
508
def started_job_registry(self):
509
"""Registry of jobs currently being executed."""
510
511
@property
512
def finished_job_registry(self):
513
"""Registry of successfully completed jobs."""
514
515
@property
516
def deferred_job_registry(self):
517
"""Registry of jobs waiting for dependencies."""
518
519
@property
520
def scheduled_job_registry(self):
521
"""Registry of scheduled jobs."""
522
523
@property
524
def canceled_job_registry(self):
525
"""Registry of canceled jobs."""
526
```
527
528
## Usage Examples
529
530
### Basic Queue Operations
531
532
```python
533
import redis
534
from rq import Queue
535
536
# Connect to Redis
537
conn = redis.Redis()
538
539
# Create a queue
540
q = Queue('data_processing', connection=conn)
541
542
# Simple function
543
def process_item(item_id, priority='normal'):
544
return f"Processed item {item_id} with {priority} priority"
545
546
# Enqueue jobs
547
job1 = q.enqueue(process_item, 'item_001')
548
job2 = q.enqueue(process_item, 'item_002', priority='high')
549
550
# Enqueue with options
551
job3 = q.enqueue(
552
process_item,
553
'item_003',
554
timeout=300,
555
result_ttl=3600,
556
description="Process critical item",
557
meta={'department': 'sales', 'urgent': True}
558
)
559
560
print(f"Enqueued {len([job1, job2, job3])} jobs")
561
print(f"Queue count: {q.count}")
562
```
563
564
### Scheduled Job Enqueueing
565
566
```python
567
from datetime import datetime, timedelta
568
from rq import Queue
569
import redis
570
571
conn = redis.Redis()
572
q = Queue('scheduled_tasks', connection=conn)
573
574
def send_reminder(user_id, message):
575
return f"Sent reminder to user {user_id}: {message}"
576
577
# Schedule for specific time
578
future_time = datetime.now() + timedelta(hours=1)
579
scheduled_job = q.enqueue_at(
580
future_time,
581
send_reminder,
582
user_id=123,
583
message="Don't forget your appointment!"
584
)
585
586
# Schedule with delay
587
delayed_job = q.enqueue_in(
588
timedelta(minutes=30),
589
send_reminder,
590
user_id=456,
591
message="Meeting starts in 30 minutes"
592
)
593
594
print(f"Scheduled job: {scheduled_job.id}")
595
print(f"Delayed job: {delayed_job.id}")
596
```
597
598
### Batch Job Enqueueing
599
600
```python
601
from rq import Queue
602
import redis
603
604
conn = redis.Redis()
605
q = Queue('batch_processing', connection=conn)
606
607
def process_data_chunk(data_chunk, config=None):
608
return f"Processed {len(data_chunk)} items"
609
610
# Prepare multiple jobs
611
job_data_list = []
612
data_chunks = [list(range(i, i+10)) for i in range(0, 100, 10)]
613
614
for i, chunk in enumerate(data_chunks):
615
job_data = Queue.prepare_data(
616
func=process_data_chunk,
617
args=(chunk,),
618
kwargs={'config': {'batch_id': i}},
619
description=f"Process chunk {i}",
620
meta={'chunk_size': len(chunk)}
621
)
622
job_data_list.append(job_data)
623
624
# Enqueue all jobs at once
625
jobs = q.enqueue_many(job_data_list, group_id='batch_001')
626
627
print(f"Enqueued {len(jobs)} jobs in batch")
628
print(f"Queue count: {q.count}")
629
```
630
631
### Queue Monitoring and Management
632
633
```python
634
from rq import Queue
635
import redis
636
637
conn = redis.Redis()
638
q = Queue('monitoring_example', connection=conn)
639
640
# Add some jobs
641
for i in range(5):
642
q.enqueue(lambda x: x * 2, i)
643
644
# Monitor queue
645
print(f"Queue: {q.name}")
646
print(f"Total jobs: {q.count}")
647
print(f"Is empty: {q.is_empty}")
648
649
# Get job information
650
job_ids = q.get_job_ids()
651
print(f"Job IDs: {job_ids}")
652
653
# Get first 3 jobs
654
first_jobs = q.get_jobs(offset=0, length=3)
655
for job in first_jobs:
656
print(f"Job {job.id}: {job.description}")
657
658
# Find specific job
659
if job_ids:
660
specific_job = q.fetch_job(job_ids[0])
661
position = q.get_job_position(specific_job)
662
print(f"Job {specific_job.id} is at position {position}")
663
664
# Queue maintenance
665
print("Before compact:", q.count)
666
q.compact() # Remove any invalid job references
667
print("After compact:", q.count)
668
669
# Registry access
670
print(f"Failed jobs: {q.failed_job_registry.count}")
671
print(f"Finished jobs: {q.finished_job_registry.count}")
672
```
673
674
### Multi-Queue Processing
675
676
```python
677
from rq import Queue
678
import redis
679
680
conn = redis.Redis()
681
682
# Create multiple queues
683
high_priority = Queue('high_priority', connection=conn)
684
normal_priority = Queue('normal', connection=conn)
685
low_priority = Queue('low_priority', connection=conn)
686
687
def important_task():
688
return "Completed important task"
689
690
def regular_task():
691
return "Completed regular task"
692
693
# Add jobs to different queues
694
high_priority.enqueue(important_task)
695
normal_priority.enqueue(regular_task)
696
low_priority.enqueue(regular_task)
697
698
# Dequeue from multiple queues (priority order)
699
queues = [high_priority, normal_priority, low_priority]
700
result = Queue.dequeue_any(queues, timeout=1, connection=conn)
701
702
if result:
703
job, queue = result
704
print(f"Dequeued job {job.id} from queue {queue.name}")
705
else:
706
print("No jobs available")
707
708
# Get all queues
709
all_queues = Queue.all(connection=conn)
710
print(f"Total queues: {len(all_queues)}")
711
for queue in all_queues:
712
print(f"Queue {queue.name}: {queue.count} jobs")
713
```