A production-quality pure-Python WSGI server with robust HTTP protocol support and comprehensive configuration options
—
Thread pool management and task dispatching system for processing WSGI requests efficiently across multiple worker threads.
Central coordinator for managing worker threads and distributing WSGI request processing tasks.
class ThreadedTaskDispatcher:
"""
Manages worker thread pool for WSGI request processing.
Coordinates task distribution across multiple threads while
maintaining proper WSGI application isolation and error handling.
"""
def __init__(self):
"""Initialize task dispatcher with default configuration."""
def set_thread_count(self, count):
"""
Set number of worker threads.
Parameters:
- count (int): Number of worker threads (typically 1-50)
Notes:
- Changes take effect on next server start
- More threads allow higher concurrency but use more memory
- Optimal count depends on application characteristics
"""
def add_task(self, task):
"""
Add task to processing queue.
Parameters:
- task: Task instance (WSGITask, ErrorTask, etc.)
Notes:
- Tasks are processed FIFO by available worker threads
- Blocks if queue is full (backpressure mechanism)
"""
def shutdown(self):
"""
Shutdown thread pool cleanly.
Waits for running tasks to complete before terminating threads.
Called automatically during server shutdown.
"""Foundation classes for implementing different types of processing tasks.
class Task:
"""
Base class for all task types.
Provides common task lifecycle management and error handling
framework for specialized task implementations.
"""
def __init__(self, channel, request):
"""
Initialize task with channel and request context.
Parameters:
- channel: HTTPChannel instance for response transmission
- request: Parsed HTTP request data
"""
def service(self):
"""
Execute task processing.
Called by worker thread to perform actual task work.
Must be implemented by subclasses.
"""
def cancel(self):
"""Cancel task execution if possible."""
def defer(self):
"""Defer task execution to later time."""
class WSGITask(Task):
"""
WSGI application execution task.
Handles execution of WSGI applications in worker threads,
including proper environ setup, application calling,
and response handling.
"""
def __init__(self, channel, request):
"""
Initialize WSGI task.
Parameters:
- channel: HTTPChannel for response transmission
- request: Complete HTTP request with parsed environ
"""
def service(self):
"""
Execute WSGI application.
Calls WSGI application with proper environ and start_response,
handles response iteration, and manages connection state.
"""
def build_response_header(self, status, headers):
"""Build HTTP response header from WSGI status and headers."""
class ErrorTask(Task):
"""
Error response generation task.
Generates appropriate HTTP error responses for
various error conditions (400, 500, etc.).
"""
def __init__(self, channel, request, status, reason, body):
"""
Initialize error task.
Parameters:
- channel: HTTPChannel for response transmission
- request: Original request context
- status (str): HTTP status code (e.g., "500")
- reason (str): HTTP reason phrase (e.g., "Internal Server Error")
- body (bytes): Error response body
"""
def service(self):
"""Generate and send error response."""Waitress uses a hybrid threading model for optimal performance.
# Threading architecture:
MAIN_THREAD_ROLE = "I/O and connection management" # asyncore event loop
WORKER_THREAD_ROLE = "WSGI application execution" # Task processing
# Thread safety:
WSGI_THREAD_SAFETY = "Thread-safe" # Each request in separate thread
SHARED_STATE_SAFETY = "Minimal" # Limited shared state between threads
APPLICATION_ISOLATION = "Complete" # WSGI apps isolated per request
# Performance characteristics:
DEFAULT_THREAD_COUNT = 4 # Suitable for most applications
RECOMMENDED_RANGE = (1, 50) # Practical thread count limits
MEMORY_PER_THREAD = "~8MB" # Approximate memory overheadCommon patterns for working with the task management system.
from waitress.task import Task
class CustomTask(Task):
"""Custom task with specialized processing."""
def __init__(self, channel, request, custom_data):
super().__init__(channel, request)
self.custom_data = custom_data
def service(self):
"""Custom task processing logic."""
try:
# Perform custom processing
result = self.process_custom_data()
# Generate response
status = '200 OK'
headers = [('Content-Type', 'application/json')]
body = json.dumps(result).encode('utf-8')
# Send response via channel
self.channel.build_response_header(status, headers)
self.channel.write_soon(body)
except Exception as e:
# Error handling
self.handle_error(e)
def process_custom_data(self):
# Custom processing logic
return {"status": "processed", "data": self.custom_data}from waitress import create_server
from waitress.task import ThreadedTaskDispatcher
# Custom dispatcher configuration
dispatcher = ThreadedTaskDispatcher()
dispatcher.set_thread_count(8) # 8 worker threads
# Create server with custom dispatcher
server = create_server(
app,
_dispatcher=dispatcher,
host='0.0.0.0',
port=8080
)
# Server will use the configured thread pool
server.run()class MonitoredTaskDispatcher(ThreadedTaskDispatcher):
"""Task dispatcher with monitoring capabilities."""
def __init__(self):
super().__init__()
self.task_count = 0
self.completed_count = 0
def add_task(self, task):
"""Add task with monitoring."""
self.task_count += 1
print(f"Queued task #{self.task_count}")
super().add_task(task)
def task_completed(self, task):
"""Called when task completes."""
self.completed_count += 1
print(f"Completed task #{self.completed_count}")The task system provides proper WSGI application isolation and execution.
# WSGI environ preparation:
def prepare_environ(self, request):
"""
Prepare WSGI environ dictionary from HTTP request.
Returns complete environ with:
- All required WSGI keys
- HTTP headers as HTTP_* keys
- Server and connection information
- Request body stream (wsgi.input)
"""
# WSGI application calling:
def call_application(self, environ):
"""
Call WSGI application with proper error handling.
Parameters:
- environ: Complete WSGI environ dict
Returns:
WSGI response iterator
Handles:
- start_response callback management
- Exception propagation and logging
- Response iterator lifecycle
"""
# Response processing:
def process_response(self, response_iter, status, headers):
"""
Process WSGI response iterator.
Parameters:
- response_iter: WSGI response iterator
- status: HTTP status from start_response
- headers: HTTP headers from start_response
Handles:
- Streaming response data
- Connection keep-alive management
- Resource cleanup
"""Comprehensive error handling for task execution failures.
# Exception handling levels:
APPLICATION_ERRORS = "Caught and converted to 500 responses"
TASK_ERRORS = "Logged and connection closed"
SYSTEM_ERRORS = "Server continues running"
# Error task creation:
def create_error_task(self, channel, request, exc_info):
"""
Create error task for exception handling.
Parameters:
- channel: HTTPChannel for response
- request: Original request context
- exc_info: Exception information tuple
Returns:
ErrorTask configured for appropriate error response
"""
# Common error scenarios:
WSGI_APP_EXCEPTION = "500 Internal Server Error"
MALFORMED_REQUEST = "400 Bad Request"
REQUEST_TOO_LARGE = "413 Request Entity Too Large"
TIMEOUT_EXCEEDED = "408 Request Timeout"
CLIENT_DISCONNECT = "Connection closed, no response"Guidelines for optimizing task management performance.
# Thread count tuning:
CPU_BOUND_APPS = "threads = CPU_cores" # CPU-intensive work
IO_BOUND_APPS = "threads = 2-4 * CPU_cores" # Database/API calls
MIXED_WORKLOAD = "threads = 1.5 * CPU_cores" # Typical web apps
# Memory considerations:
THREAD_STACK_SIZE = "8MB default" # Per-thread memory
TOTAL_MEMORY = "threads * 8MB + application" # Memory planning
LARGE_RESPONSES = "Consider streaming" # Memory efficiency
# Queue management:
QUEUE_SIZE = "Unlimited by default" # Task backlog
BACKPRESSURE = "Automatic via blocking" # Flow control
MONITORING = "Log queue depth if needed" # Operational visibility
# Example optimal configurations:
DEVELOPMENT = {"threads": 1} # Easy debugging
PRODUCTION_SMALL = {"threads": 4} # Small server
PRODUCTION_LARGE = {"threads": 8-16} # Large server
HIGH_CONCURRENCY = {"threads": 20-50} # High trafficInstall with Tessl CLI
npx tessl i tessl/pypi-waitress