or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md

queue-operations.mddocs/

0

# Queue Operations

1

2

Comprehensive queue management for job scheduling, enqueueing, and batch operations. RQ queues provide flexible job submission with support for immediate execution, delayed scheduling, priority queuing, bulk operations, and queue monitoring.

3

4

## Capabilities

5

6

### Queue Creation and Configuration

7

8

Create and configure queues with various options for job processing behavior.

9

10

```python { .api }

11

class Queue:

12

def __init__(

13

self,

14

name: str = 'default',

15

connection=None,

16

default_timeout: int = None,

17

is_async: bool = True,

18

job_class=None,

19

serializer=None,

20

death_penalty_class=None,

21

**kwargs

22

):

23

"""

24

Initialize a Queue instance.

25

26

Args:

27

name (str): Queue name. Defaults to 'default'.

28

connection: Redis connection instance.

29

default_timeout (int): Default job timeout in seconds.

30

is_async (bool): Whether to process jobs asynchronously.

31

job_class: Custom Job class to use.

32

serializer: Custom serializer for job data.

33

death_penalty_class: Custom death penalty class for timeouts.

34

**kwargs: Additional queue configuration options.

35

"""

36

37

@classmethod

38

def all(cls, connection, job_class=None, serializer=None, death_penalty_class=None) -> list['Queue']:

39

"""

40

Get all existing queues.

41

42

Args:

43

connection: Redis connection.

44

job_class: Job class for deserialization.

45

serializer: Custom serializer.

46

death_penalty_class: Death penalty class.

47

48

Returns:

49

list[Queue]: All queues in Redis.

50

"""

51

52

@classmethod

53

def from_queue_key(

54

cls,

55

queue_key: str,

56

connection,

57

job_class=None,

58

serializer=None,

59

death_penalty_class=None

60

) -> 'Queue':

61

"""

62

Create Queue instance from Redis queue key.

63

64

Args:

65

queue_key (str): Redis queue key.

66

connection: Redis connection.

67

job_class: Job class for deserialization.

68

serializer: Custom serializer.

69

death_penalty_class: Death penalty class.

70

71

Returns:

72

Queue: Queue instance.

73

"""

74

```

75

76

### Basic Job Enqueueing

77

78

Core methods for adding jobs to queues with various execution options.

79

80

```python { .api }

81

def enqueue(self, f, *args, **kwargs) -> 'Job':

82

"""

83

Enqueue a function call for execution.

84

85

Args:

86

f: Function to execute.

87

*args: Positional arguments for the function.

88

**kwargs: Keyword arguments for the function and job options.

89

90

Job options in kwargs:

91

timeout (int): Job timeout in seconds.

92

result_ttl (int): Result time-to-live in seconds.

93

ttl (int): Job time-to-live in seconds.

94

failure_ttl (int): Failure info time-to-live in seconds.

95

description (str): Job description.

96

depends_on: Job dependencies.

97

job_id (str): Custom job ID.

98

at_front (bool): Add to front of queue.

99

meta (dict): Job metadata.

100

retry (Retry): Retry configuration.

101

repeat (Repeat): Repeat configuration.

102

on_success (Callback): Success callback.

103

on_failure (Callback): Failure callback.

104

on_stopped (Callback): Stopped callback.

105

106

Returns:

107

Job: The enqueued job.

108

"""

109

110

def enqueue_call(

111

self,

112

func,

113

args=None,

114

kwargs=None,

115

timeout=None,

116

result_ttl=None,

117

ttl=None,

118

failure_ttl=None,

119

description=None,

120

depends_on=None,

121

job_id=None,

122

at_front=False,

123

meta=None,

124

retry=None,

125

repeat=None,

126

on_success=None,

127

on_failure=None,

128

on_stopped=None,

129

pipeline=None

130

) -> 'Job':

131

"""

132

Enqueue a function call with explicit parameters.

133

134

Args:

135

func: Function to execute.

136

args (tuple): Function positional arguments.

137

kwargs (dict): Function keyword arguments.

138

timeout (int): Job timeout in seconds.

139

result_ttl (int): Result time-to-live in seconds.

140

ttl (int): Job time-to-live in seconds.

141

failure_ttl (int): Failure info time-to-live in seconds.

142

description (str): Job description.

143

depends_on: Job dependencies.

144

job_id (str): Custom job ID.

145

at_front (bool): Add to front of queue.

146

meta (dict): Job metadata.

147

retry (Retry): Retry configuration.

148

repeat (Repeat): Repeat configuration.

149

on_success (Callback): Success callback.

150

on_failure (Callback): Failure callback.

151

on_stopped (Callback): Stopped callback.

152

pipeline: Redis pipeline for batched operations.

153

154

Returns:

155

Job: The enqueued job.

156

"""

157

158

def enqueue_job(self, job: 'Job', pipeline=None, at_front: bool = False) -> 'Job':

159

"""

160

Enqueue an existing job.

161

162

Args:

163

job (Job): Job to enqueue.

164

pipeline: Redis pipeline for batched operations.

165

at_front (bool): Add to front of queue.

166

167

Returns:

168

Job: The enqueued job.

169

"""

170

```

171

172

### Scheduled Job Enqueueing

173

174

Schedule jobs for future execution with precise timing control.

175

176

```python { .api }

177

def enqueue_at(self, datetime, f, *args, **kwargs) -> 'Job':

178

"""

179

Schedule a job for execution at a specific datetime.

180

181

Args:

182

datetime (datetime): When to execute the job.

183

f: Function to execute.

184

*args: Function positional arguments.

185

**kwargs: Function keyword arguments and job options.

186

187

Returns:

188

Job: The scheduled job.

189

"""

190

191

def enqueue_in(self, time_delta, func, *args, **kwargs) -> 'Job':

192

"""

193

Schedule a job for execution after a time delay.

194

195

Args:

196

time_delta (timedelta): Delay before execution.

197

func: Function to execute.

198

*args: Function positional arguments.

199

**kwargs: Function keyword arguments and job options.

200

201

Returns:

202

Job: The scheduled job.

203

"""

204

205

def schedule_job(self, job: 'Job', datetime, pipeline=None):

206

"""

207

Schedule an existing job for future execution.

208

209

Args:

210

job (Job): Job to schedule.

211

datetime (datetime): When to execute the job.

212

pipeline: Redis pipeline for batched operations.

213

"""

214

```

215

216

### Batch Operations

217

218

Efficiently handle multiple jobs with batch enqueueing and processing.

219

220

```python { .api }

221

def enqueue_many(self, job_datas, pipeline=None, group_id: str = None) -> list['Job']:

222

"""

223

Enqueue multiple jobs in a single operation.

224

225

Args:

226

job_datas: Iterable of EnqueueData instances or job specifications.

227

pipeline: Redis pipeline for batched operations.

228

group_id (str): Group identifier for related jobs.

229

230

Returns:

231

list[Job]: List of enqueued jobs.

232

"""

233

234

@classmethod

235

def prepare_data(

236

cls,

237

func,

238

args=None,

239

kwargs=None,

240

timeout=None,

241

result_ttl=None,

242

ttl=None,

243

failure_ttl=None,

244

description=None,

245

depends_on=None,

246

job_id=None,

247

at_front=False,

248

meta=None,

249

retry=None,

250

on_success=None,

251

on_failure=None,

252

on_stopped=None,

253

repeat=None

254

):

255

"""

256

Prepare job data for batch enqueueing.

257

258

Args:

259

func: Function to execute.

260

args (tuple): Function arguments.

261

kwargs (dict): Function keyword arguments.

262

timeout (int): Job timeout.

263

result_ttl (int): Result TTL.

264

ttl (int): Job TTL.

265

failure_ttl (int): Failure TTL.

266

description (str): Job description.

267

depends_on: Job dependencies.

268

job_id (str): Custom job ID.

269

at_front (bool): Priority enqueueing.

270

meta (dict): Job metadata.

271

retry (Retry): Retry configuration.

272

on_success (Callback): Success callback.

273

on_failure (Callback): Failure callback.

274

on_stopped (Callback): Stopped callback.

275

repeat (Repeat): Repeat configuration.

276

277

Returns:

278

EnqueueData: Prepared job data for batch operations.

279

"""

280

```

281

282

### Queue Monitoring and Management

283

284

Monitor queue state and manage queue lifecycle.

285

286

```python { .api }

287

@property

288

def count(self) -> int:

289

"""Number of jobs in the queue."""

290

291

@property

292

def is_empty(self) -> bool:

293

"""True if queue has no jobs."""

294

295

@property

296

def job_ids(self) -> list[str]:

297

"""List of all job IDs in the queue."""

298

299

@property

300

def jobs(self) -> list['Job']:

301

"""List of all valid jobs in the queue."""

302

303

def get_job_ids(self, offset: int = 0, length: int = -1) -> list[str]:

304

"""

305

Get a slice of job IDs from the queue.

306

307

Args:

308

offset (int): Starting position.

309

length (int): Number of IDs to return (-1 for all).

310

311

Returns:

312

list[str]: Job IDs.

313

"""

314

315

def get_jobs(self, offset: int = 0, length: int = -1) -> list['Job']:

316

"""

317

Get a slice of jobs from the queue.

318

319

Args:

320

offset (int): Starting position.

321

length (int): Number of jobs to return (-1 for all).

322

323

Returns:

324

list[Job]: Jobs in the queue.

325

"""

326

327

def fetch_job(self, job_id: str) -> 'Job | None':

328

"""

329

Fetch a specific job from the queue.

330

331

Args:

332

job_id (str): Job identifier.

333

334

Returns:

335

Job | None: Job if found, None otherwise.

336

"""

337

338

def get_job_position(self, job_or_id) -> int | None:

339

"""

340

Get position of a job in the queue.

341

342

Args:

343

job_or_id: Job instance or job ID.

344

345

Returns:

346

int | None: Position in queue (0-based) or None if not found.

347

"""

348

```

349

350

### Queue Maintenance

351

352

Maintain queue health with cleanup and management operations.

353

354

```python { .api }

355

def empty(self):

356

"""Remove all jobs from the queue."""

357

358

def delete(self, delete_jobs: bool = True):

359

"""

360

Delete the queue.

361

362

Args:

363

delete_jobs (bool): Whether to delete associated jobs.

364

"""

365

366

def compact(self):

367

"""Remove invalid job references while preserving FIFO order."""

368

369

def remove(self, job_or_id, pipeline=None):

370

"""

371

Remove a specific job from the queue.

372

373

Args:

374

job_or_id: Job instance or job ID to remove.

375

pipeline: Redis pipeline for batched operations.

376

"""

377

378

def push_job_id(self, job_id: str, pipeline=None, at_front: bool = False):

379

"""

380

Push a job ID onto the queue.

381

382

Args:

383

job_id (str): Job identifier.

384

pipeline: Redis pipeline.

385

at_front (bool): Add to front of queue.

386

"""

387

388

def pop_job_id(self) -> str | None:

389

"""

390

Pop a job ID from the front of the queue.

391

392

Returns:

393

str | None: Job ID or None if queue is empty.

394

"""

395

```

396

397

### Multi-Queue Operations

398

399

Handle operations across multiple queues for load balancing and priority processing.

400

401

```python { .api }

402

@classmethod

403

def dequeue_any(

404

cls,

405

queues,

406

timeout: int = None,

407

connection=None,

408

job_class=None,

409

serializer=None,

410

death_penalty_class=None

411

) -> tuple['Job', 'Queue'] | None:

412

"""

413

Dequeue a job from any of the given queues.

414

415

Args:

416

queues: Iterable of Queue instances.

417

timeout (int): Timeout in seconds for blocking dequeue.

418

connection: Redis connection.

419

job_class: Job class for deserialization.

420

serializer: Custom serializer.

421

death_penalty_class: Death penalty class.

422

423

Returns:

424

tuple[Job, Queue] | None: (Job, Queue) tuple or None if timeout.

425

"""

426

427

@classmethod

428

def lpop(cls, queue_keys, timeout: int = None, connection=None):

429

"""

430

Pop from multiple queue keys using Redis BLPOP.

431

432

Args:

433

queue_keys: List of queue key strings.

434

timeout (int): Timeout in seconds.

435

connection: Redis connection.

436

437

Returns:

438

tuple: (queue_key, job_id) or None if timeout.

439

"""

440

441

@classmethod

442

def lmove(cls, connection, queue_key: str, timeout: int = None):

443

"""

444

Move job using Redis BLMOVE operation.

445

446

Args:

447

connection: Redis connection.

448

queue_key (str): Source queue key.

449

timeout (int): Timeout in seconds.

450

451

Returns:

452

Job data or None if timeout.

453

"""

454

```

455

456

### Queue Properties and Configuration

457

458

Access queue configuration and runtime properties.

459

460

```python { .api }

461

@property

462

def name(self) -> str:

463

"""Queue name."""

464

465

@property

466

def key(self) -> str:

467

"""Redis key for the queue."""

468

469

@property

470

def connection(self):

471

"""Redis connection instance."""

472

473

@property

474

def serializer(self):

475

"""Serializer used for job data."""

476

477

@property

478

def is_async(self) -> bool:

479

"""Whether queue processes jobs asynchronously."""

480

481

@property

482

def intermediate_queue_key(self) -> str:

483

"""Redis key for intermediate queue."""

484

485

@property

486

def intermediate_queue(self):

487

"""IntermediateQueue instance for this queue."""

488

489

def get_redis_server_version(self) -> tuple[int, int, int]:

490

"""

491

Get Redis server version.

492

493

Returns:

494

tuple[int, int, int]: (major, minor, patch) version numbers.

495

"""

496

```

497

498

### Registry Access

499

500

Access job registries for monitoring different job states.

501

502

```python { .api }

503

@property

504

def failed_job_registry(self):

505

"""Registry of failed jobs."""

506

507

@property

508

def started_job_registry(self):

509

"""Registry of jobs currently being executed."""

510

511

@property

512

def finished_job_registry(self):

513

"""Registry of successfully completed jobs."""

514

515

@property

516

def deferred_job_registry(self):

517

"""Registry of jobs waiting for dependencies."""

518

519

@property

520

def scheduled_job_registry(self):

521

"""Registry of scheduled jobs."""

522

523

@property

524

def canceled_job_registry(self):

525

"""Registry of canceled jobs."""

526

```

527

528

## Usage Examples

529

530

### Basic Queue Operations

531

532

```python

533

import redis

534

from rq import Queue

535

536

# Connect to Redis

537

conn = redis.Redis()

538

539

# Create a queue

540

q = Queue('data_processing', connection=conn)

541

542

# Simple function

543

def process_item(item_id, priority='normal'):

544

return f"Processed item {item_id} with {priority} priority"

545

546

# Enqueue jobs

547

job1 = q.enqueue(process_item, 'item_001')

548

job2 = q.enqueue(process_item, 'item_002', priority='high')

549

550

# Enqueue with options

551

job3 = q.enqueue(

552

process_item,

553

'item_003',

554

timeout=300,

555

result_ttl=3600,

556

description="Process critical item",

557

meta={'department': 'sales', 'urgent': True}

558

)

559

560

print(f"Enqueued {len([job1, job2, job3])} jobs")

561

print(f"Queue count: {q.count}")

562

```

563

564

### Scheduled Job Enqueueing

565

566

```python

567

from datetime import datetime, timedelta

568

from rq import Queue

569

import redis

570

571

conn = redis.Redis()

572

q = Queue('scheduled_tasks', connection=conn)

573

574

def send_reminder(user_id, message):

575

return f"Sent reminder to user {user_id}: {message}"

576

577

# Schedule for specific time

578

future_time = datetime.now() + timedelta(hours=1)

579

scheduled_job = q.enqueue_at(

580

future_time,

581

send_reminder,

582

user_id=123,

583

message="Don't forget your appointment!"

584

)

585

586

# Schedule with delay

587

delayed_job = q.enqueue_in(

588

timedelta(minutes=30),

589

send_reminder,

590

user_id=456,

591

message="Meeting starts in 30 minutes"

592

)

593

594

print(f"Scheduled job: {scheduled_job.id}")

595

print(f"Delayed job: {delayed_job.id}")

596

```

597

598

### Batch Job Enqueueing

599

600

```python

601

from rq import Queue

602

import redis

603

604

conn = redis.Redis()

605

q = Queue('batch_processing', connection=conn)

606

607

def process_data_chunk(data_chunk, config=None):

608

return f"Processed {len(data_chunk)} items"

609

610

# Prepare multiple jobs

611

job_data_list = []

612

data_chunks = [list(range(i, i+10)) for i in range(0, 100, 10)]

613

614

for i, chunk in enumerate(data_chunks):

615

job_data = Queue.prepare_data(

616

func=process_data_chunk,

617

args=(chunk,),

618

kwargs={'config': {'batch_id': i}},

619

description=f"Process chunk {i}",

620

meta={'chunk_size': len(chunk)}

621

)

622

job_data_list.append(job_data)

623

624

# Enqueue all jobs at once

625

jobs = q.enqueue_many(job_data_list, group_id='batch_001')

626

627

print(f"Enqueued {len(jobs)} jobs in batch")

628

print(f"Queue count: {q.count}")

629

```

630

631

### Queue Monitoring and Management

632

633

```python

634

from rq import Queue

635

import redis

636

637

conn = redis.Redis()

638

q = Queue('monitoring_example', connection=conn)

639

640

# Add some jobs

641

for i in range(5):

642

q.enqueue(lambda x: x * 2, i)

643

644

# Monitor queue

645

print(f"Queue: {q.name}")

646

print(f"Total jobs: {q.count}")

647

print(f"Is empty: {q.is_empty}")

648

649

# Get job information

650

job_ids = q.get_job_ids()

651

print(f"Job IDs: {job_ids}")

652

653

# Get first 3 jobs

654

first_jobs = q.get_jobs(offset=0, length=3)

655

for job in first_jobs:

656

print(f"Job {job.id}: {job.description}")

657

658

# Find specific job

659

if job_ids:

660

specific_job = q.fetch_job(job_ids[0])

661

position = q.get_job_position(specific_job)

662

print(f"Job {specific_job.id} is at position {position}")

663

664

# Queue maintenance

665

print("Before compact:", q.count)

666

q.compact() # Remove any invalid job references

667

print("After compact:", q.count)

668

669

# Registry access

670

print(f"Failed jobs: {q.failed_job_registry.count}")

671

print(f"Finished jobs: {q.finished_job_registry.count}")

672

```

673

674

### Multi-Queue Processing

675

676

```python

677

from rq import Queue

678

import redis

679

680

conn = redis.Redis()

681

682

# Create multiple queues

683

high_priority = Queue('high_priority', connection=conn)

684

normal_priority = Queue('normal', connection=conn)

685

low_priority = Queue('low_priority', connection=conn)

686

687

def important_task():

688

return "Completed important task"

689

690

def regular_task():

691

return "Completed regular task"

692

693

# Add jobs to different queues

694

high_priority.enqueue(important_task)

695

normal_priority.enqueue(regular_task)

696

low_priority.enqueue(regular_task)

697

698

# Dequeue from multiple queues (priority order)

699

queues = [high_priority, normal_priority, low_priority]

700

result = Queue.dequeue_any(queues, timeout=1, connection=conn)

701

702

if result:

703

job, queue = result

704

print(f"Dequeued job {job.id} from queue {queue.name}")

705

else:

706

print("No jobs available")

707

708

# Get all queues

709

all_queues = Queue.all(connection=conn)

710

print(f"Total queues: {len(all_queues)}")

711

for queue in all_queues:

712

print(f"Queue {queue.name}: {queue.count} jobs")

713

```