0
# Job Patterns
1
2
Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies. These patterns enable sophisticated job processing workflows with automatic recovery, scheduled repetition, and event-driven processing.
3
4
## Capabilities
5
6
### Job Callbacks
7
8
Execute callback functions on job completion, failure, or termination.
9
10
```python { .api }
11
class Callback:
12
def __init__(self, func, timeout: int = None):
13
"""
14
Initialize a callback.
15
16
Args:
17
func: Callback function or function reference string.
18
timeout (int): Callback timeout in seconds. Defaults to CALLBACK_TIMEOUT (60).
19
"""
20
21
@property
22
def name(self) -> str:
23
"""Function name or path for the callback."""
24
25
@property
26
def func(self):
27
"""Callback function or function reference."""
28
29
@property
30
def timeout(self) -> int:
31
"""Callback timeout in seconds."""
32
```
33
34
### Success Callbacks
35
36
Handle successful job completion with result processing.
37
38
```python { .api }
39
def success_callback_example(job: 'Job', connection, result):
40
"""
41
Example success callback function signature.
42
43
Args:
44
job (Job): The completed job.
45
connection: Redis connection.
46
result: Job return value.
47
48
Returns:
49
Any: Callback return value (optional).
50
"""
51
52
# Usage in job creation
53
def process_data(data):
54
return f"Processed {len(data)} items"
55
56
def on_success(job, connection, result):
57
print(f"Job {job.id} completed successfully: {result}")
58
# Could send notifications, update databases, etc.
59
60
job = queue.enqueue(
61
process_data,
62
['item1', 'item2', 'item3'],
63
on_success=Callback(on_success, timeout=30)
64
)
65
```
66
67
### Failure Callbacks
68
69
Handle job failures with error information and recovery logic.
70
71
```python { .api }
72
def failure_callback_example(job: 'Job', connection, exc_type, exc_value, traceback):
73
"""
74
Example failure callback function signature.
75
76
Args:
77
job (Job): The failed job.
78
connection: Redis connection.
79
exc_type: Exception type.
80
exc_value: Exception instance.
81
traceback: Exception traceback.
82
83
Returns:
84
Any: Callback return value (optional).
85
"""
86
87
# Usage in job creation
88
def risky_operation(data):
89
if not data:
90
raise ValueError("No data provided")
91
return f"Processed {data}"
92
93
def on_failure(job, connection, exc_type, exc_value, traceback):
94
error_msg = f"Job {job.id} failed: {exc_value}"
95
print(error_msg)
96
# Could log to external service, send alerts, etc.
97
98
# Optionally requeue with different parameters
99
if isinstance(exc_value, ValueError):
100
# Requeue with default data
101
queue.enqueue(risky_operation, "default_data")
102
103
job = queue.enqueue(
104
risky_operation,
105
None, # This will cause failure
106
on_failure=Callback(on_failure)
107
)
108
```
109
110
### Stopped Callbacks
111
112
Handle jobs that are stopped or interrupted during execution.
113
114
```python { .api }
115
def stopped_callback_example(job: 'Job', connection):
116
"""
117
Example stopped callback function signature.
118
119
Args:
120
job (Job): The stopped job.
121
connection: Redis connection.
122
123
Returns:
124
Any: Callback return value (optional).
125
"""
126
127
# Usage in job creation
128
def long_running_task():
129
import time
130
for i in range(100):
131
time.sleep(1) # Could be interrupted
132
return "Completed"
133
134
def on_stopped(job, connection):
135
print(f"Job {job.id} was stopped before completion")
136
# Could clean up resources, send notifications, etc.
137
138
job = queue.enqueue(
139
long_running_task,
140
on_stopped=Callback(on_stopped)
141
)
142
```
143
144
### Job Retry Patterns
145
146
Configure automatic retry behavior for failed jobs.
147
148
```python { .api }
149
class Retry:
150
def __init__(self, max: int, interval: int | list[int] = 0):
151
"""
152
Initialize retry configuration.
153
154
Args:
155
max (int): Maximum number of retry attempts.
156
interval (int | list[int]): Retry interval(s) in seconds.
157
- int: Fixed interval between retries.
158
- list[int]: Sequence of intervals for each retry attempt.
159
"""
160
161
@property
162
def max(self) -> int:
163
"""Maximum retry attempts."""
164
165
@property
166
def intervals(self) -> list[int]:
167
"""Retry interval sequence."""
168
169
@classmethod
170
def get_interval(cls, count: int, intervals: list[int] | None) -> int:
171
"""
172
Get retry interval for attempt count.
173
174
Args:
175
count (int): Current retry attempt number (0-based).
176
intervals (list[int] | None): Configured intervals.
177
178
Returns:
179
int: Interval in seconds for this retry attempt.
180
"""
181
```
182
183
### Job Repetition Patterns
184
185
Schedule jobs to repeat automatically with configurable intervals.
186
187
```python { .api }
188
class Repeat:
189
def __init__(self, times: int, interval: int | list[int] = 0):
190
"""
191
Initialize repeat configuration.
192
193
Args:
194
times (int): Number of times to repeat the job.
195
interval (int | list[int]): Interval(s) between repetitions in seconds.
196
- int: Fixed interval between repetitions.
197
- list[int]: Sequence of intervals for each repetition.
198
"""
199
200
@property
201
def times(self) -> int:
202
"""Number of repetitions."""
203
204
@property
205
def intervals(self) -> list[int]:
206
"""Repeat interval sequence."""
207
208
@classmethod
209
def get_interval(cls, count: int, intervals: list[int]) -> int:
210
"""
211
Get repeat interval for repetition count.
212
213
Args:
214
count (int): Current repetition number (0-based).
215
intervals (list[int]): Configured intervals.
216
217
Returns:
218
int: Interval in seconds for this repetition.
219
"""
220
221
@classmethod
222
def schedule(cls, job: 'Job', queue: 'Queue', pipeline=None):
223
"""
224
Schedule the next repetition of a job.
225
226
Args:
227
job (Job): Job to repeat.
228
queue (Queue): Queue to schedule in.
229
pipeline: Redis pipeline for batched operations.
230
"""
231
```
232
233
### Job Dependencies
234
235
Create dependencies between jobs for workflow orchestration.
236
237
```python { .api }
238
class Dependency:
239
def __init__(
240
self,
241
jobs,
242
allow_failure: bool = False,
243
enqueue_at_front: bool = False
244
):
245
"""
246
Initialize job dependency.
247
248
Args:
249
jobs: Job instances, job IDs, or sequence of jobs/IDs.
250
allow_failure (bool): Allow dependent job to run even if dependencies fail.
251
enqueue_at_front (bool): Enqueue dependent job at front of queue.
252
"""
253
254
@property
255
def dependencies(self):
256
"""Sequence of dependency jobs or job IDs."""
257
258
@property
259
def allow_failure(self) -> bool:
260
"""Whether to allow dependency failures."""
261
262
@property
263
def enqueue_at_front(self) -> bool:
264
"""Whether to enqueue at front of queue."""
265
```
266
267
## Usage Examples
268
269
### Basic Callback Usage
270
271
```python
272
from rq import Queue, Callback
273
import redis
274
275
conn = redis.Redis()
276
queue = Queue(connection=conn)
277
278
def process_order(order_id):
279
# Simulate order processing
280
import time
281
time.sleep(2)
282
return f"Order {order_id} processed successfully"
283
284
def send_confirmation_email(job, connection, result):
285
"""Success callback: send confirmation email."""
286
order_id = job.args[0]
287
print(f"Sending confirmation email for order {order_id}: {result}")
288
# In real app: send actual email
289
290
def handle_order_failure(job, connection, exc_type, exc_value, traceback):
291
"""Failure callback: handle processing failure."""
292
order_id = job.args[0]
293
print(f"Order {order_id} processing failed: {exc_value}")
294
# In real app: notify customer service, log error, etc.
295
296
# Enqueue job with callbacks
297
job = queue.enqueue(
298
process_order,
299
"ORD-12345",
300
on_success=Callback(send_confirmation_email),
301
on_failure=Callback(handle_order_failure),
302
description="Process customer order"
303
)
304
305
print(f"Enqueued order processing job: {job.id}")
306
```
307
308
### Retry Patterns
309
310
```python
311
from rq import Queue, Retry
312
import redis
313
import random
314
315
conn = redis.Redis()
316
queue = Queue(connection=conn)
317
318
def unreliable_api_call(endpoint):
319
"""Simulates an unreliable API that fails sometimes."""
320
if random.random() < 0.7: # 70% failure rate
321
raise ConnectionError(f"Failed to connect to {endpoint}")
322
return f"Successfully called {endpoint}"
323
324
# Simple retry: 3 attempts with 5 second intervals
325
simple_retry = Retry(max=3, interval=5)
326
327
job1 = queue.enqueue(
328
unreliable_api_call,
329
"/api/users",
330
retry=simple_retry,
331
description="API call with simple retry"
332
)
333
334
# Exponential backoff: increasing intervals
335
exponential_retry = Retry(max=4, interval=[1, 2, 4, 8]) # 1s, 2s, 4s, 8s
336
337
job2 = queue.enqueue(
338
unreliable_api_call,
339
"/api/orders",
340
retry=exponential_retry,
341
description="API call with exponential backoff"
342
)
343
344
# Custom retry intervals
345
custom_retry = Retry(max=3, interval=[10, 30, 60]) # 10s, 30s, 1min
346
347
job3 = queue.enqueue(
348
unreliable_api_call,
349
"/api/payments",
350
retry=custom_retry,
351
description="API call with custom intervals"
352
)
353
354
print("Enqueued jobs with different retry strategies")
355
```
356
357
### Repetition Patterns
358
359
```python
360
from rq import Queue, Repeat
361
import redis
362
from datetime import datetime
363
364
conn = redis.Redis()
365
queue = Queue(connection=conn)
366
367
def generate_report(report_type):
368
"""Generate a periodic report."""
369
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
370
print(f"Generating {report_type} report at {timestamp}")
371
return f"{report_type} report generated at {timestamp}"
372
373
def cleanup_temp_files():
374
"""Clean up temporary files."""
375
import os
376
temp_count = len([f for f in os.listdir('/tmp') if f.startswith('temp_')])
377
print(f"Cleaned up {temp_count} temporary files")
378
return f"Cleaned {temp_count} files"
379
380
# Repeat 5 times every hour (3600 seconds)
381
hourly_repeat = Repeat(times=5, interval=3600)
382
383
report_job = queue.enqueue(
384
generate_report,
385
"sales_summary",
386
repeat=hourly_repeat,
387
description="Hourly sales report"
388
)
389
390
# Repeat with increasing intervals (daily, then weekly)
391
increasing_repeat = Repeat(times=3, interval=[86400, 604800, 604800]) # 1 day, 1 week, 1 week
392
393
cleanup_job = queue.enqueue(
394
cleanup_temp_files,
395
repeat=increasing_repeat,
396
description="Progressive cleanup schedule"
397
)
398
399
print(f"Scheduled repeating jobs: {report_job.id}, {cleanup_job.id}")
400
```
401
402
### Job Dependencies
403
404
```python
405
from rq import Queue, Job
406
import redis
407
408
conn = redis.Redis()
409
queue = Queue(connection=conn)
410
411
def download_data(source):
412
"""Download data from source."""
413
import time
414
time.sleep(2)
415
print(f"Downloaded data from {source}")
416
return f"data_{source}.csv"
417
418
def validate_data(filename):
419
"""Validate downloaded data."""
420
import time
421
time.sleep(1)
422
print(f"Validated {filename}")
423
return f"validated_{filename}"
424
425
def process_data(filename):
426
"""Process validated data."""
427
import time
428
time.sleep(3)
429
print(f"Processed {filename}")
430
return f"results_{filename}"
431
432
def generate_report(filenames):
433
"""Generate report from processed data."""
434
print(f"Generated report from {len(filenames)} files")
435
return f"report_from_{len(filenames)}_sources.pdf"
436
437
# Create initial download jobs
438
download1 = queue.enqueue(download_data, "api_source_1")
439
download2 = queue.enqueue(download_data, "api_source_2")
440
download3 = queue.enqueue(download_data, "database_dump")
441
442
# Create validation jobs that depend on downloads
443
validate1 = queue.enqueue(
444
validate_data,
445
depends_on=download1,
446
description="Validate API source 1 data"
447
)
448
449
validate2 = queue.enqueue(
450
validate_data,
451
depends_on=download2,
452
description="Validate API source 2 data"
453
)
454
455
validate3 = queue.enqueue(
456
validate_data,
457
depends_on=download3,
458
description="Validate database dump"
459
)
460
461
# Create processing jobs that depend on validation
462
process1 = queue.enqueue(process_data, depends_on=validate1)
463
process2 = queue.enqueue(process_data, depends_on=validate2)
464
process3 = queue.enqueue(process_data, depends_on=validate3)
465
466
# Create final report job that depends on all processing
467
report_job = queue.enqueue(
468
generate_report,
469
depends_on=[process1, process2, process3],
470
description="Generate final report"
471
)
472
473
print("Created dependency chain:")
474
print(f"Downloads: {download1.id}, {download2.id}, {download3.id}")
475
print(f"Validations: {validate1.id}, {validate2.id}, {validate3.id}")
476
print(f"Processing: {process1.id}, {process2.id}, {process3.id}")
477
print(f"Report: {report_job.id}")
478
```
479
480
### Complex Workflow Example
481
482
```python
483
from rq import Queue, Job, Retry, Repeat, Callback
484
import redis
485
486
conn = redis.Redis()
487
queue = Queue(connection=conn)
488
489
# Define workflow functions
490
def extract_data(source_id):
491
"""First step: extract data."""
492
if source_id == "unreliable_source":
493
import random
494
if random.random() < 0.3: # 30% failure rate
495
raise ConnectionError("Source temporarily unavailable")
496
return f"extracted_data_{source_id}"
497
498
def transform_data(data):
499
"""Second step: transform data."""
500
return f"transformed_{data}"
501
502
def load_data(data):
503
"""Third step: load data."""
504
return f"loaded_{data}"
505
506
def notify_completion(job, connection, result):
507
"""Success callback: notify stakeholders."""
508
print(f"Data pipeline completed successfully: {result}")
509
510
def handle_failure(job, connection, exc_type, exc_value, tb):
511
"""Failure callback: handle pipeline failures."""
512
print(f"Pipeline step failed: {exc_value}")
513
# Could trigger alternative workflow
514
515
def periodic_health_check():
516
"""Periodic system health check."""
517
import random
518
health_score = random.randint(70, 100)
519
print(f"System health: {health_score}%")
520
return health_score
521
522
# Create ETL pipeline with error handling
523
extract_job = queue.enqueue(
524
extract_data,
525
"unreliable_source",
526
retry=Retry(max=3, interval=[5, 15, 30]), # Retry with backoff
527
on_failure=Callback(handle_failure),
528
description="Extract data from unreliable source"
529
)
530
531
transform_job = queue.enqueue(
532
transform_data,
533
depends_on=extract_job,
534
on_failure=Callback(handle_failure),
535
description="Transform extracted data"
536
)
537
538
load_job = queue.enqueue(
539
load_data,
540
depends_on=transform_job,
541
on_success=Callback(notify_completion),
542
on_failure=Callback(handle_failure),
543
description="Load transformed data"
544
)
545
546
# Schedule periodic health checks
547
health_check_job = queue.enqueue(
548
periodic_health_check,
549
repeat=Repeat(times=24, interval=3600), # Every hour for 24 hours
550
description="Hourly system health check"
551
)
552
553
print("Complex workflow created:")
554
print(f"ETL Pipeline: {extract_job.id} -> {transform_job.id} -> {load_job.id}")
555
print(f"Health Check: {health_check_job.id} (repeating)")
556
```
557
558
### Advanced Callback Patterns
559
560
```python
561
from rq import Queue, Job, Callback
562
import redis
563
import json
564
565
conn = redis.Redis()
566
queue = Queue(connection=conn)
567
568
def audit_job_completion(job, connection, result):
569
"""Audit callback: log job completion details."""
570
audit_data = {
571
'job_id': job.id,
572
'function': job.func_name,
573
'args': job.args,
574
'kwargs': job.kwargs,
575
'result': str(result)[:100], # Truncate long results
576
'duration': (job.ended_at - job.started_at).total_seconds(),
577
'worker': job.worker_name
578
}
579
580
# In real app: send to audit service
581
print(f"AUDIT: {json.dumps(audit_data, indent=2)}")
582
583
def cascade_failure_handler(job, connection, exc_type, exc_value, tb):
584
"""Handle failures with cascading cleanup."""
585
print(f"Job {job.id} failed, starting cleanup cascade")
586
587
# Cancel related jobs
588
if hasattr(job, 'meta') and 'related_jobs' in job.meta:
589
for related_job_id in job.meta['related_jobs']:
590
try:
591
related_job = Job.fetch(related_job_id, connection=connection)
592
if related_job.get_status() in ['queued', 'started']:
593
related_job.cancel()
594
print(f"Cancelled related job: {related_job_id}")
595
except:
596
pass
597
598
def business_logic_processor(data_type, data):
599
"""Business logic with metadata."""
600
current_job = Job.get_current_job()
601
if current_job:
602
current_job.meta['processing_stage'] = 'started'
603
current_job.save_meta()
604
605
# Simulate processing
606
import time
607
time.sleep(2)
608
609
if current_job:
610
current_job.meta['processing_stage'] = 'completed'
611
current_job.save_meta()
612
613
return f"Processed {data_type}: {data}"
614
615
# Create jobs with advanced callback patterns
616
main_job = queue.enqueue(
617
business_logic_processor,
618
"customer_data",
619
{"records": 1000},
620
on_success=Callback(audit_job_completion, timeout=30),
621
on_failure=Callback(cascade_failure_handler, timeout=60),
622
meta={'related_jobs': [], 'priority': 'high'},
623
description="Main business logic processing"
624
)
625
626
# Create related jobs
627
related_jobs = []
628
for i in range(3):
629
related_job = queue.enqueue(
630
business_logic_processor,
631
f"related_data_{i}",
632
{"records": 100},
633
depends_on=main_job,
634
on_success=Callback(audit_job_completion),
635
description=f"Related processing {i}"
636
)
637
related_jobs.append(related_job.id)
638
639
# Update main job metadata with related job IDs
640
main_job.meta['related_jobs'] = related_jobs
641
main_job.save_meta()
642
643
print(f"Created job chain with advanced callbacks: {main_job.id}")
644
print(f"Related jobs: {related_jobs}")
645
```