0
# Middleware
1
2
The middleware system in Dramatiq provides a powerful plugin architecture for extending message processing functionality. Middleware components can intercept and modify message processing at various stages, enabling features like retries, time limits, rate limiting, monitoring, and custom processing logic.
3
4
## Capabilities
5
6
### Middleware Base Class
7
8
The foundation for all middleware components, defining hook methods for different processing stages.
9
10
```python { .api }
11
class Middleware:
12
"""
13
Base class for middleware components.
14
15
Middleware can intercept message processing at various stages
16
and modify behavior through hook methods.
17
"""
18
19
@property
20
def actor_options(self) -> Set[str]:
21
"""
22
Set of actor options this middleware supports.
23
24
Returns:
25
Set of option names that actors can use with this middleware
26
"""
27
return set()
28
29
@property
30
def forks(self) -> List[Callable]:
31
"""
32
List of fork functions for process-based middleware.
33
34
Returns:
35
List of functions to call when worker process forks
36
"""
37
return []
38
39
# Message acknowledgment hooks
40
def before_ack(self, broker: Broker, message: Message):
41
"""Called before message acknowledgment."""
42
43
def after_ack(self, broker: Broker, message: Message):
44
"""Called after message acknowledgment."""
45
46
def before_nack(self, broker: Broker, message: Message):
47
"""Called before message negative acknowledgment."""
48
49
def after_nack(self, broker: Broker, message: Message):
50
"""Called after message negative acknowledgment."""
51
52
# Actor lifecycle hooks
53
def before_declare_actor(self, broker: Broker, actor: Actor):
54
"""Called before actor is declared to broker."""
55
56
def after_declare_actor(self, broker: Broker, actor: Actor):
57
"""Called after actor is declared to broker."""
58
59
# Message enqueue hooks
60
def before_enqueue(self, broker: Broker, message: Message, delay: int):
61
"""Called before message is enqueued."""
62
63
def after_enqueue(self, broker: Broker, message: Message, delay: int):
64
"""Called after message is enqueued."""
65
66
# Message processing hooks
67
def before_process_message(self, broker: Broker, message: Message):
68
"""
69
Called before message processing.
70
71
Can raise SkipMessage to skip processing this message.
72
"""
73
74
def after_process_message(
75
self,
76
broker: Broker,
77
message: Message,
78
*,
79
result=None,
80
exception=None
81
):
82
"""
83
Called after message processing completes.
84
85
Parameters:
86
- result: Result from successful processing (if no exception)
87
- exception: Exception from failed processing (if failed)
88
"""
89
90
def after_skip_message(self, broker: Broker, message: Message):
91
"""Called when message processing is skipped."""
92
93
# Worker lifecycle hooks
94
def before_worker_boot(self, broker: Broker, worker: Worker):
95
"""Called before worker starts processing."""
96
97
def after_worker_boot(self, broker: Broker, worker: Worker):
98
"""Called after worker starts processing."""
99
100
def before_worker_shutdown(self, broker: Broker, worker: Worker):
101
"""Called before worker shuts down."""
102
103
def after_worker_shutdown(self, broker: Broker, worker: Worker):
104
"""Called after worker shuts down."""
105
```
106
107
### Built-in Middleware Components
108
109
#### Retries Middleware
110
111
Automatically retry failed messages with exponential backoff.
112
113
```python { .api }
114
class Retries(Middleware):
115
def __init__(
116
self, *,
117
max_retries: int = 20,
118
min_backoff: int = 15000,
119
max_backoff: int = 604800000,
120
retry_when: Callable = None
121
):
122
"""
123
Initialize retry middleware.
124
125
Parameters:
126
- max_retries: Maximum number of retry attempts
127
- min_backoff: Minimum backoff time in milliseconds
128
- max_backoff: Maximum backoff time in milliseconds
129
- retry_when: Function to determine if retry should occur
130
"""
131
132
@property
133
def actor_options(self) -> Set[str]:
134
return {"max_retries", "min_backoff", "max_backoff", "retry_when"}
135
```
136
137
**Usage:**
138
139
```python
140
# Default retries
141
retries = Retries()
142
143
# Custom retry configuration
144
retries = Retries(
145
max_retries=5,
146
min_backoff=1000, # 1 second
147
max_backoff=300000, # 5 minutes
148
)
149
150
# Custom retry logic
151
def should_retry(retries_so_far, exception):
152
# Only retry on specific exceptions
153
return isinstance(exception, (ConnectionError, TimeoutError)) and retries_so_far < 3
154
155
retries = Retries(retry_when=should_retry)
156
157
broker.add_middleware(retries)
158
159
# Actor-specific retry settings
160
@dramatiq.actor(max_retries=3, min_backoff=5000)
161
def fragile_task(data):
162
if random.random() < 0.5:
163
raise Exception("Random failure")
164
return "Success"
165
```
166
167
#### Time Limit Middleware
168
169
Enforce maximum execution time for tasks.
170
171
```python { .api }
172
class TimeLimit(Middleware):
173
def __init__(self, *, time_limit: int = 600000, interval: int = 1000):
174
"""
175
Initialize time limit middleware.
176
177
Parameters:
178
- time_limit: Maximum execution time in milliseconds (default: 10 minutes)
179
- interval: Check interval in milliseconds (default: 1 second)
180
"""
181
182
@property
183
def actor_options(self) -> Set[str]:
184
return {"time_limit"}
185
186
class TimeLimitExceeded(Exception):
187
"""Raised when task execution exceeds time limit."""
188
```
189
190
**Usage:**
191
192
```python
193
time_limit = TimeLimit(time_limit=30000) # 30 seconds
194
broker.add_middleware(time_limit)
195
196
@dramatiq.actor(time_limit=60000) # 1 minute limit
197
def long_running_task(data):
198
# Long-running processing
199
time.sleep(120) # Will be interrupted after 1 minute
200
return "Finished"
201
202
try:
203
long_running_task.send({"data": "test"})
204
except TimeLimitExceeded:
205
print("Task exceeded time limit")
206
```
207
208
#### Age Limit Middleware
209
210
Reject messages that are too old.
211
212
```python { .api }
213
class AgeLimit(Middleware):
214
def __init__(self, *, max_age: int = None):
215
"""
216
Initialize age limit middleware.
217
218
Parameters:
219
- max_age: Maximum message age in milliseconds
220
"""
221
222
@property
223
def actor_options(self) -> Set[str]:
224
return {"max_age"}
225
```
226
227
**Usage:**
228
229
```python
230
age_limit = AgeLimit(max_age=3600000) # 1 hour
231
broker.add_middleware(age_limit)
232
233
@dramatiq.actor(max_age=1800000) # 30 minutes
234
def time_sensitive_task(data):
235
return f"Processing {data}"
236
```
237
238
#### Callbacks Middleware
239
240
Execute callback functions on task success or failure.
241
242
```python { .api }
243
class Callbacks(Middleware):
244
def __init__(self):
245
"""Initialize callbacks middleware."""
246
247
@property
248
def actor_options(self) -> Set[str]:
249
return {"on_success", "on_failure"}
250
```
251
252
**Usage:**
253
254
```python
255
callbacks = Callbacks()
256
broker.add_middleware(callbacks)
257
258
@dramatiq.actor
259
def success_callback(message_data, result):
260
print(f"Task {message_data.message_id} succeeded with result: {result}")
261
262
@dramatiq.actor
263
def failure_callback(message_data, exception_data):
264
print(f"Task {message_data.message_id} failed: {exception_data}")
265
266
@dramatiq.actor(
267
on_success="success_callback",
268
on_failure="failure_callback"
269
)
270
def monitored_task(data):
271
if data == "fail":
272
raise ValueError("Intentional failure")
273
return f"Processed: {data}"
274
```
275
276
#### Pipelines Middleware
277
278
Enable pipeline composition functionality.
279
280
```python { .api }
281
class Pipelines(Middleware):
282
def __init__(self):
283
"""Initialize pipelines middleware."""
284
```
285
286
#### Group Callbacks Middleware
287
288
Handle group completion callbacks and coordination.
289
290
```python { .api }
291
class GroupCallbacks(Middleware):
292
def __init__(self, rate_limiter_backend):
293
"""
294
Initialize group callbacks middleware.
295
296
Parameters:
297
- rate_limiter_backend: Backend for coordination
298
"""
299
```
300
301
#### Prometheus Middleware
302
303
Export metrics to Prometheus for monitoring.
304
305
```python { .api }
306
class Prometheus(Middleware):
307
def __init__(
308
self, *,
309
http_host: str = "127.0.0.1",
310
http_port: int = 9191,
311
registry = None
312
):
313
"""
314
Initialize Prometheus metrics middleware.
315
316
Parameters:
317
- http_host: HTTP server host for metrics endpoint
318
- http_port: HTTP server port for metrics endpoint
319
- registry: Prometheus registry (uses default if None)
320
"""
321
```
322
323
**Usage:**
324
325
```python
326
prometheus = Prometheus(http_host="0.0.0.0", http_port=8000)
327
broker.add_middleware(prometheus)
328
329
# Metrics available at http://localhost:8000/metrics
330
# - dramatiq_messages_total: Total messages processed
331
# - dramatiq_message_errors_total: Total message errors
332
# - dramatiq_message_duration_seconds: Message processing duration
333
# - dramatiq_workers_total: Number of active workers
334
```
335
336
#### Results Middleware
337
338
Store and retrieve task results.
339
340
```python { .api }
341
class Results(Middleware):
342
def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):
343
"""
344
Initialize results middleware.
345
346
Parameters:
347
- backend: Result storage backend
348
- store_results: Whether to store results by default
349
"""
350
351
@property
352
def actor_options(self) -> Set[str]:
353
return {"store_results"}
354
```
355
356
**Usage:**
357
358
```python
359
from dramatiq.results.backends import RedisBackend
360
361
result_backend = RedisBackend()
362
results = Results(backend=result_backend, store_results=True)
363
broker.add_middleware(results)
364
365
@dramatiq.actor(store_results=True)
366
def task_with_result(data):
367
return {"processed": data, "timestamp": time.time()}
368
369
message = task_with_result.send("test_data")
370
result = message.get_result(block=True, timeout=30000)
371
print(f"Task result: {result}")
372
```
373
374
#### Current Message Middleware
375
376
Provide access to current message in actors.
377
378
```python { .api }
379
class CurrentMessage(Middleware):
380
def __init__(self):
381
"""Initialize current message middleware."""
382
383
# Access current message in actors
384
from dramatiq.middleware import CurrentMessage
385
386
def get_current_message() -> Message:
387
"""Get the currently processing message."""
388
```
389
390
**Usage:**
391
392
```python
393
current_message = CurrentMessage()
394
broker.add_middleware(current_message)
395
396
@dramatiq.actor
397
def message_aware_task(data):
398
from dramatiq.middleware import get_current_message
399
400
current = get_current_message()
401
print(f"Processing message {current.message_id} with data: {data}")
402
403
return {
404
"data": data,
405
"message_id": current.message_id,
406
"retry_count": current.options.get("retries", 0)
407
}
408
```
409
410
#### Shutdown Middleware
411
412
Handle graceful worker shutdown.
413
414
```python { .api }
415
class Shutdown(Middleware):
416
def __init__(self):
417
"""Initialize shutdown middleware."""
418
419
class ShutdownNotifications(Middleware):
420
def __init__(self, notify_shutdown: Callable = None):
421
"""
422
Initialize shutdown notifications middleware.
423
424
Parameters:
425
- notify_shutdown: Function to call on shutdown
426
"""
427
```
428
429
#### AsyncIO Middleware
430
431
Support for async actors.
432
433
```python { .api }
434
class AsyncIO(Middleware):
435
def __init__(self):
436
"""Initialize AsyncIO middleware for async actors."""
437
```
438
439
**Usage:**
440
441
```python
442
asyncio_middleware = AsyncIO()
443
broker.add_middleware(asyncio_middleware)
444
445
@dramatiq.actor
446
async def async_task(data):
447
await asyncio.sleep(1) # Async operation
448
return f"Async processed: {data}"
449
450
# Send async task
451
async_task.send("test_data")
452
```
453
454
### Middleware Errors
455
456
```python { .api }
457
class MiddlewareError(Exception):
458
"""Base exception for middleware errors."""
459
460
class SkipMessage(Exception):
461
"""
462
Exception raised to skip message processing.
463
464
When raised in before_process_message, the message
465
will be acknowledged without processing.
466
"""
467
```
468
469
### Threading Utilities
470
471
Utilities for thread-based middleware operations.
472
473
```python { .api }
474
class Interrupt(Exception):
475
"""Exception used to interrupt thread execution."""
476
477
def raise_thread_exception(thread_id: int, exception: Exception):
478
"""
479
Raise an exception in a specific thread.
480
481
Parameters:
482
- thread_id: Target thread ID
483
- exception: Exception to raise in the thread
484
"""
485
```
486
487
### Default Middleware Stack
488
489
```python { .api }
490
default_middleware = [
491
Prometheus,
492
AgeLimit,
493
TimeLimit,
494
ShutdownNotifications,
495
Callbacks,
496
Pipelines,
497
Retries
498
]
499
```
500
501
### Custom Middleware Development
502
503
#### Basic Custom Middleware
504
505
```python
506
class LoggingMiddleware(dramatiq.Middleware):
507
def __init__(self, log_level="INFO"):
508
self.logger = logging.getLogger("dramatiq.custom")
509
self.logger.setLevel(log_level)
510
511
def before_process_message(self, broker, message):
512
self.logger.info(f"Starting processing: {message.actor_name}")
513
514
def after_process_message(self, broker, message, *, result=None, exception=None):
515
if exception:
516
self.logger.error(f"Failed processing {message.actor_name}: {exception}")
517
else:
518
self.logger.info(f"Completed processing: {message.actor_name}")
519
520
# Add custom middleware
521
logging_middleware = LoggingMiddleware()
522
broker.add_middleware(logging_middleware)
523
```
524
525
#### Advanced Custom Middleware with Options
526
527
```python
528
class RateLimitingMiddleware(dramatiq.Middleware):
529
def __init__(self, default_limit=100):
530
self.default_limit = default_limit
531
self.counters = {}
532
533
@property
534
def actor_options(self):
535
return {"rate_limit"}
536
537
def before_process_message(self, broker, message):
538
actor_name = message.actor_name
539
rate_limit = message.options.get("rate_limit", self.default_limit)
540
541
# Simple in-memory rate limiting (use Redis in production)
542
current_count = self.counters.get(actor_name, 0)
543
if current_count >= rate_limit:
544
raise dramatiq.RateLimitExceeded(f"Rate limit {rate_limit} exceeded for {actor_name}")
545
546
self.counters[actor_name] = current_count + 1
547
548
def after_process_message(self, broker, message, *, result=None, exception=None):
549
# Reset counter after processing
550
actor_name = message.actor_name
551
if actor_name in self.counters:
552
self.counters[actor_name] -= 1
553
554
# Usage
555
rate_limiting = RateLimitingMiddleware(default_limit=50)
556
broker.add_middleware(rate_limiting)
557
558
@dramatiq.actor(rate_limit=10)
559
def rate_limited_task(data):
560
return f"Processed: {data}"
561
```
562
563
#### Middleware with External Dependencies
564
565
```python
566
class DatabaseLoggingMiddleware(dramatiq.Middleware):
567
def __init__(self, database_url):
568
self.db = database.connect(database_url)
569
570
def after_process_message(self, broker, message, *, result=None, exception=None):
571
# Log to database
572
self.db.execute(
573
"INSERT INTO task_log (message_id, actor_name, success, error) VALUES (?, ?, ?, ?)",
574
(message.message_id, message.actor_name, exception is None, str(exception) if exception else None)
575
)
576
self.db.commit()
577
578
def before_worker_shutdown(self, broker, worker):
579
self.db.close()
580
581
# Usage
582
db_logging = DatabaseLoggingMiddleware("sqlite:///tasks.db")
583
broker.add_middleware(db_logging)
584
```
585
586
### Middleware Ordering
587
588
Middleware order matters as each middleware can modify message processing:
589
590
```python
591
# Careful ordering for proper functionality
592
broker = RedisBroker(middleware=[
593
Prometheus(), # Metrics first
594
AgeLimit(), # Filter old messages early
595
TimeLimit(), # Set time limits
596
Results(), # Store results before retries
597
Retries(), # Retry logic
598
Callbacks(), # Callbacks after retries
599
Pipelines(), # Pipeline support
600
])
601
602
# Add middleware with specific positioning
603
broker.add_middleware(CustomMiddleware(), after=TimeLimit)
604
broker.add_middleware(AnotherMiddleware(), before=Retries)
605
```