or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

workers.mddocs/

0

# Workers

1

2

Workers in Dramatiq are the components that consume messages from brokers and execute the corresponding actor functions. They handle the runtime execution environment, provide thread management, implement graceful shutdown mechanisms, and integrate with the middleware system for comprehensive task processing.

3

4

## Capabilities

5

6

### Worker Class

7

8

The core worker implementation that processes messages from brokers.

9

10

```python { .api }

11

class Worker:

12

def __init__(

13

self,

14

broker: Broker,

15

*,

16

queues: List[str] = None,

17

worker_timeout: int = 1000,

18

worker_threads: int = 8

19

):

20

"""

21

Create a worker instance.

22

23

Parameters:

24

- broker: Broker instance to consume messages from

25

- queues: List of queue names to process (processes all if None)

26

- worker_timeout: Timeout for message consumption in milliseconds

27

- worker_threads: Number of worker threads for parallel processing

28

"""

29

30

def start(self):

31

"""

32

Start the worker to begin processing messages.

33

34

This method starts the worker threads and begins consuming

35

messages from the broker queues.

36

"""

37

38

def stop(self):

39

"""

40

Stop the worker gracefully.

41

42

Signals the worker to stop processing new messages and

43

wait for current messages to complete.

44

"""

45

46

def join(self):

47

"""

48

Wait for the worker to finish processing and shut down.

49

50

This method blocks until all worker threads have completed

51

their current tasks and shut down.

52

"""

53

54

# Properties

55

broker: Broker # Associated broker

56

queues: List[str] # Queues being processed

57

worker_timeout: int # Message consumption timeout

58

worker_threads: int # Number of worker threads

59

```

60

61

**Usage:**

62

63

```python

64

import dramatiq

65

from dramatiq.brokers.redis import RedisBroker

66

67

# Set up broker and actors

68

broker = RedisBroker()

69

dramatiq.set_broker(broker)

70

71

@dramatiq.actor

72

def example_task(data):

73

print(f"Processing: {data}")

74

return f"Processed: {data}"

75

76

# Create and start worker

77

worker = dramatiq.Worker(

78

broker,

79

worker_threads=4, # 4 concurrent threads

80

worker_timeout=5000 # 5 second timeout

81

)

82

83

# Start processing

84

worker.start()

85

86

# Send some tasks

87

for i in range(10):

88

example_task.send(f"task_{i}")

89

90

# Let it process for a while

91

import time

92

time.sleep(10)

93

94

# Graceful shutdown

95

worker.stop()

96

worker.join()

97

```

98

99

### Command Line Interface

100

101

Dramatiq provides a CLI for running workers in production environments.

102

103

**Basic CLI Usage:**

104

105

```bash

106

# Run worker for specific module

107

dramatiq my_module

108

109

# Run worker with specific settings

110

dramatiq my_module --processes 4 --threads 8

111

112

# Run worker for specific queues

113

dramatiq my_module --queues high_priority,normal

114

115

# Run worker with custom broker URL

116

dramatiq my_module --broker-url redis://localhost:6379/0

117

118

# Run worker with verbose logging

119

dramatiq my_module --verbose

120

121

# Watch for code changes and reload

122

dramatiq my_module --watch /path/to/code

123

```

124

125

**CLI Options:**

126

127

```python { .api }

128

# Common CLI options

129

CLI_OPTIONS = {

130

"--processes": int, # Number of worker processes

131

"--threads": int, # Number of threads per process

132

"--path": str, # Add path to Python path

133

"--queues": str, # Comma-separated queue names

134

"--pid-file": str, # PID file path

135

"--broker-url": str, # Broker connection URL

136

"--verbose": bool, # Verbose logging

137

"--watch": str, # Watch directory for changes

138

"--reload": bool, # Auto-reload on changes

139

}

140

```

141

142

### Advanced Worker Configuration

143

144

#### Multi-Queue Processing

145

146

```python

147

# Worker processing specific queues with different priorities

148

high_priority_worker = dramatiq.Worker(

149

broker,

150

queues=["critical", "high_priority"],

151

worker_threads=6,

152

worker_timeout=2000

153

)

154

155

normal_worker = dramatiq.Worker(

156

broker,

157

queues=["normal", "low_priority"],

158

worker_threads=4,

159

worker_timeout=10000

160

)

161

162

# Start both workers

163

high_priority_worker.start()

164

normal_worker.start()

165

166

# Define actors with different queue assignments

167

@dramatiq.actor(queue_name="critical", priority=0)

168

def critical_task(data):

169

return handle_critical_operation(data)

170

171

@dramatiq.actor(queue_name="normal", priority=5)

172

def normal_task(data):

173

return handle_normal_operation(data)

174

```

175

176

#### Worker with Custom Middleware

177

178

```python

179

from dramatiq.middleware import Middleware

180

181

class CustomWorkerMiddleware(Middleware):

182

def before_worker_boot(self, broker, worker):

183

print(f"Worker starting with {worker.worker_threads} threads")

184

# Initialize worker-specific resources

185

worker.custom_resource = initialize_worker_resource()

186

187

def after_worker_boot(self, broker, worker):

188

print("Worker fully initialized and ready")

189

190

def before_worker_shutdown(self, broker, worker):

191

print("Worker shutting down gracefully")

192

# Cleanup worker-specific resources

193

cleanup_worker_resource(worker.custom_resource)

194

195

def after_worker_shutdown(self, broker, worker):

196

print("Worker shutdown complete")

197

198

# Add custom middleware

199

broker.add_middleware(CustomWorkerMiddleware())

200

201

worker = dramatiq.Worker(broker)

202

worker.start()

203

```

204

205

#### Production Worker Configuration

206

207

```python

208

import os

209

import signal

210

import sys

211

212

def create_production_worker():

213

"""Create worker with production-ready configuration"""

214

215

# Get configuration from environment

216

worker_threads = int(os.getenv("DRAMATIQ_THREADS", "8"))

217

worker_timeout = int(os.getenv("DRAMATIQ_TIMEOUT", "1000"))

218

queues = os.getenv("DRAMATIQ_QUEUES", "").split(",") if os.getenv("DRAMATIQ_QUEUES") else None

219

220

# Create worker

221

worker = dramatiq.Worker(

222

broker,

223

queues=queues,

224

worker_threads=worker_threads,

225

worker_timeout=worker_timeout

226

)

227

228

# Set up signal handlers for graceful shutdown

229

def signal_handler(signum, frame):

230

print(f"Received signal {signum}, shutting down...")

231

worker.stop()

232

sys.exit(0)

233

234

signal.signal(signal.SIGINT, signal_handler)

235

signal.signal(signal.SIGTERM, signal_handler)

236

237

return worker

238

239

# Usage

240

if __name__ == "__main__":

241

worker = create_production_worker()

242

print("Starting production worker...")

243

worker.start()

244

worker.join()

245

```

246

247

### Worker Lifecycle and Monitoring

248

249

#### Worker State Monitoring

250

251

```python

252

import threading

253

import time

254

255

def monitor_worker(worker, check_interval=5):

256

"""Monitor worker health and performance"""

257

258

def monitoring_loop():

259

while worker.is_running: # Hypothetical property

260

# Collect worker metrics

261

stats = {

262

"threads": worker.worker_threads,

263

"queues": worker.queues,

264

"processed_messages": getattr(worker, 'processed_count', 0),

265

"failed_messages": getattr(worker, 'failed_count', 0),

266

"uptime": time.time() - worker.start_time

267

}

268

269

print(f"Worker stats: {stats}")

270

271

# Check worker health

272

if stats["failed_messages"] > 100:

273

print("WARNING: High failure rate detected")

274

275

time.sleep(check_interval)

276

277

# Start monitoring in separate thread

278

monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)

279

monitor_thread.start()

280

281

return monitor_thread

282

283

# Usage with monitoring

284

worker = dramatiq.Worker(broker)

285

worker.start_time = time.time()

286

monitor_thread = monitor_worker(worker)

287

288

worker.start()

289

```

290

291

#### Graceful Shutdown Handling

292

293

```python

294

import atexit

295

import signal

296

297

class GracefulWorker:

298

def __init__(self, broker, **kwargs):

299

self.worker = dramatiq.Worker(broker, **kwargs)

300

self.shutdown_event = threading.Event()

301

self.setup_shutdown_handlers()

302

303

def setup_shutdown_handlers(self):

304

"""Set up handlers for graceful shutdown"""

305

306

def shutdown_handler(signum=None, frame=None):

307

print(f"Shutdown signal received: {signum}")

308

self.shutdown_event.set()

309

self.worker.stop()

310

311

# Register signal handlers

312

signal.signal(signal.SIGINT, shutdown_handler)

313

signal.signal(signal.SIGTERM, shutdown_handler)

314

315

# Register exit handler

316

atexit.register(shutdown_handler)

317

318

def start(self):

319

"""Start worker with shutdown monitoring"""

320

self.worker.start()

321

322

# Monitor for shutdown signal

323

try:

324

while not self.shutdown_event.is_set():

325

time.sleep(1)

326

except KeyboardInterrupt:

327

pass

328

finally:

329

print("Initiating graceful shutdown...")

330

self.worker.stop()

331

self.worker.join()

332

print("Worker shutdown complete")

333

334

# Usage

335

graceful_worker = GracefulWorker(broker, worker_threads=6)

336

graceful_worker.start()

337

```

338

339

### Worker Performance Optimization

340

341

#### Thread Pool Tuning

342

343

```python

344

import psutil

345

346

def calculate_optimal_threads():

347

"""Calculate optimal thread count based on system resources"""

348

cpu_count = psutil.cpu_count()

349

memory_gb = psutil.virtual_memory().total / (1024**3)

350

351

# I/O bound tasks: more threads than CPU cores

352

# CPU bound tasks: threads ≈ CPU cores

353

354

if memory_gb > 8:

355

# High memory system: can handle more threads

356

optimal_threads = min(cpu_count * 2, 16)

357

else:

358

# Limited memory: conservative thread count

359

optimal_threads = max(cpu_count, 4)

360

361

return optimal_threads

362

363

# Create optimized worker

364

optimal_threads = calculate_optimal_threads()

365

optimized_worker = dramatiq.Worker(

366

broker,

367

worker_threads=optimal_threads,

368

worker_timeout=5000

369

)

370

371

print(f"Using {optimal_threads} worker threads")

372

optimized_worker.start()

373

```

374

375

#### Memory Management

376

377

```python

378

import gc

379

import psutil

380

import threading

381

382

class MemoryManagedWorker:

383

def __init__(self, broker, memory_limit_mb=1000, **kwargs):

384

self.worker = dramatiq.Worker(broker, **kwargs)

385

self.memory_limit = memory_limit_mb * 1024 * 1024 # Convert to bytes

386

self.monitoring = True

387

388

def start_memory_monitoring(self):

389

"""Monitor memory usage and trigger garbage collection"""

390

391

def memory_monitor():

392

while self.monitoring:

393

process = psutil.Process()

394

memory_usage = process.memory_info().rss

395

396

if memory_usage > self.memory_limit:

397

print(f"Memory limit exceeded: {memory_usage / 1024 / 1024:.1f}MB")

398

print("Triggering garbage collection...")

399

gc.collect()

400

401

# Check again after GC

402

new_usage = psutil.Process().memory_info().rss

403

print(f"Memory after GC: {new_usage / 1024 / 1024:.1f}MB")

404

405

time.sleep(30) # Check every 30 seconds

406

407

monitor_thread = threading.Thread(target=memory_monitor, daemon=True)

408

monitor_thread.start()

409

return monitor_thread

410

411

def start(self):

412

self.start_memory_monitoring()

413

self.worker.start()

414

415

def stop(self):

416

self.monitoring = False

417

self.worker.stop()

418

419

def join(self):

420

self.worker.join()

421

422

# Usage

423

memory_worker = MemoryManagedWorker(

424

broker,

425

memory_limit_mb=500,

426

worker_threads=4

427

)

428

memory_worker.start()

429

```

430

431

### Multi-Process Worker Setup

432

433

#### Process Pool Worker

434

435

```python

436

import multiprocessing

437

import os

438

439

def worker_process(broker_config, queues, worker_id):

440

"""Worker process function"""

441

442

# Set process title for monitoring

443

try:

444

import setproctitle

445

setproctitle.setproctitle(f"dramatiq-worker-{worker_id}")

446

except ImportError:

447

pass

448

449

# Initialize broker in each process

450

if broker_config["type"] == "redis":

451

from dramatiq.brokers.redis import RedisBroker

452

broker = RedisBroker(**broker_config["params"])

453

else:

454

raise ValueError(f"Unsupported broker type: {broker_config['type']}")

455

456

dramatiq.set_broker(broker)

457

458

# Create and start worker

459

worker = dramatiq.Worker(

460

broker,

461

queues=queues,

462

worker_threads=2 # Fewer threads per process

463

)

464

465

print(f"Worker process {worker_id} starting...")

466

worker.start()

467

worker.join()

468

469

def start_worker_pool(num_processes=4):

470

"""Start multiple worker processes"""

471

472

broker_config = {

473

"type": "redis",

474

"params": {"host": "localhost", "port": 6379, "db": 0}

475

}

476

477

queues = ["high_priority", "normal", "low_priority"]

478

479

processes = []

480

481

for i in range(num_processes):

482

process = multiprocessing.Process(

483

target=worker_process,

484

args=(broker_config, queues, i)

485

)

486

process.start()

487

processes.append(process)

488

print(f"Started worker process {i} (PID: {process.pid})")

489

490

try:

491

# Wait for all processes

492

for process in processes:

493

process.join()

494

except KeyboardInterrupt:

495

print("Shutting down worker processes...")

496

for process in processes:

497

process.terminate()

498

499

for process in processes:

500

process.join()

501

502

# Usage

503

if __name__ == "__main__":

504

start_worker_pool(num_processes=4)

505

```

506

507

### Docker and Container Deployment

508

509

#### Dockerfile for Worker

510

511

```dockerfile

512

# Example Dockerfile for dramatiq worker

513

FROM python:3.11-slim

514

515

WORKDIR /app

516

517

# Install dependencies

518

COPY requirements.txt .

519

RUN pip install -r requirements.txt

520

521

# Copy application code

522

COPY . .

523

524

# Set environment variables

525

ENV DRAMATIQ_THREADS=8

526

ENV DRAMATIQ_TIMEOUT=10000

527

ENV DRAMATIQ_QUEUES=default,high_priority

528

529

# Health check

530

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \

531

CMD python -c "import dramatiq; print('Worker healthy')" || exit 1

532

533

# Run worker

534

CMD ["python", "-m", "dramatiq", "my_app.tasks"]

535

```

536

537

#### Docker Compose for Scaling

538

539

```yaml

540

# docker-compose.yml

541

version: '3.8'

542

543

services:

544

redis:

545

image: redis:7-alpine

546

ports:

547

- "6379:6379"

548

549

worker-high-priority:

550

build: .

551

environment:

552

- DRAMATIQ_THREADS=4

553

- DRAMATIQ_QUEUES=critical,high_priority

554

- REDIS_URL=redis://redis:6379/0

555

depends_on:

556

- redis

557

scale: 2

558

559

worker-normal:

560

build: .

561

environment:

562

- DRAMATIQ_THREADS=6

563

- DRAMATIQ_QUEUES=normal,low_priority

564

- REDIS_URL=redis://redis:6379/0

565

depends_on:

566

- redis

567

scale: 4

568

```

569

570

### Worker Debugging and Troubleshooting

571

572

#### Debug Worker

573

574

```python

575

import logging

576

import traceback

577

578

class DebugWorker:

579

def __init__(self, broker, **kwargs):

580

# Enable debug logging

581

logging.basicConfig(level=logging.DEBUG)

582

logger = logging.getLogger("dramatiq")

583

logger.setLevel(logging.DEBUG)

584

585

self.worker = dramatiq.Worker(broker, **kwargs)

586

self.message_count = 0

587

self.error_count = 0

588

589

def create_debug_middleware(self):

590

"""Create middleware for debugging"""

591

592

class DebugMiddleware(dramatiq.Middleware):

593

def __init__(self, debug_worker):

594

self.debug_worker = debug_worker

595

596

def before_process_message(self, broker, message):

597

self.debug_worker.message_count += 1

598

print(f"[DEBUG] Processing message {self.debug_worker.message_count}: {message.actor_name}")

599

print(f"[DEBUG] Message args: {message.args}")

600

print(f"[DEBUG] Message kwargs: {message.kwargs}")

601

602

def after_process_message(self, broker, message, *, result=None, exception=None):

603

if exception:

604

self.debug_worker.error_count += 1

605

print(f"[ERROR] Message failed: {exception}")

606

print(f"[ERROR] Traceback:")

607

traceback.print_exc()

608

else:

609

print(f"[DEBUG] Message completed successfully: {result}")

610

611

return DebugMiddleware(self)

612

613

def start(self):

614

# Add debug middleware

615

debug_middleware = self.create_debug_middleware()

616

self.worker.broker.add_middleware(debug_middleware)

617

618

print(f"[DEBUG] Starting worker with {self.worker.worker_threads} threads")

619

self.worker.start()

620

621

def print_stats(self):

622

print(f"[STATS] Messages processed: {self.message_count}")

623

print(f"[STATS] Errors: {self.error_count}")

624

if self.message_count > 0:

625

error_rate = (self.error_count / self.message_count) * 100

626

print(f"[STATS] Error rate: {error_rate:.1f}%")

627

628

# Usage for debugging

629

debug_worker = DebugWorker(broker, worker_threads=2)

630

debug_worker.start()

631

632

# Print stats periodically

633

import threading

634

def stats_printer():

635

while True:

636

time.sleep(30)

637

debug_worker.print_stats()

638

639

stats_thread = threading.Thread(target=stats_printer, daemon=True)

640

stats_thread.start()

641

```

642

643

### Gevent Integration

644

645

For high-concurrency scenarios, Dramatiq supports gevent:

646

647

```python

648

# Install: pip install dramatiq[gevent]

649

650

# Use gevent launcher script

651

# dramatiq-gevent my_module

652

653

# Or programmatically with gevent

654

import gevent

655

from gevent import monkey

656

monkey.patch_all()

657

658

import dramatiq

659

from dramatiq.brokers.redis import RedisBroker

660

661

@dramatiq.actor

662

def io_bound_task(url):

663

"""I/O bound task that benefits from gevent"""

664

import requests

665

response = requests.get(url)

666

return {"url": url, "status": response.status_code}

667

668

# Gevent-compatible worker setup

669

broker = RedisBroker()

670

dramatiq.set_broker(broker)

671

672

# Worker will use gevent for concurrency

673

worker = dramatiq.Worker(broker, worker_threads=100) # Many lightweight threads

674

worker.start()

675

```

676

677

This comprehensive worker documentation covers all aspects of running and managing Dramatiq workers, from basic usage to advanced production deployments with monitoring, optimization, and debugging capabilities.