Pure-Python HTTP/2 protocol implementation providing low-level connection and stream management
—
Structured events representing HTTP/2 protocol actions and state changes. All H2Connection operations generate events that applications process to handle requests, responses, data transmission, and protocol state changes.
All events inherit from the base Event class.
class Event:
"""Base class for all h2 events."""Events related to HTTP request and response processing.
Event generated when a complete request is received (headers with possible END_STREAM).
class RequestReceived(Event):
"""
Request received from remote peer.
Attributes:
stream_id: Stream ID for the request
headers: List of header objects
stream_ended: Associated StreamEnded event if END_STREAM flag set
priority_updated: Associated PriorityUpdated event if priority info present
"""
stream_id: int
headers: list[Header]
stream_ended: StreamEnded | None
priority_updated: PriorityUpdated | NoneEvent generated when a complete response is received (headers with possible END_STREAM).
class ResponseReceived(Event):
"""
Response received from remote peer.
Attributes:
stream_id: Stream ID for the response
headers: List of header objects
stream_ended: Associated StreamEnded event if END_STREAM flag set
priority_updated: Associated PriorityUpdated event if priority info present
"""
stream_id: int
headers: list[Header]
stream_ended: StreamEnded | None
priority_updated: PriorityUpdated | NoneEvent generated when HTTP trailers are received (always includes END_STREAM).
class TrailersReceived(Event):
"""
Trailers received from remote peer.
Attributes:
stream_id: Stream ID for the trailers
headers: List of header objects
stream_ended: Associated StreamEnded event (always present for trailers)
priority_updated: Associated PriorityUpdated event if priority info present
"""
stream_id: int
headers: list[Header]
stream_ended: StreamEnded | None
priority_updated: PriorityUpdated | NoneEvent generated when 1xx informational response is received.
class InformationalResponseReceived(Event):
"""
Informational response (1xx) received from remote peer.
Attributes:
stream_id: Stream ID for the informational response
headers: List of header objects
priority_updated: Associated PriorityUpdated event if priority info present
"""
stream_id: int
headers: list[Header]
priority_updated: PriorityUpdated | NoneEvents related to data transmission and reception.
Event generated when data is received on a stream.
class DataReceived(Event):
"""
Data received from remote peer.
Attributes:
stream_id: Stream ID the data was received on
data: Raw data bytes received
flow_controlled_length: Number of flow-controlled bytes (may differ from len(data) due to padding)
stream_ended: Associated StreamEnded event if END_STREAM flag set
"""
stream_id: int
data: bytes
flow_controlled_length: int
stream_ended: StreamEnded | NoneEvents related to HTTP/2 flow control window management.
Event generated when a flow control window changes size.
class WindowUpdated(Event):
"""
Flow control window updated.
Attributes:
stream_id: Stream ID whose window was updated (0 for connection window)
delta: Number of bytes the window was increased by
"""
stream_id: int
delta: intEvents related to stream creation, termination, and management.
Event generated when a stream is ended by a remote party.
class StreamEnded(Event):
"""
Stream ended by remote peer.
Attributes:
stream_id: Stream ID that was ended
"""
stream_id: intEvent generated when a stream is reset by either party.
class StreamReset(Event):
"""
Stream reset by local or remote peer.
Attributes:
stream_id: Stream ID that was reset
error_code: HTTP/2 error code for reset reason
remote_reset: True if reset by remote peer, False if reset locally
"""
stream_id: int
error_code: ErrorCodes | int
remote_reset: boolEvent generated when a pushed stream is received from a server (client-side only).
class PushedStreamReceived(Event):
"""
Pushed stream received from server.
Attributes:
pushed_stream_id: Stream ID for the pushed response
parent_stream_id: Stream ID that initiated the push
headers: List of header objects for the pushed request
"""
pushed_stream_id: int
parent_stream_id: int
headers: list[Header]Events related to connection-level control operations.
Event generated when a PING frame is received.
class PingReceived(Event):
"""
PING frame received from remote peer.
A ping acknowledgment with the same opaque data is automatically emitted
after receiving a ping.
Attributes:
ping_data: 8 bytes of opaque data from the PING frame
.. versionadded:: 3.1.0
"""
ping_data: bytesEvent generated when a PING acknowledgment is received.
class PingAckReceived(Event):
"""
PING acknowledgment received from remote peer.
Contains the opaque data of the PING+ACK frame, allowing correlation
of PINGs and calculation of RTT.
Attributes:
ping_data: 8 bytes of opaque data from the PING+ACK frame
.. versionadded:: 3.1.0
.. versionchanged:: 4.0.0
Removed deprecated but equivalent PingAcknowledged
"""
ping_data: bytesEvent generated when a connection is terminated.
class ConnectionTerminated(Event):
"""
Connection terminated by local or remote peer.
Attributes:
error_code: HTTP/2 error code for termination reason
last_stream_id: Last stream ID processed before termination
additional_data: Additional debug information
"""
error_code: ErrorCodes | int | None
last_stream_id: int | None
additional_data: bytes | NoneEvents related to HTTP/2 settings negotiation and acknowledgment.
Event generated when remote peer changes its settings.
class RemoteSettingsChanged(Event):
"""
Remote peer settings changed.
Attributes:
changed_settings: Dictionary of setting codes to ChangedSetting objects
"""
changed_settings: dict[int, ChangedSetting]
@classmethod
def from_settings(
cls,
old_settings: Settings | dict[int, int],
new_settings: dict[int, int]
) -> RemoteSettingsChanged:
"""
Create event from old and new settings.
Args:
old_settings: Previous settings state
new_settings: New settings state
Returns:
RemoteSettingsChanged event with populated changed_settings
"""Event generated when a settings acknowledgment is received.
class SettingsAcknowledged(Event):
"""
Settings acknowledged by remote peer.
Attributes:
changed_settings: Dictionary of setting codes to ChangedSetting objects
"""
changed_settings: dict[SettingCodes | int, ChangedSetting]Events related to stream priority and dependency management.
Event generated when stream priority information is updated.
class PriorityUpdated(Event):
"""
Stream priority updated.
Attributes:
stream_id: Stream ID whose priority was updated
weight: New stream weight (1-256)
depends_on: Stream ID this stream depends on
exclusive: Whether dependency is exclusive
"""
stream_id: int | None
weight: int | None
depends_on: int | None
exclusive: bool | NoneEvents related to HTTP/2 extensions and unknown frames.
Event generated when alternative service is advertised.
class AlternativeServiceAvailable(Event):
"""
Alternative service advertised by remote peer.
Attributes:
origin: Origin for which alternative service is available
field_value: Alt-Svc field value as defined in RFC 7838
"""
origin: bytes | None
field_value: bytes | NoneEvent generated when an unknown or unsupported frame is received.
class UnknownFrameReceived(Event):
"""
Unknown frame received from remote peer.
This occurs primarily when the remote peer is employing HTTP/2 extensions
that h2 doesn't know anything about.
Attributes:
frame: The unknown frame object
.. versionadded:: 2.7.0
"""
frame: FrameFor events that include headers, the Header type represents HTTP/2 headers:
from typing import Union
from hpack.struct import Header
# Header is a union type that can be:
Header = Union[
tuple[bytes, bytes], # (name, value) as bytes
tuple[str, str], # (name, value) as strings
tuple[bytes, str], # Mixed types
tuple[str, bytes] # Mixed types
]Simple event processing loop for handling HTTP/2 events:
from h2.connection import H2Connection
from h2.events import (
RequestReceived, ResponseReceived, DataReceived,
StreamEnded, ConnectionTerminated
)
def process_events(conn: H2Connection, data: bytes):
events = conn.receive_data(data)
for event in events:
if isinstance(event, RequestReceived):
print(f"Request on stream {event.stream_id}")
for name, value in event.headers:
print(f" {name}: {value}")
elif isinstance(event, DataReceived):
print(f"Data on stream {event.stream_id}: {len(event.data)} bytes")
# Acknowledge received data for flow control
conn.acknowledge_received_data(
acknowledged_size=event.flow_controlled_length,
stream_id=event.stream_id
)
elif isinstance(event, StreamEnded):
print(f"Stream {event.stream_id} ended")
elif isinstance(event, ConnectionTerminated):
print(f"Connection terminated: {event.error_code}")
break
# Send any response data
data_to_send = conn.data_to_send()
if data_to_send:
# Send data_to_send over your socket
passMore sophisticated event handling with error handling and stream management:
from h2.connection import H2Connection
from h2.events import *
from h2.exceptions import ProtocolError
class HTTP2EventHandler:
def __init__(self, conn: H2Connection):
self.conn = conn
self.streams = {} # Track active streams
def handle_events(self, data: bytes):
try:
events = self.conn.receive_data(data)
for event in events:
handler = getattr(self, f"handle_{event.__class__.__name__}", None)
if handler:
handler(event)
else:
print(f"Unhandled event: {event.__class__.__name__}")
except ProtocolError as e:
print(f"Protocol error: {e}")
self.conn.close_connection()
return self.conn.data_to_send()
def handle_RequestReceived(self, event: RequestReceived):
self.streams[event.stream_id] = {
'headers': event.headers,
'method': next((v for n, v in event.headers if n == b':method'), None),
'path': next((v for n, v in event.headers if n == b':path'), None)
}
if event.stream_ended:
self.handle_complete_request(event.stream_id)
def handle_DataReceived(self, event: DataReceived):
if event.stream_id in self.streams:
stream_data = self.streams[event.stream_id].setdefault('data', b'')
self.streams[event.stream_id]['data'] = stream_data + event.data
# Acknowledge for flow control
self.conn.acknowledge_received_data(
acknowledged_size=event.flow_controlled_length,
stream_id=event.stream_id
)
if event.stream_ended:
self.handle_complete_request(event.stream_id)
def handle_PingReceived(self, event: PingReceived):
print(f"Ping received with data: {event.ping_data.hex()}")
# Ping acknowledgment is sent automatically
def handle_complete_request(self, stream_id: int):
if stream_id in self.streams:
stream = self.streams[stream_id]
print(f"Complete request on stream {stream_id}: {stream.get('method')} {stream.get('path')}")
# Process complete request...
del self.streams[stream_id]Install with Tessl CLI
npx tessl i tessl/pypi-h2