0
# Middleware
1
2
Middleware system providing extensible pipeline for implementing cross-cutting concerns like retries, monitoring, authentication, and custom processing logic. Middleware components can intercept and modify messages at various stages of the task lifecycle.
3
4
## Capabilities
5
6
### Middleware Interface
7
8
Base middleware class that defines hooks for message processing at different lifecycle stages.
9
10
```python { .api }
11
class TaskiqMiddleware:
12
"""
13
Abstract base class for implementing middleware components.
14
15
Middleware can intercept and modify messages during:
16
- Before sending to broker (pre_send)
17
- After sending to broker (post_send)
18
- Before task execution (pre_execute)
19
- After task execution (post_execute)
20
"""
21
22
def __init__(self) -> None: ...
23
24
def set_broker(self, broker: AsyncBroker) -> None:
25
"""Called when middleware is added to a broker."""
26
27
async def startup(self) -> None:
28
"""Called during broker startup."""
29
30
async def shutdown(self) -> None:
31
"""Called during broker shutdown."""
32
33
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
34
"""
35
Process message before sending to broker.
36
37
Args:
38
message: Task message to be sent
39
40
Returns:
41
Modified message (can return same message if no changes)
42
"""
43
44
async def post_send(self, message: TaskiqMessage) -> None:
45
"""
46
Process message after successful send to broker.
47
48
Args:
49
message: Task message that was sent
50
"""
51
52
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
53
"""
54
Process message before task execution.
55
56
Args:
57
message: Task message to be executed
58
59
Returns:
60
Modified message (can return same message if no changes)
61
"""
62
63
async def post_execute(
64
self,
65
message: TaskiqMessage,
66
result: TaskiqResult,
67
) -> None:
68
"""
69
Process result after task execution.
70
71
Args:
72
message: Original task message
73
result: Task execution result
74
"""
75
```
76
77
### Simple Retry Middleware
78
79
Basic retry mechanism that retries failed tasks a fixed number of times.
80
81
```python { .api }
82
class SimpleRetryMiddleware(TaskiqMiddleware):
83
"""
84
Simple retry middleware with fixed retry count.
85
86
Retries failed tasks up to max_retries times with no delay between attempts.
87
Uses task labels to track retry count and prevent infinite loops.
88
"""
89
90
def __init__(
91
self,
92
max_retries: int = 3,
93
ignore_errors: Optional[List[Type[Exception]]] = None,
94
) -> None:
95
"""
96
Initialize simple retry middleware.
97
98
Args:
99
max_retries: Maximum number of retry attempts
100
ignore_errors: Exception types that should not trigger retries
101
"""
102
103
async def post_execute(
104
self,
105
message: TaskiqMessage,
106
result: TaskiqResult,
107
) -> None:
108
"""Retry task if it failed and retries are available."""
109
```
110
111
### Smart Retry Middleware
112
113
Advanced retry mechanism with exponential backoff, jitter, and configurable retry conditions.
114
115
```python { .api }
116
class SmartRetryMiddleware(TaskiqMiddleware):
117
"""
118
Advanced retry middleware with exponential backoff.
119
120
Features:
121
- Exponential backoff with configurable base delay
122
- Jitter to prevent thundering herd
123
- Maximum retry attempts
124
- Exception type filtering
125
- Custom retry condition functions
126
"""
127
128
def __init__(
129
self,
130
max_retries: int = 3,
131
base_delay: float = 1.0,
132
max_delay: float = 60.0,
133
exponential_base: float = 2.0,
134
jitter: bool = True,
135
ignore_errors: Optional[List[Type[Exception]]] = None,
136
retry_on: Optional[Callable[[Exception], bool]] = None,
137
) -> None:
138
"""
139
Initialize smart retry middleware.
140
141
Args:
142
max_retries: Maximum number of retry attempts
143
base_delay: Initial delay between retries in seconds
144
max_delay: Maximum delay between retries in seconds
145
exponential_base: Base for exponential backoff calculation
146
jitter: Whether to add random jitter to delays
147
ignore_errors: Exception types that should not trigger retries
148
retry_on: Custom function to determine if exception should trigger retry
149
"""
150
151
async def post_execute(
152
self,
153
message: TaskiqMessage,
154
result: TaskiqResult,
155
) -> None:
156
"""Retry task with exponential backoff if conditions are met."""
157
```
158
159
### Prometheus Middleware
160
161
Monitoring middleware that collects and exports Prometheus metrics for task execution.
162
163
```python { .api }
164
class PrometheusMiddleware(TaskiqMiddleware):
165
"""
166
Prometheus metrics collection middleware.
167
168
Collects metrics for:
169
- Task execution counts by status (success/failure)
170
- Task execution duration histograms
171
- Active task counts
172
- Queue size metrics
173
"""
174
175
def __init__(
176
self,
177
registry: Optional[CollectorRegistry] = None,
178
label_names: Optional[List[str]] = None,
179
) -> None:
180
"""
181
Initialize Prometheus middleware.
182
183
Args:
184
registry: Prometheus registry for metric collection
185
label_names: Additional label names for metrics
186
"""
187
188
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
189
"""Start timing and increment active task counter."""
190
191
async def post_execute(
192
self,
193
message: TaskiqMessage,
194
result: TaskiqResult,
195
) -> None:
196
"""Record execution metrics and update counters."""
197
```
198
199
## Usage Examples
200
201
### Basic Middleware Setup
202
203
```python
204
from taskiq import InMemoryBroker
205
from taskiq.middlewares import SimpleRetryMiddleware
206
207
# Add middleware to broker
208
broker = InMemoryBroker()
209
broker.add_middlewares(SimpleRetryMiddleware(max_retries=5))
210
211
# Alternative: using builder pattern
212
broker = InMemoryBroker().with_middlewares(
213
SimpleRetryMiddleware(max_retries=5)
214
)
215
216
@broker.task
217
async def unreliable_task(data: str) -> str:
218
# Task that might fail and need retries
219
if random.random() < 0.3: # 30% failure rate
220
raise ValueError("Random failure")
221
return f"Processed: {data}"
222
```
223
224
### Advanced Retry Configuration
225
226
```python
227
from taskiq.middlewares import SmartRetryMiddleware
228
229
# Configure smart retry with custom settings
230
smart_retry = SmartRetryMiddleware(
231
max_retries=5,
232
base_delay=2.0, # Start with 2 second delay
233
max_delay=120.0, # Cap at 2 minutes
234
exponential_base=2.0, # Double delay each time
235
jitter=True, # Add randomness
236
ignore_errors=[ValueError], # Don't retry ValueError
237
)
238
239
broker.add_middlewares(smart_retry)
240
241
@broker.task
242
async def api_call_task(url: str) -> dict:
243
# Task that benefits from smart retry
244
async with httpx.AsyncClient() as client:
245
response = await client.get(url, timeout=10.0)
246
response.raise_for_status()
247
return response.json()
248
```
249
250
### Custom Middleware Implementation
251
252
```python
253
class LoggingMiddleware(TaskiqMiddleware):
254
"""Custom middleware for detailed task logging."""
255
256
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
257
print(f"Sending task: {message.task_name} (ID: {message.task_id})")
258
return message
259
260
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
261
print(f"Executing task: {message.task_name}")
262
# Add execution start time to labels
263
message.labels["execution_started"] = time.time()
264
return message
265
266
async def post_execute(
267
self,
268
message: TaskiqMessage,
269
result: TaskiqResult,
270
) -> None:
271
start_time = message.labels.get("execution_started", 0)
272
duration = time.time() - start_time
273
274
status = "SUCCESS" if not result.is_err else "FAILED"
275
print(f"Task {message.task_name} {status} in {duration:.2f}s")
276
277
if result.is_err:
278
print(f"Error: {result.error}")
279
280
# Use custom middleware
281
broker.add_middlewares(LoggingMiddleware())
282
```
283
284
### Multiple Middleware Pipeline
285
286
```python
287
from taskiq.middlewares import (
288
SimpleRetryMiddleware,
289
PrometheusMiddleware,
290
)
291
292
# Multiple middleware are executed in order
293
broker = InMemoryBroker().with_middlewares(
294
LoggingMiddleware(), # First: logging
295
SimpleRetryMiddleware(max_retries=3), # Second: retries
296
PrometheusMiddleware(), # Third: metrics
297
)
298
299
# Execution order:
300
# 1. pre_send: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware
301
# 2. post_send: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware
302
# 3. pre_execute: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware
303
# 4. post_execute: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware
304
```
305
306
### Conditional Middleware
307
308
```python
309
class ConditionalRetryMiddleware(TaskiqMiddleware):
310
"""Retry middleware that only applies to specific tasks."""
311
312
def __init__(self, max_retries: int = 3):
313
self.max_retries = max_retries
314
315
async def post_execute(
316
self,
317
message: TaskiqMessage,
318
result: TaskiqResult,
319
) -> None:
320
# Only retry tasks with 'retryable' label
321
if not message.labels.get("retryable", False):
322
return
323
324
if result.is_err:
325
retry_count = int(message.labels.get("retry_count", 0))
326
if retry_count < self.max_retries:
327
message.labels["retry_count"] = str(retry_count + 1)
328
await self.broker.kick(self.broker.formatter.dumps(message))
329
330
# Use with labeled tasks
331
@broker.task(retryable=True)
332
async def important_task(data: str) -> str:
333
# This task will be retried on failure
334
return process_important_data(data)
335
336
@broker.task(retryable=False)
337
async def simple_task(data: str) -> str:
338
# This task won't be retried
339
return process_simple_data(data)
340
```