or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdcommand-line.mdconfiguration.mderror-handling.mdhttp-processing.mdindex.mdproxy-headers.mdserver-management.mdtask-management.md

task-management.mddocs/

0

# Task Management

1

2

Thread pool management and task dispatching system for processing WSGI requests efficiently across multiple worker threads.

3

4

## Capabilities

5

6

### Task Dispatcher

7

8

Central coordinator for managing worker threads and distributing WSGI request processing tasks.

9

10

```python { .api }

11

class ThreadedTaskDispatcher:

12

"""

13

Manages worker thread pool for WSGI request processing.

14

15

Coordinates task distribution across multiple threads while

16

maintaining proper WSGI application isolation and error handling.

17

"""

18

19

def __init__(self):

20

"""Initialize task dispatcher with default configuration."""

21

22

def set_thread_count(self, count):

23

"""

24

Set number of worker threads.

25

26

Parameters:

27

- count (int): Number of worker threads (typically 1-50)

28

29

Notes:

30

- Changes take effect on next server start

31

- More threads allow higher concurrency but use more memory

32

- Optimal count depends on application characteristics

33

"""

34

35

def add_task(self, task):

36

"""

37

Add task to processing queue.

38

39

Parameters:

40

- task: Task instance (WSGITask, ErrorTask, etc.)

41

42

Notes:

43

- Tasks are processed FIFO by available worker threads

44

- Blocks if queue is full (backpressure mechanism)

45

"""

46

47

def shutdown(self):

48

"""

49

Shutdown thread pool cleanly.

50

51

Waits for running tasks to complete before terminating threads.

52

Called automatically during server shutdown.

53

"""

54

```

55

56

### Task Base Classes

57

58

Foundation classes for implementing different types of processing tasks.

59

60

```python { .api }

61

class Task:

62

"""

63

Base class for all task types.

64

65

Provides common task lifecycle management and error handling

66

framework for specialized task implementations.

67

"""

68

69

def __init__(self, channel, request):

70

"""

71

Initialize task with channel and request context.

72

73

Parameters:

74

- channel: HTTPChannel instance for response transmission

75

- request: Parsed HTTP request data

76

"""

77

78

def service(self):

79

"""

80

Execute task processing.

81

82

Called by worker thread to perform actual task work.

83

Must be implemented by subclasses.

84

"""

85

86

def cancel(self):

87

"""Cancel task execution if possible."""

88

89

def defer(self):

90

"""Defer task execution to later time."""

91

92

class WSGITask(Task):

93

"""

94

WSGI application execution task.

95

96

Handles execution of WSGI applications in worker threads,

97

including proper environ setup, application calling,

98

and response handling.

99

"""

100

101

def __init__(self, channel, request):

102

"""

103

Initialize WSGI task.

104

105

Parameters:

106

- channel: HTTPChannel for response transmission

107

- request: Complete HTTP request with parsed environ

108

"""

109

110

def service(self):

111

"""

112

Execute WSGI application.

113

114

Calls WSGI application with proper environ and start_response,

115

handles response iteration, and manages connection state.

116

"""

117

118

def build_response_header(self, status, headers):

119

"""Build HTTP response header from WSGI status and headers."""

120

121

class ErrorTask(Task):

122

"""

123

Error response generation task.

124

125

Generates appropriate HTTP error responses for

126

various error conditions (400, 500, etc.).

127

"""

128

129

def __init__(self, channel, request, status, reason, body):

130

"""

131

Initialize error task.

132

133

Parameters:

134

- channel: HTTPChannel for response transmission

135

- request: Original request context

136

- status (str): HTTP status code (e.g., "500")

137

- reason (str): HTTP reason phrase (e.g., "Internal Server Error")

138

- body (bytes): Error response body

139

"""

140

141

def service(self):

142

"""Generate and send error response."""

143

```

144

145

### Threading Model

146

147

Waitress uses a hybrid threading model for optimal performance.

148

149

```python { .api }

150

# Threading architecture:

151

MAIN_THREAD_ROLE = "I/O and connection management" # asyncore event loop

152

WORKER_THREAD_ROLE = "WSGI application execution" # Task processing

153

154

# Thread safety:

155

WSGI_THREAD_SAFETY = "Thread-safe" # Each request in separate thread

156

SHARED_STATE_SAFETY = "Minimal" # Limited shared state between threads

157

APPLICATION_ISOLATION = "Complete" # WSGI apps isolated per request

158

159

# Performance characteristics:

160

DEFAULT_THREAD_COUNT = 4 # Suitable for most applications

161

RECOMMENDED_RANGE = (1, 50) # Practical thread count limits

162

MEMORY_PER_THREAD = "~8MB" # Approximate memory overhead

163

```

164

165

### Task Processing Examples

166

167

Common patterns for working with the task management system.

168

169

#### Custom Task Implementation

170

171

```python

172

from waitress.task import Task

173

174

class CustomTask(Task):

175

"""Custom task with specialized processing."""

176

177

def __init__(self, channel, request, custom_data):

178

super().__init__(channel, request)

179

self.custom_data = custom_data

180

181

def service(self):

182

"""Custom task processing logic."""

183

try:

184

# Perform custom processing

185

result = self.process_custom_data()

186

187

# Generate response

188

status = '200 OK'

189

headers = [('Content-Type', 'application/json')]

190

body = json.dumps(result).encode('utf-8')

191

192

# Send response via channel

193

self.channel.build_response_header(status, headers)

194

self.channel.write_soon(body)

195

196

except Exception as e:

197

# Error handling

198

self.handle_error(e)

199

200

def process_custom_data(self):

201

# Custom processing logic

202

return {"status": "processed", "data": self.custom_data}

203

```

204

205

#### Thread Pool Configuration

206

207

```python

208

from waitress import create_server

209

from waitress.task import ThreadedTaskDispatcher

210

211

# Custom dispatcher configuration

212

dispatcher = ThreadedTaskDispatcher()

213

dispatcher.set_thread_count(8) # 8 worker threads

214

215

# Create server with custom dispatcher

216

server = create_server(

217

app,

218

_dispatcher=dispatcher,

219

host='0.0.0.0',

220

port=8080

221

)

222

223

# Server will use the configured thread pool

224

server.run()

225

```

226

227

#### Monitoring Task Queue

228

229

```python

230

class MonitoredTaskDispatcher(ThreadedTaskDispatcher):

231

"""Task dispatcher with monitoring capabilities."""

232

233

def __init__(self):

234

super().__init__()

235

self.task_count = 0

236

self.completed_count = 0

237

238

def add_task(self, task):

239

"""Add task with monitoring."""

240

self.task_count += 1

241

print(f"Queued task #{self.task_count}")

242

super().add_task(task)

243

244

def task_completed(self, task):

245

"""Called when task completes."""

246

self.completed_count += 1

247

print(f"Completed task #{self.completed_count}")

248

```

249

250

### WSGI Application Integration

251

252

The task system provides proper WSGI application isolation and execution.

253

254

```python { .api }

255

# WSGI environ preparation:

256

def prepare_environ(self, request):

257

"""

258

Prepare WSGI environ dictionary from HTTP request.

259

260

Returns complete environ with:

261

- All required WSGI keys

262

- HTTP headers as HTTP_* keys

263

- Server and connection information

264

- Request body stream (wsgi.input)

265

"""

266

267

# WSGI application calling:

268

def call_application(self, environ):

269

"""

270

Call WSGI application with proper error handling.

271

272

Parameters:

273

- environ: Complete WSGI environ dict

274

275

Returns:

276

WSGI response iterator

277

278

Handles:

279

- start_response callback management

280

- Exception propagation and logging

281

- Response iterator lifecycle

282

"""

283

284

# Response processing:

285

def process_response(self, response_iter, status, headers):

286

"""

287

Process WSGI response iterator.

288

289

Parameters:

290

- response_iter: WSGI response iterator

291

- status: HTTP status from start_response

292

- headers: HTTP headers from start_response

293

294

Handles:

295

- Streaming response data

296

- Connection keep-alive management

297

- Resource cleanup

298

"""

299

```

300

301

### Error Handling in Tasks

302

303

Comprehensive error handling for task execution failures.

304

305

```python { .api }

306

# Exception handling levels:

307

APPLICATION_ERRORS = "Caught and converted to 500 responses"

308

TASK_ERRORS = "Logged and connection closed"

309

SYSTEM_ERRORS = "Server continues running"

310

311

# Error task creation:

312

def create_error_task(self, channel, request, exc_info):

313

"""

314

Create error task for exception handling.

315

316

Parameters:

317

- channel: HTTPChannel for response

318

- request: Original request context

319

- exc_info: Exception information tuple

320

321

Returns:

322

ErrorTask configured for appropriate error response

323

"""

324

325

# Common error scenarios:

326

WSGI_APP_EXCEPTION = "500 Internal Server Error"

327

MALFORMED_REQUEST = "400 Bad Request"

328

REQUEST_TOO_LARGE = "413 Request Entity Too Large"

329

TIMEOUT_EXCEEDED = "408 Request Timeout"

330

CLIENT_DISCONNECT = "Connection closed, no response"

331

```

332

333

### Performance Tuning

334

335

Guidelines for optimizing task management performance.

336

337

```python

338

# Thread count tuning:

339

CPU_BOUND_APPS = "threads = CPU_cores" # CPU-intensive work

340

IO_BOUND_APPS = "threads = 2-4 * CPU_cores" # Database/API calls

341

MIXED_WORKLOAD = "threads = 1.5 * CPU_cores" # Typical web apps

342

343

# Memory considerations:

344

THREAD_STACK_SIZE = "8MB default" # Per-thread memory

345

TOTAL_MEMORY = "threads * 8MB + application" # Memory planning

346

LARGE_RESPONSES = "Consider streaming" # Memory efficiency

347

348

# Queue management:

349

QUEUE_SIZE = "Unlimited by default" # Task backlog

350

BACKPRESSURE = "Automatic via blocking" # Flow control

351

MONITORING = "Log queue depth if needed" # Operational visibility

352

353

# Example optimal configurations:

354

DEVELOPMENT = {"threads": 1} # Easy debugging

355

PRODUCTION_SMALL = {"threads": 4} # Small server

356

PRODUCTION_LARGE = {"threads": 8-16} # Large server

357

HIGH_CONCURRENCY = {"threads": 20-50} # High traffic

358

```