0
# Worker Management
1
2
Comprehensive worker process management for job execution with support for multiple queues, different execution strategies, monitoring, and flexible deployment options. RQ workers handle job lifecycle, error recovery, and provide robust distributed processing capabilities.
3
4
## Capabilities
5
6
### Worker Creation and Configuration
7
8
Create and configure workers with various execution strategies and options.
9
10
```python { .api }
11
class Worker:
12
def __init__(
13
self,
14
queues,
15
name: str = None,
16
default_result_ttl=500,
17
connection=None,
18
exc_handler=None,
19
exception_handlers=None,
20
maintenance_interval: int = 600,
21
default_worker_ttl: int = None,
22
worker_ttl: int = None,
23
job_class=None,
24
queue_class=None,
25
log_job_description: bool = True,
26
job_monitoring_interval=30,
27
disable_default_exception_handler: bool = False,
28
prepare_for_work: bool = True,
29
serializer=None,
30
work_horse_killed_handler=None
31
):
32
"""
33
Initialize a Worker instance.
34
35
Args:
36
queues: Queue instances or names to process.
37
name (str): Worker name. Auto-generated if None.
38
default_result_ttl (int): Default result TTL in seconds.
39
connection: Redis connection instance.
40
exc_handler: Legacy exception handler (deprecated).
41
exception_handlers (list): List of exception handler functions.
42
maintenance_interval (int): Maintenance task interval in seconds.
43
default_worker_ttl (int): Default worker TTL.
44
worker_ttl (int): Worker TTL in seconds.
45
job_class: Custom Job class.
46
queue_class: Custom Queue class.
47
log_job_description (bool): Log job descriptions.
48
job_monitoring_interval (int): Job monitoring interval in seconds.
49
disable_default_exception_handler (bool): Disable default exception handling.
50
prepare_for_work (bool): Prepare worker for work immediately.
51
serializer: Custom serializer.
52
work_horse_killed_handler: Handler for work horse termination.
53
"""
54
55
class SimpleWorker(Worker):
56
"""Worker that executes jobs in the same process/thread."""
57
58
def execute_job(self, job: 'Job', queue: 'Queue'):
59
"""
60
Execute job in the same process without forking.
61
62
Args:
63
job (Job): Job to execute.
64
queue (Queue): Queue the job came from.
65
"""
66
67
class SpawnWorker(Worker):
68
"""Worker that spawns child processes for job execution."""
69
70
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
71
"""
72
Spawn work horse process using os.spawn().
73
74
Args:
75
job (Job): Job to execute.
76
queue (Queue): Queue the job came from.
77
"""
78
```
79
80
### Worker Discovery and Management
81
82
Find and manage worker instances across the system.
83
84
```python { .api }
85
@classmethod
86
def all(
87
cls,
88
connection=None,
89
job_class=None,
90
queue_class=None,
91
queue: 'Queue' = None,
92
serializer=None
93
) -> list['Worker']:
94
"""
95
Get all workers.
96
97
Args:
98
connection: Redis connection.
99
job_class: Job class for deserialization.
100
queue_class: Queue class for deserialization.
101
queue (Queue): Filter by specific queue.
102
serializer: Custom serializer.
103
104
Returns:
105
list[Worker]: All worker instances.
106
"""
107
108
@classmethod
109
def all_keys(cls, connection=None, queue: 'Queue' = None) -> list[str]:
110
"""
111
Get all worker keys.
112
113
Args:
114
connection: Redis connection.
115
queue (Queue): Filter by specific queue.
116
117
Returns:
118
list[str]: Worker Redis keys.
119
"""
120
121
@classmethod
122
def count(cls, connection=None, queue: 'Queue' = None) -> int:
123
"""
124
Count active workers.
125
126
Args:
127
connection: Redis connection.
128
queue (Queue): Filter by specific queue.
129
130
Returns:
131
int: Number of active workers.
132
"""
133
134
@classmethod
135
def find_by_key(
136
cls,
137
worker_key: str,
138
connection,
139
job_class=None,
140
queue_class=None,
141
serializer=None
142
) -> 'Worker | None':
143
"""
144
Find worker by its Redis key.
145
146
Args:
147
worker_key (str): Worker Redis key.
148
connection: Redis connection.
149
job_class: Job class.
150
queue_class: Queue class.
151
serializer: Custom serializer.
152
153
Returns:
154
Worker | None: Worker instance or None if not found.
155
"""
156
```
157
158
### Main Work Loop
159
160
Core worker execution loop with comprehensive job processing capabilities.
161
162
```python { .api }
163
def work(
164
self,
165
burst: bool = False,
166
logging_level: str = None,
167
date_format: str = '%H:%M:%S',
168
log_format: str = '%(asctime)s %(message)s',
169
max_jobs: int = None,
170
max_idle_time: int = None,
171
with_scheduler: bool = False,
172
dequeue_strategy: 'DequeueStrategy' = 'default'
173
) -> bool:
174
"""
175
Main worker loop for processing jobs.
176
177
Args:
178
burst (bool): Exit after processing available jobs.
179
logging_level (str): Logging level ('DEBUG', 'INFO', etc.).
180
date_format (str): Log date format.
181
log_format (str): Log message format.
182
max_jobs (int): Maximum jobs to process before exiting.
183
max_idle_time (int): Maximum idle time before exiting.
184
with_scheduler (bool): Run scheduler in the same process.
185
dequeue_strategy (DequeueStrategy): Queue dequeuing strategy.
186
187
Returns:
188
bool: True if worker exited cleanly, False otherwise.
189
"""
190
191
def execute_job(self, job: 'Job', queue: 'Queue'):
192
"""
193
Execute a single job (abstract method implemented by subclasses).
194
195
Args:
196
job (Job): Job to execute.
197
queue (Queue): Queue the job came from.
198
"""
199
```
200
201
### Worker State and Monitoring
202
203
Monitor worker status, statistics, and health information.
204
205
```python { .api }
206
@property
207
def name(self) -> str:
208
"""Worker name/identifier."""
209
210
@property
211
def key(self) -> str:
212
"""Redis key for this worker."""
213
214
@property
215
def connection(self):
216
"""Redis connection instance."""
217
218
@property
219
def queues(self) -> list['Queue']:
220
"""List of queues this worker processes."""
221
222
@property
223
def version(self) -> str:
224
"""RQ version."""
225
226
@property
227
def python_version(self) -> str:
228
"""Python version string."""
229
230
@property
231
def hostname(self) -> str | None:
232
"""Worker hostname."""
233
234
@property
235
def ip_address(self) -> str:
236
"""Worker IP address."""
237
238
@property
239
def pid(self) -> int | None:
240
"""Worker process ID."""
241
242
@property
243
def birth_date(self) -> datetime | None:
244
"""When worker was created."""
245
246
@property
247
def last_heartbeat(self) -> datetime | None:
248
"""Last heartbeat timestamp."""
249
250
@property
251
def successful_job_count(self) -> int:
252
"""Number of successfully completed jobs."""
253
254
@property
255
def failed_job_count(self) -> int:
256
"""Number of failed jobs."""
257
258
@property
259
def total_working_time(self) -> float:
260
"""Total time spent working (seconds)."""
261
262
@property
263
def current_job_working_time(self) -> float:
264
"""Time spent on current job (seconds)."""
265
266
def refresh(self):
267
"""Refresh worker data from Redis."""
268
269
def queue_names(self) -> list[str]:
270
"""
271
Get queue names this worker processes.
272
273
Returns:
274
list[str]: Queue names.
275
"""
276
277
def queue_keys(self) -> list[str]:
278
"""
279
Get queue Redis keys this worker processes.
280
281
Returns:
282
list[str]: Queue keys.
283
"""
284
```
285
286
### Worker Lifecycle Control
287
288
Control worker lifecycle with graceful shutdown and signal handling.
289
290
```python { .api }
291
def request_stop(self, signum=None, frame=None):
292
"""
293
Request graceful worker shutdown.
294
295
Args:
296
signum: Signal number (for signal handlers).
297
frame: Frame object (for signal handlers).
298
"""
299
300
def request_force_stop(self, signum: int, frame=None):
301
"""
302
Request immediate worker shutdown (abstract method).
303
304
Args:
305
signum (int): Signal number.
306
frame: Frame object.
307
"""
308
309
def kill_horse(self, sig=15):
310
"""
311
Kill the work horse process (for Worker class).
312
313
Args:
314
sig (int): Signal to send to work horse.
315
"""
316
317
def wait_for_horse(self) -> tuple[int | None, int | None, Any]:
318
"""
319
Wait for work horse process to complete (for Worker class).
320
321
Returns:
322
tuple: (exit_code, signal, resource_usage).
323
"""
324
```
325
326
### Worker Maintenance and Health
327
328
Maintain worker health with registry cleanup and monitoring.
329
330
```python { .api }
331
def clean_registries(self):
332
"""Clean job registries of expired entries."""
333
334
def validate_queues(self):
335
"""Validate that all queues are valid Queue instances."""
336
337
def get_redis_server_version(self) -> tuple[int, int, int]:
338
"""
339
Get Redis server version.
340
341
Returns:
342
tuple[int, int, int]: (major, minor, patch) version.
343
"""
344
345
@property
346
def should_run_maintenance_tasks(self) -> bool:
347
"""True if it's time to run maintenance tasks."""
348
349
@property
350
def dequeue_timeout(self) -> int:
351
"""Timeout for dequeue operations."""
352
353
@property
354
def connection_timeout(self) -> int:
355
"""Redis connection timeout."""
356
```
357
358
### Work Horse Management
359
360
Advanced work horse process management for fork-based execution.
361
362
```python { .api }
363
# Worker class specific methods
364
365
@property
366
def is_horse(self) -> bool:
367
"""True if this is the work horse process."""
368
369
@property
370
def horse_pid(self) -> int:
371
"""Work horse process ID."""
372
373
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
374
"""
375
Fork a work horse process to execute the job.
376
377
Args:
378
job (Job): Job to execute.
379
queue (Queue): Queue the job came from.
380
"""
381
382
def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
383
"""
384
Monitor work horse process execution.
385
386
Args:
387
job (Job): Job being executed.
388
queue (Queue): Queue the job came from.
389
"""
390
391
def get_heartbeat_ttl(self, job: 'Job') -> int:
392
"""
393
Get heartbeat TTL for job monitoring.
394
395
Args:
396
job (Job): Job being executed.
397
398
Returns:
399
int: Heartbeat TTL in seconds.
400
"""
401
```
402
403
### Dequeue Strategies
404
405
Configure how workers dequeue jobs from multiple queues.
406
407
```python { .api }
408
from enum import Enum
409
410
class DequeueStrategy(str, Enum):
411
DEFAULT = 'default' # Process queues in order
412
ROUND_ROBIN = 'round_robin' # Rotate between queues
413
RANDOM = 'random' # Random queue selection
414
415
class WorkerStatus(str, Enum):
416
STARTED = 'started'
417
SUSPENDED = 'suspended'
418
BUSY = 'busy'
419
IDLE = 'idle'
420
```
421
422
## Usage Examples
423
424
### Basic Worker Usage
425
426
```python
427
import redis
428
from rq import Queue, Worker
429
430
# Connect to Redis
431
conn = redis.Redis()
432
433
# Create queues
434
high_priority = Queue('high', connection=conn)
435
normal_priority = Queue('normal', connection=conn)
436
437
# Create worker for multiple queues
438
worker = Worker([high_priority, normal_priority], connection=conn)
439
440
# Add some jobs
441
def process_data(data):
442
import time
443
time.sleep(2)
444
return f"Processed: {data}"
445
446
high_priority.enqueue(process_data, "urgent_data")
447
normal_priority.enqueue(process_data, "regular_data")
448
449
print(f"Worker: {worker.name}")
450
print(f"Processing queues: {worker.queue_names()}")
451
452
# Start processing (this blocks)
453
worker.work()
454
```
455
456
### Worker with Configuration
457
458
```python
459
from rq import Worker, Queue
460
import redis
461
import logging
462
463
conn = redis.Redis()
464
q = Queue('configured_worker', connection=conn)
465
466
# Custom exception handler
467
def handle_failed_job(job, exc_type, exc_value, traceback):
468
print(f"Job {job.id} failed: {exc_value}")
469
# Could send alerts, log to external service, etc.
470
471
# Create configured worker
472
worker = Worker(
473
[q],
474
connection=conn,
475
name='custom_worker_001',
476
exception_handlers=[handle_failed_job],
477
default_result_ttl=3600, # Keep results for 1 hour
478
job_monitoring_interval=15, # Check job progress every 15s
479
maintenance_interval=300, # Run maintenance every 5 minutes
480
log_job_description=True
481
)
482
483
# Set up logging
484
logging.basicConfig(level=logging.INFO)
485
486
# Work with specific options
487
worker.work(
488
burst=False, # Keep running
489
max_jobs=100, # Stop after 100 jobs
490
logging_level='INFO',
491
dequeue_strategy='round_robin'
492
)
493
```
494
495
### Different Worker Types
496
497
```python
498
from rq import Worker, SimpleWorker, SpawnWorker, Queue
499
import redis
500
501
conn = redis.Redis()
502
q = Queue('worker_types', connection=conn)
503
504
def cpu_intensive_task(n):
505
# Simulate CPU intensive work
506
total = sum(i * i for i in range(n))
507
return total
508
509
# Add jobs
510
for i in range(5):
511
q.enqueue(cpu_intensive_task, 10000 * (i + 1))
512
513
# Standard worker (forks for each job)
514
standard_worker = Worker([q], connection=conn, name='standard')
515
516
# Simple worker (no forking, same process)
517
simple_worker = SimpleWorker([q], connection=conn, name='simple')
518
519
# Spawn worker (uses os.spawn instead of fork)
520
spawn_worker = SpawnWorker([q], connection=conn, name='spawn')
521
522
print("Worker types created:")
523
print(f"Standard: {standard_worker.name}")
524
print(f"Simple: {simple_worker.name}")
525
print(f"Spawn: {spawn_worker.name}")
526
527
# Use simple worker for this example (no forking)
528
simple_worker.work(burst=True)
529
```
530
531
### Worker Monitoring and Management
532
533
```python
534
from rq import Worker, Queue
535
import redis
536
import time
537
538
conn = redis.Redis()
539
q = Queue('monitoring', connection=conn)
540
541
# Add a long-running job
542
def long_running_job():
543
import time
544
for i in range(10):
545
time.sleep(1)
546
print(f"Working... step {i+1}/10")
547
return "Completed long job"
548
549
job = q.enqueue(long_running_job)
550
551
# Create worker
552
worker = Worker([q], connection=conn, name='monitored_worker')
553
554
# Monitor worker in separate process/thread
555
def monitor_worker():
556
while True:
557
worker.refresh() # Get latest data from Redis
558
print(f"Worker: {worker.name}")
559
print(f"Status: {'busy' if worker.current_job else 'idle'}")
560
print(f"Successful jobs: {worker.successful_job_count}")
561
print(f"Failed jobs: {worker.failed_job_count}")
562
print(f"Total working time: {worker.total_working_time:.2f}s")
563
564
if worker.current_job:
565
print(f"Current job: {worker.current_job.id}")
566
print(f"Job working time: {worker.current_job_working_time:.2f}s")
567
568
print("---")
569
time.sleep(2)
570
571
# Get all workers
572
all_workers = Worker.all(connection=conn)
573
print(f"Total workers: {len(all_workers)}")
574
575
for w in all_workers:
576
print(f"Worker {w.name}: {w.successful_job_count} successful jobs")
577
578
# Find specific worker
579
found_worker = Worker.find_by_key(worker.key, connection=conn)
580
if found_worker:
581
print(f"Found worker: {found_worker.name}")
582
```
583
584
### Worker Lifecycle Management
585
586
```python
587
from rq import Worker, Queue
588
import redis
589
import signal
590
import os
591
import time
592
593
conn = redis.Redis()
594
q = Queue('lifecycle', connection=conn)
595
596
def interruptible_job():
597
import time
598
for i in range(20):
599
time.sleep(1)
600
print(f"Job progress: {i+1}/20")
601
return "Job completed"
602
603
# Add job
604
job = q.enqueue(interruptible_job)
605
606
# Create worker
607
worker = Worker([q], connection=conn)
608
609
# Set up signal handlers for graceful shutdown
610
def signal_handler(signum, frame):
611
print(f"Received signal {signum}, requesting worker stop...")
612
worker.request_stop(signum, frame)
613
614
signal.signal(signal.SIGTERM, signal_handler)
615
signal.signal(signal.SIGINT, signal_handler)
616
617
try:
618
print(f"Starting worker {worker.name} (PID: {os.getpid()})")
619
print("Press Ctrl+C for graceful shutdown")
620
621
# Start worker with limits
622
worker.work(
623
burst=False,
624
max_idle_time=30, # Exit if idle for 30 seconds
625
logging_level='INFO'
626
)
627
628
except KeyboardInterrupt:
629
print("Worker stopped by user")
630
except Exception as e:
631
print(f"Worker error: {e}")
632
finally:
633
print("Worker shutdown complete")
634
```
635
636
### Batch Processing with Multiple Workers
637
638
```python
639
from rq import Worker, Queue
640
import redis
641
from multiprocessing import Process
642
import time
643
644
conn = redis.Redis()
645
646
# Create multiple queues for different priorities
647
high_q = Queue('high_priority', connection=conn)
648
normal_q = Queue('normal_priority', connection=conn)
649
low_q = Queue('low_priority', connection=conn)
650
651
def process_item(item_id, priority):
652
processing_time = {'high': 1, 'normal': 2, 'low': 3}
653
time.sleep(processing_time[priority])
654
return f"Processed item {item_id} with {priority} priority"
655
656
# Add jobs to different queues
657
for i in range(3):
658
high_q.enqueue(process_item, f"H{i}", 'high')
659
normal_q.enqueue(process_item, f"N{i}", 'normal')
660
low_q.enqueue(process_item, f"L{i}", 'low')
661
662
def start_worker(worker_name, queues):
663
"""Function to start a worker in a separate process."""
664
worker = Worker(queues, connection=conn, name=worker_name)
665
print(f"Starting worker {worker_name}")
666
worker.work(burst=True) # Process all available jobs then exit
667
print(f"Worker {worker_name} finished")
668
669
# Start multiple workers
670
processes = []
671
672
# High priority worker (only processes high priority queue)
673
p1 = Process(target=start_worker, args=('worker_high', [high_q]))
674
675
# General workers (process all queues in priority order)
676
p2 = Process(target=start_worker, args=('worker_general_1', [high_q, normal_q, low_q]))
677
p3 = Process(target=start_worker, args=('worker_general_2', [high_q, normal_q, low_q]))
678
679
processes = [p1, p2, p3]
680
681
# Start all workers
682
for p in processes:
683
p.start()
684
685
# Wait for completion
686
for p in processes:
687
p.join()
688
689
print("All workers completed")
690
691
# Check final queue states
692
print(f"High priority queue: {high_q.count} jobs remaining")
693
print(f"Normal priority queue: {normal_q.count} jobs remaining")
694
print(f"Low priority queue: {low_q.count} jobs remaining")
695
```