0
# Workers
1
2
Workers in Dramatiq are the components that consume messages from brokers and execute the corresponding actor functions. They handle the runtime execution environment, provide thread management, implement graceful shutdown mechanisms, and integrate with the middleware system for comprehensive task processing.
3
4
## Capabilities
5
6
### Worker Class
7
8
The core worker implementation that processes messages from brokers.
9
10
```python { .api }
11
class Worker:
12
def __init__(
13
self,
14
broker: Broker,
15
*,
16
queues: List[str] = None,
17
worker_timeout: int = 1000,
18
worker_threads: int = 8
19
):
20
"""
21
Create a worker instance.
22
23
Parameters:
24
- broker: Broker instance to consume messages from
25
- queues: List of queue names to process (processes all if None)
26
- worker_timeout: Timeout for message consumption in milliseconds
27
- worker_threads: Number of worker threads for parallel processing
28
"""
29
30
def start(self):
31
"""
32
Start the worker to begin processing messages.
33
34
This method starts the worker threads and begins consuming
35
messages from the broker queues.
36
"""
37
38
def stop(self):
39
"""
40
Stop the worker gracefully.
41
42
Signals the worker to stop processing new messages and
43
wait for current messages to complete.
44
"""
45
46
def join(self):
47
"""
48
Wait for the worker to finish processing and shut down.
49
50
This method blocks until all worker threads have completed
51
their current tasks and shut down.
52
"""
53
54
# Properties
55
broker: Broker # Associated broker
56
queues: List[str] # Queues being processed
57
worker_timeout: int # Message consumption timeout
58
worker_threads: int # Number of worker threads
59
```
60
61
**Usage:**
62
63
```python
64
import dramatiq
65
from dramatiq.brokers.redis import RedisBroker
66
67
# Set up broker and actors
68
broker = RedisBroker()
69
dramatiq.set_broker(broker)
70
71
@dramatiq.actor
72
def example_task(data):
73
print(f"Processing: {data}")
74
return f"Processed: {data}"
75
76
# Create and start worker
77
worker = dramatiq.Worker(
78
broker,
79
worker_threads=4, # 4 concurrent threads
80
worker_timeout=5000 # 5 second timeout
81
)
82
83
# Start processing
84
worker.start()
85
86
# Send some tasks
87
for i in range(10):
88
example_task.send(f"task_{i}")
89
90
# Let it process for a while
91
import time
92
time.sleep(10)
93
94
# Graceful shutdown
95
worker.stop()
96
worker.join()
97
```
98
99
### Command Line Interface
100
101
Dramatiq provides a CLI for running workers in production environments.
102
103
**Basic CLI Usage:**
104
105
```bash
106
# Run worker for specific module
107
dramatiq my_module
108
109
# Run worker with specific settings
110
dramatiq my_module --processes 4 --threads 8
111
112
# Run worker for specific queues
113
dramatiq my_module --queues high_priority,normal
114
115
# Run worker with custom broker URL
116
dramatiq my_module --broker-url redis://localhost:6379/0
117
118
# Run worker with verbose logging
119
dramatiq my_module --verbose
120
121
# Watch for code changes and reload
122
dramatiq my_module --watch /path/to/code
123
```
124
125
**CLI Options:**
126
127
```python { .api }
128
# Common CLI options
129
CLI_OPTIONS = {
130
"--processes": int, # Number of worker processes
131
"--threads": int, # Number of threads per process
132
"--path": str, # Add path to Python path
133
"--queues": str, # Comma-separated queue names
134
"--pid-file": str, # PID file path
135
"--broker-url": str, # Broker connection URL
136
"--verbose": bool, # Verbose logging
137
"--watch": str, # Watch directory for changes
138
"--reload": bool, # Auto-reload on changes
139
}
140
```
141
142
### Advanced Worker Configuration
143
144
#### Multi-Queue Processing
145
146
```python
147
# Worker processing specific queues with different priorities
148
high_priority_worker = dramatiq.Worker(
149
broker,
150
queues=["critical", "high_priority"],
151
worker_threads=6,
152
worker_timeout=2000
153
)
154
155
normal_worker = dramatiq.Worker(
156
broker,
157
queues=["normal", "low_priority"],
158
worker_threads=4,
159
worker_timeout=10000
160
)
161
162
# Start both workers
163
high_priority_worker.start()
164
normal_worker.start()
165
166
# Define actors with different queue assignments
167
@dramatiq.actor(queue_name="critical", priority=0)
168
def critical_task(data):
169
return handle_critical_operation(data)
170
171
@dramatiq.actor(queue_name="normal", priority=5)
172
def normal_task(data):
173
return handle_normal_operation(data)
174
```
175
176
#### Worker with Custom Middleware
177
178
```python
179
from dramatiq.middleware import Middleware
180
181
class CustomWorkerMiddleware(Middleware):
182
def before_worker_boot(self, broker, worker):
183
print(f"Worker starting with {worker.worker_threads} threads")
184
# Initialize worker-specific resources
185
worker.custom_resource = initialize_worker_resource()
186
187
def after_worker_boot(self, broker, worker):
188
print("Worker fully initialized and ready")
189
190
def before_worker_shutdown(self, broker, worker):
191
print("Worker shutting down gracefully")
192
# Cleanup worker-specific resources
193
cleanup_worker_resource(worker.custom_resource)
194
195
def after_worker_shutdown(self, broker, worker):
196
print("Worker shutdown complete")
197
198
# Add custom middleware
199
broker.add_middleware(CustomWorkerMiddleware())
200
201
worker = dramatiq.Worker(broker)
202
worker.start()
203
```
204
205
#### Production Worker Configuration
206
207
```python
208
import os
209
import signal
210
import sys
211
212
def create_production_worker():
213
"""Create worker with production-ready configuration"""
214
215
# Get configuration from environment
216
worker_threads = int(os.getenv("DRAMATIQ_THREADS", "8"))
217
worker_timeout = int(os.getenv("DRAMATIQ_TIMEOUT", "1000"))
218
queues = os.getenv("DRAMATIQ_QUEUES", "").split(",") if os.getenv("DRAMATIQ_QUEUES") else None
219
220
# Create worker
221
worker = dramatiq.Worker(
222
broker,
223
queues=queues,
224
worker_threads=worker_threads,
225
worker_timeout=worker_timeout
226
)
227
228
# Set up signal handlers for graceful shutdown
229
def signal_handler(signum, frame):
230
print(f"Received signal {signum}, shutting down...")
231
worker.stop()
232
sys.exit(0)
233
234
signal.signal(signal.SIGINT, signal_handler)
235
signal.signal(signal.SIGTERM, signal_handler)
236
237
return worker
238
239
# Usage
240
if __name__ == "__main__":
241
worker = create_production_worker()
242
print("Starting production worker...")
243
worker.start()
244
worker.join()
245
```
246
247
### Worker Lifecycle and Monitoring
248
249
#### Worker State Monitoring
250
251
```python
252
import threading
253
import time
254
255
def monitor_worker(worker, check_interval=5):
256
"""Monitor worker health and performance"""
257
258
def monitoring_loop():
259
while worker.is_running: # Hypothetical property
260
# Collect worker metrics
261
stats = {
262
"threads": worker.worker_threads,
263
"queues": worker.queues,
264
"processed_messages": getattr(worker, 'processed_count', 0),
265
"failed_messages": getattr(worker, 'failed_count', 0),
266
"uptime": time.time() - worker.start_time
267
}
268
269
print(f"Worker stats: {stats}")
270
271
# Check worker health
272
if stats["failed_messages"] > 100:
273
print("WARNING: High failure rate detected")
274
275
time.sleep(check_interval)
276
277
# Start monitoring in separate thread
278
monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)
279
monitor_thread.start()
280
281
return monitor_thread
282
283
# Usage with monitoring
284
worker = dramatiq.Worker(broker)
285
worker.start_time = time.time()
286
monitor_thread = monitor_worker(worker)
287
288
worker.start()
289
```
290
291
#### Graceful Shutdown Handling
292
293
```python
294
import atexit
295
import signal
296
297
class GracefulWorker:
298
def __init__(self, broker, **kwargs):
299
self.worker = dramatiq.Worker(broker, **kwargs)
300
self.shutdown_event = threading.Event()
301
self.setup_shutdown_handlers()
302
303
def setup_shutdown_handlers(self):
304
"""Set up handlers for graceful shutdown"""
305
306
def shutdown_handler(signum=None, frame=None):
307
print(f"Shutdown signal received: {signum}")
308
self.shutdown_event.set()
309
self.worker.stop()
310
311
# Register signal handlers
312
signal.signal(signal.SIGINT, shutdown_handler)
313
signal.signal(signal.SIGTERM, shutdown_handler)
314
315
# Register exit handler
316
atexit.register(shutdown_handler)
317
318
def start(self):
319
"""Start worker with shutdown monitoring"""
320
self.worker.start()
321
322
# Monitor for shutdown signal
323
try:
324
while not self.shutdown_event.is_set():
325
time.sleep(1)
326
except KeyboardInterrupt:
327
pass
328
finally:
329
print("Initiating graceful shutdown...")
330
self.worker.stop()
331
self.worker.join()
332
print("Worker shutdown complete")
333
334
# Usage
335
graceful_worker = GracefulWorker(broker, worker_threads=6)
336
graceful_worker.start()
337
```
338
339
### Worker Performance Optimization
340
341
#### Thread Pool Tuning
342
343
```python
344
import psutil
345
346
def calculate_optimal_threads():
347
"""Calculate optimal thread count based on system resources"""
348
cpu_count = psutil.cpu_count()
349
memory_gb = psutil.virtual_memory().total / (1024**3)
350
351
# I/O bound tasks: more threads than CPU cores
352
# CPU bound tasks: threads ≈ CPU cores
353
354
if memory_gb > 8:
355
# High memory system: can handle more threads
356
optimal_threads = min(cpu_count * 2, 16)
357
else:
358
# Limited memory: conservative thread count
359
optimal_threads = max(cpu_count, 4)
360
361
return optimal_threads
362
363
# Create optimized worker
364
optimal_threads = calculate_optimal_threads()
365
optimized_worker = dramatiq.Worker(
366
broker,
367
worker_threads=optimal_threads,
368
worker_timeout=5000
369
)
370
371
print(f"Using {optimal_threads} worker threads")
372
optimized_worker.start()
373
```
374
375
#### Memory Management
376
377
```python
378
import gc
379
import psutil
380
import threading
381
382
class MemoryManagedWorker:
383
def __init__(self, broker, memory_limit_mb=1000, **kwargs):
384
self.worker = dramatiq.Worker(broker, **kwargs)
385
self.memory_limit = memory_limit_mb * 1024 * 1024 # Convert to bytes
386
self.monitoring = True
387
388
def start_memory_monitoring(self):
389
"""Monitor memory usage and trigger garbage collection"""
390
391
def memory_monitor():
392
while self.monitoring:
393
process = psutil.Process()
394
memory_usage = process.memory_info().rss
395
396
if memory_usage > self.memory_limit:
397
print(f"Memory limit exceeded: {memory_usage / 1024 / 1024:.1f}MB")
398
print("Triggering garbage collection...")
399
gc.collect()
400
401
# Check again after GC
402
new_usage = psutil.Process().memory_info().rss
403
print(f"Memory after GC: {new_usage / 1024 / 1024:.1f}MB")
404
405
time.sleep(30) # Check every 30 seconds
406
407
monitor_thread = threading.Thread(target=memory_monitor, daemon=True)
408
monitor_thread.start()
409
return monitor_thread
410
411
def start(self):
412
self.start_memory_monitoring()
413
self.worker.start()
414
415
def stop(self):
416
self.monitoring = False
417
self.worker.stop()
418
419
def join(self):
420
self.worker.join()
421
422
# Usage
423
memory_worker = MemoryManagedWorker(
424
broker,
425
memory_limit_mb=500,
426
worker_threads=4
427
)
428
memory_worker.start()
429
```
430
431
### Multi-Process Worker Setup
432
433
#### Process Pool Worker
434
435
```python
436
import multiprocessing
437
import os
438
439
def worker_process(broker_config, queues, worker_id):
440
"""Worker process function"""
441
442
# Set process title for monitoring
443
try:
444
import setproctitle
445
setproctitle.setproctitle(f"dramatiq-worker-{worker_id}")
446
except ImportError:
447
pass
448
449
# Initialize broker in each process
450
if broker_config["type"] == "redis":
451
from dramatiq.brokers.redis import RedisBroker
452
broker = RedisBroker(**broker_config["params"])
453
else:
454
raise ValueError(f"Unsupported broker type: {broker_config['type']}")
455
456
dramatiq.set_broker(broker)
457
458
# Create and start worker
459
worker = dramatiq.Worker(
460
broker,
461
queues=queues,
462
worker_threads=2 # Fewer threads per process
463
)
464
465
print(f"Worker process {worker_id} starting...")
466
worker.start()
467
worker.join()
468
469
def start_worker_pool(num_processes=4):
470
"""Start multiple worker processes"""
471
472
broker_config = {
473
"type": "redis",
474
"params": {"host": "localhost", "port": 6379, "db": 0}
475
}
476
477
queues = ["high_priority", "normal", "low_priority"]
478
479
processes = []
480
481
for i in range(num_processes):
482
process = multiprocessing.Process(
483
target=worker_process,
484
args=(broker_config, queues, i)
485
)
486
process.start()
487
processes.append(process)
488
print(f"Started worker process {i} (PID: {process.pid})")
489
490
try:
491
# Wait for all processes
492
for process in processes:
493
process.join()
494
except KeyboardInterrupt:
495
print("Shutting down worker processes...")
496
for process in processes:
497
process.terminate()
498
499
for process in processes:
500
process.join()
501
502
# Usage
503
if __name__ == "__main__":
504
start_worker_pool(num_processes=4)
505
```
506
507
### Docker and Container Deployment
508
509
#### Dockerfile for Worker
510
511
```dockerfile
512
# Example Dockerfile for dramatiq worker
513
FROM python:3.11-slim
514
515
WORKDIR /app
516
517
# Install dependencies
518
COPY requirements.txt .
519
RUN pip install -r requirements.txt
520
521
# Copy application code
522
COPY . .
523
524
# Set environment variables
525
ENV DRAMATIQ_THREADS=8
526
ENV DRAMATIQ_TIMEOUT=10000
527
ENV DRAMATIQ_QUEUES=default,high_priority
528
529
# Health check
530
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
531
CMD python -c "import dramatiq; print('Worker healthy')" || exit 1
532
533
# Run worker
534
CMD ["python", "-m", "dramatiq", "my_app.tasks"]
535
```
536
537
#### Docker Compose for Scaling
538
539
```yaml
540
# docker-compose.yml
541
version: '3.8'
542
543
services:
544
redis:
545
image: redis:7-alpine
546
ports:
547
- "6379:6379"
548
549
worker-high-priority:
550
build: .
551
environment:
552
- DRAMATIQ_THREADS=4
553
- DRAMATIQ_QUEUES=critical,high_priority
554
- REDIS_URL=redis://redis:6379/0
555
depends_on:
556
- redis
557
scale: 2
558
559
worker-normal:
560
build: .
561
environment:
562
- DRAMATIQ_THREADS=6
563
- DRAMATIQ_QUEUES=normal,low_priority
564
- REDIS_URL=redis://redis:6379/0
565
depends_on:
566
- redis
567
scale: 4
568
```
569
570
### Worker Debugging and Troubleshooting
571
572
#### Debug Worker
573
574
```python
575
import logging
576
import traceback
577
578
class DebugWorker:
579
def __init__(self, broker, **kwargs):
580
# Enable debug logging
581
logging.basicConfig(level=logging.DEBUG)
582
logger = logging.getLogger("dramatiq")
583
logger.setLevel(logging.DEBUG)
584
585
self.worker = dramatiq.Worker(broker, **kwargs)
586
self.message_count = 0
587
self.error_count = 0
588
589
def create_debug_middleware(self):
590
"""Create middleware for debugging"""
591
592
class DebugMiddleware(dramatiq.Middleware):
593
def __init__(self, debug_worker):
594
self.debug_worker = debug_worker
595
596
def before_process_message(self, broker, message):
597
self.debug_worker.message_count += 1
598
print(f"[DEBUG] Processing message {self.debug_worker.message_count}: {message.actor_name}")
599
print(f"[DEBUG] Message args: {message.args}")
600
print(f"[DEBUG] Message kwargs: {message.kwargs}")
601
602
def after_process_message(self, broker, message, *, result=None, exception=None):
603
if exception:
604
self.debug_worker.error_count += 1
605
print(f"[ERROR] Message failed: {exception}")
606
print(f"[ERROR] Traceback:")
607
traceback.print_exc()
608
else:
609
print(f"[DEBUG] Message completed successfully: {result}")
610
611
return DebugMiddleware(self)
612
613
def start(self):
614
# Add debug middleware
615
debug_middleware = self.create_debug_middleware()
616
self.worker.broker.add_middleware(debug_middleware)
617
618
print(f"[DEBUG] Starting worker with {self.worker.worker_threads} threads")
619
self.worker.start()
620
621
def print_stats(self):
622
print(f"[STATS] Messages processed: {self.message_count}")
623
print(f"[STATS] Errors: {self.error_count}")
624
if self.message_count > 0:
625
error_rate = (self.error_count / self.message_count) * 100
626
print(f"[STATS] Error rate: {error_rate:.1f}%")
627
628
# Usage for debugging
629
debug_worker = DebugWorker(broker, worker_threads=2)
630
debug_worker.start()
631
632
# Print stats periodically
633
import threading
634
def stats_printer():
635
while True:
636
time.sleep(30)
637
debug_worker.print_stats()
638
639
stats_thread = threading.Thread(target=stats_printer, daemon=True)
640
stats_thread.start()
641
```
642
643
### Gevent Integration
644
645
For high-concurrency scenarios, Dramatiq supports gevent:
646
647
```python
648
# Install: pip install dramatiq[gevent]
649
650
# Use gevent launcher script
651
# dramatiq-gevent my_module
652
653
# Or programmatically with gevent
654
import gevent
655
from gevent import monkey
656
monkey.patch_all()
657
658
import dramatiq
659
from dramatiq.brokers.redis import RedisBroker
660
661
@dramatiq.actor
662
def io_bound_task(url):
663
"""I/O bound task that benefits from gevent"""
664
import requests
665
response = requests.get(url)
666
return {"url": url, "status": response.status_code}
667
668
# Gevent-compatible worker setup
669
broker = RedisBroker()
670
dramatiq.set_broker(broker)
671
672
# Worker will use gevent for concurrency
673
worker = dramatiq.Worker(broker, worker_threads=100) # Many lightweight threads
674
worker.start()
675
```
676
677
This comprehensive worker documentation covers all aspects of running and managing Dramatiq workers, from basic usage to advanced production deployments with monitoring, optimization, and debugging capabilities.