CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-h2

Pure-Python HTTP/2 protocol implementation providing low-level connection and stream management

Pending
Overview
Eval results
Files

connection.mddocs/

Connection Management

Core HTTP/2 connection handling providing the primary interface for all HTTP/2 protocol operations. The H2Connection class manages connection state, frame processing, stream lifecycle, and data transmission.

Capabilities

H2Connection Class

The main connection object that handles HTTP/2 protocol operations, maintains connection and stream state, and processes frames according to the HTTP/2 specification.

class H2Connection:
    def __init__(self, config: H2Configuration | None = None):
        """
        Initialize HTTP/2 connection.
        
        Args:
            config: Connection configuration object
        """

Properties

Connection state properties providing information about open streams and flow control.

@property
def open_outbound_streams(self) -> int:
    """The current number of open outbound streams."""

@property
def open_inbound_streams(self) -> int:
    """The current number of open inbound streams."""

@property  
def inbound_flow_control_window(self) -> int:
    """The size of the inbound flow control window for the connection."""

Connection Initialization

Methods for establishing HTTP/2 connections in different scenarios.

def initiate_connection(self) -> None:
    """
    Provides any data that needs to be sent at the start of the connection.
    Must be called for both clients and servers.
    """

def initiate_upgrade_connection(self, settings_header: bytes | None = None) -> bytes | None:
    """
    Call to initialise the connection object for use with an upgraded HTTP/2 
    connection (i.e. a connection negotiated using the Upgrade: h2c HTTP header).
    
    Args:
        settings_header: HTTP2-Settings header value from HTTP/1.1 upgrade request
        
    Returns:
        Connection preface to send to the client, or None if not applicable
    """

Stream Management

Methods for creating, managing, and controlling HTTP/2 streams.

def get_next_available_stream_id(self) -> int:
    """
    Returns an integer suitable for use as the stream ID for the next stream 
    created by this endpoint. For server endpoints, this stream ID will be even. 
    For client endpoints, this stream ID will be odd.
    """

def send_headers(
    self, 
    stream_id: int, 
    headers: Iterable[HeaderWeaklyTyped], 
    end_stream: bool = False, 
    priority_weight: int | None = None, 
    priority_depends_on: int | None = None, 
    priority_exclusive: bool | None = None
) -> None:
    """
    Send headers on a given stream. This function can be used to send 
    request or response headers.
    
    Args:
        stream_id: Stream ID to send headers on
        headers: List of (name, value) header tuples  
        end_stream: Whether to close stream after sending headers
        priority_weight: Stream priority weight (1-256)
        priority_depends_on: Stream ID this stream depends on
        priority_exclusive: Whether dependency is exclusive
    """

def send_data(
    self, 
    stream_id: int, 
    data: bytes | memoryview, 
    end_stream: bool = False, 
    pad_length: Any = None
) -> None:
    """
    Send data on a given stream. This method does no breaking up of data.
    
    Args:
        stream_id: Stream ID to send data on
        data: Data bytes to send
        end_stream: Whether to close stream after sending data
        pad_length: Padding length for data frame
    """

def end_stream(self, stream_id: int) -> None:
    """
    Cleanly end a given stream. This method ends a stream by sending an 
    empty DATA frame on that stream with the END_STREAM flag set.
    
    Args:
        stream_id: Stream ID to end
    """

def push_stream(
    self, 
    stream_id: int, 
    promised_stream_id: int, 
    request_headers: Iterable[HeaderWeaklyTyped]
) -> None:
    """
    Push a response to the client by sending a PUSH_PROMISE frame.
    
    Args:
        stream_id: Stream ID to send push promise on
        promised_stream_id: Stream ID for the promised response
        request_headers: Headers for the promised request
    """

def reset_stream(self, stream_id: int, error_code: ErrorCodes | int = 0) -> None:
    """
    Reset a stream. This method forcibly closes a stream by sending a 
    RST_STREAM frame for a given stream.
    
    Args:
        stream_id: Stream ID to reset
        error_code: HTTP/2 error code for reset reason
    """

Flow Control

Methods for managing HTTP/2 flow control windows at connection and stream levels.

def increment_flow_control_window(self, increment: int, stream_id: int | None = None) -> None:
    """
    Increment a flow control window, optionally for a single stream. 
    Allows the remote peer to send more data.
    
    Args:
        increment: Number of bytes to increment window by
        stream_id: Stream ID to increment window for, or None for connection window
    """

def local_flow_control_window(self, stream_id: int) -> int:
    """
    Returns the maximum amount of data that can be sent on stream stream_id.
    
    Args:
        stream_id: Stream ID to check window for
        
    Returns:
        Number of bytes that can be sent
    """

def remote_flow_control_window(self, stream_id: int) -> int:
    """
    Returns the maximum amount of data the remote peer can send on stream stream_id.
    
    Args:
        stream_id: Stream ID to check window for
        
    Returns:
        Number of bytes remote peer can send
    """

def acknowledge_received_data(self, acknowledged_size: int, stream_id: int) -> None:
    """
    Inform the H2Connection that a certain number of flow-controlled bytes have 
    been processed, and that the space should be handed back to the remote peer 
    at an opportune time.
    
    Args:
        acknowledged_size: Number of bytes processed
        stream_id: Stream ID the data was received on
    """

Connection Control

Methods for connection-level operations including ping, settings, and connection termination.

def ping(self, opaque_data: bytes | str) -> None:
    """
    Send a PING frame.
    
    Args:
        opaque_data: 8 bytes of opaque data to include in ping
    """

def close_connection(
    self, 
    error_code: ErrorCodes | int = 0, 
    additional_data: bytes | None = None, 
    last_stream_id: int | None = None
) -> None:
    """
    Close a connection, emitting a GOAWAY frame.
    
    Args:
        error_code: HTTP/2 error code for connection close reason
        additional_data: Additional debug data
        last_stream_id: Last stream ID processed
    """

def update_settings(self, new_settings: dict[SettingCodes | int, int]) -> None:
    """
    Update the local settings. This will prepare and emit the appropriate SETTINGS frame.
    
    Args:
        new_settings: Dictionary of setting codes to values
    """

def advertise_alternative_service(
    self, 
    field_value: bytes | str, 
    origin: bytes | None = None, 
    stream_id: int | None = None
) -> None:
    """
    Notify a client about an available Alternative Service.
    
    Args:
        field_value: Alt-Svc field value as defined in RFC 7838
        origin: Origin to advertise alt-svc for 
        stream_id: Stream ID to send alt-svc on, or None for connection level
    """

def prioritize(
    self, 
    stream_id: int, 
    weight: int | None = None, 
    depends_on: int | None = None, 
    exclusive: bool | None = None
) -> None:
    """
    Notify a server about the priority of a stream.
    
    Args:
        stream_id: Stream ID to set priority for
        weight: Stream priority weight (1-256)
        depends_on: Stream ID this stream depends on
        exclusive: Whether dependency is exclusive
    """

Data Handling

Methods for sending and receiving data through the connection.

def data_to_send(self, amount: int | None = None) -> bytes:
    """
    Returns some data for sending out of the internal data buffer. This method is 
    analogous to read on a file-like object, but it doesn't block.
    
    Args:
        amount: Maximum number of bytes to return, or None for all available
        
    Returns:
        Bytes to send over the network connection
    """

def clear_outbound_data_buffer(self) -> None:
    """
    Clears the outbound data buffer, such that if this call was immediately 
    followed by a call to data_to_send, that call would return no data.
    """

def receive_data(self, data: bytes) -> list[Event]:
    """
    Pass some received HTTP/2 data to the connection for handling.
    
    Args:
        data: Raw bytes received from the network
        
    Returns:
        List of events generated from processing the data
    """

Usage Examples

Basic Client Connection

from h2.connection import H2Connection
from h2.config import H2Configuration

# Create client connection
config = H2Configuration(client_side=True)
conn = H2Connection(config=config)

# Initialize connection
conn.initiate_connection()
data_to_send = conn.data_to_send()
# Send data_to_send over your socket

# Send request
conn.send_headers(
    stream_id=1,
    headers=[
        (':method', 'GET'),
        (':path', '/api/data'),
        (':scheme', 'https'), 
        (':authority', 'example.com'),
    ]
)
data_to_send = conn.data_to_send()
# Send data_to_send over your socket

Flow Control Management

# Process received data and manage flow control
received_data = socket.recv(65536)
events = conn.receive_data(received_data)

for event in events:
    if isinstance(event, DataReceived):
        # Process the data
        process_data(event.data)
        
        # Acknowledge received data for flow control
        conn.acknowledge_received_data(
            acknowledged_size=len(event.data),
            stream_id=event.stream_id
        )
        
        # Send acknowledgment
        data_to_send = conn.data_to_send()
        socket.send(data_to_send)

Install with Tessl CLI

npx tessl i tessl/pypi-h2

docs

configuration.md

connection.md

events.md

exceptions.md

index.md

settings.md

tile.json