0
# Job Management
1
2
Comprehensive job lifecycle management including creation, execution tracking, status monitoring, and control operations. Jobs in RQ encapsulate function calls with rich metadata, status tracking, and support for advanced patterns like callbacks, retries, and dependencies.
3
4
## Capabilities
5
6
### Job Creation and Retrieval
7
8
Create new jobs and retrieve existing ones by ID with full serialization support.
9
10
```python { .api }
11
class Job:
12
def __init__(self, id: str = None, connection = None, serializer=None):
13
"""
14
Initialize a Job instance.
15
16
Args:
17
id (str, optional): Job identifier. Generated if not provided.
18
connection: Redis connection instance.
19
serializer: Custom serializer for job data.
20
"""
21
22
@classmethod
23
def create(
24
cls,
25
func,
26
args=None,
27
kwargs=None,
28
connection=None,
29
result_ttl=None,
30
ttl=None,
31
status=None,
32
description=None,
33
depends_on=None,
34
timeout=None,
35
id=None,
36
origin='',
37
meta=None,
38
failure_ttl=None,
39
serializer=None,
40
group_id=None,
41
on_success=None,
42
on_failure=None,
43
on_stopped=None
44
) -> 'Job':
45
"""
46
Create a new job instance.
47
48
Args:
49
func: Function to execute or function reference string.
50
args (tuple): Positional arguments for the function.
51
kwargs (dict): Keyword arguments for the function.
52
connection: Redis connection.
53
result_ttl (int): Result time-to-live in seconds.
54
ttl (int): Job time-to-live in seconds.
55
status (JobStatus): Initial job status.
56
description (str): Human-readable job description.
57
depends_on: Job dependencies.
58
timeout (int): Job execution timeout in seconds.
59
id (str): Custom job ID.
60
origin (str): Queue name where job originated.
61
meta (dict): Additional job metadata.
62
failure_ttl (int): Failure info time-to-live in seconds.
63
serializer: Custom serializer.
64
group_id (str): Job group identifier.
65
on_success (Callback): Success callback.
66
on_failure (Callback): Failure callback.
67
on_stopped (Callback): Stopped callback.
68
69
Returns:
70
Job: New job instance.
71
"""
72
73
@classmethod
74
def fetch(cls, id: str, connection, serializer=None) -> 'Job':
75
"""
76
Retrieve an existing job by ID.
77
78
Args:
79
id (str): Job identifier.
80
connection: Redis connection.
81
serializer: Custom serializer.
82
83
Returns:
84
Job: Retrieved job instance.
85
86
Raises:
87
NoSuchJobError: If job doesn't exist.
88
"""
89
90
@classmethod
91
def exists(cls, job_id: str, connection) -> bool:
92
"""
93
Check if a job exists.
94
95
Args:
96
job_id (str): Job identifier.
97
connection: Redis connection.
98
99
Returns:
100
bool: True if job exists, False otherwise.
101
"""
102
103
@classmethod
104
def fetch_many(cls, job_ids: list[str], connection, serializer=None) -> list['Job | None']:
105
"""
106
Fetch multiple jobs by their IDs.
107
108
Args:
109
job_ids (list[str]): List of job identifiers.
110
connection: Redis connection.
111
serializer: Custom serializer.
112
113
Returns:
114
list[Job | None]: List of jobs (None for non-existent jobs).
115
"""
116
```
117
118
### Job Status and Lifecycle
119
120
Monitor and control job execution status throughout its lifecycle.
121
122
```python { .api }
123
def get_status(self, refresh: bool = True) -> JobStatus:
124
"""
125
Get current job status.
126
127
Args:
128
refresh (bool): Whether to refresh from Redis before returning.
129
130
Returns:
131
JobStatus: Current job status.
132
"""
133
134
def set_status(self, status: JobStatus, pipeline=None):
135
"""
136
Set job status.
137
138
Args:
139
status (JobStatus): New status to set.
140
pipeline: Redis pipeline for batched operations.
141
"""
142
143
def refresh(self):
144
"""Refresh job data from Redis."""
145
146
def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True):
147
"""
148
Save job to Redis.
149
150
Args:
151
pipeline: Redis pipeline for batched operations.
152
include_meta (bool): Whether to save metadata.
153
include_result (bool): Whether to save result.
154
"""
155
156
def delete(self, pipeline=None, remove_from_queue: bool = True, delete_dependents: bool = False):
157
"""
158
Delete job from Redis.
159
160
Args:
161
pipeline: Redis pipeline for batched operations.
162
remove_from_queue (bool): Remove from queue if queued.
163
delete_dependents (bool): Delete dependent jobs.
164
"""
165
166
def cleanup(self, ttl: int = None, pipeline=None, remove_from_queue: bool = True):
167
"""
168
Clean up job data.
169
170
Args:
171
ttl (int): Time-to-live for cleanup.
172
pipeline: Redis pipeline.
173
remove_from_queue (bool): Remove from queue.
174
"""
175
```
176
177
### Job Execution and Results
178
179
Execute jobs and manage execution results with comprehensive error handling.
180
181
```python { .api }
182
def perform(self) -> Any:
183
"""
184
Execute the job function.
185
186
Returns:
187
Any: Function return value.
188
189
Raises:
190
Various: Any exception raised by the job function.
191
"""
192
193
def return_value(self, refresh: bool = False) -> Any:
194
"""
195
Get job return value.
196
197
Args:
198
refresh (bool): Whether to refresh from Redis.
199
200
Returns:
201
Any: Job return value if completed, None otherwise.
202
"""
203
204
def latest_result(self, timeout: int = 0) -> 'Result | None':
205
"""
206
Get the latest job result.
207
208
Args:
209
timeout (int): Maximum wait time for result.
210
211
Returns:
212
Result | None: Latest result or None.
213
"""
214
215
def results(self) -> list['Result']:
216
"""
217
Get all job results.
218
219
Returns:
220
list[Result]: All job results.
221
"""
222
223
def get_call_string(self) -> str | None:
224
"""
225
Get string representation of the function call.
226
227
Returns:
228
str | None: Function call string.
229
"""
230
```
231
232
### Job Control Operations
233
234
Control job execution with cancellation, requeuing, and retry mechanisms.
235
236
```python { .api }
237
def cancel(self, pipeline=None, enqueue_dependents: bool = False, remove_from_dependencies: bool = False):
238
"""
239
Cancel job execution.
240
241
Args:
242
pipeline: Redis pipeline for batched operations.
243
enqueue_dependents (bool): Enqueue dependent jobs.
244
remove_from_dependencies (bool): Remove from dependency lists.
245
"""
246
247
def requeue(self, at_front: bool = False) -> 'Job':
248
"""
249
Requeue job for execution.
250
251
Args:
252
at_front (bool): Add to front of queue.
253
254
Returns:
255
Job: The requeued job.
256
"""
257
258
def retry(self, queue: 'Queue', pipeline=None):
259
"""
260
Retry job execution.
261
262
Args:
263
queue (Queue): Queue to retry in.
264
pipeline: Redis pipeline.
265
"""
266
267
def get_retry_interval(self) -> int:
268
"""
269
Get retry interval for this job.
270
271
Returns:
272
int: Retry interval in seconds.
273
"""
274
```
275
276
### Standalone Job Functions
277
278
Module-level functions for job operations without requiring job instances.
279
280
```python { .api }
281
def get_current_job(connection=None, job_class=None) -> 'Job | None':
282
"""
283
Get the currently executing job within a worker context.
284
285
Args:
286
connection: Redis connection. Uses default if None.
287
job_class: Job class to use for deserialization.
288
289
Returns:
290
Job | None: Current job if in worker context, None otherwise.
291
"""
292
293
def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False):
294
"""
295
Cancel a job by its ID.
296
297
Args:
298
job_id (str): Job identifier to cancel.
299
connection: Redis connection.
300
serializer: Custom serializer.
301
enqueue_dependents (bool): Enqueue dependent jobs after cancellation.
302
303
Raises:
304
NoSuchJobError: If job doesn't exist.
305
"""
306
307
def requeue_job(job_id: str, connection, serializer=None) -> 'Job':
308
"""
309
Requeue a job by its ID.
310
311
Args:
312
job_id (str): Job identifier to requeue.
313
connection: Redis connection.
314
serializer: Custom serializer.
315
316
Returns:
317
Job: The requeued job.
318
319
Raises:
320
NoSuchJobError: If job doesn't exist.
321
"""
322
```
323
324
### Job Properties and Metadata
325
326
Access and modify job properties, metadata, and execution information.
327
328
```python { .api }
329
# Core Properties
330
@property
331
def id(self) -> str:
332
"""Job identifier."""
333
334
@property
335
def key(self) -> bytes:
336
"""Redis key for this job."""
337
338
@property
339
def func(self):
340
"""Function to execute."""
341
342
@property
343
def args(self) -> tuple:
344
"""Function positional arguments."""
345
346
@property
347
def kwargs(self) -> dict:
348
"""Function keyword arguments."""
349
350
@property
351
def description(self) -> str | None:
352
"""Job description."""
353
354
@property
355
def origin(self) -> str:
356
"""Queue where job originated."""
357
358
@property
359
def timeout(self) -> float | None:
360
"""Job timeout in seconds."""
361
362
@property
363
def result_ttl(self) -> int | None:
364
"""Result time-to-live in seconds."""
365
366
@property
367
def failure_ttl(self) -> int | None:
368
"""Failure info time-to-live in seconds."""
369
370
@property
371
def ttl(self) -> int | None:
372
"""Job time-to-live in seconds."""
373
374
# Timing Properties
375
@property
376
def enqueued_at(self) -> datetime | None:
377
"""When job was enqueued."""
378
379
@property
380
def started_at(self) -> datetime | None:
381
"""When job execution started."""
382
383
@property
384
def ended_at(self) -> datetime | None:
385
"""When job execution ended."""
386
387
@property
388
def last_heartbeat(self) -> datetime | None:
389
"""Last worker heartbeat timestamp."""
390
391
# Status Properties
392
@property
393
def is_finished(self) -> bool:
394
"""True if job finished successfully."""
395
396
@property
397
def is_queued(self) -> bool:
398
"""True if job is queued for execution."""
399
400
@property
401
def is_failed(self) -> bool:
402
"""True if job failed."""
403
404
@property
405
def is_started(self) -> bool:
406
"""True if job execution started."""
407
408
@property
409
def is_deferred(self) -> bool:
410
"""True if job is deferred (waiting for dependencies)."""
411
412
@property
413
def is_canceled(self) -> bool:
414
"""True if job was canceled."""
415
416
@property
417
def is_scheduled(self) -> bool:
418
"""True if job is scheduled for future execution."""
419
420
@property
421
def is_stopped(self) -> bool:
422
"""True if job was stopped."""
423
424
# Metadata Properties
425
@property
426
def meta(self) -> dict:
427
"""Job metadata dictionary."""
428
429
@property
430
def worker_name(self) -> str | None:
431
"""Name of worker that executed/is executing the job."""
432
433
@property
434
def group_id(self) -> str | None:
435
"""Job group identifier."""
436
437
def get_meta(self, refresh: bool = True) -> dict:
438
"""
439
Get job metadata.
440
441
Args:
442
refresh (bool): Refresh from Redis before returning.
443
444
Returns:
445
dict: Job metadata.
446
"""
447
448
def save_meta(self):
449
"""Save metadata to Redis."""
450
```
451
452
### Job Dependencies
453
454
Manage job dependencies and execution ordering.
455
456
```python { .api }
457
@property
458
def dependency(self) -> 'Job | None':
459
"""First job dependency."""
460
461
@property
462
def dependency_ids(self) -> list[bytes]:
463
"""List of dependency job keys."""
464
465
@property
466
def dependent_ids(self) -> list[str]:
467
"""List of dependent job IDs."""
468
469
def fetch_dependencies(self, watch: bool = False, pipeline=None) -> list['Job']:
470
"""
471
Fetch all job dependencies.
472
473
Args:
474
watch (bool): Watch dependencies for changes.
475
pipeline: Redis pipeline.
476
477
Returns:
478
list[Job]: List of dependency jobs.
479
"""
480
481
def register_dependency(self, pipeline=None):
482
"""
483
Register job dependencies in Redis.
484
485
Args:
486
pipeline: Redis pipeline for batched operations.
487
"""
488
489
def dependencies_are_met(
490
self,
491
parent_job: 'Job' = None,
492
pipeline=None,
493
exclude_job_id: str = None,
494
refresh_job_status: bool = True
495
) -> bool:
496
"""
497
Check if all job dependencies are satisfied.
498
499
Args:
500
parent_job (Job): Parent job context.
501
pipeline: Redis pipeline.
502
exclude_job_id (str): Job ID to exclude from check.
503
refresh_job_status (bool): Refresh dependency status from Redis.
504
505
Returns:
506
bool: True if dependencies are met, False otherwise.
507
"""
508
509
def delete_dependents(self, pipeline=None):
510
"""
511
Delete all dependent jobs.
512
513
Args:
514
pipeline: Redis pipeline for batched operations.
515
"""
516
```
517
518
## Usage Examples
519
520
### Basic Job Creation and Monitoring
521
522
```python
523
import redis
524
from rq import Job
525
526
# Connect to Redis
527
conn = redis.Redis()
528
529
# Define a job function
530
def process_data(data_id, options=None):
531
# Simulate processing
532
import time
533
time.sleep(2)
534
return f"Processed data {data_id}"
535
536
# Create a job
537
job = Job.create(
538
func=process_data,
539
args=('data_123',),
540
kwargs={'options': {'fast': True}},
541
connection=conn,
542
timeout=300,
543
description="Process data batch 123"
544
)
545
546
# Save job to Redis
547
job.save()
548
549
print(f"Created job: {job.id}")
550
print(f"Status: {job.get_status()}")
551
print(f"Description: {job.description}")
552
553
# Later, retrieve and check the job
554
retrieved_job = Job.fetch(job.id, connection=conn)
555
print(f"Retrieved job status: {retrieved_job.get_status()}")
556
```
557
558
### Job Metadata and Results
559
560
```python
561
from rq import Job, get_current_job
562
import redis
563
564
conn = redis.Redis()
565
566
def tracked_function(item_count):
567
# Get current job to update metadata
568
job = get_current_job()
569
570
if job:
571
job.meta['progress'] = 0
572
job.save_meta()
573
574
# Simulate processing with progress updates
575
for i in range(item_count):
576
# Do work...
577
578
if job:
579
job.meta['progress'] = (i + 1) / item_count * 100
580
job.meta['current_item'] = i + 1
581
job.save_meta()
582
583
return f"Completed processing {item_count} items"
584
585
# Create job with initial metadata
586
job = Job.create(
587
func=tracked_function,
588
args=(100,),
589
connection=conn,
590
meta={'stage': 'queued', 'priority': 'high'}
591
)
592
593
job.save()
594
595
# Monitor progress (from another process/thread)
596
while not job.is_finished and not job.is_failed:
597
job.refresh()
598
meta = job.get_meta()
599
print(f"Progress: {meta.get('progress', 0)}%")
600
time.sleep(1)
601
602
# Get final result
603
if job.is_finished:
604
print(f"Result: {job.return_value()}")
605
elif job.is_failed:
606
print("Job failed")
607
```
608
609
### Job Cancellation and Requeuing
610
611
```python
612
from rq import Job, cancel_job, requeue_job
613
import redis
614
615
conn = redis.Redis()
616
617
# Create a long-running job
618
def long_task():
619
import time
620
time.sleep(60)
621
return "Done"
622
623
job = Job.create(func=long_task, connection=conn)
624
job.save()
625
626
# Cancel the job
627
cancel_job(job.id, connection=conn)
628
print(f"Job {job.id} canceled")
629
630
# Create another job and then requeue it
631
job2 = Job.create(func=long_task, connection=conn)
632
job2.save()
633
634
# Requeue (useful after failures or for retrying)
635
requeued_job = requeue_job(job2.id, connection=conn)
636
print(f"Job requeued: {requeued_job.id}")
637
```