Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
Workers in Dramatiq are the components that consume messages from brokers and execute the corresponding actor functions. They handle the runtime execution environment, provide thread management, implement graceful shutdown mechanisms, and integrate with the middleware system for comprehensive task processing.
The core worker implementation that processes messages from brokers.
class Worker:
def __init__(
self,
broker: Broker,
*,
queues: List[str] = None,
worker_timeout: int = 1000,
worker_threads: int = 8
):
"""
Create a worker instance.
Parameters:
- broker: Broker instance to consume messages from
- queues: List of queue names to process (processes all if None)
- worker_timeout: Timeout for message consumption in milliseconds
- worker_threads: Number of worker threads for parallel processing
"""
def start(self):
"""
Start the worker to begin processing messages.
This method starts the worker threads and begins consuming
messages from the broker queues.
"""
def stop(self):
"""
Stop the worker gracefully.
Signals the worker to stop processing new messages and
wait for current messages to complete.
"""
def join(self):
"""
Wait for the worker to finish processing and shut down.
This method blocks until all worker threads have completed
their current tasks and shut down.
"""
# Properties
broker: Broker # Associated broker
queues: List[str] # Queues being processed
worker_timeout: int # Message consumption timeout
worker_threads: int # Number of worker threadsUsage:
import dramatiq
from dramatiq.brokers.redis import RedisBroker
# Set up broker and actors
broker = RedisBroker()
dramatiq.set_broker(broker)
@dramatiq.actor
def example_task(data):
print(f"Processing: {data}")
return f"Processed: {data}"
# Create and start worker
worker = dramatiq.Worker(
broker,
worker_threads=4, # 4 concurrent threads
worker_timeout=5000 # 5 second timeout
)
# Start processing
worker.start()
# Send some tasks
for i in range(10):
example_task.send(f"task_{i}")
# Let it process for a while
import time
time.sleep(10)
# Graceful shutdown
worker.stop()
worker.join()Dramatiq provides a CLI for running workers in production environments.
Basic CLI Usage:
# Run worker for specific module
dramatiq my_module
# Run worker with specific settings
dramatiq my_module --processes 4 --threads 8
# Run worker for specific queues
dramatiq my_module --queues high_priority,normal
# Run worker with custom broker URL
dramatiq my_module --broker-url redis://localhost:6379/0
# Run worker with verbose logging
dramatiq my_module --verbose
# Watch for code changes and reload
dramatiq my_module --watch /path/to/codeCLI Options:
# Common CLI options
CLI_OPTIONS = {
"--processes": int, # Number of worker processes
"--threads": int, # Number of threads per process
"--path": str, # Add path to Python path
"--queues": str, # Comma-separated queue names
"--pid-file": str, # PID file path
"--broker-url": str, # Broker connection URL
"--verbose": bool, # Verbose logging
"--watch": str, # Watch directory for changes
"--reload": bool, # Auto-reload on changes
}# Worker processing specific queues with different priorities
high_priority_worker = dramatiq.Worker(
broker,
queues=["critical", "high_priority"],
worker_threads=6,
worker_timeout=2000
)
normal_worker = dramatiq.Worker(
broker,
queues=["normal", "low_priority"],
worker_threads=4,
worker_timeout=10000
)
# Start both workers
high_priority_worker.start()
normal_worker.start()
# Define actors with different queue assignments
@dramatiq.actor(queue_name="critical", priority=0)
def critical_task(data):
return handle_critical_operation(data)
@dramatiq.actor(queue_name="normal", priority=5)
def normal_task(data):
return handle_normal_operation(data)from dramatiq.middleware import Middleware
class CustomWorkerMiddleware(Middleware):
def before_worker_boot(self, broker, worker):
print(f"Worker starting with {worker.worker_threads} threads")
# Initialize worker-specific resources
worker.custom_resource = initialize_worker_resource()
def after_worker_boot(self, broker, worker):
print("Worker fully initialized and ready")
def before_worker_shutdown(self, broker, worker):
print("Worker shutting down gracefully")
# Cleanup worker-specific resources
cleanup_worker_resource(worker.custom_resource)
def after_worker_shutdown(self, broker, worker):
print("Worker shutdown complete")
# Add custom middleware
broker.add_middleware(CustomWorkerMiddleware())
worker = dramatiq.Worker(broker)
worker.start()import os
import signal
import sys
def create_production_worker():
"""Create worker with production-ready configuration"""
# Get configuration from environment
worker_threads = int(os.getenv("DRAMATIQ_THREADS", "8"))
worker_timeout = int(os.getenv("DRAMATIQ_TIMEOUT", "1000"))
queues = os.getenv("DRAMATIQ_QUEUES", "").split(",") if os.getenv("DRAMATIQ_QUEUES") else None
# Create worker
worker = dramatiq.Worker(
broker,
queues=queues,
worker_threads=worker_threads,
worker_timeout=worker_timeout
)
# Set up signal handlers for graceful shutdown
def signal_handler(signum, frame):
print(f"Received signal {signum}, shutting down...")
worker.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
return worker
# Usage
if __name__ == "__main__":
worker = create_production_worker()
print("Starting production worker...")
worker.start()
worker.join()import threading
import time
def monitor_worker(worker, check_interval=5):
"""Monitor worker health and performance"""
def monitoring_loop():
while worker.is_running: # Hypothetical property
# Collect worker metrics
stats = {
"threads": worker.worker_threads,
"queues": worker.queues,
"processed_messages": getattr(worker, 'processed_count', 0),
"failed_messages": getattr(worker, 'failed_count', 0),
"uptime": time.time() - worker.start_time
}
print(f"Worker stats: {stats}")
# Check worker health
if stats["failed_messages"] > 100:
print("WARNING: High failure rate detected")
time.sleep(check_interval)
# Start monitoring in separate thread
monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)
monitor_thread.start()
return monitor_thread
# Usage with monitoring
worker = dramatiq.Worker(broker)
worker.start_time = time.time()
monitor_thread = monitor_worker(worker)
worker.start()import atexit
import signal
class GracefulWorker:
def __init__(self, broker, **kwargs):
self.worker = dramatiq.Worker(broker, **kwargs)
self.shutdown_event = threading.Event()
self.setup_shutdown_handlers()
def setup_shutdown_handlers(self):
"""Set up handlers for graceful shutdown"""
def shutdown_handler(signum=None, frame=None):
print(f"Shutdown signal received: {signum}")
self.shutdown_event.set()
self.worker.stop()
# Register signal handlers
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
# Register exit handler
atexit.register(shutdown_handler)
def start(self):
"""Start worker with shutdown monitoring"""
self.worker.start()
# Monitor for shutdown signal
try:
while not self.shutdown_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
print("Initiating graceful shutdown...")
self.worker.stop()
self.worker.join()
print("Worker shutdown complete")
# Usage
graceful_worker = GracefulWorker(broker, worker_threads=6)
graceful_worker.start()import psutil
def calculate_optimal_threads():
"""Calculate optimal thread count based on system resources"""
cpu_count = psutil.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
# I/O bound tasks: more threads than CPU cores
# CPU bound tasks: threads ≈ CPU cores
if memory_gb > 8:
# High memory system: can handle more threads
optimal_threads = min(cpu_count * 2, 16)
else:
# Limited memory: conservative thread count
optimal_threads = max(cpu_count, 4)
return optimal_threads
# Create optimized worker
optimal_threads = calculate_optimal_threads()
optimized_worker = dramatiq.Worker(
broker,
worker_threads=optimal_threads,
worker_timeout=5000
)
print(f"Using {optimal_threads} worker threads")
optimized_worker.start()import gc
import psutil
import threading
class MemoryManagedWorker:
def __init__(self, broker, memory_limit_mb=1000, **kwargs):
self.worker = dramatiq.Worker(broker, **kwargs)
self.memory_limit = memory_limit_mb * 1024 * 1024 # Convert to bytes
self.monitoring = True
def start_memory_monitoring(self):
"""Monitor memory usage and trigger garbage collection"""
def memory_monitor():
while self.monitoring:
process = psutil.Process()
memory_usage = process.memory_info().rss
if memory_usage > self.memory_limit:
print(f"Memory limit exceeded: {memory_usage / 1024 / 1024:.1f}MB")
print("Triggering garbage collection...")
gc.collect()
# Check again after GC
new_usage = psutil.Process().memory_info().rss
print(f"Memory after GC: {new_usage / 1024 / 1024:.1f}MB")
time.sleep(30) # Check every 30 seconds
monitor_thread = threading.Thread(target=memory_monitor, daemon=True)
monitor_thread.start()
return monitor_thread
def start(self):
self.start_memory_monitoring()
self.worker.start()
def stop(self):
self.monitoring = False
self.worker.stop()
def join(self):
self.worker.join()
# Usage
memory_worker = MemoryManagedWorker(
broker,
memory_limit_mb=500,
worker_threads=4
)
memory_worker.start()import multiprocessing
import os
def worker_process(broker_config, queues, worker_id):
"""Worker process function"""
# Set process title for monitoring
try:
import setproctitle
setproctitle.setproctitle(f"dramatiq-worker-{worker_id}")
except ImportError:
pass
# Initialize broker in each process
if broker_config["type"] == "redis":
from dramatiq.brokers.redis import RedisBroker
broker = RedisBroker(**broker_config["params"])
else:
raise ValueError(f"Unsupported broker type: {broker_config['type']}")
dramatiq.set_broker(broker)
# Create and start worker
worker = dramatiq.Worker(
broker,
queues=queues,
worker_threads=2 # Fewer threads per process
)
print(f"Worker process {worker_id} starting...")
worker.start()
worker.join()
def start_worker_pool(num_processes=4):
"""Start multiple worker processes"""
broker_config = {
"type": "redis",
"params": {"host": "localhost", "port": 6379, "db": 0}
}
queues = ["high_priority", "normal", "low_priority"]
processes = []
for i in range(num_processes):
process = multiprocessing.Process(
target=worker_process,
args=(broker_config, queues, i)
)
process.start()
processes.append(process)
print(f"Started worker process {i} (PID: {process.pid})")
try:
# Wait for all processes
for process in processes:
process.join()
except KeyboardInterrupt:
print("Shutting down worker processes...")
for process in processes:
process.terminate()
for process in processes:
process.join()
# Usage
if __name__ == "__main__":
start_worker_pool(num_processes=4)# Example Dockerfile for dramatiq worker
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV DRAMATIQ_THREADS=8
ENV DRAMATIQ_TIMEOUT=10000
ENV DRAMATIQ_QUEUES=default,high_priority
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import dramatiq; print('Worker healthy')" || exit 1
# Run worker
CMD ["python", "-m", "dramatiq", "my_app.tasks"]# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
worker-high-priority:
build: .
environment:
- DRAMATIQ_THREADS=4
- DRAMATIQ_QUEUES=critical,high_priority
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
scale: 2
worker-normal:
build: .
environment:
- DRAMATIQ_THREADS=6
- DRAMATIQ_QUEUES=normal,low_priority
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
scale: 4import logging
import traceback
class DebugWorker:
def __init__(self, broker, **kwargs):
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("dramatiq")
logger.setLevel(logging.DEBUG)
self.worker = dramatiq.Worker(broker, **kwargs)
self.message_count = 0
self.error_count = 0
def create_debug_middleware(self):
"""Create middleware for debugging"""
class DebugMiddleware(dramatiq.Middleware):
def __init__(self, debug_worker):
self.debug_worker = debug_worker
def before_process_message(self, broker, message):
self.debug_worker.message_count += 1
print(f"[DEBUG] Processing message {self.debug_worker.message_count}: {message.actor_name}")
print(f"[DEBUG] Message args: {message.args}")
print(f"[DEBUG] Message kwargs: {message.kwargs}")
def after_process_message(self, broker, message, *, result=None, exception=None):
if exception:
self.debug_worker.error_count += 1
print(f"[ERROR] Message failed: {exception}")
print(f"[ERROR] Traceback:")
traceback.print_exc()
else:
print(f"[DEBUG] Message completed successfully: {result}")
return DebugMiddleware(self)
def start(self):
# Add debug middleware
debug_middleware = self.create_debug_middleware()
self.worker.broker.add_middleware(debug_middleware)
print(f"[DEBUG] Starting worker with {self.worker.worker_threads} threads")
self.worker.start()
def print_stats(self):
print(f"[STATS] Messages processed: {self.message_count}")
print(f"[STATS] Errors: {self.error_count}")
if self.message_count > 0:
error_rate = (self.error_count / self.message_count) * 100
print(f"[STATS] Error rate: {error_rate:.1f}%")
# Usage for debugging
debug_worker = DebugWorker(broker, worker_threads=2)
debug_worker.start()
# Print stats periodically
import threading
def stats_printer():
while True:
time.sleep(30)
debug_worker.print_stats()
stats_thread = threading.Thread(target=stats_printer, daemon=True)
stats_thread.start()For high-concurrency scenarios, Dramatiq supports gevent:
# Install: pip install dramatiq[gevent]
# Use gevent launcher script
# dramatiq-gevent my_module
# Or programmatically with gevent
import gevent
from gevent import monkey
monkey.patch_all()
import dramatiq
from dramatiq.brokers.redis import RedisBroker
@dramatiq.actor
def io_bound_task(url):
"""I/O bound task that benefits from gevent"""
import requests
response = requests.get(url)
return {"url": url, "status": response.status_code}
# Gevent-compatible worker setup
broker = RedisBroker()
dramatiq.set_broker(broker)
# Worker will use gevent for concurrency
worker = dramatiq.Worker(broker, worker_threads=100) # Many lightweight threads
worker.start()This comprehensive worker documentation covers all aspects of running and managing Dramatiq workers, from basic usage to advanced production deployments with monitoring, optimization, and debugging capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq