or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdjob-management.mdjob-patterns.mdqueue-operations.mdregistries-monitoring.mdworker-management.md

worker-management.mddocs/

0

# Worker Management

1

2

Comprehensive worker process management for job execution with support for multiple queues, different execution strategies, monitoring, and flexible deployment options. RQ workers handle job lifecycle, error recovery, and provide robust distributed processing capabilities.

3

4

## Capabilities

5

6

### Worker Creation and Configuration

7

8

Create and configure workers with various execution strategies and options.

9

10

```python { .api }

11

class Worker:

12

def __init__(

13

self,

14

queues,

15

name: str = None,

16

default_result_ttl=500,

17

connection=None,

18

exc_handler=None,

19

exception_handlers=None,

20

maintenance_interval: int = 600,

21

default_worker_ttl: int = None,

22

worker_ttl: int = None,

23

job_class=None,

24

queue_class=None,

25

log_job_description: bool = True,

26

job_monitoring_interval=30,

27

disable_default_exception_handler: bool = False,

28

prepare_for_work: bool = True,

29

serializer=None,

30

work_horse_killed_handler=None

31

):

32

"""

33

Initialize a Worker instance.

34

35

Args:

36

queues: Queue instances or names to process.

37

name (str): Worker name. Auto-generated if None.

38

default_result_ttl (int): Default result TTL in seconds.

39

connection: Redis connection instance.

40

exc_handler: Legacy exception handler (deprecated).

41

exception_handlers (list): List of exception handler functions.

42

maintenance_interval (int): Maintenance task interval in seconds.

43

default_worker_ttl (int): Default worker TTL.

44

worker_ttl (int): Worker TTL in seconds.

45

job_class: Custom Job class.

46

queue_class: Custom Queue class.

47

log_job_description (bool): Log job descriptions.

48

job_monitoring_interval (int): Job monitoring interval in seconds.

49

disable_default_exception_handler (bool): Disable default exception handling.

50

prepare_for_work (bool): Prepare worker for work immediately.

51

serializer: Custom serializer.

52

work_horse_killed_handler: Handler for work horse termination.

53

"""

54

55

class SimpleWorker(Worker):

56

"""Worker that executes jobs in the same process/thread."""

57

58

def execute_job(self, job: 'Job', queue: 'Queue'):

59

"""

60

Execute job in the same process without forking.

61

62

Args:

63

job (Job): Job to execute.

64

queue (Queue): Queue the job came from.

65

"""

66

67

class SpawnWorker(Worker):

68

"""Worker that spawns child processes for job execution."""

69

70

def fork_work_horse(self, job: 'Job', queue: 'Queue'):

71

"""

72

Spawn work horse process using os.spawn().

73

74

Args:

75

job (Job): Job to execute.

76

queue (Queue): Queue the job came from.

77

"""

78

```

79

80

### Worker Discovery and Management

81

82

Find and manage worker instances across the system.

83

84

```python { .api }

85

@classmethod

86

def all(

87

cls,

88

connection=None,

89

job_class=None,

90

queue_class=None,

91

queue: 'Queue' = None,

92

serializer=None

93

) -> list['Worker']:

94

"""

95

Get all workers.

96

97

Args:

98

connection: Redis connection.

99

job_class: Job class for deserialization.

100

queue_class: Queue class for deserialization.

101

queue (Queue): Filter by specific queue.

102

serializer: Custom serializer.

103

104

Returns:

105

list[Worker]: All worker instances.

106

"""

107

108

@classmethod

109

def all_keys(cls, connection=None, queue: 'Queue' = None) -> list[str]:

110

"""

111

Get all worker keys.

112

113

Args:

114

connection: Redis connection.

115

queue (Queue): Filter by specific queue.

116

117

Returns:

118

list[str]: Worker Redis keys.

119

"""

120

121

@classmethod

122

def count(cls, connection=None, queue: 'Queue' = None) -> int:

123

"""

124

Count active workers.

125

126

Args:

127

connection: Redis connection.

128

queue (Queue): Filter by specific queue.

129

130

Returns:

131

int: Number of active workers.

132

"""

133

134

@classmethod

135

def find_by_key(

136

cls,

137

worker_key: str,

138

connection,

139

job_class=None,

140

queue_class=None,

141

serializer=None

142

) -> 'Worker | None':

143

"""

144

Find worker by its Redis key.

145

146

Args:

147

worker_key (str): Worker Redis key.

148

connection: Redis connection.

149

job_class: Job class.

150

queue_class: Queue class.

151

serializer: Custom serializer.

152

153

Returns:

154

Worker | None: Worker instance or None if not found.

155

"""

156

```

157

158

### Main Work Loop

159

160

Core worker execution loop with comprehensive job processing capabilities.

161

162

```python { .api }

163

def work(

164

self,

165

burst: bool = False,

166

logging_level: str = None,

167

date_format: str = '%H:%M:%S',

168

log_format: str = '%(asctime)s %(message)s',

169

max_jobs: int = None,

170

max_idle_time: int = None,

171

with_scheduler: bool = False,

172

dequeue_strategy: 'DequeueStrategy' = 'default'

173

) -> bool:

174

"""

175

Main worker loop for processing jobs.

176

177

Args:

178

burst (bool): Exit after processing available jobs.

179

logging_level (str): Logging level ('DEBUG', 'INFO', etc.).

180

date_format (str): Log date format.

181

log_format (str): Log message format.

182

max_jobs (int): Maximum jobs to process before exiting.

183

max_idle_time (int): Maximum idle time before exiting.

184

with_scheduler (bool): Run scheduler in the same process.

185

dequeue_strategy (DequeueStrategy): Queue dequeuing strategy.

186

187

Returns:

188

bool: True if worker exited cleanly, False otherwise.

189

"""

190

191

def execute_job(self, job: 'Job', queue: 'Queue'):

192

"""

193

Execute a single job (abstract method implemented by subclasses).

194

195

Args:

196

job (Job): Job to execute.

197

queue (Queue): Queue the job came from.

198

"""

199

```

200

201

### Worker State and Monitoring

202

203

Monitor worker status, statistics, and health information.

204

205

```python { .api }

206

@property

207

def name(self) -> str:

208

"""Worker name/identifier."""

209

210

@property

211

def key(self) -> str:

212

"""Redis key for this worker."""

213

214

@property

215

def connection(self):

216

"""Redis connection instance."""

217

218

@property

219

def queues(self) -> list['Queue']:

220

"""List of queues this worker processes."""

221

222

@property

223

def version(self) -> str:

224

"""RQ version."""

225

226

@property

227

def python_version(self) -> str:

228

"""Python version string."""

229

230

@property

231

def hostname(self) -> str | None:

232

"""Worker hostname."""

233

234

@property

235

def ip_address(self) -> str:

236

"""Worker IP address."""

237

238

@property

239

def pid(self) -> int | None:

240

"""Worker process ID."""

241

242

@property

243

def birth_date(self) -> datetime | None:

244

"""When worker was created."""

245

246

@property

247

def last_heartbeat(self) -> datetime | None:

248

"""Last heartbeat timestamp."""

249

250

@property

251

def successful_job_count(self) -> int:

252

"""Number of successfully completed jobs."""

253

254

@property

255

def failed_job_count(self) -> int:

256

"""Number of failed jobs."""

257

258

@property

259

def total_working_time(self) -> float:

260

"""Total time spent working (seconds)."""

261

262

@property

263

def current_job_working_time(self) -> float:

264

"""Time spent on current job (seconds)."""

265

266

def refresh(self):

267

"""Refresh worker data from Redis."""

268

269

def queue_names(self) -> list[str]:

270

"""

271

Get queue names this worker processes.

272

273

Returns:

274

list[str]: Queue names.

275

"""

276

277

def queue_keys(self) -> list[str]:

278

"""

279

Get queue Redis keys this worker processes.

280

281

Returns:

282

list[str]: Queue keys.

283

"""

284

```

285

286

### Worker Lifecycle Control

287

288

Control worker lifecycle with graceful shutdown and signal handling.

289

290

```python { .api }

291

def request_stop(self, signum=None, frame=None):

292

"""

293

Request graceful worker shutdown.

294

295

Args:

296

signum: Signal number (for signal handlers).

297

frame: Frame object (for signal handlers).

298

"""

299

300

def request_force_stop(self, signum: int, frame=None):

301

"""

302

Request immediate worker shutdown (abstract method).

303

304

Args:

305

signum (int): Signal number.

306

frame: Frame object.

307

"""

308

309

def kill_horse(self, sig=15):

310

"""

311

Kill the work horse process (for Worker class).

312

313

Args:

314

sig (int): Signal to send to work horse.

315

"""

316

317

def wait_for_horse(self) -> tuple[int | None, int | None, Any]:

318

"""

319

Wait for work horse process to complete (for Worker class).

320

321

Returns:

322

tuple: (exit_code, signal, resource_usage).

323

"""

324

```

325

326

### Worker Maintenance and Health

327

328

Maintain worker health with registry cleanup and monitoring.

329

330

```python { .api }

331

def clean_registries(self):

332

"""Clean job registries of expired entries."""

333

334

def validate_queues(self):

335

"""Validate that all queues are valid Queue instances."""

336

337

def get_redis_server_version(self) -> tuple[int, int, int]:

338

"""

339

Get Redis server version.

340

341

Returns:

342

tuple[int, int, int]: (major, minor, patch) version.

343

"""

344

345

@property

346

def should_run_maintenance_tasks(self) -> bool:

347

"""True if it's time to run maintenance tasks."""

348

349

@property

350

def dequeue_timeout(self) -> int:

351

"""Timeout for dequeue operations."""

352

353

@property

354

def connection_timeout(self) -> int:

355

"""Redis connection timeout."""

356

```

357

358

### Work Horse Management

359

360

Advanced work horse process management for fork-based execution.

361

362

```python { .api }

363

# Worker class specific methods

364

365

@property

366

def is_horse(self) -> bool:

367

"""True if this is the work horse process."""

368

369

@property

370

def horse_pid(self) -> int:

371

"""Work horse process ID."""

372

373

def fork_work_horse(self, job: 'Job', queue: 'Queue'):

374

"""

375

Fork a work horse process to execute the job.

376

377

Args:

378

job (Job): Job to execute.

379

queue (Queue): Queue the job came from.

380

"""

381

382

def monitor_work_horse(self, job: 'Job', queue: 'Queue'):

383

"""

384

Monitor work horse process execution.

385

386

Args:

387

job (Job): Job being executed.

388

queue (Queue): Queue the job came from.

389

"""

390

391

def get_heartbeat_ttl(self, job: 'Job') -> int:

392

"""

393

Get heartbeat TTL for job monitoring.

394

395

Args:

396

job (Job): Job being executed.

397

398

Returns:

399

int: Heartbeat TTL in seconds.

400

"""

401

```

402

403

### Dequeue Strategies

404

405

Configure how workers dequeue jobs from multiple queues.

406

407

```python { .api }

408

from enum import Enum

409

410

class DequeueStrategy(str, Enum):

411

DEFAULT = 'default' # Process queues in order

412

ROUND_ROBIN = 'round_robin' # Rotate between queues

413

RANDOM = 'random' # Random queue selection

414

415

class WorkerStatus(str, Enum):

416

STARTED = 'started'

417

SUSPENDED = 'suspended'

418

BUSY = 'busy'

419

IDLE = 'idle'

420

```

421

422

## Usage Examples

423

424

### Basic Worker Usage

425

426

```python

427

import redis

428

from rq import Queue, Worker

429

430

# Connect to Redis

431

conn = redis.Redis()

432

433

# Create queues

434

high_priority = Queue('high', connection=conn)

435

normal_priority = Queue('normal', connection=conn)

436

437

# Create worker for multiple queues

438

worker = Worker([high_priority, normal_priority], connection=conn)

439

440

# Add some jobs

441

def process_data(data):

442

import time

443

time.sleep(2)

444

return f"Processed: {data}"

445

446

high_priority.enqueue(process_data, "urgent_data")

447

normal_priority.enqueue(process_data, "regular_data")

448

449

print(f"Worker: {worker.name}")

450

print(f"Processing queues: {worker.queue_names()}")

451

452

# Start processing (this blocks)

453

worker.work()

454

```

455

456

### Worker with Configuration

457

458

```python

459

from rq import Worker, Queue

460

import redis

461

import logging

462

463

conn = redis.Redis()

464

q = Queue('configured_worker', connection=conn)

465

466

# Custom exception handler

467

def handle_failed_job(job, exc_type, exc_value, traceback):

468

print(f"Job {job.id} failed: {exc_value}")

469

# Could send alerts, log to external service, etc.

470

471

# Create configured worker

472

worker = Worker(

473

[q],

474

connection=conn,

475

name='custom_worker_001',

476

exception_handlers=[handle_failed_job],

477

default_result_ttl=3600, # Keep results for 1 hour

478

job_monitoring_interval=15, # Check job progress every 15s

479

maintenance_interval=300, # Run maintenance every 5 minutes

480

log_job_description=True

481

)

482

483

# Set up logging

484

logging.basicConfig(level=logging.INFO)

485

486

# Work with specific options

487

worker.work(

488

burst=False, # Keep running

489

max_jobs=100, # Stop after 100 jobs

490

logging_level='INFO',

491

dequeue_strategy='round_robin'

492

)

493

```

494

495

### Different Worker Types

496

497

```python

498

from rq import Worker, SimpleWorker, SpawnWorker, Queue

499

import redis

500

501

conn = redis.Redis()

502

q = Queue('worker_types', connection=conn)

503

504

def cpu_intensive_task(n):

505

# Simulate CPU intensive work

506

total = sum(i * i for i in range(n))

507

return total

508

509

# Add jobs

510

for i in range(5):

511

q.enqueue(cpu_intensive_task, 10000 * (i + 1))

512

513

# Standard worker (forks for each job)

514

standard_worker = Worker([q], connection=conn, name='standard')

515

516

# Simple worker (no forking, same process)

517

simple_worker = SimpleWorker([q], connection=conn, name='simple')

518

519

# Spawn worker (uses os.spawn instead of fork)

520

spawn_worker = SpawnWorker([q], connection=conn, name='spawn')

521

522

print("Worker types created:")

523

print(f"Standard: {standard_worker.name}")

524

print(f"Simple: {simple_worker.name}")

525

print(f"Spawn: {spawn_worker.name}")

526

527

# Use simple worker for this example (no forking)

528

simple_worker.work(burst=True)

529

```

530

531

### Worker Monitoring and Management

532

533

```python

534

from rq import Worker, Queue

535

import redis

536

import time

537

538

conn = redis.Redis()

539

q = Queue('monitoring', connection=conn)

540

541

# Add a long-running job

542

def long_running_job():

543

import time

544

for i in range(10):

545

time.sleep(1)

546

print(f"Working... step {i+1}/10")

547

return "Completed long job"

548

549

job = q.enqueue(long_running_job)

550

551

# Create worker

552

worker = Worker([q], connection=conn, name='monitored_worker')

553

554

# Monitor worker in separate process/thread

555

def monitor_worker():

556

while True:

557

worker.refresh() # Get latest data from Redis

558

print(f"Worker: {worker.name}")

559

print(f"Status: {'busy' if worker.current_job else 'idle'}")

560

print(f"Successful jobs: {worker.successful_job_count}")

561

print(f"Failed jobs: {worker.failed_job_count}")

562

print(f"Total working time: {worker.total_working_time:.2f}s")

563

564

if worker.current_job:

565

print(f"Current job: {worker.current_job.id}")

566

print(f"Job working time: {worker.current_job_working_time:.2f}s")

567

568

print("---")

569

time.sleep(2)

570

571

# Get all workers

572

all_workers = Worker.all(connection=conn)

573

print(f"Total workers: {len(all_workers)}")

574

575

for w in all_workers:

576

print(f"Worker {w.name}: {w.successful_job_count} successful jobs")

577

578

# Find specific worker

579

found_worker = Worker.find_by_key(worker.key, connection=conn)

580

if found_worker:

581

print(f"Found worker: {found_worker.name}")

582

```

583

584

### Worker Lifecycle Management

585

586

```python

587

from rq import Worker, Queue

588

import redis

589

import signal

590

import os

591

import time

592

593

conn = redis.Redis()

594

q = Queue('lifecycle', connection=conn)

595

596

def interruptible_job():

597

import time

598

for i in range(20):

599

time.sleep(1)

600

print(f"Job progress: {i+1}/20")

601

return "Job completed"

602

603

# Add job

604

job = q.enqueue(interruptible_job)

605

606

# Create worker

607

worker = Worker([q], connection=conn)

608

609

# Set up signal handlers for graceful shutdown

610

def signal_handler(signum, frame):

611

print(f"Received signal {signum}, requesting worker stop...")

612

worker.request_stop(signum, frame)

613

614

signal.signal(signal.SIGTERM, signal_handler)

615

signal.signal(signal.SIGINT, signal_handler)

616

617

try:

618

print(f"Starting worker {worker.name} (PID: {os.getpid()})")

619

print("Press Ctrl+C for graceful shutdown")

620

621

# Start worker with limits

622

worker.work(

623

burst=False,

624

max_idle_time=30, # Exit if idle for 30 seconds

625

logging_level='INFO'

626

)

627

628

except KeyboardInterrupt:

629

print("Worker stopped by user")

630

except Exception as e:

631

print(f"Worker error: {e}")

632

finally:

633

print("Worker shutdown complete")

634

```

635

636

### Batch Processing with Multiple Workers

637

638

```python

639

from rq import Worker, Queue

640

import redis

641

from multiprocessing import Process

642

import time

643

644

conn = redis.Redis()

645

646

# Create multiple queues for different priorities

647

high_q = Queue('high_priority', connection=conn)

648

normal_q = Queue('normal_priority', connection=conn)

649

low_q = Queue('low_priority', connection=conn)

650

651

def process_item(item_id, priority):

652

processing_time = {'high': 1, 'normal': 2, 'low': 3}

653

time.sleep(processing_time[priority])

654

return f"Processed item {item_id} with {priority} priority"

655

656

# Add jobs to different queues

657

for i in range(3):

658

high_q.enqueue(process_item, f"H{i}", 'high')

659

normal_q.enqueue(process_item, f"N{i}", 'normal')

660

low_q.enqueue(process_item, f"L{i}", 'low')

661

662

def start_worker(worker_name, queues):

663

"""Function to start a worker in a separate process."""

664

worker = Worker(queues, connection=conn, name=worker_name)

665

print(f"Starting worker {worker_name}")

666

worker.work(burst=True) # Process all available jobs then exit

667

print(f"Worker {worker_name} finished")

668

669

# Start multiple workers

670

processes = []

671

672

# High priority worker (only processes high priority queue)

673

p1 = Process(target=start_worker, args=('worker_high', [high_q]))

674

675

# General workers (process all queues in priority order)

676

p2 = Process(target=start_worker, args=('worker_general_1', [high_q, normal_q, low_q]))

677

p3 = Process(target=start_worker, args=('worker_general_2', [high_q, normal_q, low_q]))

678

679

processes = [p1, p2, p3]

680

681

# Start all workers

682

for p in processes:

683

p.start()

684

685

# Wait for completion

686

for p in processes:

687

p.join()

688

689

print("All workers completed")

690

691

# Check final queue states

692

print(f"High priority queue: {high_q.count} jobs remaining")

693

print(f"Normal priority queue: {normal_q.count} jobs remaining")

694

print(f"Low priority queue: {low_q.count} jobs remaining")

695

```