CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

workers.mddocs/

Workers

Workers in Dramatiq are the components that consume messages from brokers and execute the corresponding actor functions. They handle the runtime execution environment, provide thread management, implement graceful shutdown mechanisms, and integrate with the middleware system for comprehensive task processing.

Capabilities

Worker Class

The core worker implementation that processes messages from brokers.

class Worker:
    def __init__(
        self,
        broker: Broker,
        *,
        queues: List[str] = None,
        worker_timeout: int = 1000,
        worker_threads: int = 8
    ):
        """
        Create a worker instance.
        
        Parameters:
        - broker: Broker instance to consume messages from
        - queues: List of queue names to process (processes all if None)
        - worker_timeout: Timeout for message consumption in milliseconds
        - worker_threads: Number of worker threads for parallel processing
        """
    
    def start(self):
        """
        Start the worker to begin processing messages.
        
        This method starts the worker threads and begins consuming
        messages from the broker queues.
        """
    
    def stop(self):
        """
        Stop the worker gracefully.
        
        Signals the worker to stop processing new messages and
        wait for current messages to complete.
        """
    
    def join(self):
        """
        Wait for the worker to finish processing and shut down.
        
        This method blocks until all worker threads have completed
        their current tasks and shut down.
        """
    
    # Properties
    broker: Broker              # Associated broker
    queues: List[str]          # Queues being processed
    worker_timeout: int        # Message consumption timeout
    worker_threads: int        # Number of worker threads

Usage:

import dramatiq
from dramatiq.brokers.redis import RedisBroker

# Set up broker and actors
broker = RedisBroker()
dramatiq.set_broker(broker)

@dramatiq.actor
def example_task(data):
    print(f"Processing: {data}")
    return f"Processed: {data}"

# Create and start worker
worker = dramatiq.Worker(
    broker,
    worker_threads=4,           # 4 concurrent threads
    worker_timeout=5000         # 5 second timeout
)

# Start processing
worker.start()

# Send some tasks
for i in range(10):
    example_task.send(f"task_{i}")

# Let it process for a while
import time
time.sleep(10)

# Graceful shutdown
worker.stop()
worker.join()

Command Line Interface

Dramatiq provides a CLI for running workers in production environments.

Basic CLI Usage:

# Run worker for specific module
dramatiq my_module

# Run worker with specific settings
dramatiq my_module --processes 4 --threads 8

# Run worker for specific queues
dramatiq my_module --queues high_priority,normal

# Run worker with custom broker URL
dramatiq my_module --broker-url redis://localhost:6379/0

# Run worker with verbose logging
dramatiq my_module --verbose

# Watch for code changes and reload
dramatiq my_module --watch /path/to/code

CLI Options:

# Common CLI options
CLI_OPTIONS = {
    "--processes": int,         # Number of worker processes
    "--threads": int,           # Number of threads per process  
    "--path": str,             # Add path to Python path
    "--queues": str,           # Comma-separated queue names
    "--pid-file": str,         # PID file path
    "--broker-url": str,       # Broker connection URL
    "--verbose": bool,         # Verbose logging
    "--watch": str,            # Watch directory for changes
    "--reload": bool,          # Auto-reload on changes
}

Advanced Worker Configuration

Multi-Queue Processing

# Worker processing specific queues with different priorities
high_priority_worker = dramatiq.Worker(
    broker,
    queues=["critical", "high_priority"],
    worker_threads=6,
    worker_timeout=2000
)

normal_worker = dramatiq.Worker(
    broker,
    queues=["normal", "low_priority"],
    worker_threads=4,
    worker_timeout=10000
)

# Start both workers
high_priority_worker.start()
normal_worker.start()

# Define actors with different queue assignments
@dramatiq.actor(queue_name="critical", priority=0)
def critical_task(data):
    return handle_critical_operation(data)

@dramatiq.actor(queue_name="normal", priority=5)
def normal_task(data):
    return handle_normal_operation(data)

Worker with Custom Middleware

from dramatiq.middleware import Middleware

class CustomWorkerMiddleware(Middleware):
    def before_worker_boot(self, broker, worker):
        print(f"Worker starting with {worker.worker_threads} threads")
        # Initialize worker-specific resources
        worker.custom_resource = initialize_worker_resource()
    
    def after_worker_boot(self, broker, worker):
        print("Worker fully initialized and ready")
    
    def before_worker_shutdown(self, broker, worker):
        print("Worker shutting down gracefully")
        # Cleanup worker-specific resources
        cleanup_worker_resource(worker.custom_resource)
    
    def after_worker_shutdown(self, broker, worker):
        print("Worker shutdown complete")

# Add custom middleware
broker.add_middleware(CustomWorkerMiddleware())

worker = dramatiq.Worker(broker)
worker.start()

Production Worker Configuration

import os
import signal
import sys

def create_production_worker():
    """Create worker with production-ready configuration"""
    
    # Get configuration from environment
    worker_threads = int(os.getenv("DRAMATIQ_THREADS", "8"))
    worker_timeout = int(os.getenv("DRAMATIQ_TIMEOUT", "1000"))
    queues = os.getenv("DRAMATIQ_QUEUES", "").split(",") if os.getenv("DRAMATIQ_QUEUES") else None
    
    # Create worker
    worker = dramatiq.Worker(
        broker,
        queues=queues,
        worker_threads=worker_threads,
        worker_timeout=worker_timeout
    )
    
    # Set up signal handlers for graceful shutdown
    def signal_handler(signum, frame):
        print(f"Received signal {signum}, shutting down...")
        worker.stop()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    return worker

# Usage
if __name__ == "__main__":
    worker = create_production_worker()
    print("Starting production worker...")
    worker.start()
    worker.join()

Worker Lifecycle and Monitoring

Worker State Monitoring

import threading
import time

def monitor_worker(worker, check_interval=5):
    """Monitor worker health and performance"""
    
    def monitoring_loop():
        while worker.is_running:  # Hypothetical property
            # Collect worker metrics
            stats = {
                "threads": worker.worker_threads,
                "queues": worker.queues,
                "processed_messages": getattr(worker, 'processed_count', 0),
                "failed_messages": getattr(worker, 'failed_count', 0),
                "uptime": time.time() - worker.start_time
            }
            
            print(f"Worker stats: {stats}")
            
            # Check worker health
            if stats["failed_messages"] > 100:
                print("WARNING: High failure rate detected")
            
            time.sleep(check_interval)
    
    # Start monitoring in separate thread
    monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)
    monitor_thread.start()
    
    return monitor_thread

# Usage with monitoring
worker = dramatiq.Worker(broker)
worker.start_time = time.time()
monitor_thread = monitor_worker(worker)

worker.start()

Graceful Shutdown Handling

import atexit
import signal

class GracefulWorker:
    def __init__(self, broker, **kwargs):
        self.worker = dramatiq.Worker(broker, **kwargs)
        self.shutdown_event = threading.Event()
        self.setup_shutdown_handlers()
    
    def setup_shutdown_handlers(self):
        """Set up handlers for graceful shutdown"""
        
        def shutdown_handler(signum=None, frame=None):
            print(f"Shutdown signal received: {signum}")
            self.shutdown_event.set()
            self.worker.stop()
        
        # Register signal handlers
        signal.signal(signal.SIGINT, shutdown_handler)
        signal.signal(signal.SIGTERM, shutdown_handler)
        
        # Register exit handler
        atexit.register(shutdown_handler)
    
    def start(self):
        """Start worker with shutdown monitoring"""
        self.worker.start()
        
        # Monitor for shutdown signal
        try:
            while not self.shutdown_event.is_set():
                time.sleep(1)
        except KeyboardInterrupt:
            pass
        finally:
            print("Initiating graceful shutdown...")
            self.worker.stop()
            self.worker.join()
            print("Worker shutdown complete")

# Usage
graceful_worker = GracefulWorker(broker, worker_threads=6)
graceful_worker.start()

Worker Performance Optimization

Thread Pool Tuning

import psutil

def calculate_optimal_threads():
    """Calculate optimal thread count based on system resources"""
    cpu_count = psutil.cpu_count()
    memory_gb = psutil.virtual_memory().total / (1024**3)
    
    # I/O bound tasks: more threads than CPU cores
    # CPU bound tasks: threads ≈ CPU cores
    
    if memory_gb > 8:
        # High memory system: can handle more threads
        optimal_threads = min(cpu_count * 2, 16)
    else:
        # Limited memory: conservative thread count
        optimal_threads = max(cpu_count, 4)
    
    return optimal_threads

# Create optimized worker
optimal_threads = calculate_optimal_threads()
optimized_worker = dramatiq.Worker(
    broker,
    worker_threads=optimal_threads,
    worker_timeout=5000
)

print(f"Using {optimal_threads} worker threads")
optimized_worker.start()

Memory Management

import gc
import psutil
import threading

class MemoryManagedWorker:
    def __init__(self, broker, memory_limit_mb=1000, **kwargs):
        self.worker = dramatiq.Worker(broker, **kwargs)
        self.memory_limit = memory_limit_mb * 1024 * 1024  # Convert to bytes
        self.monitoring = True
        
    def start_memory_monitoring(self):
        """Monitor memory usage and trigger garbage collection"""
        
        def memory_monitor():
            while self.monitoring:
                process = psutil.Process()
                memory_usage = process.memory_info().rss
                
                if memory_usage > self.memory_limit:
                    print(f"Memory limit exceeded: {memory_usage / 1024 / 1024:.1f}MB")
                    print("Triggering garbage collection...")
                    gc.collect()
                    
                    # Check again after GC
                    new_usage = psutil.Process().memory_info().rss
                    print(f"Memory after GC: {new_usage / 1024 / 1024:.1f}MB")
                
                time.sleep(30)  # Check every 30 seconds
        
        monitor_thread = threading.Thread(target=memory_monitor, daemon=True)
        monitor_thread.start()
        return monitor_thread
    
    def start(self):
        self.start_memory_monitoring()
        self.worker.start()
    
    def stop(self):
        self.monitoring = False
        self.worker.stop()
        
    def join(self):
        self.worker.join()

# Usage
memory_worker = MemoryManagedWorker(
    broker, 
    memory_limit_mb=500,
    worker_threads=4
)
memory_worker.start()

Multi-Process Worker Setup

Process Pool Worker

import multiprocessing
import os

def worker_process(broker_config, queues, worker_id):
    """Worker process function"""
    
    # Set process title for monitoring
    try:
        import setproctitle
        setproctitle.setproctitle(f"dramatiq-worker-{worker_id}")
    except ImportError:
        pass
    
    # Initialize broker in each process
    if broker_config["type"] == "redis":
        from dramatiq.brokers.redis import RedisBroker
        broker = RedisBroker(**broker_config["params"])
    else:
        raise ValueError(f"Unsupported broker type: {broker_config['type']}")
    
    dramatiq.set_broker(broker)
    
    # Create and start worker
    worker = dramatiq.Worker(
        broker,
        queues=queues,
        worker_threads=2  # Fewer threads per process
    )
    
    print(f"Worker process {worker_id} starting...")
    worker.start()
    worker.join()

def start_worker_pool(num_processes=4):
    """Start multiple worker processes"""
    
    broker_config = {
        "type": "redis",
        "params": {"host": "localhost", "port": 6379, "db": 0}
    }
    
    queues = ["high_priority", "normal", "low_priority"]
    
    processes = []
    
    for i in range(num_processes):
        process = multiprocessing.Process(
            target=worker_process,
            args=(broker_config, queues, i)
        )
        process.start()
        processes.append(process)
        print(f"Started worker process {i} (PID: {process.pid})")
    
    try:
        # Wait for all processes
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        print("Shutting down worker processes...")
        for process in processes:
            process.terminate()
        
        for process in processes:
            process.join()

# Usage
if __name__ == "__main__":
    start_worker_pool(num_processes=4)

Docker and Container Deployment

Dockerfile for Worker

# Example Dockerfile for dramatiq worker
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application code
COPY . .

# Set environment variables
ENV DRAMATIQ_THREADS=8
ENV DRAMATIQ_TIMEOUT=10000
ENV DRAMATIQ_QUEUES=default,high_priority

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import dramatiq; print('Worker healthy')" || exit 1

# Run worker
CMD ["python", "-m", "dramatiq", "my_app.tasks"]

Docker Compose for Scaling

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  
  worker-high-priority:
    build: .
    environment:
      - DRAMATIQ_THREADS=4
      - DRAMATIQ_QUEUES=critical,high_priority
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    scale: 2
  
  worker-normal:
    build: .
    environment:
      - DRAMATIQ_THREADS=6
      - DRAMATIQ_QUEUES=normal,low_priority
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    scale: 4

Worker Debugging and Troubleshooting

Debug Worker

import logging
import traceback

class DebugWorker:
    def __init__(self, broker, **kwargs):
        # Enable debug logging
        logging.basicConfig(level=logging.DEBUG)
        logger = logging.getLogger("dramatiq")
        logger.setLevel(logging.DEBUG)
        
        self.worker = dramatiq.Worker(broker, **kwargs)
        self.message_count = 0
        self.error_count = 0
        
    def create_debug_middleware(self):
        """Create middleware for debugging"""
        
        class DebugMiddleware(dramatiq.Middleware):
            def __init__(self, debug_worker):
                self.debug_worker = debug_worker
            
            def before_process_message(self, broker, message):
                self.debug_worker.message_count += 1
                print(f"[DEBUG] Processing message {self.debug_worker.message_count}: {message.actor_name}")
                print(f"[DEBUG] Message args: {message.args}")
                print(f"[DEBUG] Message kwargs: {message.kwargs}")
            
            def after_process_message(self, broker, message, *, result=None, exception=None):
                if exception:
                    self.debug_worker.error_count += 1
                    print(f"[ERROR] Message failed: {exception}")
                    print(f"[ERROR] Traceback:")
                    traceback.print_exc()
                else:
                    print(f"[DEBUG] Message completed successfully: {result}")
        
        return DebugMiddleware(self)
    
    def start(self):
        # Add debug middleware
        debug_middleware = self.create_debug_middleware()
        self.worker.broker.add_middleware(debug_middleware)
        
        print(f"[DEBUG] Starting worker with {self.worker.worker_threads} threads")
        self.worker.start()
    
    def print_stats(self):
        print(f"[STATS] Messages processed: {self.message_count}")
        print(f"[STATS] Errors: {self.error_count}")
        if self.message_count > 0:
            error_rate = (self.error_count / self.message_count) * 100
            print(f"[STATS] Error rate: {error_rate:.1f}%")

# Usage for debugging
debug_worker = DebugWorker(broker, worker_threads=2)
debug_worker.start()

# Print stats periodically
import threading
def stats_printer():
    while True:
        time.sleep(30)
        debug_worker.print_stats()

stats_thread = threading.Thread(target=stats_printer, daemon=True)
stats_thread.start()

Gevent Integration

For high-concurrency scenarios, Dramatiq supports gevent:

# Install: pip install dramatiq[gevent]

# Use gevent launcher script
# dramatiq-gevent my_module

# Or programmatically with gevent
import gevent
from gevent import monkey
monkey.patch_all()

import dramatiq
from dramatiq.brokers.redis import RedisBroker

@dramatiq.actor
def io_bound_task(url):
    """I/O bound task that benefits from gevent"""
    import requests
    response = requests.get(url)
    return {"url": url, "status": response.status_code}

# Gevent-compatible worker setup
broker = RedisBroker()
dramatiq.set_broker(broker)

# Worker will use gevent for concurrency
worker = dramatiq.Worker(broker, worker_threads=100)  # Many lightweight threads
worker.start()

This comprehensive worker documentation covers all aspects of running and managing Dramatiq workers, from basic usage to advanced production deployments with monitoring, optimization, and debugging capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json