0
# Events
1
2
Event-driven architecture for connection lifecycle management, providing structured event handling for connection state changes and data flow. Hypercorn's event system enables efficient processing of network events and connection management.
3
4
## Capabilities
5
6
### Base Event Class
7
8
Abstract base class that defines the event interface for all event types in Hypercorn's event system.
9
10
```python { .api }
11
class Event(ABC):
12
"""
13
Abstract base class for all events.
14
15
Defines the common interface for events in Hypercorn's
16
event-driven architecture. All concrete event types
17
inherit from this base class.
18
"""
19
pass
20
```
21
22
### Raw Data Event
23
24
Event representing raw network data received from or to be sent to a client connection.
25
26
```python { .api }
27
@dataclass
28
class RawData(Event):
29
"""
30
Event containing raw network data.
31
32
Represents data received from or to be sent to a network
33
connection, along with optional address information for
34
datagram protocols.
35
"""
36
data: bytes # Raw network data
37
address: tuple | None = None # Optional address for datagram protocols
38
39
# Usage patterns:
40
# - TCP connections: address is typically None
41
# - UDP/QUIC: address contains (host, port) tuple
42
# - Unix sockets: address format depends on socket type
43
```
44
45
### Connection Closed Event
46
47
Event indicating that a network connection has been closed or terminated.
48
49
```python { .api }
50
@dataclass
51
class Closed(Event):
52
"""
53
Event indicating connection closure.
54
55
Signals that a network connection has been closed,
56
either gracefully by the client/server or due to
57
network errors or timeouts.
58
"""
59
pass
60
61
# Triggered when:
62
# - Client closes connection gracefully
63
# - Server closes connection
64
# - Network error causes connection termination
65
# - Connection timeout expires
66
```
67
68
### Connection Updated Event
69
70
Event indicating changes in connection state, particularly idle status for connection management and keep-alive handling.
71
72
```python { .api }
73
@dataclass
74
class Updated(Event):
75
"""
76
Event indicating connection state update.
77
78
Signals changes in connection state, particularly
79
transitions between active and idle states for
80
connection management and resource optimization.
81
"""
82
idle: bool # Whether connection is currently idle
83
84
# Connection states:
85
# - idle=False: Connection is actively processing requests
86
# - idle=True: Connection is idle and available for new requests
87
#
88
# Used for:
89
# - Keep-alive connection management
90
# - Connection pooling decisions
91
# - Resource cleanup timing
92
# - Load balancing algorithms
93
```
94
95
## Usage Examples
96
97
### Event Handling in Protocol Implementation
98
99
```python
100
from hypercorn.events import Event, RawData, Closed, Updated
101
102
class ProtocolHandler:
103
def __init__(self):
104
self.connection_active = True
105
self.idle_timeout = 30.0
106
107
async def handle_event(self, event: Event):
108
"""Handle different event types."""
109
if isinstance(event, RawData):
110
await self.handle_raw_data(event)
111
elif isinstance(event, Closed):
112
await self.handle_connection_closed(event)
113
elif isinstance(event, Updated):
114
await self.handle_connection_updated(event)
115
else:
116
print(f"Unknown event type: {type(event)}")
117
118
async def handle_raw_data(self, event: RawData):
119
"""Process incoming raw data."""
120
data = event.data
121
address = event.address
122
123
print(f"Received {len(data)} bytes from {address}")
124
125
# Process the data (HTTP parsing, WebSocket frames, etc.)
126
await self.process_protocol_data(data)
127
128
async def handle_connection_closed(self, event: Closed):
129
"""Handle connection closure."""
130
print("Connection closed")
131
self.connection_active = False
132
133
# Cleanup resources
134
await self.cleanup_connection()
135
136
async def handle_connection_updated(self, event: Updated):
137
"""Handle connection state updates."""
138
if event.idle:
139
print("Connection became idle")
140
# Start idle timeout timer
141
await self.start_idle_timer()
142
else:
143
print("Connection became active")
144
# Cancel idle timeout timer
145
await self.cancel_idle_timer()
146
```
147
148
### Custom Event Loop Integration
149
150
```python
151
import asyncio
152
from hypercorn.events import RawData, Closed, Updated
153
154
class EventProcessor:
155
def __init__(self):
156
self.event_queue = asyncio.Queue()
157
self.running = True
158
159
async def process_events(self):
160
"""Main event processing loop."""
161
while self.running:
162
try:
163
event = await self.event_queue.get()
164
await self.dispatch_event(event)
165
self.event_queue.task_done()
166
except Exception as e:
167
print(f"Error processing event: {e}")
168
169
async def dispatch_event(self, event):
170
"""Dispatch event to appropriate handler."""
171
if isinstance(event, RawData):
172
await self.on_data_received(event.data, event.address)
173
elif isinstance(event, Closed):
174
await self.on_connection_closed()
175
elif isinstance(event, Updated):
176
await self.on_connection_updated(event.idle)
177
178
async def on_data_received(self, data: bytes, address: tuple | None):
179
"""Handle received data."""
180
print(f"Data: {len(data)} bytes from {address}")
181
# Process data...
182
183
async def on_connection_closed(self):
184
"""Handle connection closure."""
185
print("Connection closed")
186
# Cleanup...
187
188
async def on_connection_updated(self, idle: bool):
189
"""Handle connection state update."""
190
print(f"Connection {'idle' if idle else 'active'}")
191
# Update state...
192
193
# Usage
194
processor = EventProcessor()
195
196
# Add events to queue
197
await processor.event_queue.put(RawData(b"GET / HTTP/1.1\r\n\r\n"))
198
await processor.event_queue.put(Updated(idle=False))
199
await processor.event_queue.put(Closed())
200
201
# Process events
202
await processor.process_events()
203
```
204
205
### Connection State Management
206
207
```python
208
from hypercorn.events import Updated
209
from enum import Enum
210
import time
211
212
class ConnectionState(Enum):
213
CONNECTING = "connecting"
214
ACTIVE = "active"
215
IDLE = "idle"
216
CLOSING = "closing"
217
CLOSED = "closed"
218
219
class ConnectionManager:
220
def __init__(self):
221
self.connections = {}
222
self.idle_timeout = 60.0 # seconds
223
224
async def handle_connection_update(self, conn_id: str, event: Updated):
225
"""Handle connection state updates."""
226
connection = self.connections.get(conn_id)
227
if not connection:
228
return
229
230
if event.idle:
231
# Connection became idle
232
connection['state'] = ConnectionState.IDLE
233
connection['idle_since'] = time.time()
234
print(f"Connection {conn_id} is now idle")
235
236
# Schedule idle timeout check
237
await self.schedule_idle_check(conn_id)
238
else:
239
# Connection became active
240
if connection['state'] == ConnectionState.IDLE:
241
idle_duration = time.time() - connection['idle_since']
242
print(f"Connection {conn_id} active after {idle_duration:.2f}s idle")
243
244
connection['state'] = ConnectionState.ACTIVE
245
connection.pop('idle_since', None)
246
247
async def schedule_idle_check(self, conn_id: str):
248
"""Schedule check for idle timeout."""
249
await asyncio.sleep(self.idle_timeout)
250
251
connection = self.connections.get(conn_id)
252
if (connection and
253
connection['state'] == ConnectionState.IDLE and
254
time.time() - connection['idle_since'] >= self.idle_timeout):
255
256
print(f"Connection {conn_id} idle timeout - closing")
257
await self.close_connection(conn_id)
258
259
async def close_connection(self, conn_id: str):
260
"""Close idle connection."""
261
connection = self.connections.get(conn_id)
262
if connection:
263
connection['state'] = ConnectionState.CLOSING
264
# Trigger connection close...
265
```
266
267
### Event-Driven Protocol State Machine
268
269
```python
270
from hypercorn.events import RawData, Closed, Updated
271
from enum import Enum
272
273
class ProtocolState(Enum):
274
WAITING_REQUEST = "waiting_request"
275
RECEIVING_HEADERS = "receiving_headers"
276
RECEIVING_BODY = "receiving_body"
277
PROCESSING = "processing"
278
SENDING_RESPONSE = "sending_response"
279
KEEP_ALIVE = "keep_alive"
280
281
class HTTPProtocolStateMachine:
282
def __init__(self):
283
self.state = ProtocolState.WAITING_REQUEST
284
self.buffer = b""
285
286
async def handle_event(self, event):
287
"""Handle events based on current protocol state."""
288
if isinstance(event, RawData):
289
await self.handle_data(event.data)
290
elif isinstance(event, Closed):
291
await self.handle_close()
292
elif isinstance(event, Updated):
293
await self.handle_update(event.idle)
294
295
async def handle_data(self, data: bytes):
296
"""Handle incoming data based on current state."""
297
self.buffer += data
298
299
if self.state == ProtocolState.WAITING_REQUEST:
300
if b"\r\n" in self.buffer:
301
await self.parse_request_line()
302
self.state = ProtocolState.RECEIVING_HEADERS
303
304
elif self.state == ProtocolState.RECEIVING_HEADERS:
305
if b"\r\n\r\n" in self.buffer:
306
await self.parse_headers()
307
# Determine if body is expected
308
if await self.expects_body():
309
self.state = ProtocolState.RECEIVING_BODY
310
else:
311
self.state = ProtocolState.PROCESSING
312
await self.process_request()
313
314
elif self.state == ProtocolState.RECEIVING_BODY:
315
if await self.body_complete():
316
self.state = ProtocolState.PROCESSING
317
await self.process_request()
318
319
async def handle_close(self):
320
"""Handle connection closure."""
321
print(f"Connection closed in state: {self.state}")
322
# Cleanup based on current state
323
324
async def handle_update(self, idle: bool):
325
"""Handle connection state updates."""
326
if idle and self.state == ProtocolState.KEEP_ALIVE:
327
print("Keep-alive connection is idle")
328
elif not idle and self.state == ProtocolState.KEEP_ALIVE:
329
print("Keep-alive connection received new request")
330
self.state = ProtocolState.WAITING_REQUEST
331
```
332
333
### Event Metrics and Monitoring
334
335
```python
336
from hypercorn.events import Event, RawData, Closed, Updated
337
import time
338
from collections import defaultdict
339
340
class EventMetrics:
341
def __init__(self):
342
self.event_counts = defaultdict(int)
343
self.data_bytes = 0
344
self.connection_count = 0
345
self.start_time = time.time()
346
347
async def record_event(self, event: Event):
348
"""Record event metrics."""
349
event_type = type(event).__name__
350
self.event_counts[event_type] += 1
351
352
if isinstance(event, RawData):
353
self.data_bytes += len(event.data)
354
print(f"Data event: {len(event.data)} bytes")
355
356
elif isinstance(event, Closed):
357
self.connection_count -= 1
358
print(f"Connection closed (active: {self.connection_count})")
359
360
elif isinstance(event, Updated):
361
state = "idle" if event.idle else "active"
362
print(f"Connection updated: {state}")
363
364
def get_stats(self):
365
"""Get current statistics."""
366
uptime = time.time() - self.start_time
367
return {
368
'uptime': uptime,
369
'event_counts': dict(self.event_counts),
370
'total_data_bytes': self.data_bytes,
371
'active_connections': self.connection_count,
372
'events_per_second': sum(self.event_counts.values()) / uptime
373
}
374
375
# Usage
376
metrics = EventMetrics()
377
378
# Record events
379
await metrics.record_event(RawData(b"HTTP data", ("127.0.0.1", 54321)))
380
await metrics.record_event(Updated(idle=False))
381
await metrics.record_event(Closed())
382
383
# Get statistics
384
stats = metrics.get_stats()
385
print(f"Statistics: {stats}")
386
```
387
388
The event system provides the foundation for Hypercorn's efficient connection management and protocol handling, enabling clean separation of concerns and reactive programming patterns.