or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md

job-patterns.mddocs/

0

# Job Patterns

1

2

Advanced job patterns including callbacks, retries, repetition, and dependencies for complex workflow orchestration and error handling strategies. These patterns enable sophisticated job processing workflows with automatic recovery, scheduled repetition, and event-driven processing.

3

4

## Capabilities

5

6

### Job Callbacks

7

8

Execute callback functions on job completion, failure, or termination.

9

10

```python { .api }

11

class Callback:

12

def __init__(self, func, timeout: int = None):

13

"""

14

Initialize a callback.

15

16

Args:

17

func: Callback function or function reference string.

18

timeout (int): Callback timeout in seconds. Defaults to CALLBACK_TIMEOUT (60).

19

"""

20

21

@property

22

def name(self) -> str:

23

"""Function name or path for the callback."""

24

25

@property

26

def func(self):

27

"""Callback function or function reference."""

28

29

@property

30

def timeout(self) -> int:

31

"""Callback timeout in seconds."""

32

```

33

34

### Success Callbacks

35

36

Handle successful job completion with result processing.

37

38

```python { .api }

39

def success_callback_example(job: 'Job', connection, result):

40

"""

41

Example success callback function signature.

42

43

Args:

44

job (Job): The completed job.

45

connection: Redis connection.

46

result: Job return value.

47

48

Returns:

49

Any: Callback return value (optional).

50

"""

51

52

# Usage in job creation

53

def process_data(data):

54

return f"Processed {len(data)} items"

55

56

def on_success(job, connection, result):

57

print(f"Job {job.id} completed successfully: {result}")

58

# Could send notifications, update databases, etc.

59

60

job = queue.enqueue(

61

process_data,

62

['item1', 'item2', 'item3'],

63

on_success=Callback(on_success, timeout=30)

64

)

65

```

66

67

### Failure Callbacks

68

69

Handle job failures with error information and recovery logic.

70

71

```python { .api }

72

def failure_callback_example(job: 'Job', connection, exc_type, exc_value, traceback):

73

"""

74

Example failure callback function signature.

75

76

Args:

77

job (Job): The failed job.

78

connection: Redis connection.

79

exc_type: Exception type.

80

exc_value: Exception instance.

81

traceback: Exception traceback.

82

83

Returns:

84

Any: Callback return value (optional).

85

"""

86

87

# Usage in job creation

88

def risky_operation(data):

89

if not data:

90

raise ValueError("No data provided")

91

return f"Processed {data}"

92

93

def on_failure(job, connection, exc_type, exc_value, traceback):

94

error_msg = f"Job {job.id} failed: {exc_value}"

95

print(error_msg)

96

# Could log to external service, send alerts, etc.

97

98

# Optionally requeue with different parameters

99

if isinstance(exc_value, ValueError):

100

# Requeue with default data

101

queue.enqueue(risky_operation, "default_data")

102

103

job = queue.enqueue(

104

risky_operation,

105

None, # This will cause failure

106

on_failure=Callback(on_failure)

107

)

108

```

109

110

### Stopped Callbacks

111

112

Handle jobs that are stopped or interrupted during execution.

113

114

```python { .api }

115

def stopped_callback_example(job: 'Job', connection):

116

"""

117

Example stopped callback function signature.

118

119

Args:

120

job (Job): The stopped job.

121

connection: Redis connection.

122

123

Returns:

124

Any: Callback return value (optional).

125

"""

126

127

# Usage in job creation

128

def long_running_task():

129

import time

130

for i in range(100):

131

time.sleep(1) # Could be interrupted

132

return "Completed"

133

134

def on_stopped(job, connection):

135

print(f"Job {job.id} was stopped before completion")

136

# Could clean up resources, send notifications, etc.

137

138

job = queue.enqueue(

139

long_running_task,

140

on_stopped=Callback(on_stopped)

141

)

142

```

143

144

### Job Retry Patterns

145

146

Configure automatic retry behavior for failed jobs.

147

148

```python { .api }

149

class Retry:

150

def __init__(self, max: int, interval: int | list[int] = 0):

151

"""

152

Initialize retry configuration.

153

154

Args:

155

max (int): Maximum number of retry attempts.

156

interval (int | list[int]): Retry interval(s) in seconds.

157

- int: Fixed interval between retries.

158

- list[int]: Sequence of intervals for each retry attempt.

159

"""

160

161

@property

162

def max(self) -> int:

163

"""Maximum retry attempts."""

164

165

@property

166

def intervals(self) -> list[int]:

167

"""Retry interval sequence."""

168

169

@classmethod

170

def get_interval(cls, count: int, intervals: list[int] | None) -> int:

171

"""

172

Get retry interval for attempt count.

173

174

Args:

175

count (int): Current retry attempt number (0-based).

176

intervals (list[int] | None): Configured intervals.

177

178

Returns:

179

int: Interval in seconds for this retry attempt.

180

"""

181

```

182

183

### Job Repetition Patterns

184

185

Schedule jobs to repeat automatically with configurable intervals.

186

187

```python { .api }

188

class Repeat:

189

def __init__(self, times: int, interval: int | list[int] = 0):

190

"""

191

Initialize repeat configuration.

192

193

Args:

194

times (int): Number of times to repeat the job.

195

interval (int | list[int]): Interval(s) between repetitions in seconds.

196

- int: Fixed interval between repetitions.

197

- list[int]: Sequence of intervals for each repetition.

198

"""

199

200

@property

201

def times(self) -> int:

202

"""Number of repetitions."""

203

204

@property

205

def intervals(self) -> list[int]:

206

"""Repeat interval sequence."""

207

208

@classmethod

209

def get_interval(cls, count: int, intervals: list[int]) -> int:

210

"""

211

Get repeat interval for repetition count.

212

213

Args:

214

count (int): Current repetition number (0-based).

215

intervals (list[int]): Configured intervals.

216

217

Returns:

218

int: Interval in seconds for this repetition.

219

"""

220

221

@classmethod

222

def schedule(cls, job: 'Job', queue: 'Queue', pipeline=None):

223

"""

224

Schedule the next repetition of a job.

225

226

Args:

227

job (Job): Job to repeat.

228

queue (Queue): Queue to schedule in.

229

pipeline: Redis pipeline for batched operations.

230

"""

231

```

232

233

### Job Dependencies

234

235

Create dependencies between jobs for workflow orchestration.

236

237

```python { .api }

238

class Dependency:

239

def __init__(

240

self,

241

jobs,

242

allow_failure: bool = False,

243

enqueue_at_front: bool = False

244

):

245

"""

246

Initialize job dependency.

247

248

Args:

249

jobs: Job instances, job IDs, or sequence of jobs/IDs.

250

allow_failure (bool): Allow dependent job to run even if dependencies fail.

251

enqueue_at_front (bool): Enqueue dependent job at front of queue.

252

"""

253

254

@property

255

def dependencies(self):

256

"""Sequence of dependency jobs or job IDs."""

257

258

@property

259

def allow_failure(self) -> bool:

260

"""Whether to allow dependency failures."""

261

262

@property

263

def enqueue_at_front(self) -> bool:

264

"""Whether to enqueue at front of queue."""

265

```

266

267

## Usage Examples

268

269

### Basic Callback Usage

270

271

```python

272

from rq import Queue, Callback

273

import redis

274

275

conn = redis.Redis()

276

queue = Queue(connection=conn)

277

278

def process_order(order_id):

279

# Simulate order processing

280

import time

281

time.sleep(2)

282

return f"Order {order_id} processed successfully"

283

284

def send_confirmation_email(job, connection, result):

285

"""Success callback: send confirmation email."""

286

order_id = job.args[0]

287

print(f"Sending confirmation email for order {order_id}: {result}")

288

# In real app: send actual email

289

290

def handle_order_failure(job, connection, exc_type, exc_value, traceback):

291

"""Failure callback: handle processing failure."""

292

order_id = job.args[0]

293

print(f"Order {order_id} processing failed: {exc_value}")

294

# In real app: notify customer service, log error, etc.

295

296

# Enqueue job with callbacks

297

job = queue.enqueue(

298

process_order,

299

"ORD-12345",

300

on_success=Callback(send_confirmation_email),

301

on_failure=Callback(handle_order_failure),

302

description="Process customer order"

303

)

304

305

print(f"Enqueued order processing job: {job.id}")

306

```

307

308

### Retry Patterns

309

310

```python

311

from rq import Queue, Retry

312

import redis

313

import random

314

315

conn = redis.Redis()

316

queue = Queue(connection=conn)

317

318

def unreliable_api_call(endpoint):

319

"""Simulates an unreliable API that fails sometimes."""

320

if random.random() < 0.7: # 70% failure rate

321

raise ConnectionError(f"Failed to connect to {endpoint}")

322

return f"Successfully called {endpoint}"

323

324

# Simple retry: 3 attempts with 5 second intervals

325

simple_retry = Retry(max=3, interval=5)

326

327

job1 = queue.enqueue(

328

unreliable_api_call,

329

"/api/users",

330

retry=simple_retry,

331

description="API call with simple retry"

332

)

333

334

# Exponential backoff: increasing intervals

335

exponential_retry = Retry(max=4, interval=[1, 2, 4, 8]) # 1s, 2s, 4s, 8s

336

337

job2 = queue.enqueue(

338

unreliable_api_call,

339

"/api/orders",

340

retry=exponential_retry,

341

description="API call with exponential backoff"

342

)

343

344

# Custom retry intervals

345

custom_retry = Retry(max=3, interval=[10, 30, 60]) # 10s, 30s, 1min

346

347

job3 = queue.enqueue(

348

unreliable_api_call,

349

"/api/payments",

350

retry=custom_retry,

351

description="API call with custom intervals"

352

)

353

354

print("Enqueued jobs with different retry strategies")

355

```

356

357

### Repetition Patterns

358

359

```python

360

from rq import Queue, Repeat

361

import redis

362

from datetime import datetime

363

364

conn = redis.Redis()

365

queue = Queue(connection=conn)

366

367

def generate_report(report_type):

368

"""Generate a periodic report."""

369

timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

370

print(f"Generating {report_type} report at {timestamp}")

371

return f"{report_type} report generated at {timestamp}"

372

373

def cleanup_temp_files():

374

"""Clean up temporary files."""

375

import os

376

temp_count = len([f for f in os.listdir('/tmp') if f.startswith('temp_')])

377

print(f"Cleaned up {temp_count} temporary files")

378

return f"Cleaned {temp_count} files"

379

380

# Repeat 5 times every hour (3600 seconds)

381

hourly_repeat = Repeat(times=5, interval=3600)

382

383

report_job = queue.enqueue(

384

generate_report,

385

"sales_summary",

386

repeat=hourly_repeat,

387

description="Hourly sales report"

388

)

389

390

# Repeat with increasing intervals (daily, then weekly)

391

increasing_repeat = Repeat(times=3, interval=[86400, 604800, 604800]) # 1 day, 1 week, 1 week

392

393

cleanup_job = queue.enqueue(

394

cleanup_temp_files,

395

repeat=increasing_repeat,

396

description="Progressive cleanup schedule"

397

)

398

399

print(f"Scheduled repeating jobs: {report_job.id}, {cleanup_job.id}")

400

```

401

402

### Job Dependencies

403

404

```python

405

from rq import Queue, Job

406

import redis

407

408

conn = redis.Redis()

409

queue = Queue(connection=conn)

410

411

def download_data(source):

412

"""Download data from source."""

413

import time

414

time.sleep(2)

415

print(f"Downloaded data from {source}")

416

return f"data_{source}.csv"

417

418

def validate_data(filename):

419

"""Validate downloaded data."""

420

import time

421

time.sleep(1)

422

print(f"Validated {filename}")

423

return f"validated_{filename}"

424

425

def process_data(filename):

426

"""Process validated data."""

427

import time

428

time.sleep(3)

429

print(f"Processed {filename}")

430

return f"results_{filename}"

431

432

def generate_report(filenames):

433

"""Generate report from processed data."""

434

print(f"Generated report from {len(filenames)} files")

435

return f"report_from_{len(filenames)}_sources.pdf"

436

437

# Create initial download jobs

438

download1 = queue.enqueue(download_data, "api_source_1")

439

download2 = queue.enqueue(download_data, "api_source_2")

440

download3 = queue.enqueue(download_data, "database_dump")

441

442

# Create validation jobs that depend on downloads

443

validate1 = queue.enqueue(

444

validate_data,

445

depends_on=download1,

446

description="Validate API source 1 data"

447

)

448

449

validate2 = queue.enqueue(

450

validate_data,

451

depends_on=download2,

452

description="Validate API source 2 data"

453

)

454

455

validate3 = queue.enqueue(

456

validate_data,

457

depends_on=download3,

458

description="Validate database dump"

459

)

460

461

# Create processing jobs that depend on validation

462

process1 = queue.enqueue(process_data, depends_on=validate1)

463

process2 = queue.enqueue(process_data, depends_on=validate2)

464

process3 = queue.enqueue(process_data, depends_on=validate3)

465

466

# Create final report job that depends on all processing

467

report_job = queue.enqueue(

468

generate_report,

469

depends_on=[process1, process2, process3],

470

description="Generate final report"

471

)

472

473

print("Created dependency chain:")

474

print(f"Downloads: {download1.id}, {download2.id}, {download3.id}")

475

print(f"Validations: {validate1.id}, {validate2.id}, {validate3.id}")

476

print(f"Processing: {process1.id}, {process2.id}, {process3.id}")

477

print(f"Report: {report_job.id}")

478

```

479

480

### Complex Workflow Example

481

482

```python

483

from rq import Queue, Job, Retry, Repeat, Callback

484

import redis

485

486

conn = redis.Redis()

487

queue = Queue(connection=conn)

488

489

# Define workflow functions

490

def extract_data(source_id):

491

"""First step: extract data."""

492

if source_id == "unreliable_source":

493

import random

494

if random.random() < 0.3: # 30% failure rate

495

raise ConnectionError("Source temporarily unavailable")

496

return f"extracted_data_{source_id}"

497

498

def transform_data(data):

499

"""Second step: transform data."""

500

return f"transformed_{data}"

501

502

def load_data(data):

503

"""Third step: load data."""

504

return f"loaded_{data}"

505

506

def notify_completion(job, connection, result):

507

"""Success callback: notify stakeholders."""

508

print(f"Data pipeline completed successfully: {result}")

509

510

def handle_failure(job, connection, exc_type, exc_value, tb):

511

"""Failure callback: handle pipeline failures."""

512

print(f"Pipeline step failed: {exc_value}")

513

# Could trigger alternative workflow

514

515

def periodic_health_check():

516

"""Periodic system health check."""

517

import random

518

health_score = random.randint(70, 100)

519

print(f"System health: {health_score}%")

520

return health_score

521

522

# Create ETL pipeline with error handling

523

extract_job = queue.enqueue(

524

extract_data,

525

"unreliable_source",

526

retry=Retry(max=3, interval=[5, 15, 30]), # Retry with backoff

527

on_failure=Callback(handle_failure),

528

description="Extract data from unreliable source"

529

)

530

531

transform_job = queue.enqueue(

532

transform_data,

533

depends_on=extract_job,

534

on_failure=Callback(handle_failure),

535

description="Transform extracted data"

536

)

537

538

load_job = queue.enqueue(

539

load_data,

540

depends_on=transform_job,

541

on_success=Callback(notify_completion),

542

on_failure=Callback(handle_failure),

543

description="Load transformed data"

544

)

545

546

# Schedule periodic health checks

547

health_check_job = queue.enqueue(

548

periodic_health_check,

549

repeat=Repeat(times=24, interval=3600), # Every hour for 24 hours

550

description="Hourly system health check"

551

)

552

553

print("Complex workflow created:")

554

print(f"ETL Pipeline: {extract_job.id} -> {transform_job.id} -> {load_job.id}")

555

print(f"Health Check: {health_check_job.id} (repeating)")

556

```

557

558

### Advanced Callback Patterns

559

560

```python

561

from rq import Queue, Job, Callback

562

import redis

563

import json

564

565

conn = redis.Redis()

566

queue = Queue(connection=conn)

567

568

def audit_job_completion(job, connection, result):

569

"""Audit callback: log job completion details."""

570

audit_data = {

571

'job_id': job.id,

572

'function': job.func_name,

573

'args': job.args,

574

'kwargs': job.kwargs,

575

'result': str(result)[:100], # Truncate long results

576

'duration': (job.ended_at - job.started_at).total_seconds(),

577

'worker': job.worker_name

578

}

579

580

# In real app: send to audit service

581

print(f"AUDIT: {json.dumps(audit_data, indent=2)}")

582

583

def cascade_failure_handler(job, connection, exc_type, exc_value, tb):

584

"""Handle failures with cascading cleanup."""

585

print(f"Job {job.id} failed, starting cleanup cascade")

586

587

# Cancel related jobs

588

if hasattr(job, 'meta') and 'related_jobs' in job.meta:

589

for related_job_id in job.meta['related_jobs']:

590

try:

591

related_job = Job.fetch(related_job_id, connection=connection)

592

if related_job.get_status() in ['queued', 'started']:

593

related_job.cancel()

594

print(f"Cancelled related job: {related_job_id}")

595

except:

596

pass

597

598

def business_logic_processor(data_type, data):

599

"""Business logic with metadata."""

600

current_job = Job.get_current_job()

601

if current_job:

602

current_job.meta['processing_stage'] = 'started'

603

current_job.save_meta()

604

605

# Simulate processing

606

import time

607

time.sleep(2)

608

609

if current_job:

610

current_job.meta['processing_stage'] = 'completed'

611

current_job.save_meta()

612

613

return f"Processed {data_type}: {data}"

614

615

# Create jobs with advanced callback patterns

616

main_job = queue.enqueue(

617

business_logic_processor,

618

"customer_data",

619

{"records": 1000},

620

on_success=Callback(audit_job_completion, timeout=30),

621

on_failure=Callback(cascade_failure_handler, timeout=60),

622

meta={'related_jobs': [], 'priority': 'high'},

623

description="Main business logic processing"

624

)

625

626

# Create related jobs

627

related_jobs = []

628

for i in range(3):

629

related_job = queue.enqueue(

630

business_logic_processor,

631

f"related_data_{i}",

632

{"records": 100},

633

depends_on=main_job,

634

on_success=Callback(audit_job_completion),

635

description=f"Related processing {i}"

636

)

637

related_jobs.append(related_job.id)

638

639

# Update main job metadata with related job IDs

640

main_job.meta['related_jobs'] = related_jobs

641

main_job.save_meta()

642

643

print(f"Created job chain with advanced callbacks: {main_job.id}")

644

print(f"Related jobs: {related_jobs}")

645

```