CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-hypercorn

A high-performance ASGI and WSGI web server implementation that provides comprehensive support for modern web protocols including HTTP/1, HTTP/2, WebSockets, and experimental HTTP/3

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

events.mddocs/

Events

Event-driven architecture for connection lifecycle management, providing structured event handling for connection state changes and data flow. Hypercorn's event system enables efficient processing of network events and connection management.

Capabilities

Base Event Class

Abstract base class that defines the event interface for all event types in Hypercorn's event system.

class Event(ABC):
    """
    Abstract base class for all events.
    
    Defines the common interface for events in Hypercorn's
    event-driven architecture. All concrete event types
    inherit from this base class.
    """
    pass

Raw Data Event

Event representing raw network data received from or to be sent to a client connection.

@dataclass
class RawData(Event):
    """
    Event containing raw network data.
    
    Represents data received from or to be sent to a network
    connection, along with optional address information for
    datagram protocols.
    """
    data: bytes  # Raw network data
    address: tuple | None = None  # Optional address for datagram protocols
    
    # Usage patterns:
    # - TCP connections: address is typically None
    # - UDP/QUIC: address contains (host, port) tuple
    # - Unix sockets: address format depends on socket type

Connection Closed Event

Event indicating that a network connection has been closed or terminated.

@dataclass
class Closed(Event):
    """
    Event indicating connection closure.
    
    Signals that a network connection has been closed,
    either gracefully by the client/server or due to
    network errors or timeouts.
    """
    pass
    
    # Triggered when:
    # - Client closes connection gracefully
    # - Server closes connection
    # - Network error causes connection termination
    # - Connection timeout expires

Connection Updated Event

Event indicating changes in connection state, particularly idle status for connection management and keep-alive handling.

@dataclass
class Updated(Event):
    """
    Event indicating connection state update.
    
    Signals changes in connection state, particularly
    transitions between active and idle states for
    connection management and resource optimization.
    """
    idle: bool  # Whether connection is currently idle
    
    # Connection states:
    # - idle=False: Connection is actively processing requests
    # - idle=True: Connection is idle and available for new requests
    # 
    # Used for:
    # - Keep-alive connection management
    # - Connection pooling decisions
    # - Resource cleanup timing
    # - Load balancing algorithms

Usage Examples

Event Handling in Protocol Implementation

from hypercorn.events import Event, RawData, Closed, Updated

class ProtocolHandler:
    def __init__(self):
        self.connection_active = True
        self.idle_timeout = 30.0
        
    async def handle_event(self, event: Event):
        """Handle different event types."""
        if isinstance(event, RawData):
            await self.handle_raw_data(event)
        elif isinstance(event, Closed):
            await self.handle_connection_closed(event)
        elif isinstance(event, Updated):
            await self.handle_connection_updated(event)
        else:
            print(f"Unknown event type: {type(event)}")
    
    async def handle_raw_data(self, event: RawData):
        """Process incoming raw data."""
        data = event.data
        address = event.address
        
        print(f"Received {len(data)} bytes from {address}")
        
        # Process the data (HTTP parsing, WebSocket frames, etc.)
        await self.process_protocol_data(data)
        
    async def handle_connection_closed(self, event: Closed):
        """Handle connection closure."""
        print("Connection closed")
        self.connection_active = False
        
        # Cleanup resources
        await self.cleanup_connection()
        
    async def handle_connection_updated(self, event: Updated):
        """Handle connection state updates."""
        if event.idle:
            print("Connection became idle")
            # Start idle timeout timer
            await self.start_idle_timer()
        else:
            print("Connection became active")
            # Cancel idle timeout timer
            await self.cancel_idle_timer()

Custom Event Loop Integration

import asyncio
from hypercorn.events import RawData, Closed, Updated

class EventProcessor:
    def __init__(self):
        self.event_queue = asyncio.Queue()
        self.running = True
        
    async def process_events(self):
        """Main event processing loop."""
        while self.running:
            try:
                event = await self.event_queue.get()
                await self.dispatch_event(event)
                self.event_queue.task_done()
            except Exception as e:
                print(f"Error processing event: {e}")
    
    async def dispatch_event(self, event):
        """Dispatch event to appropriate handler."""
        if isinstance(event, RawData):
            await self.on_data_received(event.data, event.address)
        elif isinstance(event, Closed):
            await self.on_connection_closed()
        elif isinstance(event, Updated):
            await self.on_connection_updated(event.idle)
    
    async def on_data_received(self, data: bytes, address: tuple | None):
        """Handle received data."""
        print(f"Data: {len(data)} bytes from {address}")
        # Process data...
    
    async def on_connection_closed(self):
        """Handle connection closure."""
        print("Connection closed")
        # Cleanup...
    
    async def on_connection_updated(self, idle: bool):
        """Handle connection state update."""
        print(f"Connection {'idle' if idle else 'active'}")
        # Update state...

# Usage
processor = EventProcessor()

# Add events to queue
await processor.event_queue.put(RawData(b"GET / HTTP/1.1\r\n\r\n"))
await processor.event_queue.put(Updated(idle=False))
await processor.event_queue.put(Closed())

# Process events
await processor.process_events()

Connection State Management

from hypercorn.events import Updated
from enum import Enum
import time

class ConnectionState(Enum):
    CONNECTING = "connecting"
    ACTIVE = "active"
    IDLE = "idle"
    CLOSING = "closing"
    CLOSED = "closed"

class ConnectionManager:
    def __init__(self):
        self.connections = {}
        self.idle_timeout = 60.0  # seconds
        
    async def handle_connection_update(self, conn_id: str, event: Updated):
        """Handle connection state updates."""
        connection = self.connections.get(conn_id)
        if not connection:
            return
            
        if event.idle:
            # Connection became idle
            connection['state'] = ConnectionState.IDLE
            connection['idle_since'] = time.time()
            print(f"Connection {conn_id} is now idle")
            
            # Schedule idle timeout check
            await self.schedule_idle_check(conn_id)
        else:
            # Connection became active
            if connection['state'] == ConnectionState.IDLE:
                idle_duration = time.time() - connection['idle_since']
                print(f"Connection {conn_id} active after {idle_duration:.2f}s idle")
            
            connection['state'] = ConnectionState.ACTIVE
            connection.pop('idle_since', None)
    
    async def schedule_idle_check(self, conn_id: str):
        """Schedule check for idle timeout."""
        await asyncio.sleep(self.idle_timeout)
        
        connection = self.connections.get(conn_id)
        if (connection and 
            connection['state'] == ConnectionState.IDLE and
            time.time() - connection['idle_since'] >= self.idle_timeout):
            
            print(f"Connection {conn_id} idle timeout - closing")
            await self.close_connection(conn_id)
    
    async def close_connection(self, conn_id: str):
        """Close idle connection."""
        connection = self.connections.get(conn_id)
        if connection:
            connection['state'] = ConnectionState.CLOSING
            # Trigger connection close...

Event-Driven Protocol State Machine

from hypercorn.events import RawData, Closed, Updated
from enum import Enum

class ProtocolState(Enum):
    WAITING_REQUEST = "waiting_request"
    RECEIVING_HEADERS = "receiving_headers"
    RECEIVING_BODY = "receiving_body"
    PROCESSING = "processing"
    SENDING_RESPONSE = "sending_response"
    KEEP_ALIVE = "keep_alive"

class HTTPProtocolStateMachine:
    def __init__(self):
        self.state = ProtocolState.WAITING_REQUEST
        self.buffer = b""
        
    async def handle_event(self, event):
        """Handle events based on current protocol state."""
        if isinstance(event, RawData):
            await self.handle_data(event.data)
        elif isinstance(event, Closed):
            await self.handle_close()
        elif isinstance(event, Updated):
            await self.handle_update(event.idle)
    
    async def handle_data(self, data: bytes):
        """Handle incoming data based on current state."""
        self.buffer += data
        
        if self.state == ProtocolState.WAITING_REQUEST:
            if b"\r\n" in self.buffer:
                await self.parse_request_line()
                self.state = ProtocolState.RECEIVING_HEADERS
                
        elif self.state == ProtocolState.RECEIVING_HEADERS:
            if b"\r\n\r\n" in self.buffer:
                await self.parse_headers()
                # Determine if body is expected
                if await self.expects_body():
                    self.state = ProtocolState.RECEIVING_BODY
                else:
                    self.state = ProtocolState.PROCESSING
                    await self.process_request()
                    
        elif self.state == ProtocolState.RECEIVING_BODY:
            if await self.body_complete():
                self.state = ProtocolState.PROCESSING
                await self.process_request()
    
    async def handle_close(self):
        """Handle connection closure."""
        print(f"Connection closed in state: {self.state}")
        # Cleanup based on current state
        
    async def handle_update(self, idle: bool):
        """Handle connection state updates."""
        if idle and self.state == ProtocolState.KEEP_ALIVE:
            print("Keep-alive connection is idle")
        elif not idle and self.state == ProtocolState.KEEP_ALIVE:
            print("Keep-alive connection received new request")
            self.state = ProtocolState.WAITING_REQUEST

Event Metrics and Monitoring

from hypercorn.events import Event, RawData, Closed, Updated
import time
from collections import defaultdict

class EventMetrics:
    def __init__(self):
        self.event_counts = defaultdict(int)
        self.data_bytes = 0
        self.connection_count = 0
        self.start_time = time.time()
        
    async def record_event(self, event: Event):
        """Record event metrics."""
        event_type = type(event).__name__
        self.event_counts[event_type] += 1
        
        if isinstance(event, RawData):
            self.data_bytes += len(event.data)
            print(f"Data event: {len(event.data)} bytes")
            
        elif isinstance(event, Closed):
            self.connection_count -= 1
            print(f"Connection closed (active: {self.connection_count})")
            
        elif isinstance(event, Updated):
            state = "idle" if event.idle else "active"
            print(f"Connection updated: {state}")
    
    def get_stats(self):
        """Get current statistics."""
        uptime = time.time() - self.start_time
        return {
            'uptime': uptime,
            'event_counts': dict(self.event_counts),
            'total_data_bytes': self.data_bytes,
            'active_connections': self.connection_count,
            'events_per_second': sum(self.event_counts.values()) / uptime
        }

# Usage
metrics = EventMetrics()

# Record events
await metrics.record_event(RawData(b"HTTP data", ("127.0.0.1", 54321)))
await metrics.record_event(Updated(idle=False))
await metrics.record_event(Closed())

# Get statistics
stats = metrics.get_stats()
print(f"Statistics: {stats}")

The event system provides the foundation for Hypercorn's efficient connection management and protocol handling, enabling clean separation of concerns and reactive programming patterns.

Install with Tessl CLI

npx tessl i tessl/pypi-hypercorn

docs

application-wrappers.md

async-integration.md

configuration.md

events.md

index.md

logging.md

middleware.md

server-execution.md

types.md

utilities.md

tile.json