CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-waitress

A production-quality pure-Python WSGI server with robust HTTP protocol support and comprehensive configuration options

Pending
Overview
Eval results
Files

task-management.mddocs/

Task Management

Thread pool management and task dispatching system for processing WSGI requests efficiently across multiple worker threads.

Capabilities

Task Dispatcher

Central coordinator for managing worker threads and distributing WSGI request processing tasks.

class ThreadedTaskDispatcher:
    """
    Manages worker thread pool for WSGI request processing.
    
    Coordinates task distribution across multiple threads while
    maintaining proper WSGI application isolation and error handling.
    """
    
    def __init__(self):
        """Initialize task dispatcher with default configuration."""
    
    def set_thread_count(self, count):
        """
        Set number of worker threads.
        
        Parameters:
        - count (int): Number of worker threads (typically 1-50)
        
        Notes:
        - Changes take effect on next server start
        - More threads allow higher concurrency but use more memory
        - Optimal count depends on application characteristics
        """
    
    def add_task(self, task):
        """
        Add task to processing queue.
        
        Parameters:
        - task: Task instance (WSGITask, ErrorTask, etc.)
        
        Notes:
        - Tasks are processed FIFO by available worker threads
        - Blocks if queue is full (backpressure mechanism)
        """
    
    def shutdown(self):
        """
        Shutdown thread pool cleanly.
        
        Waits for running tasks to complete before terminating threads.
        Called automatically during server shutdown.
        """

Task Base Classes

Foundation classes for implementing different types of processing tasks.

class Task:
    """
    Base class for all task types.
    
    Provides common task lifecycle management and error handling
    framework for specialized task implementations.
    """
    
    def __init__(self, channel, request):
        """
        Initialize task with channel and request context.
        
        Parameters:
        - channel: HTTPChannel instance for response transmission
        - request: Parsed HTTP request data
        """
    
    def service(self):
        """
        Execute task processing.
        
        Called by worker thread to perform actual task work.
        Must be implemented by subclasses.
        """
    
    def cancel(self):
        """Cancel task execution if possible."""
    
    def defer(self):
        """Defer task execution to later time."""

class WSGITask(Task):
    """
    WSGI application execution task.
    
    Handles execution of WSGI applications in worker threads,
    including proper environ setup, application calling,
    and response handling.
    """
    
    def __init__(self, channel, request):
        """
        Initialize WSGI task.
        
        Parameters:
        - channel: HTTPChannel for response transmission
        - request: Complete HTTP request with parsed environ
        """
    
    def service(self):
        """
        Execute WSGI application.
        
        Calls WSGI application with proper environ and start_response,
        handles response iteration, and manages connection state.
        """
    
    def build_response_header(self, status, headers):
        """Build HTTP response header from WSGI status and headers."""

class ErrorTask(Task):
    """
    Error response generation task.
    
    Generates appropriate HTTP error responses for
    various error conditions (400, 500, etc.).
    """
    
    def __init__(self, channel, request, status, reason, body):
        """
        Initialize error task.
        
        Parameters:
        - channel: HTTPChannel for response transmission
        - request: Original request context
        - status (str): HTTP status code (e.g., "500")
        - reason (str): HTTP reason phrase (e.g., "Internal Server Error")
        - body (bytes): Error response body
        """
    
    def service(self):
        """Generate and send error response."""

Threading Model

Waitress uses a hybrid threading model for optimal performance.

# Threading architecture:
MAIN_THREAD_ROLE = "I/O and connection management"  # asyncore event loop
WORKER_THREAD_ROLE = "WSGI application execution"   # Task processing

# Thread safety:
WSGI_THREAD_SAFETY = "Thread-safe"     # Each request in separate thread
SHARED_STATE_SAFETY = "Minimal"        # Limited shared state between threads
APPLICATION_ISOLATION = "Complete"      # WSGI apps isolated per request

# Performance characteristics:
DEFAULT_THREAD_COUNT = 4               # Suitable for most applications
RECOMMENDED_RANGE = (1, 50)           # Practical thread count limits
MEMORY_PER_THREAD = "~8MB"            # Approximate memory overhead

Task Processing Examples

Common patterns for working with the task management system.

Custom Task Implementation

from waitress.task import Task

class CustomTask(Task):
    """Custom task with specialized processing."""
    
    def __init__(self, channel, request, custom_data):
        super().__init__(channel, request)
        self.custom_data = custom_data
    
    def service(self):
        """Custom task processing logic."""
        try:
            # Perform custom processing
            result = self.process_custom_data()
            
            # Generate response
            status = '200 OK'
            headers = [('Content-Type', 'application/json')]
            body = json.dumps(result).encode('utf-8')
            
            # Send response via channel
            self.channel.build_response_header(status, headers)
            self.channel.write_soon(body)
            
        except Exception as e:
            # Error handling
            self.handle_error(e)
    
    def process_custom_data(self):
        # Custom processing logic
        return {"status": "processed", "data": self.custom_data}

Thread Pool Configuration

from waitress import create_server
from waitress.task import ThreadedTaskDispatcher

# Custom dispatcher configuration
dispatcher = ThreadedTaskDispatcher()
dispatcher.set_thread_count(8)  # 8 worker threads

# Create server with custom dispatcher
server = create_server(
    app,
    _dispatcher=dispatcher,
    host='0.0.0.0',
    port=8080
)

# Server will use the configured thread pool
server.run()

Monitoring Task Queue

class MonitoredTaskDispatcher(ThreadedTaskDispatcher):
    """Task dispatcher with monitoring capabilities."""
    
    def __init__(self):
        super().__init__()
        self.task_count = 0
        self.completed_count = 0
    
    def add_task(self, task):
        """Add task with monitoring."""
        self.task_count += 1
        print(f"Queued task #{self.task_count}")
        super().add_task(task)
    
    def task_completed(self, task):
        """Called when task completes."""
        self.completed_count += 1
        print(f"Completed task #{self.completed_count}")

WSGI Application Integration

The task system provides proper WSGI application isolation and execution.

# WSGI environ preparation:
def prepare_environ(self, request):
    """
    Prepare WSGI environ dictionary from HTTP request.
    
    Returns complete environ with:
    - All required WSGI keys
    - HTTP headers as HTTP_* keys  
    - Server and connection information
    - Request body stream (wsgi.input)
    """

# WSGI application calling:
def call_application(self, environ):
    """
    Call WSGI application with proper error handling.
    
    Parameters:
    - environ: Complete WSGI environ dict
    
    Returns:
    WSGI response iterator
    
    Handles:
    - start_response callback management
    - Exception propagation and logging
    - Response iterator lifecycle
    """

# Response processing:
def process_response(self, response_iter, status, headers):
    """
    Process WSGI response iterator.
    
    Parameters:
    - response_iter: WSGI response iterator
    - status: HTTP status from start_response
    - headers: HTTP headers from start_response
    
    Handles:
    - Streaming response data
    - Connection keep-alive management
    - Resource cleanup
    """

Error Handling in Tasks

Comprehensive error handling for task execution failures.

# Exception handling levels:
APPLICATION_ERRORS = "Caught and converted to 500 responses"
TASK_ERRORS = "Logged and connection closed"
SYSTEM_ERRORS = "Server continues running"

# Error task creation:
def create_error_task(self, channel, request, exc_info):
    """
    Create error task for exception handling.
    
    Parameters:
    - channel: HTTPChannel for response
    - request: Original request context  
    - exc_info: Exception information tuple
    
    Returns:
    ErrorTask configured for appropriate error response
    """

# Common error scenarios:
WSGI_APP_EXCEPTION = "500 Internal Server Error"
MALFORMED_REQUEST = "400 Bad Request"  
REQUEST_TOO_LARGE = "413 Request Entity Too Large"
TIMEOUT_EXCEEDED = "408 Request Timeout"
CLIENT_DISCONNECT = "Connection closed, no response"

Performance Tuning

Guidelines for optimizing task management performance.

# Thread count tuning:
CPU_BOUND_APPS = "threads = CPU_cores"           # CPU-intensive work
IO_BOUND_APPS = "threads = 2-4 * CPU_cores"     # Database/API calls  
MIXED_WORKLOAD = "threads = 1.5 * CPU_cores"    # Typical web apps

# Memory considerations:
THREAD_STACK_SIZE = "8MB default"               # Per-thread memory
TOTAL_MEMORY = "threads * 8MB + application"    # Memory planning
LARGE_RESPONSES = "Consider streaming"           # Memory efficiency

# Queue management:
QUEUE_SIZE = "Unlimited by default"             # Task backlog
BACKPRESSURE = "Automatic via blocking"         # Flow control
MONITORING = "Log queue depth if needed"        # Operational visibility

# Example optimal configurations:
DEVELOPMENT = {"threads": 1}                     # Easy debugging
PRODUCTION_SMALL = {"threads": 4}               # Small server
PRODUCTION_LARGE = {"threads": 8-16}            # Large server
HIGH_CONCURRENCY = {"threads": 20-50}           # High traffic

Install with Tessl CLI

npx tessl i tessl/pypi-waitress

docs

buffer-management.md

command-line.md

configuration.md

error-handling.md

http-processing.md

index.md

proxy-headers.md

server-management.md

task-management.md

tile.json