A pure-Python, bring-your-own-I/O implementation of HTTP/1.1
—
Core connection handling and state management for HTTP/1.1 protocol implementation. The Connection class serves as the central state machine that manages the HTTP/1.1 protocol flow for both client and server roles.
The main class encapsulating HTTP connection state and protocol logic. Manages parsing, generation, and state transitions for HTTP/1.1 messages.
class Connection:
def __init__(self, our_role, max_incomplete_event_size=16384):
"""
Create a new HTTP/1.1 connection state machine.
Args:
our_role: Either h11.CLIENT or h11.SERVER
max_incomplete_event_size (int): Maximum bytes to buffer for incomplete events (default: 16384)
"""Usage Example:
import h11
# Create client connection
client_conn = h11.Connection(h11.CLIENT)
# Create server connection
server_conn = h11.Connection(h11.SERVER, max_incomplete_event_size=8192)Parse incoming data into HTTP events and convert events into bytes for transmission.
def next_event(self):
"""
Parse the next event from the receive buffer.
Returns:
Union[Event, Type[NEED_DATA], Type[PAUSED]]:
- An Event object if a complete event was parsed
- NEED_DATA if more data is needed from the socket
- PAUSED if connection is paused and needs start_next_cycle()
"""
def send(self, event):
"""
Convert an event to bytes for transmission over the wire.
Args:
event (Event): Event object to serialize
Returns:
Optional[bytes]: Bytes to send, or None if event should not be sent
Raises:
LocalProtocolError: If event is not valid for current connection state
"""
def send_with_data_passthrough(self, event):
"""
Like send() but preserves Data event objects in the output.
Args:
event (Event): Event object to serialize
Returns:
Optional[List[bytes]]: List of byte chunks to send, or None
Note:
Data events are returned as-is rather than serialized to bytes.
"""Manage incoming data and connection lifecycle.
def receive_data(self, data):
"""
Add incoming data to the internal receive buffer.
Args:
data (bytes): Raw bytes received from socket
Note:
Call next_event() after this to parse events from the buffer.
"""
def start_next_cycle(self):
"""
Reset the connection state for a new request/response cycle.
Note:
Only call when both sides have finished the current cycle.
Required after receiving PAUSED from next_event().
"""
def send_failed(self):
"""
Notify the state machine that sending failed.
Note:
Call this if you were unable to send data returned by send().
Transitions connection to ERROR state.
"""Access connection state and metadata.
@property
def states(self):
"""
Dict mapping roles to their current states.
Returns:
Dict[Role, State]: Current states for CLIENT and SERVER roles
"""
@property
def our_state(self):
"""
Current state of our role.
Returns:
State: Current state (IDLE, SEND_BODY, DONE, etc.)
"""
@property
def their_state(self):
"""
Current state of the peer's role.
Returns:
State: Peer's current state
"""
our_role: Role
"""
Our role in the connection - either CLIENT or SERVER.
Attribute:
Role: Either CLIENT or SERVER
"""
their_role: Role
"""
The peer's role in the connection - either CLIENT or SERVER (opposite of our_role).
Attribute:
Role: Either CLIENT or SERVER
"""
their_http_version: Optional[bytes]
"""
HTTP version used by the peer.
Attribute:
Optional[bytes]: HTTP version like b"1.1" or b"1.0", or None if not determined yet
"""
@property
def they_are_waiting_for_100_continue(self):
"""
Whether peer is waiting for 100-continue response.
Returns:
bool: True if peer sent Expect: 100-continue header
"""
client_is_waiting_for_100_continue: bool
"""
Whether client is waiting for 100-continue response.
Attribute:
bool: True if client is waiting for 100-continue
"""
@property
def trailing_data(self):
"""
Get any unprocessed data after connection closed.
Returns:
Tuple[bytes, bool]:
- bytes: Unprocessed data
- bool: Whether connection was closed
"""Special sentinel values returned by next_event() to indicate connection state.
class NEED_DATA:
"""
Sentinel returned by next_event() when more data is needed.
Usage:
event = conn.next_event()
if event is h11.NEED_DATA:
# Read more data from socket and call receive_data()
"""
class PAUSED:
"""
Sentinel returned by next_event() when connection is paused.
Usage:
event = conn.next_event()
if event is h11.PAUSED:
# Call start_next_cycle() to begin new request/response
"""import h11
conn = h11.Connection(h11.CLIENT)
# Send request
req = h11.Request(method=b'GET', target=b'/', headers=[(b'host', b'example.com')])
data = conn.send(req)
send_to_socket(data)
# End request
eom = h11.EndOfMessage()
data = conn.send(eom)
send_to_socket(data)
# Receive response
while True:
data = receive_from_socket()
conn.receive_data(data)
event = conn.next_event()
if event is h11.NEED_DATA:
continue
elif isinstance(event, h11.Response):
print(f"Status: {event.status_code}")
elif isinstance(event, h11.EndOfMessage):
breakimport h11
conn = h11.Connection(h11.SERVER)
# Receive request
while True:
data = receive_from_socket()
conn.receive_data(data)
event = conn.next_event()
if event is h11.NEED_DATA:
continue
elif isinstance(event, h11.Request):
print(f"Method: {event.method}, Target: {event.target}")
elif isinstance(event, h11.EndOfMessage):
break
# Send response
resp = h11.Response(status_code=200, headers=[(b'content-type', b'text/plain')])
data = conn.send(resp)
send_to_socket(data)
# Send body
body_data = h11.Data(data=b'Hello, World!')
data = conn.send(body_data)
send_to_socket(data)
# End response
eom = h11.EndOfMessage()
data = conn.send(eom)
send_to_socket(data)Install with Tessl CLI
npx tessl i tessl/pypi-h11