WebSockets state-machine based protocol implementation
Core connection establishment, state management, and lifecycle control for both client and server WebSocket connections. This module provides the main high-level interface for WebSocket operations.
The primary interface for WebSocket connections, handling both client and server modes with automatic handshake management.
class WSConnection:
"""
Represents the local end of a WebSocket connection to a remote peer.
"""
def __init__(self, connection_type: ConnectionType) -> None:
"""
Constructor for WebSocket connection.
Args:
connection_type: Controls whether the library behaves as a client or server
"""
@property
def state(self) -> ConnectionState:
"""
Returns the current connection state.
Returns:
Current connection state
"""
def send(self, event: Event) -> bytes:
"""
Generate network data for the specified event.
When you want to communicate with a WebSocket peer, construct an event
and pass it to this method. This method will return the bytes that you
should send to the peer.
Args:
event: The event to generate data for
Returns:
The data to send to the peer
"""
def receive_data(self, data: Optional[bytes]) -> None:
"""
Feed network data into the connection instance.
After calling this method, you should call events() to see if the
received data triggered any new events.
Args:
data: Data received from remote peer, or None to signal connection closed
"""
def events(self) -> Generator[Event, None, None]:
"""
A generator that yields pending events.
Each event is an instance of a subclass of Event.
Yields:
Event objects representing WebSocket protocol actions
"""
def initiate_upgrade_connection(self, headers: Headers, path: Union[bytes, str]) -> None:
"""
Initiate an upgrade connection (server only).
This should be used if the request has already been received and parsed.
Args:
headers: HTTP headers represented as a list of 2-tuples
path: A URL path
Raises:
LocalProtocolError: If called on a client connection
"""Enumeration controlling whether the connection acts as a client or server.
class ConnectionType(Enum):
"""An enumeration of connection types."""
CLIENT = 1 # This connection will act as client and talk to a remote server
SERVER = 2 # This connection will act as server and wait for client connections
# Convenience constants
CLIENT = ConnectionType.CLIENT
SERVER = ConnectionType.SERVEREnumeration representing the various states of a WebSocket connection lifecycle.
class ConnectionState(Enum):
"""
RFC 6455, Section 4 - Opening Handshake states.
"""
CONNECTING = 0 # The opening handshake is in progress
OPEN = 1 # The opening handshake is complete
REMOTE_CLOSING = 2 # The remote WebSocket has initiated a connection close
LOCAL_CLOSING = 3 # The local WebSocket has initiated a connection close
CLOSED = 4 # The closing handshake has completed
REJECTING = 5 # The connection was rejected during the opening handshakeDirect WebSocket connection object for advanced use cases requiring more control.
class Connection:
"""
A low-level WebSocket connection object.
This wraps WebSocket frame protocol functionality for exchanging messages
and control frames after the handshake is complete.
"""
def __init__(
self,
connection_type: ConnectionType,
extensions: Optional[List[Extension]] = None,
trailing_data: bytes = b"",
) -> None:
"""
Initialize a low-level connection.
Args:
connection_type: Whether this is a client or server connection
extensions: List of negotiated extensions
trailing_data: Any trailing data from handshake
"""
@property
def state(self) -> ConnectionState:
"""Get the current connection state."""
def send(self, event: Event) -> bytes:
"""
Send an event and return bytes to transmit.
Args:
event: Event to send (Message, Ping, Pong, or CloseConnection)
Returns:
Bytes to send over the network
Raises:
LocalProtocolError: If event cannot be sent in current state
"""
def receive_data(self, data: Optional[bytes]) -> None:
"""
Pass received data to the connection for handling.
Args:
data: Data received from remote peer, or None if connection closed
Raises:
LocalProtocolError: If connection is already closed
"""
def events(self) -> Generator[Event, None, None]:
"""
Return a generator that provides events generated by protocol activity.
Yields:
Event subclasses representing WebSocket protocol actions
"""Low-level HTTP/1.1 handshake implementation for WebSocket upgrade negotiation.
class H11Handshake:
"""
A Handshake implementation for HTTP/1.1 connections.
Handles the WebSocket upgrade handshake process using the h11 HTTP library,
managing the transition from HTTP to WebSocket protocol.
"""
def __init__(self, connection_type: ConnectionType) -> None:
"""
Initialize handshake handler.
Args:
connection_type: Whether this is a client or server handshake
"""
@property
def state(self) -> ConnectionState:
"""Get the current handshake state."""
@property
def connection(self) -> Optional[Connection]:
"""
Return the established connection.
Returns the WebSocket connection after successful handshake,
or None if handshake is not yet complete.
Returns:
Connection object or None
"""
def initiate_upgrade_connection(
self, headers: Headers, path: Union[bytes, str]
) -> None:
"""
Initiate an upgrade connection (server only).
Used when the HTTP request has already been received and parsed
externally, allowing the handshake to process the upgrade.
Args:
headers: HTTP headers as list of 2-tuples
path: URL path from the request
Raises:
LocalProtocolError: If called on a client connection
"""
def send(self, event: Event) -> bytes:
"""
Send an event and return bytes to transmit.
Args:
event: Event to send during handshake
Returns:
Bytes to send over the network
Raises:
LocalProtocolError: If event is not valid for current state
"""
def receive_data(self, data: Optional[bytes]) -> None:
"""
Process received handshake data.
Args:
data: Raw bytes from network, or None if connection closed
"""
def events(self) -> Generator[Event, None, None]:
"""
Generator yielding handshake events.
Yields:
Event objects representing handshake progress
"""from wsproto import WSConnection, ConnectionType
from wsproto.events import Request, AcceptConnection, TextMessage, CloseConnection
import socket
# Create client connection
ws = WSConnection(ConnectionType.CLIENT)
# Connect to server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('echo.websocket.org', 80))
# Send handshake
request_data = ws.send(Request(host='echo.websocket.org', target='/'))
sock.send(request_data)
# Receive handshake response
response_data = sock.recv(4096)
ws.receive_data(response_data)
# Process events
for event in ws.events():
if isinstance(event, AcceptConnection):
print("Connected!")
# Send a message
message_data = ws.send(TextMessage(data="Hello WebSocket!"))
sock.send(message_data)
elif isinstance(event, TextMessage):
print(f"Received: {event.data}")
# Close connection
close_data = ws.send(CloseConnection(code=1000, reason="Done"))
sock.send(close_data)
sock.close()from wsproto import WSConnection, ConnectionType
from wsproto.events import Request, AcceptConnection, TextMessage, CloseConnection
import socket
# Create server socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('localhost', 8765))
server_sock.listen(1)
client_sock, addr = server_sock.accept()
# Create server connection
ws = WSConnection(ConnectionType.SERVER)
# Receive handshake
request_data = client_sock.recv(4096)
ws.receive_data(request_data)
# Process events
for event in ws.events():
if isinstance(event, Request):
print(f"WebSocket request for {event.target}")
# Accept connection
response_data = ws.send(AcceptConnection())
client_sock.send(response_data)
elif isinstance(event, TextMessage):
print(f"Received: {event.data}")
# Echo message back
echo_data = ws.send(TextMessage(data=f"Echo: {event.data}"))
client_sock.send(echo_data)
elif isinstance(event, CloseConnection):
print("Connection closing")
# Respond to close
close_data = ws.send(event.response())
client_sock.send(close_data)
break
client_sock.close()
server_sock.close()from wsproto import WSConnection, ConnectionType
from wsproto.connection import ConnectionState
ws = WSConnection(ConnectionType.CLIENT)
# Check initial state
print(f"Initial state: {ws.state}") # ConnectionState.CONNECTING
# After successful handshake
# ... handshake process ...
print(f"After handshake: {ws.state}") # ConnectionState.OPEN
# After initiating close
# ... close process ...
print(f"After close: {ws.state}") # ConnectionState.LOCAL_CLOSING or CLOSEDInstall with Tessl CLI
npx tessl i tessl/pypi-wsproto