or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdevents-state.mdexceptions.mdindex.mdmiddleware.mdresult-backends.mdscheduling.mdtasks-results.md

middleware.mddocs/

0

# Middleware

1

2

Middleware system providing extensible pipeline for implementing cross-cutting concerns like retries, monitoring, authentication, and custom processing logic. Middleware components can intercept and modify messages at various stages of the task lifecycle.

3

4

## Capabilities

5

6

### Middleware Interface

7

8

Base middleware class that defines hooks for message processing at different lifecycle stages.

9

10

```python { .api }

11

class TaskiqMiddleware:

12

"""

13

Abstract base class for implementing middleware components.

14

15

Middleware can intercept and modify messages during:

16

- Before sending to broker (pre_send)

17

- After sending to broker (post_send)

18

- Before task execution (pre_execute)

19

- After task execution (post_execute)

20

"""

21

22

def __init__(self) -> None: ...

23

24

def set_broker(self, broker: AsyncBroker) -> None:

25

"""Called when middleware is added to a broker."""

26

27

async def startup(self) -> None:

28

"""Called during broker startup."""

29

30

async def shutdown(self) -> None:

31

"""Called during broker shutdown."""

32

33

async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:

34

"""

35

Process message before sending to broker.

36

37

Args:

38

message: Task message to be sent

39

40

Returns:

41

Modified message (can return same message if no changes)

42

"""

43

44

async def post_send(self, message: TaskiqMessage) -> None:

45

"""

46

Process message after successful send to broker.

47

48

Args:

49

message: Task message that was sent

50

"""

51

52

async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:

53

"""

54

Process message before task execution.

55

56

Args:

57

message: Task message to be executed

58

59

Returns:

60

Modified message (can return same message if no changes)

61

"""

62

63

async def post_execute(

64

self,

65

message: TaskiqMessage,

66

result: TaskiqResult,

67

) -> None:

68

"""

69

Process result after task execution.

70

71

Args:

72

message: Original task message

73

result: Task execution result

74

"""

75

```

76

77

### Simple Retry Middleware

78

79

Basic retry mechanism that retries failed tasks a fixed number of times.

80

81

```python { .api }

82

class SimpleRetryMiddleware(TaskiqMiddleware):

83

"""

84

Simple retry middleware with fixed retry count.

85

86

Retries failed tasks up to max_retries times with no delay between attempts.

87

Uses task labels to track retry count and prevent infinite loops.

88

"""

89

90

def __init__(

91

self,

92

max_retries: int = 3,

93

ignore_errors: Optional[List[Type[Exception]]] = None,

94

) -> None:

95

"""

96

Initialize simple retry middleware.

97

98

Args:

99

max_retries: Maximum number of retry attempts

100

ignore_errors: Exception types that should not trigger retries

101

"""

102

103

async def post_execute(

104

self,

105

message: TaskiqMessage,

106

result: TaskiqResult,

107

) -> None:

108

"""Retry task if it failed and retries are available."""

109

```

110

111

### Smart Retry Middleware

112

113

Advanced retry mechanism with exponential backoff, jitter, and configurable retry conditions.

114

115

```python { .api }

116

class SmartRetryMiddleware(TaskiqMiddleware):

117

"""

118

Advanced retry middleware with exponential backoff.

119

120

Features:

121

- Exponential backoff with configurable base delay

122

- Jitter to prevent thundering herd

123

- Maximum retry attempts

124

- Exception type filtering

125

- Custom retry condition functions

126

"""

127

128

def __init__(

129

self,

130

max_retries: int = 3,

131

base_delay: float = 1.0,

132

max_delay: float = 60.0,

133

exponential_base: float = 2.0,

134

jitter: bool = True,

135

ignore_errors: Optional[List[Type[Exception]]] = None,

136

retry_on: Optional[Callable[[Exception], bool]] = None,

137

) -> None:

138

"""

139

Initialize smart retry middleware.

140

141

Args:

142

max_retries: Maximum number of retry attempts

143

base_delay: Initial delay between retries in seconds

144

max_delay: Maximum delay between retries in seconds

145

exponential_base: Base for exponential backoff calculation

146

jitter: Whether to add random jitter to delays

147

ignore_errors: Exception types that should not trigger retries

148

retry_on: Custom function to determine if exception should trigger retry

149

"""

150

151

async def post_execute(

152

self,

153

message: TaskiqMessage,

154

result: TaskiqResult,

155

) -> None:

156

"""Retry task with exponential backoff if conditions are met."""

157

```

158

159

### Prometheus Middleware

160

161

Monitoring middleware that collects and exports Prometheus metrics for task execution.

162

163

```python { .api }

164

class PrometheusMiddleware(TaskiqMiddleware):

165

"""

166

Prometheus metrics collection middleware.

167

168

Collects metrics for:

169

- Task execution counts by status (success/failure)

170

- Task execution duration histograms

171

- Active task counts

172

- Queue size metrics

173

"""

174

175

def __init__(

176

self,

177

registry: Optional[CollectorRegistry] = None,

178

label_names: Optional[List[str]] = None,

179

) -> None:

180

"""

181

Initialize Prometheus middleware.

182

183

Args:

184

registry: Prometheus registry for metric collection

185

label_names: Additional label names for metrics

186

"""

187

188

async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:

189

"""Start timing and increment active task counter."""

190

191

async def post_execute(

192

self,

193

message: TaskiqMessage,

194

result: TaskiqResult,

195

) -> None:

196

"""Record execution metrics and update counters."""

197

```

198

199

## Usage Examples

200

201

### Basic Middleware Setup

202

203

```python

204

from taskiq import InMemoryBroker

205

from taskiq.middlewares import SimpleRetryMiddleware

206

207

# Add middleware to broker

208

broker = InMemoryBroker()

209

broker.add_middlewares(SimpleRetryMiddleware(max_retries=5))

210

211

# Alternative: using builder pattern

212

broker = InMemoryBroker().with_middlewares(

213

SimpleRetryMiddleware(max_retries=5)

214

)

215

216

@broker.task

217

async def unreliable_task(data: str) -> str:

218

# Task that might fail and need retries

219

if random.random() < 0.3: # 30% failure rate

220

raise ValueError("Random failure")

221

return f"Processed: {data}"

222

```

223

224

### Advanced Retry Configuration

225

226

```python

227

from taskiq.middlewares import SmartRetryMiddleware

228

229

# Configure smart retry with custom settings

230

smart_retry = SmartRetryMiddleware(

231

max_retries=5,

232

base_delay=2.0, # Start with 2 second delay

233

max_delay=120.0, # Cap at 2 minutes

234

exponential_base=2.0, # Double delay each time

235

jitter=True, # Add randomness

236

ignore_errors=[ValueError], # Don't retry ValueError

237

)

238

239

broker.add_middlewares(smart_retry)

240

241

@broker.task

242

async def api_call_task(url: str) -> dict:

243

# Task that benefits from smart retry

244

async with httpx.AsyncClient() as client:

245

response = await client.get(url, timeout=10.0)

246

response.raise_for_status()

247

return response.json()

248

```

249

250

### Custom Middleware Implementation

251

252

```python

253

class LoggingMiddleware(TaskiqMiddleware):

254

"""Custom middleware for detailed task logging."""

255

256

async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:

257

print(f"Sending task: {message.task_name} (ID: {message.task_id})")

258

return message

259

260

async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:

261

print(f"Executing task: {message.task_name}")

262

# Add execution start time to labels

263

message.labels["execution_started"] = time.time()

264

return message

265

266

async def post_execute(

267

self,

268

message: TaskiqMessage,

269

result: TaskiqResult,

270

) -> None:

271

start_time = message.labels.get("execution_started", 0)

272

duration = time.time() - start_time

273

274

status = "SUCCESS" if not result.is_err else "FAILED"

275

print(f"Task {message.task_name} {status} in {duration:.2f}s")

276

277

if result.is_err:

278

print(f"Error: {result.error}")

279

280

# Use custom middleware

281

broker.add_middlewares(LoggingMiddleware())

282

```

283

284

### Multiple Middleware Pipeline

285

286

```python

287

from taskiq.middlewares import (

288

SimpleRetryMiddleware,

289

PrometheusMiddleware,

290

)

291

292

# Multiple middleware are executed in order

293

broker = InMemoryBroker().with_middlewares(

294

LoggingMiddleware(), # First: logging

295

SimpleRetryMiddleware(max_retries=3), # Second: retries

296

PrometheusMiddleware(), # Third: metrics

297

)

298

299

# Execution order:

300

# 1. pre_send: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware

301

# 2. post_send: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware

302

# 3. pre_execute: LoggingMiddleware -> SimpleRetryMiddleware -> PrometheusMiddleware

303

# 4. post_execute: PrometheusMiddleware -> SimpleRetryMiddleware -> LoggingMiddleware

304

```

305

306

### Conditional Middleware

307

308

```python

309

class ConditionalRetryMiddleware(TaskiqMiddleware):

310

"""Retry middleware that only applies to specific tasks."""

311

312

def __init__(self, max_retries: int = 3):

313

self.max_retries = max_retries

314

315

async def post_execute(

316

self,

317

message: TaskiqMessage,

318

result: TaskiqResult,

319

) -> None:

320

# Only retry tasks with 'retryable' label

321

if not message.labels.get("retryable", False):

322

return

323

324

if result.is_err:

325

retry_count = int(message.labels.get("retry_count", 0))

326

if retry_count < self.max_retries:

327

message.labels["retry_count"] = str(retry_count + 1)

328

await self.broker.kick(self.broker.formatter.dumps(message))

329

330

# Use with labeled tasks

331

@broker.task(retryable=True)

332

async def important_task(data: str) -> str:

333

# This task will be retried on failure

334

return process_important_data(data)

335

336

@broker.task(retryable=False)

337

async def simple_task(data: str) -> str:

338

# This task won't be retried

339

return process_simple_data(data)

340

```