0
# Task Management
1
2
Thread pool management and task dispatching system for processing WSGI requests efficiently across multiple worker threads.
3
4
## Capabilities
5
6
### Task Dispatcher
7
8
Central coordinator for managing worker threads and distributing WSGI request processing tasks.
9
10
```python { .api }
11
class ThreadedTaskDispatcher:
12
"""
13
Manages worker thread pool for WSGI request processing.
14
15
Coordinates task distribution across multiple threads while
16
maintaining proper WSGI application isolation and error handling.
17
"""
18
19
def __init__(self):
20
"""Initialize task dispatcher with default configuration."""
21
22
def set_thread_count(self, count):
23
"""
24
Set number of worker threads.
25
26
Parameters:
27
- count (int): Number of worker threads (typically 1-50)
28
29
Notes:
30
- Changes take effect on next server start
31
- More threads allow higher concurrency but use more memory
32
- Optimal count depends on application characteristics
33
"""
34
35
def add_task(self, task):
36
"""
37
Add task to processing queue.
38
39
Parameters:
40
- task: Task instance (WSGITask, ErrorTask, etc.)
41
42
Notes:
43
- Tasks are processed FIFO by available worker threads
44
- Blocks if queue is full (backpressure mechanism)
45
"""
46
47
def shutdown(self):
48
"""
49
Shutdown thread pool cleanly.
50
51
Waits for running tasks to complete before terminating threads.
52
Called automatically during server shutdown.
53
"""
54
```
55
56
### Task Base Classes
57
58
Foundation classes for implementing different types of processing tasks.
59
60
```python { .api }
61
class Task:
62
"""
63
Base class for all task types.
64
65
Provides common task lifecycle management and error handling
66
framework for specialized task implementations.
67
"""
68
69
def __init__(self, channel, request):
70
"""
71
Initialize task with channel and request context.
72
73
Parameters:
74
- channel: HTTPChannel instance for response transmission
75
- request: Parsed HTTP request data
76
"""
77
78
def service(self):
79
"""
80
Execute task processing.
81
82
Called by worker thread to perform actual task work.
83
Must be implemented by subclasses.
84
"""
85
86
def cancel(self):
87
"""Cancel task execution if possible."""
88
89
def defer(self):
90
"""Defer task execution to later time."""
91
92
class WSGITask(Task):
93
"""
94
WSGI application execution task.
95
96
Handles execution of WSGI applications in worker threads,
97
including proper environ setup, application calling,
98
and response handling.
99
"""
100
101
def __init__(self, channel, request):
102
"""
103
Initialize WSGI task.
104
105
Parameters:
106
- channel: HTTPChannel for response transmission
107
- request: Complete HTTP request with parsed environ
108
"""
109
110
def service(self):
111
"""
112
Execute WSGI application.
113
114
Calls WSGI application with proper environ and start_response,
115
handles response iteration, and manages connection state.
116
"""
117
118
def build_response_header(self, status, headers):
119
"""Build HTTP response header from WSGI status and headers."""
120
121
class ErrorTask(Task):
122
"""
123
Error response generation task.
124
125
Generates appropriate HTTP error responses for
126
various error conditions (400, 500, etc.).
127
"""
128
129
def __init__(self, channel, request, status, reason, body):
130
"""
131
Initialize error task.
132
133
Parameters:
134
- channel: HTTPChannel for response transmission
135
- request: Original request context
136
- status (str): HTTP status code (e.g., "500")
137
- reason (str): HTTP reason phrase (e.g., "Internal Server Error")
138
- body (bytes): Error response body
139
"""
140
141
def service(self):
142
"""Generate and send error response."""
143
```
144
145
### Threading Model
146
147
Waitress uses a hybrid threading model for optimal performance.
148
149
```python { .api }
150
# Threading architecture:
151
MAIN_THREAD_ROLE = "I/O and connection management" # asyncore event loop
152
WORKER_THREAD_ROLE = "WSGI application execution" # Task processing
153
154
# Thread safety:
155
WSGI_THREAD_SAFETY = "Thread-safe" # Each request in separate thread
156
SHARED_STATE_SAFETY = "Minimal" # Limited shared state between threads
157
APPLICATION_ISOLATION = "Complete" # WSGI apps isolated per request
158
159
# Performance characteristics:
160
DEFAULT_THREAD_COUNT = 4 # Suitable for most applications
161
RECOMMENDED_RANGE = (1, 50) # Practical thread count limits
162
MEMORY_PER_THREAD = "~8MB" # Approximate memory overhead
163
```
164
165
### Task Processing Examples
166
167
Common patterns for working with the task management system.
168
169
#### Custom Task Implementation
170
171
```python
172
from waitress.task import Task
173
174
class CustomTask(Task):
175
"""Custom task with specialized processing."""
176
177
def __init__(self, channel, request, custom_data):
178
super().__init__(channel, request)
179
self.custom_data = custom_data
180
181
def service(self):
182
"""Custom task processing logic."""
183
try:
184
# Perform custom processing
185
result = self.process_custom_data()
186
187
# Generate response
188
status = '200 OK'
189
headers = [('Content-Type', 'application/json')]
190
body = json.dumps(result).encode('utf-8')
191
192
# Send response via channel
193
self.channel.build_response_header(status, headers)
194
self.channel.write_soon(body)
195
196
except Exception as e:
197
# Error handling
198
self.handle_error(e)
199
200
def process_custom_data(self):
201
# Custom processing logic
202
return {"status": "processed", "data": self.custom_data}
203
```
204
205
#### Thread Pool Configuration
206
207
```python
208
from waitress import create_server
209
from waitress.task import ThreadedTaskDispatcher
210
211
# Custom dispatcher configuration
212
dispatcher = ThreadedTaskDispatcher()
213
dispatcher.set_thread_count(8) # 8 worker threads
214
215
# Create server with custom dispatcher
216
server = create_server(
217
app,
218
_dispatcher=dispatcher,
219
host='0.0.0.0',
220
port=8080
221
)
222
223
# Server will use the configured thread pool
224
server.run()
225
```
226
227
#### Monitoring Task Queue
228
229
```python
230
class MonitoredTaskDispatcher(ThreadedTaskDispatcher):
231
"""Task dispatcher with monitoring capabilities."""
232
233
def __init__(self):
234
super().__init__()
235
self.task_count = 0
236
self.completed_count = 0
237
238
def add_task(self, task):
239
"""Add task with monitoring."""
240
self.task_count += 1
241
print(f"Queued task #{self.task_count}")
242
super().add_task(task)
243
244
def task_completed(self, task):
245
"""Called when task completes."""
246
self.completed_count += 1
247
print(f"Completed task #{self.completed_count}")
248
```
249
250
### WSGI Application Integration
251
252
The task system provides proper WSGI application isolation and execution.
253
254
```python { .api }
255
# WSGI environ preparation:
256
def prepare_environ(self, request):
257
"""
258
Prepare WSGI environ dictionary from HTTP request.
259
260
Returns complete environ with:
261
- All required WSGI keys
262
- HTTP headers as HTTP_* keys
263
- Server and connection information
264
- Request body stream (wsgi.input)
265
"""
266
267
# WSGI application calling:
268
def call_application(self, environ):
269
"""
270
Call WSGI application with proper error handling.
271
272
Parameters:
273
- environ: Complete WSGI environ dict
274
275
Returns:
276
WSGI response iterator
277
278
Handles:
279
- start_response callback management
280
- Exception propagation and logging
281
- Response iterator lifecycle
282
"""
283
284
# Response processing:
285
def process_response(self, response_iter, status, headers):
286
"""
287
Process WSGI response iterator.
288
289
Parameters:
290
- response_iter: WSGI response iterator
291
- status: HTTP status from start_response
292
- headers: HTTP headers from start_response
293
294
Handles:
295
- Streaming response data
296
- Connection keep-alive management
297
- Resource cleanup
298
"""
299
```
300
301
### Error Handling in Tasks
302
303
Comprehensive error handling for task execution failures.
304
305
```python { .api }
306
# Exception handling levels:
307
APPLICATION_ERRORS = "Caught and converted to 500 responses"
308
TASK_ERRORS = "Logged and connection closed"
309
SYSTEM_ERRORS = "Server continues running"
310
311
# Error task creation:
312
def create_error_task(self, channel, request, exc_info):
313
"""
314
Create error task for exception handling.
315
316
Parameters:
317
- channel: HTTPChannel for response
318
- request: Original request context
319
- exc_info: Exception information tuple
320
321
Returns:
322
ErrorTask configured for appropriate error response
323
"""
324
325
# Common error scenarios:
326
WSGI_APP_EXCEPTION = "500 Internal Server Error"
327
MALFORMED_REQUEST = "400 Bad Request"
328
REQUEST_TOO_LARGE = "413 Request Entity Too Large"
329
TIMEOUT_EXCEEDED = "408 Request Timeout"
330
CLIENT_DISCONNECT = "Connection closed, no response"
331
```
332
333
### Performance Tuning
334
335
Guidelines for optimizing task management performance.
336
337
```python
338
# Thread count tuning:
339
CPU_BOUND_APPS = "threads = CPU_cores" # CPU-intensive work
340
IO_BOUND_APPS = "threads = 2-4 * CPU_cores" # Database/API calls
341
MIXED_WORKLOAD = "threads = 1.5 * CPU_cores" # Typical web apps
342
343
# Memory considerations:
344
THREAD_STACK_SIZE = "8MB default" # Per-thread memory
345
TOTAL_MEMORY = "threads * 8MB + application" # Memory planning
346
LARGE_RESPONSES = "Consider streaming" # Memory efficiency
347
348
# Queue management:
349
QUEUE_SIZE = "Unlimited by default" # Task backlog
350
BACKPRESSURE = "Automatic via blocking" # Flow control
351
MONITORING = "Log queue depth if needed" # Operational visibility
352
353
# Example optimal configurations:
354
DEVELOPMENT = {"threads": 1} # Easy debugging
355
PRODUCTION_SMALL = {"threads": 4} # Small server
356
PRODUCTION_LARGE = {"threads": 8-16} # Large server
357
HIGH_CONCURRENCY = {"threads": 20-50} # High traffic
358
```