or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md

job-management.mddocs/

0

# Job Management

1

2

Comprehensive job lifecycle management including creation, execution tracking, status monitoring, and control operations. Jobs in RQ encapsulate function calls with rich metadata, status tracking, and support for advanced patterns like callbacks, retries, and dependencies.

3

4

## Capabilities

5

6

### Job Creation and Retrieval

7

8

Create new jobs and retrieve existing ones by ID with full serialization support.

9

10

```python { .api }

11

class Job:

12

def __init__(self, id: str = None, connection = None, serializer=None):

13

"""

14

Initialize a Job instance.

15

16

Args:

17

id (str, optional): Job identifier. Generated if not provided.

18

connection: Redis connection instance.

19

serializer: Custom serializer for job data.

20

"""

21

22

@classmethod

23

def create(

24

cls,

25

func,

26

args=None,

27

kwargs=None,

28

connection=None,

29

result_ttl=None,

30

ttl=None,

31

status=None,

32

description=None,

33

depends_on=None,

34

timeout=None,

35

id=None,

36

origin='',

37

meta=None,

38

failure_ttl=None,

39

serializer=None,

40

group_id=None,

41

on_success=None,

42

on_failure=None,

43

on_stopped=None

44

) -> 'Job':

45

"""

46

Create a new job instance.

47

48

Args:

49

func: Function to execute or function reference string.

50

args (tuple): Positional arguments for the function.

51

kwargs (dict): Keyword arguments for the function.

52

connection: Redis connection.

53

result_ttl (int): Result time-to-live in seconds.

54

ttl (int): Job time-to-live in seconds.

55

status (JobStatus): Initial job status.

56

description (str): Human-readable job description.

57

depends_on: Job dependencies.

58

timeout (int): Job execution timeout in seconds.

59

id (str): Custom job ID.

60

origin (str): Queue name where job originated.

61

meta (dict): Additional job metadata.

62

failure_ttl (int): Failure info time-to-live in seconds.

63

serializer: Custom serializer.

64

group_id (str): Job group identifier.

65

on_success (Callback): Success callback.

66

on_failure (Callback): Failure callback.

67

on_stopped (Callback): Stopped callback.

68

69

Returns:

70

Job: New job instance.

71

"""

72

73

@classmethod

74

def fetch(cls, id: str, connection, serializer=None) -> 'Job':

75

"""

76

Retrieve an existing job by ID.

77

78

Args:

79

id (str): Job identifier.

80

connection: Redis connection.

81

serializer: Custom serializer.

82

83

Returns:

84

Job: Retrieved job instance.

85

86

Raises:

87

NoSuchJobError: If job doesn't exist.

88

"""

89

90

@classmethod

91

def exists(cls, job_id: str, connection) -> bool:

92

"""

93

Check if a job exists.

94

95

Args:

96

job_id (str): Job identifier.

97

connection: Redis connection.

98

99

Returns:

100

bool: True if job exists, False otherwise.

101

"""

102

103

@classmethod

104

def fetch_many(cls, job_ids: list[str], connection, serializer=None) -> list['Job | None']:

105

"""

106

Fetch multiple jobs by their IDs.

107

108

Args:

109

job_ids (list[str]): List of job identifiers.

110

connection: Redis connection.

111

serializer: Custom serializer.

112

113

Returns:

114

list[Job | None]: List of jobs (None for non-existent jobs).

115

"""

116

```

117

118

### Job Status and Lifecycle

119

120

Monitor and control job execution status throughout its lifecycle.

121

122

```python { .api }

123

def get_status(self, refresh: bool = True) -> JobStatus:

124

"""

125

Get current job status.

126

127

Args:

128

refresh (bool): Whether to refresh from Redis before returning.

129

130

Returns:

131

JobStatus: Current job status.

132

"""

133

134

def set_status(self, status: JobStatus, pipeline=None):

135

"""

136

Set job status.

137

138

Args:

139

status (JobStatus): New status to set.

140

pipeline: Redis pipeline for batched operations.

141

"""

142

143

def refresh(self):

144

"""Refresh job data from Redis."""

145

146

def save(self, pipeline=None, include_meta: bool = True, include_result: bool = True):

147

"""

148

Save job to Redis.

149

150

Args:

151

pipeline: Redis pipeline for batched operations.

152

include_meta (bool): Whether to save metadata.

153

include_result (bool): Whether to save result.

154

"""

155

156

def delete(self, pipeline=None, remove_from_queue: bool = True, delete_dependents: bool = False):

157

"""

158

Delete job from Redis.

159

160

Args:

161

pipeline: Redis pipeline for batched operations.

162

remove_from_queue (bool): Remove from queue if queued.

163

delete_dependents (bool): Delete dependent jobs.

164

"""

165

166

def cleanup(self, ttl: int = None, pipeline=None, remove_from_queue: bool = True):

167

"""

168

Clean up job data.

169

170

Args:

171

ttl (int): Time-to-live for cleanup.

172

pipeline: Redis pipeline.

173

remove_from_queue (bool): Remove from queue.

174

"""

175

```

176

177

### Job Execution and Results

178

179

Execute jobs and manage execution results with comprehensive error handling.

180

181

```python { .api }

182

def perform(self) -> Any:

183

"""

184

Execute the job function.

185

186

Returns:

187

Any: Function return value.

188

189

Raises:

190

Various: Any exception raised by the job function.

191

"""

192

193

def return_value(self, refresh: bool = False) -> Any:

194

"""

195

Get job return value.

196

197

Args:

198

refresh (bool): Whether to refresh from Redis.

199

200

Returns:

201

Any: Job return value if completed, None otherwise.

202

"""

203

204

def latest_result(self, timeout: int = 0) -> 'Result | None':

205

"""

206

Get the latest job result.

207

208

Args:

209

timeout (int): Maximum wait time for result.

210

211

Returns:

212

Result | None: Latest result or None.

213

"""

214

215

def results(self) -> list['Result']:

216

"""

217

Get all job results.

218

219

Returns:

220

list[Result]: All job results.

221

"""

222

223

def get_call_string(self) -> str | None:

224

"""

225

Get string representation of the function call.

226

227

Returns:

228

str | None: Function call string.

229

"""

230

```

231

232

### Job Control Operations

233

234

Control job execution with cancellation, requeuing, and retry mechanisms.

235

236

```python { .api }

237

def cancel(self, pipeline=None, enqueue_dependents: bool = False, remove_from_dependencies: bool = False):

238

"""

239

Cancel job execution.

240

241

Args:

242

pipeline: Redis pipeline for batched operations.

243

enqueue_dependents (bool): Enqueue dependent jobs.

244

remove_from_dependencies (bool): Remove from dependency lists.

245

"""

246

247

def requeue(self, at_front: bool = False) -> 'Job':

248

"""

249

Requeue job for execution.

250

251

Args:

252

at_front (bool): Add to front of queue.

253

254

Returns:

255

Job: The requeued job.

256

"""

257

258

def retry(self, queue: 'Queue', pipeline=None):

259

"""

260

Retry job execution.

261

262

Args:

263

queue (Queue): Queue to retry in.

264

pipeline: Redis pipeline.

265

"""

266

267

def get_retry_interval(self) -> int:

268

"""

269

Get retry interval for this job.

270

271

Returns:

272

int: Retry interval in seconds.

273

"""

274

```

275

276

### Standalone Job Functions

277

278

Module-level functions for job operations without requiring job instances.

279

280

```python { .api }

281

def get_current_job(connection=None, job_class=None) -> 'Job | None':

282

"""

283

Get the currently executing job within a worker context.

284

285

Args:

286

connection: Redis connection. Uses default if None.

287

job_class: Job class to use for deserialization.

288

289

Returns:

290

Job | None: Current job if in worker context, None otherwise.

291

"""

292

293

def cancel_job(job_id: str, connection, serializer=None, enqueue_dependents: bool = False):

294

"""

295

Cancel a job by its ID.

296

297

Args:

298

job_id (str): Job identifier to cancel.

299

connection: Redis connection.

300

serializer: Custom serializer.

301

enqueue_dependents (bool): Enqueue dependent jobs after cancellation.

302

303

Raises:

304

NoSuchJobError: If job doesn't exist.

305

"""

306

307

def requeue_job(job_id: str, connection, serializer=None) -> 'Job':

308

"""

309

Requeue a job by its ID.

310

311

Args:

312

job_id (str): Job identifier to requeue.

313

connection: Redis connection.

314

serializer: Custom serializer.

315

316

Returns:

317

Job: The requeued job.

318

319

Raises:

320

NoSuchJobError: If job doesn't exist.

321

"""

322

```

323

324

### Job Properties and Metadata

325

326

Access and modify job properties, metadata, and execution information.

327

328

```python { .api }

329

# Core Properties

330

@property

331

def id(self) -> str:

332

"""Job identifier."""

333

334

@property

335

def key(self) -> bytes:

336

"""Redis key for this job."""

337

338

@property

339

def func(self):

340

"""Function to execute."""

341

342

@property

343

def args(self) -> tuple:

344

"""Function positional arguments."""

345

346

@property

347

def kwargs(self) -> dict:

348

"""Function keyword arguments."""

349

350

@property

351

def description(self) -> str | None:

352

"""Job description."""

353

354

@property

355

def origin(self) -> str:

356

"""Queue where job originated."""

357

358

@property

359

def timeout(self) -> float | None:

360

"""Job timeout in seconds."""

361

362

@property

363

def result_ttl(self) -> int | None:

364

"""Result time-to-live in seconds."""

365

366

@property

367

def failure_ttl(self) -> int | None:

368

"""Failure info time-to-live in seconds."""

369

370

@property

371

def ttl(self) -> int | None:

372

"""Job time-to-live in seconds."""

373

374

# Timing Properties

375

@property

376

def enqueued_at(self) -> datetime | None:

377

"""When job was enqueued."""

378

379

@property

380

def started_at(self) -> datetime | None:

381

"""When job execution started."""

382

383

@property

384

def ended_at(self) -> datetime | None:

385

"""When job execution ended."""

386

387

@property

388

def last_heartbeat(self) -> datetime | None:

389

"""Last worker heartbeat timestamp."""

390

391

# Status Properties

392

@property

393

def is_finished(self) -> bool:

394

"""True if job finished successfully."""

395

396

@property

397

def is_queued(self) -> bool:

398

"""True if job is queued for execution."""

399

400

@property

401

def is_failed(self) -> bool:

402

"""True if job failed."""

403

404

@property

405

def is_started(self) -> bool:

406

"""True if job execution started."""

407

408

@property

409

def is_deferred(self) -> bool:

410

"""True if job is deferred (waiting for dependencies)."""

411

412

@property

413

def is_canceled(self) -> bool:

414

"""True if job was canceled."""

415

416

@property

417

def is_scheduled(self) -> bool:

418

"""True if job is scheduled for future execution."""

419

420

@property

421

def is_stopped(self) -> bool:

422

"""True if job was stopped."""

423

424

# Metadata Properties

425

@property

426

def meta(self) -> dict:

427

"""Job metadata dictionary."""

428

429

@property

430

def worker_name(self) -> str | None:

431

"""Name of worker that executed/is executing the job."""

432

433

@property

434

def group_id(self) -> str | None:

435

"""Job group identifier."""

436

437

def get_meta(self, refresh: bool = True) -> dict:

438

"""

439

Get job metadata.

440

441

Args:

442

refresh (bool): Refresh from Redis before returning.

443

444

Returns:

445

dict: Job metadata.

446

"""

447

448

def save_meta(self):

449

"""Save metadata to Redis."""

450

```

451

452

### Job Dependencies

453

454

Manage job dependencies and execution ordering.

455

456

```python { .api }

457

@property

458

def dependency(self) -> 'Job | None':

459

"""First job dependency."""

460

461

@property

462

def dependency_ids(self) -> list[bytes]:

463

"""List of dependency job keys."""

464

465

@property

466

def dependent_ids(self) -> list[str]:

467

"""List of dependent job IDs."""

468

469

def fetch_dependencies(self, watch: bool = False, pipeline=None) -> list['Job']:

470

"""

471

Fetch all job dependencies.

472

473

Args:

474

watch (bool): Watch dependencies for changes.

475

pipeline: Redis pipeline.

476

477

Returns:

478

list[Job]: List of dependency jobs.

479

"""

480

481

def register_dependency(self, pipeline=None):

482

"""

483

Register job dependencies in Redis.

484

485

Args:

486

pipeline: Redis pipeline for batched operations.

487

"""

488

489

def dependencies_are_met(

490

self,

491

parent_job: 'Job' = None,

492

pipeline=None,

493

exclude_job_id: str = None,

494

refresh_job_status: bool = True

495

) -> bool:

496

"""

497

Check if all job dependencies are satisfied.

498

499

Args:

500

parent_job (Job): Parent job context.

501

pipeline: Redis pipeline.

502

exclude_job_id (str): Job ID to exclude from check.

503

refresh_job_status (bool): Refresh dependency status from Redis.

504

505

Returns:

506

bool: True if dependencies are met, False otherwise.

507

"""

508

509

def delete_dependents(self, pipeline=None):

510

"""

511

Delete all dependent jobs.

512

513

Args:

514

pipeline: Redis pipeline for batched operations.

515

"""

516

```

517

518

## Usage Examples

519

520

### Basic Job Creation and Monitoring

521

522

```python

523

import redis

524

from rq import Job

525

526

# Connect to Redis

527

conn = redis.Redis()

528

529

# Define a job function

530

def process_data(data_id, options=None):

531

# Simulate processing

532

import time

533

time.sleep(2)

534

return f"Processed data {data_id}"

535

536

# Create a job

537

job = Job.create(

538

func=process_data,

539

args=('data_123',),

540

kwargs={'options': {'fast': True}},

541

connection=conn,

542

timeout=300,

543

description="Process data batch 123"

544

)

545

546

# Save job to Redis

547

job.save()

548

549

print(f"Created job: {job.id}")

550

print(f"Status: {job.get_status()}")

551

print(f"Description: {job.description}")

552

553

# Later, retrieve and check the job

554

retrieved_job = Job.fetch(job.id, connection=conn)

555

print(f"Retrieved job status: {retrieved_job.get_status()}")

556

```

557

558

### Job Metadata and Results

559

560

```python

561

from rq import Job, get_current_job

562

import redis

563

564

conn = redis.Redis()

565

566

def tracked_function(item_count):

567

# Get current job to update metadata

568

job = get_current_job()

569

570

if job:

571

job.meta['progress'] = 0

572

job.save_meta()

573

574

# Simulate processing with progress updates

575

for i in range(item_count):

576

# Do work...

577

578

if job:

579

job.meta['progress'] = (i + 1) / item_count * 100

580

job.meta['current_item'] = i + 1

581

job.save_meta()

582

583

return f"Completed processing {item_count} items"

584

585

# Create job with initial metadata

586

job = Job.create(

587

func=tracked_function,

588

args=(100,),

589

connection=conn,

590

meta={'stage': 'queued', 'priority': 'high'}

591

)

592

593

job.save()

594

595

# Monitor progress (from another process/thread)

596

while not job.is_finished and not job.is_failed:

597

job.refresh()

598

meta = job.get_meta()

599

print(f"Progress: {meta.get('progress', 0)}%")

600

time.sleep(1)

601

602

# Get final result

603

if job.is_finished:

604

print(f"Result: {job.return_value()}")

605

elif job.is_failed:

606

print("Job failed")

607

```

608

609

### Job Cancellation and Requeuing

610

611

```python

612

from rq import Job, cancel_job, requeue_job

613

import redis

614

615

conn = redis.Redis()

616

617

# Create a long-running job

618

def long_task():

619

import time

620

time.sleep(60)

621

return "Done"

622

623

job = Job.create(func=long_task, connection=conn)

624

job.save()

625

626

# Cancel the job

627

cancel_job(job.id, connection=conn)

628

print(f"Job {job.id} canceled")

629

630

# Create another job and then requeue it

631

job2 = Job.create(func=long_task, connection=conn)

632

job2.save()

633

634

# Requeue (useful after failures or for retrying)

635

requeued_job = requeue_job(job2.id, connection=conn)

636

print(f"Job requeued: {requeued_job.id}")

637

```