A high-performance ASGI and WSGI web server implementation that provides comprehensive support for modern web protocols including HTTP/1, HTTP/2, WebSockets, and experimental HTTP/3
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Event-driven architecture for connection lifecycle management, providing structured event handling for connection state changes and data flow. Hypercorn's event system enables efficient processing of network events and connection management.
Abstract base class that defines the event interface for all event types in Hypercorn's event system.
class Event(ABC):
"""
Abstract base class for all events.
Defines the common interface for events in Hypercorn's
event-driven architecture. All concrete event types
inherit from this base class.
"""
passEvent representing raw network data received from or to be sent to a client connection.
@dataclass
class RawData(Event):
"""
Event containing raw network data.
Represents data received from or to be sent to a network
connection, along with optional address information for
datagram protocols.
"""
data: bytes # Raw network data
address: tuple | None = None # Optional address for datagram protocols
# Usage patterns:
# - TCP connections: address is typically None
# - UDP/QUIC: address contains (host, port) tuple
# - Unix sockets: address format depends on socket typeEvent indicating that a network connection has been closed or terminated.
@dataclass
class Closed(Event):
"""
Event indicating connection closure.
Signals that a network connection has been closed,
either gracefully by the client/server or due to
network errors or timeouts.
"""
pass
# Triggered when:
# - Client closes connection gracefully
# - Server closes connection
# - Network error causes connection termination
# - Connection timeout expiresEvent indicating changes in connection state, particularly idle status for connection management and keep-alive handling.
@dataclass
class Updated(Event):
"""
Event indicating connection state update.
Signals changes in connection state, particularly
transitions between active and idle states for
connection management and resource optimization.
"""
idle: bool # Whether connection is currently idle
# Connection states:
# - idle=False: Connection is actively processing requests
# - idle=True: Connection is idle and available for new requests
#
# Used for:
# - Keep-alive connection management
# - Connection pooling decisions
# - Resource cleanup timing
# - Load balancing algorithmsfrom hypercorn.events import Event, RawData, Closed, Updated
class ProtocolHandler:
def __init__(self):
self.connection_active = True
self.idle_timeout = 30.0
async def handle_event(self, event: Event):
"""Handle different event types."""
if isinstance(event, RawData):
await self.handle_raw_data(event)
elif isinstance(event, Closed):
await self.handle_connection_closed(event)
elif isinstance(event, Updated):
await self.handle_connection_updated(event)
else:
print(f"Unknown event type: {type(event)}")
async def handle_raw_data(self, event: RawData):
"""Process incoming raw data."""
data = event.data
address = event.address
print(f"Received {len(data)} bytes from {address}")
# Process the data (HTTP parsing, WebSocket frames, etc.)
await self.process_protocol_data(data)
async def handle_connection_closed(self, event: Closed):
"""Handle connection closure."""
print("Connection closed")
self.connection_active = False
# Cleanup resources
await self.cleanup_connection()
async def handle_connection_updated(self, event: Updated):
"""Handle connection state updates."""
if event.idle:
print("Connection became idle")
# Start idle timeout timer
await self.start_idle_timer()
else:
print("Connection became active")
# Cancel idle timeout timer
await self.cancel_idle_timer()import asyncio
from hypercorn.events import RawData, Closed, Updated
class EventProcessor:
def __init__(self):
self.event_queue = asyncio.Queue()
self.running = True
async def process_events(self):
"""Main event processing loop."""
while self.running:
try:
event = await self.event_queue.get()
await self.dispatch_event(event)
self.event_queue.task_done()
except Exception as e:
print(f"Error processing event: {e}")
async def dispatch_event(self, event):
"""Dispatch event to appropriate handler."""
if isinstance(event, RawData):
await self.on_data_received(event.data, event.address)
elif isinstance(event, Closed):
await self.on_connection_closed()
elif isinstance(event, Updated):
await self.on_connection_updated(event.idle)
async def on_data_received(self, data: bytes, address: tuple | None):
"""Handle received data."""
print(f"Data: {len(data)} bytes from {address}")
# Process data...
async def on_connection_closed(self):
"""Handle connection closure."""
print("Connection closed")
# Cleanup...
async def on_connection_updated(self, idle: bool):
"""Handle connection state update."""
print(f"Connection {'idle' if idle else 'active'}")
# Update state...
# Usage
processor = EventProcessor()
# Add events to queue
await processor.event_queue.put(RawData(b"GET / HTTP/1.1\r\n\r\n"))
await processor.event_queue.put(Updated(idle=False))
await processor.event_queue.put(Closed())
# Process events
await processor.process_events()from hypercorn.events import Updated
from enum import Enum
import time
class ConnectionState(Enum):
CONNECTING = "connecting"
ACTIVE = "active"
IDLE = "idle"
CLOSING = "closing"
CLOSED = "closed"
class ConnectionManager:
def __init__(self):
self.connections = {}
self.idle_timeout = 60.0 # seconds
async def handle_connection_update(self, conn_id: str, event: Updated):
"""Handle connection state updates."""
connection = self.connections.get(conn_id)
if not connection:
return
if event.idle:
# Connection became idle
connection['state'] = ConnectionState.IDLE
connection['idle_since'] = time.time()
print(f"Connection {conn_id} is now idle")
# Schedule idle timeout check
await self.schedule_idle_check(conn_id)
else:
# Connection became active
if connection['state'] == ConnectionState.IDLE:
idle_duration = time.time() - connection['idle_since']
print(f"Connection {conn_id} active after {idle_duration:.2f}s idle")
connection['state'] = ConnectionState.ACTIVE
connection.pop('idle_since', None)
async def schedule_idle_check(self, conn_id: str):
"""Schedule check for idle timeout."""
await asyncio.sleep(self.idle_timeout)
connection = self.connections.get(conn_id)
if (connection and
connection['state'] == ConnectionState.IDLE and
time.time() - connection['idle_since'] >= self.idle_timeout):
print(f"Connection {conn_id} idle timeout - closing")
await self.close_connection(conn_id)
async def close_connection(self, conn_id: str):
"""Close idle connection."""
connection = self.connections.get(conn_id)
if connection:
connection['state'] = ConnectionState.CLOSING
# Trigger connection close...from hypercorn.events import RawData, Closed, Updated
from enum import Enum
class ProtocolState(Enum):
WAITING_REQUEST = "waiting_request"
RECEIVING_HEADERS = "receiving_headers"
RECEIVING_BODY = "receiving_body"
PROCESSING = "processing"
SENDING_RESPONSE = "sending_response"
KEEP_ALIVE = "keep_alive"
class HTTPProtocolStateMachine:
def __init__(self):
self.state = ProtocolState.WAITING_REQUEST
self.buffer = b""
async def handle_event(self, event):
"""Handle events based on current protocol state."""
if isinstance(event, RawData):
await self.handle_data(event.data)
elif isinstance(event, Closed):
await self.handle_close()
elif isinstance(event, Updated):
await self.handle_update(event.idle)
async def handle_data(self, data: bytes):
"""Handle incoming data based on current state."""
self.buffer += data
if self.state == ProtocolState.WAITING_REQUEST:
if b"\r\n" in self.buffer:
await self.parse_request_line()
self.state = ProtocolState.RECEIVING_HEADERS
elif self.state == ProtocolState.RECEIVING_HEADERS:
if b"\r\n\r\n" in self.buffer:
await self.parse_headers()
# Determine if body is expected
if await self.expects_body():
self.state = ProtocolState.RECEIVING_BODY
else:
self.state = ProtocolState.PROCESSING
await self.process_request()
elif self.state == ProtocolState.RECEIVING_BODY:
if await self.body_complete():
self.state = ProtocolState.PROCESSING
await self.process_request()
async def handle_close(self):
"""Handle connection closure."""
print(f"Connection closed in state: {self.state}")
# Cleanup based on current state
async def handle_update(self, idle: bool):
"""Handle connection state updates."""
if idle and self.state == ProtocolState.KEEP_ALIVE:
print("Keep-alive connection is idle")
elif not idle and self.state == ProtocolState.KEEP_ALIVE:
print("Keep-alive connection received new request")
self.state = ProtocolState.WAITING_REQUESTfrom hypercorn.events import Event, RawData, Closed, Updated
import time
from collections import defaultdict
class EventMetrics:
def __init__(self):
self.event_counts = defaultdict(int)
self.data_bytes = 0
self.connection_count = 0
self.start_time = time.time()
async def record_event(self, event: Event):
"""Record event metrics."""
event_type = type(event).__name__
self.event_counts[event_type] += 1
if isinstance(event, RawData):
self.data_bytes += len(event.data)
print(f"Data event: {len(event.data)} bytes")
elif isinstance(event, Closed):
self.connection_count -= 1
print(f"Connection closed (active: {self.connection_count})")
elif isinstance(event, Updated):
state = "idle" if event.idle else "active"
print(f"Connection updated: {state}")
def get_stats(self):
"""Get current statistics."""
uptime = time.time() - self.start_time
return {
'uptime': uptime,
'event_counts': dict(self.event_counts),
'total_data_bytes': self.data_bytes,
'active_connections': self.connection_count,
'events_per_second': sum(self.event_counts.values()) / uptime
}
# Usage
metrics = EventMetrics()
# Record events
await metrics.record_event(RawData(b"HTTP data", ("127.0.0.1", 54321)))
await metrics.record_event(Updated(idle=False))
await metrics.record_event(Closed())
# Get statistics
stats = metrics.get_stats()
print(f"Statistics: {stats}")The event system provides the foundation for Hypercorn's efficient connection management and protocol handling, enabling clean separation of concerns and reactive programming patterns.
Install with Tessl CLI
npx tessl i tessl/pypi-hypercorn