Pure-Python HTTP/2 protocol implementation providing low-level connection and stream management
—
Core HTTP/2 connection handling providing the primary interface for all HTTP/2 protocol operations. The H2Connection class manages connection state, frame processing, stream lifecycle, and data transmission.
The main connection object that handles HTTP/2 protocol operations, maintains connection and stream state, and processes frames according to the HTTP/2 specification.
class H2Connection:
def __init__(self, config: H2Configuration | None = None):
"""
Initialize HTTP/2 connection.
Args:
config: Connection configuration object
"""Connection state properties providing information about open streams and flow control.
@property
def open_outbound_streams(self) -> int:
"""The current number of open outbound streams."""
@property
def open_inbound_streams(self) -> int:
"""The current number of open inbound streams."""
@property
def inbound_flow_control_window(self) -> int:
"""The size of the inbound flow control window for the connection."""Methods for establishing HTTP/2 connections in different scenarios.
def initiate_connection(self) -> None:
"""
Provides any data that needs to be sent at the start of the connection.
Must be called for both clients and servers.
"""
def initiate_upgrade_connection(self, settings_header: bytes | None = None) -> bytes | None:
"""
Call to initialise the connection object for use with an upgraded HTTP/2
connection (i.e. a connection negotiated using the Upgrade: h2c HTTP header).
Args:
settings_header: HTTP2-Settings header value from HTTP/1.1 upgrade request
Returns:
Connection preface to send to the client, or None if not applicable
"""Methods for creating, managing, and controlling HTTP/2 streams.
def get_next_available_stream_id(self) -> int:
"""
Returns an integer suitable for use as the stream ID for the next stream
created by this endpoint. For server endpoints, this stream ID will be even.
For client endpoints, this stream ID will be odd.
"""
def send_headers(
self,
stream_id: int,
headers: Iterable[HeaderWeaklyTyped],
end_stream: bool = False,
priority_weight: int | None = None,
priority_depends_on: int | None = None,
priority_exclusive: bool | None = None
) -> None:
"""
Send headers on a given stream. This function can be used to send
request or response headers.
Args:
stream_id: Stream ID to send headers on
headers: List of (name, value) header tuples
end_stream: Whether to close stream after sending headers
priority_weight: Stream priority weight (1-256)
priority_depends_on: Stream ID this stream depends on
priority_exclusive: Whether dependency is exclusive
"""
def send_data(
self,
stream_id: int,
data: bytes | memoryview,
end_stream: bool = False,
pad_length: Any = None
) -> None:
"""
Send data on a given stream. This method does no breaking up of data.
Args:
stream_id: Stream ID to send data on
data: Data bytes to send
end_stream: Whether to close stream after sending data
pad_length: Padding length for data frame
"""
def end_stream(self, stream_id: int) -> None:
"""
Cleanly end a given stream. This method ends a stream by sending an
empty DATA frame on that stream with the END_STREAM flag set.
Args:
stream_id: Stream ID to end
"""
def push_stream(
self,
stream_id: int,
promised_stream_id: int,
request_headers: Iterable[HeaderWeaklyTyped]
) -> None:
"""
Push a response to the client by sending a PUSH_PROMISE frame.
Args:
stream_id: Stream ID to send push promise on
promised_stream_id: Stream ID for the promised response
request_headers: Headers for the promised request
"""
def reset_stream(self, stream_id: int, error_code: ErrorCodes | int = 0) -> None:
"""
Reset a stream. This method forcibly closes a stream by sending a
RST_STREAM frame for a given stream.
Args:
stream_id: Stream ID to reset
error_code: HTTP/2 error code for reset reason
"""Methods for managing HTTP/2 flow control windows at connection and stream levels.
def increment_flow_control_window(self, increment: int, stream_id: int | None = None) -> None:
"""
Increment a flow control window, optionally for a single stream.
Allows the remote peer to send more data.
Args:
increment: Number of bytes to increment window by
stream_id: Stream ID to increment window for, or None for connection window
"""
def local_flow_control_window(self, stream_id: int) -> int:
"""
Returns the maximum amount of data that can be sent on stream stream_id.
Args:
stream_id: Stream ID to check window for
Returns:
Number of bytes that can be sent
"""
def remote_flow_control_window(self, stream_id: int) -> int:
"""
Returns the maximum amount of data the remote peer can send on stream stream_id.
Args:
stream_id: Stream ID to check window for
Returns:
Number of bytes remote peer can send
"""
def acknowledge_received_data(self, acknowledged_size: int, stream_id: int) -> None:
"""
Inform the H2Connection that a certain number of flow-controlled bytes have
been processed, and that the space should be handed back to the remote peer
at an opportune time.
Args:
acknowledged_size: Number of bytes processed
stream_id: Stream ID the data was received on
"""Methods for connection-level operations including ping, settings, and connection termination.
def ping(self, opaque_data: bytes | str) -> None:
"""
Send a PING frame.
Args:
opaque_data: 8 bytes of opaque data to include in ping
"""
def close_connection(
self,
error_code: ErrorCodes | int = 0,
additional_data: bytes | None = None,
last_stream_id: int | None = None
) -> None:
"""
Close a connection, emitting a GOAWAY frame.
Args:
error_code: HTTP/2 error code for connection close reason
additional_data: Additional debug data
last_stream_id: Last stream ID processed
"""
def update_settings(self, new_settings: dict[SettingCodes | int, int]) -> None:
"""
Update the local settings. This will prepare and emit the appropriate SETTINGS frame.
Args:
new_settings: Dictionary of setting codes to values
"""
def advertise_alternative_service(
self,
field_value: bytes | str,
origin: bytes | None = None,
stream_id: int | None = None
) -> None:
"""
Notify a client about an available Alternative Service.
Args:
field_value: Alt-Svc field value as defined in RFC 7838
origin: Origin to advertise alt-svc for
stream_id: Stream ID to send alt-svc on, or None for connection level
"""
def prioritize(
self,
stream_id: int,
weight: int | None = None,
depends_on: int | None = None,
exclusive: bool | None = None
) -> None:
"""
Notify a server about the priority of a stream.
Args:
stream_id: Stream ID to set priority for
weight: Stream priority weight (1-256)
depends_on: Stream ID this stream depends on
exclusive: Whether dependency is exclusive
"""Methods for sending and receiving data through the connection.
def data_to_send(self, amount: int | None = None) -> bytes:
"""
Returns some data for sending out of the internal data buffer. This method is
analogous to read on a file-like object, but it doesn't block.
Args:
amount: Maximum number of bytes to return, or None for all available
Returns:
Bytes to send over the network connection
"""
def clear_outbound_data_buffer(self) -> None:
"""
Clears the outbound data buffer, such that if this call was immediately
followed by a call to data_to_send, that call would return no data.
"""
def receive_data(self, data: bytes) -> list[Event]:
"""
Pass some received HTTP/2 data to the connection for handling.
Args:
data: Raw bytes received from the network
Returns:
List of events generated from processing the data
"""from h2.connection import H2Connection
from h2.config import H2Configuration
# Create client connection
config = H2Configuration(client_side=True)
conn = H2Connection(config=config)
# Initialize connection
conn.initiate_connection()
data_to_send = conn.data_to_send()
# Send data_to_send over your socket
# Send request
conn.send_headers(
stream_id=1,
headers=[
(':method', 'GET'),
(':path', '/api/data'),
(':scheme', 'https'),
(':authority', 'example.com'),
]
)
data_to_send = conn.data_to_send()
# Send data_to_send over your socket# Process received data and manage flow control
received_data = socket.recv(65536)
events = conn.receive_data(received_data)
for event in events:
if isinstance(event, DataReceived):
# Process the data
process_data(event.data)
# Acknowledge received data for flow control
conn.acknowledge_received_data(
acknowledged_size=len(event.data),
stream_id=event.stream_id
)
# Send acknowledgment
data_to_send = conn.data_to_send()
socket.send(data_to_send)Install with Tessl CLI
npx tessl i tessl/pypi-h2