or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

middleware.mddocs/

0

# Middleware

1

2

The middleware system in Dramatiq provides a powerful plugin architecture for extending message processing functionality. Middleware components can intercept and modify message processing at various stages, enabling features like retries, time limits, rate limiting, monitoring, and custom processing logic.

3

4

## Capabilities

5

6

### Middleware Base Class

7

8

The foundation for all middleware components, defining hook methods for different processing stages.

9

10

```python { .api }

11

class Middleware:

12

"""

13

Base class for middleware components.

14

15

Middleware can intercept message processing at various stages

16

and modify behavior through hook methods.

17

"""

18

19

@property

20

def actor_options(self) -> Set[str]:

21

"""

22

Set of actor options this middleware supports.

23

24

Returns:

25

Set of option names that actors can use with this middleware

26

"""

27

return set()

28

29

@property

30

def forks(self) -> List[Callable]:

31

"""

32

List of fork functions for process-based middleware.

33

34

Returns:

35

List of functions to call when worker process forks

36

"""

37

return []

38

39

# Message acknowledgment hooks

40

def before_ack(self, broker: Broker, message: Message):

41

"""Called before message acknowledgment."""

42

43

def after_ack(self, broker: Broker, message: Message):

44

"""Called after message acknowledgment."""

45

46

def before_nack(self, broker: Broker, message: Message):

47

"""Called before message negative acknowledgment."""

48

49

def after_nack(self, broker: Broker, message: Message):

50

"""Called after message negative acknowledgment."""

51

52

# Actor lifecycle hooks

53

def before_declare_actor(self, broker: Broker, actor: Actor):

54

"""Called before actor is declared to broker."""

55

56

def after_declare_actor(self, broker: Broker, actor: Actor):

57

"""Called after actor is declared to broker."""

58

59

# Message enqueue hooks

60

def before_enqueue(self, broker: Broker, message: Message, delay: int):

61

"""Called before message is enqueued."""

62

63

def after_enqueue(self, broker: Broker, message: Message, delay: int):

64

"""Called after message is enqueued."""

65

66

# Message processing hooks

67

def before_process_message(self, broker: Broker, message: Message):

68

"""

69

Called before message processing.

70

71

Can raise SkipMessage to skip processing this message.

72

"""

73

74

def after_process_message(

75

self,

76

broker: Broker,

77

message: Message,

78

*,

79

result=None,

80

exception=None

81

):

82

"""

83

Called after message processing completes.

84

85

Parameters:

86

- result: Result from successful processing (if no exception)

87

- exception: Exception from failed processing (if failed)

88

"""

89

90

def after_skip_message(self, broker: Broker, message: Message):

91

"""Called when message processing is skipped."""

92

93

# Worker lifecycle hooks

94

def before_worker_boot(self, broker: Broker, worker: Worker):

95

"""Called before worker starts processing."""

96

97

def after_worker_boot(self, broker: Broker, worker: Worker):

98

"""Called after worker starts processing."""

99

100

def before_worker_shutdown(self, broker: Broker, worker: Worker):

101

"""Called before worker shuts down."""

102

103

def after_worker_shutdown(self, broker: Broker, worker: Worker):

104

"""Called after worker shuts down."""

105

```

106

107

### Built-in Middleware Components

108

109

#### Retries Middleware

110

111

Automatically retry failed messages with exponential backoff.

112

113

```python { .api }

114

class Retries(Middleware):

115

def __init__(

116

self, *,

117

max_retries: int = 20,

118

min_backoff: int = 15000,

119

max_backoff: int = 604800000,

120

retry_when: Callable = None

121

):

122

"""

123

Initialize retry middleware.

124

125

Parameters:

126

- max_retries: Maximum number of retry attempts

127

- min_backoff: Minimum backoff time in milliseconds

128

- max_backoff: Maximum backoff time in milliseconds

129

- retry_when: Function to determine if retry should occur

130

"""

131

132

@property

133

def actor_options(self) -> Set[str]:

134

return {"max_retries", "min_backoff", "max_backoff", "retry_when"}

135

```

136

137

**Usage:**

138

139

```python

140

# Default retries

141

retries = Retries()

142

143

# Custom retry configuration

144

retries = Retries(

145

max_retries=5,

146

min_backoff=1000, # 1 second

147

max_backoff=300000, # 5 minutes

148

)

149

150

# Custom retry logic

151

def should_retry(retries_so_far, exception):

152

# Only retry on specific exceptions

153

return isinstance(exception, (ConnectionError, TimeoutError)) and retries_so_far < 3

154

155

retries = Retries(retry_when=should_retry)

156

157

broker.add_middleware(retries)

158

159

# Actor-specific retry settings

160

@dramatiq.actor(max_retries=3, min_backoff=5000)

161

def fragile_task(data):

162

if random.random() < 0.5:

163

raise Exception("Random failure")

164

return "Success"

165

```

166

167

#### Time Limit Middleware

168

169

Enforce maximum execution time for tasks.

170

171

```python { .api }

172

class TimeLimit(Middleware):

173

def __init__(self, *, time_limit: int = 600000, interval: int = 1000):

174

"""

175

Initialize time limit middleware.

176

177

Parameters:

178

- time_limit: Maximum execution time in milliseconds (default: 10 minutes)

179

- interval: Check interval in milliseconds (default: 1 second)

180

"""

181

182

@property

183

def actor_options(self) -> Set[str]:

184

return {"time_limit"}

185

186

class TimeLimitExceeded(Exception):

187

"""Raised when task execution exceeds time limit."""

188

```

189

190

**Usage:**

191

192

```python

193

time_limit = TimeLimit(time_limit=30000) # 30 seconds

194

broker.add_middleware(time_limit)

195

196

@dramatiq.actor(time_limit=60000) # 1 minute limit

197

def long_running_task(data):

198

# Long-running processing

199

time.sleep(120) # Will be interrupted after 1 minute

200

return "Finished"

201

202

try:

203

long_running_task.send({"data": "test"})

204

except TimeLimitExceeded:

205

print("Task exceeded time limit")

206

```

207

208

#### Age Limit Middleware

209

210

Reject messages that are too old.

211

212

```python { .api }

213

class AgeLimit(Middleware):

214

def __init__(self, *, max_age: int = None):

215

"""

216

Initialize age limit middleware.

217

218

Parameters:

219

- max_age: Maximum message age in milliseconds

220

"""

221

222

@property

223

def actor_options(self) -> Set[str]:

224

return {"max_age"}

225

```

226

227

**Usage:**

228

229

```python

230

age_limit = AgeLimit(max_age=3600000) # 1 hour

231

broker.add_middleware(age_limit)

232

233

@dramatiq.actor(max_age=1800000) # 30 minutes

234

def time_sensitive_task(data):

235

return f"Processing {data}"

236

```

237

238

#### Callbacks Middleware

239

240

Execute callback functions on task success or failure.

241

242

```python { .api }

243

class Callbacks(Middleware):

244

def __init__(self):

245

"""Initialize callbacks middleware."""

246

247

@property

248

def actor_options(self) -> Set[str]:

249

return {"on_success", "on_failure"}

250

```

251

252

**Usage:**

253

254

```python

255

callbacks = Callbacks()

256

broker.add_middleware(callbacks)

257

258

@dramatiq.actor

259

def success_callback(message_data, result):

260

print(f"Task {message_data.message_id} succeeded with result: {result}")

261

262

@dramatiq.actor

263

def failure_callback(message_data, exception_data):

264

print(f"Task {message_data.message_id} failed: {exception_data}")

265

266

@dramatiq.actor(

267

on_success="success_callback",

268

on_failure="failure_callback"

269

)

270

def monitored_task(data):

271

if data == "fail":

272

raise ValueError("Intentional failure")

273

return f"Processed: {data}"

274

```

275

276

#### Pipelines Middleware

277

278

Enable pipeline composition functionality.

279

280

```python { .api }

281

class Pipelines(Middleware):

282

def __init__(self):

283

"""Initialize pipelines middleware."""

284

```

285

286

#### Group Callbacks Middleware

287

288

Handle group completion callbacks and coordination.

289

290

```python { .api }

291

class GroupCallbacks(Middleware):

292

def __init__(self, rate_limiter_backend):

293

"""

294

Initialize group callbacks middleware.

295

296

Parameters:

297

- rate_limiter_backend: Backend for coordination

298

"""

299

```

300

301

#### Prometheus Middleware

302

303

Export metrics to Prometheus for monitoring.

304

305

```python { .api }

306

class Prometheus(Middleware):

307

def __init__(

308

self, *,

309

http_host: str = "127.0.0.1",

310

http_port: int = 9191,

311

registry = None

312

):

313

"""

314

Initialize Prometheus metrics middleware.

315

316

Parameters:

317

- http_host: HTTP server host for metrics endpoint

318

- http_port: HTTP server port for metrics endpoint

319

- registry: Prometheus registry (uses default if None)

320

"""

321

```

322

323

**Usage:**

324

325

```python

326

prometheus = Prometheus(http_host="0.0.0.0", http_port=8000)

327

broker.add_middleware(prometheus)

328

329

# Metrics available at http://localhost:8000/metrics

330

# - dramatiq_messages_total: Total messages processed

331

# - dramatiq_message_errors_total: Total message errors

332

# - dramatiq_message_duration_seconds: Message processing duration

333

# - dramatiq_workers_total: Number of active workers

334

```

335

336

#### Results Middleware

337

338

Store and retrieve task results.

339

340

```python { .api }

341

class Results(Middleware):

342

def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):

343

"""

344

Initialize results middleware.

345

346

Parameters:

347

- backend: Result storage backend

348

- store_results: Whether to store results by default

349

"""

350

351

@property

352

def actor_options(self) -> Set[str]:

353

return {"store_results"}

354

```

355

356

**Usage:**

357

358

```python

359

from dramatiq.results.backends import RedisBackend

360

361

result_backend = RedisBackend()

362

results = Results(backend=result_backend, store_results=True)

363

broker.add_middleware(results)

364

365

@dramatiq.actor(store_results=True)

366

def task_with_result(data):

367

return {"processed": data, "timestamp": time.time()}

368

369

message = task_with_result.send("test_data")

370

result = message.get_result(block=True, timeout=30000)

371

print(f"Task result: {result}")

372

```

373

374

#### Current Message Middleware

375

376

Provide access to current message in actors.

377

378

```python { .api }

379

class CurrentMessage(Middleware):

380

def __init__(self):

381

"""Initialize current message middleware."""

382

383

# Access current message in actors

384

from dramatiq.middleware import CurrentMessage

385

386

def get_current_message() -> Message:

387

"""Get the currently processing message."""

388

```

389

390

**Usage:**

391

392

```python

393

current_message = CurrentMessage()

394

broker.add_middleware(current_message)

395

396

@dramatiq.actor

397

def message_aware_task(data):

398

from dramatiq.middleware import get_current_message

399

400

current = get_current_message()

401

print(f"Processing message {current.message_id} with data: {data}")

402

403

return {

404

"data": data,

405

"message_id": current.message_id,

406

"retry_count": current.options.get("retries", 0)

407

}

408

```

409

410

#### Shutdown Middleware

411

412

Handle graceful worker shutdown.

413

414

```python { .api }

415

class Shutdown(Middleware):

416

def __init__(self):

417

"""Initialize shutdown middleware."""

418

419

class ShutdownNotifications(Middleware):

420

def __init__(self, notify_shutdown: Callable = None):

421

"""

422

Initialize shutdown notifications middleware.

423

424

Parameters:

425

- notify_shutdown: Function to call on shutdown

426

"""

427

```

428

429

#### AsyncIO Middleware

430

431

Support for async actors.

432

433

```python { .api }

434

class AsyncIO(Middleware):

435

def __init__(self):

436

"""Initialize AsyncIO middleware for async actors."""

437

```

438

439

**Usage:**

440

441

```python

442

asyncio_middleware = AsyncIO()

443

broker.add_middleware(asyncio_middleware)

444

445

@dramatiq.actor

446

async def async_task(data):

447

await asyncio.sleep(1) # Async operation

448

return f"Async processed: {data}"

449

450

# Send async task

451

async_task.send("test_data")

452

```

453

454

### Middleware Errors

455

456

```python { .api }

457

class MiddlewareError(Exception):

458

"""Base exception for middleware errors."""

459

460

class SkipMessage(Exception):

461

"""

462

Exception raised to skip message processing.

463

464

When raised in before_process_message, the message

465

will be acknowledged without processing.

466

"""

467

```

468

469

### Threading Utilities

470

471

Utilities for thread-based middleware operations.

472

473

```python { .api }

474

class Interrupt(Exception):

475

"""Exception used to interrupt thread execution."""

476

477

def raise_thread_exception(thread_id: int, exception: Exception):

478

"""

479

Raise an exception in a specific thread.

480

481

Parameters:

482

- thread_id: Target thread ID

483

- exception: Exception to raise in the thread

484

"""

485

```

486

487

### Default Middleware Stack

488

489

```python { .api }

490

default_middleware = [

491

Prometheus,

492

AgeLimit,

493

TimeLimit,

494

ShutdownNotifications,

495

Callbacks,

496

Pipelines,

497

Retries

498

]

499

```

500

501

### Custom Middleware Development

502

503

#### Basic Custom Middleware

504

505

```python

506

class LoggingMiddleware(dramatiq.Middleware):

507

def __init__(self, log_level="INFO"):

508

self.logger = logging.getLogger("dramatiq.custom")

509

self.logger.setLevel(log_level)

510

511

def before_process_message(self, broker, message):

512

self.logger.info(f"Starting processing: {message.actor_name}")

513

514

def after_process_message(self, broker, message, *, result=None, exception=None):

515

if exception:

516

self.logger.error(f"Failed processing {message.actor_name}: {exception}")

517

else:

518

self.logger.info(f"Completed processing: {message.actor_name}")

519

520

# Add custom middleware

521

logging_middleware = LoggingMiddleware()

522

broker.add_middleware(logging_middleware)

523

```

524

525

#### Advanced Custom Middleware with Options

526

527

```python

528

class RateLimitingMiddleware(dramatiq.Middleware):

529

def __init__(self, default_limit=100):

530

self.default_limit = default_limit

531

self.counters = {}

532

533

@property

534

def actor_options(self):

535

return {"rate_limit"}

536

537

def before_process_message(self, broker, message):

538

actor_name = message.actor_name

539

rate_limit = message.options.get("rate_limit", self.default_limit)

540

541

# Simple in-memory rate limiting (use Redis in production)

542

current_count = self.counters.get(actor_name, 0)

543

if current_count >= rate_limit:

544

raise dramatiq.RateLimitExceeded(f"Rate limit {rate_limit} exceeded for {actor_name}")

545

546

self.counters[actor_name] = current_count + 1

547

548

def after_process_message(self, broker, message, *, result=None, exception=None):

549

# Reset counter after processing

550

actor_name = message.actor_name

551

if actor_name in self.counters:

552

self.counters[actor_name] -= 1

553

554

# Usage

555

rate_limiting = RateLimitingMiddleware(default_limit=50)

556

broker.add_middleware(rate_limiting)

557

558

@dramatiq.actor(rate_limit=10)

559

def rate_limited_task(data):

560

return f"Processed: {data}"

561

```

562

563

#### Middleware with External Dependencies

564

565

```python

566

class DatabaseLoggingMiddleware(dramatiq.Middleware):

567

def __init__(self, database_url):

568

self.db = database.connect(database_url)

569

570

def after_process_message(self, broker, message, *, result=None, exception=None):

571

# Log to database

572

self.db.execute(

573

"INSERT INTO task_log (message_id, actor_name, success, error) VALUES (?, ?, ?, ?)",

574

(message.message_id, message.actor_name, exception is None, str(exception) if exception else None)

575

)

576

self.db.commit()

577

578

def before_worker_shutdown(self, broker, worker):

579

self.db.close()

580

581

# Usage

582

db_logging = DatabaseLoggingMiddleware("sqlite:///tasks.db")

583

broker.add_middleware(db_logging)

584

```

585

586

### Middleware Ordering

587

588

Middleware order matters as each middleware can modify message processing:

589

590

```python

591

# Careful ordering for proper functionality

592

broker = RedisBroker(middleware=[

593

Prometheus(), # Metrics first

594

AgeLimit(), # Filter old messages early

595

TimeLimit(), # Set time limits

596

Results(), # Store results before retries

597

Retries(), # Retry logic

598

Callbacks(), # Callbacks after retries

599

Pipelines(), # Pipeline support

600

])

601

602

# Add middleware with specific positioning

603

broker.add_middleware(CustomMiddleware(), after=TimeLimit)

604

broker.add_middleware(AnotherMiddleware(), before=Retries)

605

```