WebSockets state-machine based protocol implementation
WebSocket extensions support including RFC 7692 permessage-deflate compression with configurable parameters and negotiation. The extension system is designed to be pluggable and allows for custom extensions while providing a complete implementation of the standard compression extension.
Abstract base class for implementing WebSocket extensions with the standard negotiation and processing lifecycle.
class Extension:
"""
Base class for WebSocket extensions.
"""
name: str # Extension name for negotiation
def enabled(self) -> bool:
"""
Check if the extension is enabled.
Returns:
True if extension is enabled, False otherwise
"""
def offer(self) -> Union[bool, str]:
"""
Generate an extension offer for the client to send.
Returns:
Extension parameters as string, or boolean indicating simple offer
"""
def accept(self, offer: str) -> Optional[Union[bool, str]]:
"""
Accept an extension offer from the client (server-side).
Args:
offer: The extension offer string from client
Returns:
Extension response parameters, True for simple accept,
None to reject the offer
"""
def finalize(self, offer: str) -> None:
"""
Finalize extension negotiation (client-side).
Args:
offer: The accepted extension parameters from server
"""
def frame_inbound_header(
self,
proto: Union[FrameDecoder, FrameProtocol],
opcode: Opcode,
rsv: RsvBits,
payload_length: int,
) -> Union[CloseReason, RsvBits]:
"""
Process inbound frame header.
Args:
proto: The frame protocol instance
opcode: Frame opcode
rsv: Reserved bits from frame header
payload_length: Length of frame payload
Returns:
Modified RSV bits or CloseReason for error
"""
def frame_inbound_payload_data(
self, proto: Union[FrameDecoder, FrameProtocol], data: bytes
) -> Union[bytes, CloseReason]:
"""
Process inbound frame payload data.
Args:
proto: The frame protocol instance
data: Payload data chunk
Returns:
Processed payload data or CloseReason for error
"""
def frame_inbound_complete(
self, proto: Union[FrameDecoder, FrameProtocol], fin: bool
) -> Union[bytes, CloseReason, None]:
"""
Process inbound frame completion.
Args:
proto: The frame protocol instance
fin: Whether this completes the frame
Returns:
Additional payload data, CloseReason for error, or None
"""
def frame_outbound(
self,
proto: Union[FrameDecoder, FrameProtocol],
opcode: Opcode,
rsv: RsvBits,
data: bytes,
fin: bool,
) -> Tuple[RsvBits, bytes]:
"""
Process outbound frame.
Args:
proto: The frame protocol instance
opcode: Frame opcode
rsv: Reserved bits for frame
data: Frame payload data
fin: Whether this completes the frame
Returns:
Tuple of (modified_rsv_bits, processed_payload_data)
"""Complete implementation of RFC 7692 permessage-deflate compression extension with configurable compression parameters.
class PerMessageDeflate(Extension):
"""
RFC 7692 permessage-deflate WebSocket compression extension.
"""
name = "permessage-deflate" # Standard extension name
DEFAULT_CLIENT_MAX_WINDOW_BITS = 15 # Default client compression window size
DEFAULT_SERVER_MAX_WINDOW_BITS = 15 # Default server compression window size
def __init__(
self,
client_no_context_takeover: bool = False,
client_max_window_bits: Optional[int] = None,
server_no_context_takeover: bool = False,
server_max_window_bits: Optional[int] = None,
) -> None:
"""
Initialize permessage-deflate extension.
Args:
client_no_context_takeover: Client resets compression context each message
client_max_window_bits: Client compression window size (9-15)
server_no_context_takeover: Server resets compression context each message
server_max_window_bits: Server compression window size (9-15)
"""
@property
def client_max_window_bits(self) -> int:
"""
Get client maximum window bits.
Returns:
Window size between 9 and 15 inclusive
"""
@client_max_window_bits.setter
def client_max_window_bits(self, value: int) -> None:
"""
Set client maximum window bits.
Args:
value: Window size between 9 and 15 inclusive
Raises:
ValueError: If value is not between 9 and 15
"""
@property
def server_max_window_bits(self) -> int:
"""
Get server maximum window bits.
Returns:
Window size between 9 and 15 inclusive
"""
@server_max_window_bits.setter
def server_max_window_bits(self, value: int) -> None:
"""
Set server maximum window bits.
Args:
value: Window size between 9 and 15 inclusive
Raises:
ValueError: If value is not between 9 and 15
"""# Dictionary mapping all supported extension names to their class
SUPPORTED_EXTENSIONS = {PerMessageDeflate.name: PerMessageDeflate}from wsproto import WSConnection, ConnectionType
from wsproto.extensions import PerMessageDeflate
from wsproto.events import Request, AcceptConnection
# Client with compression
compression = PerMessageDeflate()
ws = WSConnection(ConnectionType.CLIENT)
# Send request with compression offered
request_data = ws.send(Request(
host='example.com',
target='/ws',
extensions=[compression]
))
# Server accepting compression
ws_server = WSConnection(ConnectionType.SERVER)
ws_server.receive_data(request_data)
for event in ws_server.events():
if isinstance(event, Request):
# Accept with compression
server_compression = PerMessageDeflate()
response_data = ws_server.send(AcceptConnection(
extensions=[server_compression]
))from wsproto.extensions import PerMessageDeflate
# Configure compression parameters
compression = PerMessageDeflate(
client_no_context_takeover=True, # Reset client context each message
client_max_window_bits=12, # Smaller window for less memory
server_no_context_takeover=False, # Keep server context between messages
server_max_window_bits=15, # Maximum compression for server
)
print(f"Client window bits: {compression.client_max_window_bits}")
print(f"Server window bits: {compression.server_max_window_bits}")
print(f"Extension enabled: {compression.enabled()}")
# Use in connection
ws = WSConnection(ConnectionType.CLIENT)
request_data = ws.send(Request(
host='example.com',
target='/ws',
extensions=[compression]
))from wsproto import WSConnection, ConnectionType
from wsproto.extensions import PerMessageDeflate, SUPPORTED_EXTENSIONS
from wsproto.events import Request, AcceptConnection
def handle_extensions(requested_extensions):
"""Handle extension negotiation on server side."""
accepted_extensions = []
for ext_offer in requested_extensions:
ext_name = ext_offer.split(';')[0].strip()
if ext_name in SUPPORTED_EXTENSIONS:
ext_class = SUPPORTED_EXTENSIONS[ext_name]
extension = ext_class()
# Try to accept the offer
result = extension.accept(ext_offer)
if result is not None:
extension.finalize(ext_offer)
accepted_extensions.append(extension)
print(f"Accepted extension: {ext_name}")
else:
print(f"Rejected extension: {ext_name}")
else:
print(f"Unsupported extension: {ext_name}")
return accepted_extensions
# Server handling extensions
ws = WSConnection(ConnectionType.SERVER)
ws.receive_data(handshake_data)
for event in ws.events():
if isinstance(event, Request):
print(f"Requested extensions: {event.extensions}")
# Negotiate extensions
accepted_extensions = handle_extensions(event.extensions)
# Accept connection with negotiated extensions
response_data = ws.send(AcceptConnection(
extensions=accepted_extensions
))from wsproto.extensions import Extension
from wsproto.frame_protocol import Opcode, RsvBits, CloseReason
class SimpleLoggingExtension(Extension):
"""Example custom extension that logs frame information."""
name = "simple-logging"
def __init__(self):
self._enabled = False
def enabled(self) -> bool:
return self._enabled
def offer(self) -> Union[bool, str]:
return True # Simple offer with no parameters
def accept(self, offer: str) -> Optional[Union[bool, str]]:
self._enabled = True
return True # Accept the offer
def finalize(self, offer: str) -> None:
self._enabled = True
def frame_inbound_header(self, proto, opcode, rsv, payload_length):
print(f"Inbound frame: opcode={opcode}, length={payload_length}")
return RsvBits(False, False, False) # Don't modify RSV bits
def frame_inbound_payload_data(self, proto, data):
print(f"Inbound payload: {len(data)} bytes")
return data # Pass through unchanged
def frame_outbound(self, proto, opcode, rsv, data, fin):
print(f"Outbound frame: opcode={opcode}, length={len(data)}, fin={fin}")
return (rsv, data) # Pass through unchanged
# Use custom extension
custom_ext = SimpleLoggingExtension()
ws = WSConnection(ConnectionType.CLIENT)
request_data = ws.send(Request(
host='example.com',
target='/ws',
extensions=[custom_ext]
))from wsproto.extensions import PerMessageDeflate
# Memory-conscious compression (smaller windows)
low_memory_compression = PerMessageDeflate(
client_max_window_bits=9, # Minimum window size
server_max_window_bits=9,
client_no_context_takeover=True, # Reset context to save memory
server_no_context_takeover=True,
)
# High-compression setup (larger windows)
high_compression = PerMessageDeflate(
client_max_window_bits=15, # Maximum window size
server_max_window_bits=15,
client_no_context_takeover=False, # Keep context for better compression
server_no_context_takeover=False,
)
# Test different configurations
extensions_to_test = [
("Low Memory", low_memory_compression),
("High Compression", high_compression),
]
for name, ext in extensions_to_test:
ws = WSConnection(ConnectionType.CLIENT)
print(f"Testing {name} configuration:")
print(f" Client window: {ext.client_max_window_bits}")
print(f" Server window: {ext.server_max_window_bits}")
request_data = ws.send(Request(
host='example.com',
target='/ws',
extensions=[ext]
))from wsproto.extensions import PerMessageDeflate
try:
# Invalid window size
bad_compression = PerMessageDeflate(client_max_window_bits=16)
except ValueError as e:
print(f"Configuration error: {e}")
try:
# Another invalid configuration
bad_compression = PerMessageDeflate(server_max_window_bits=8)
except ValueError as e:
print(f"Configuration error: {e}")
# Valid configuration
good_compression = PerMessageDeflate(
client_max_window_bits=12,
server_max_window_bits=14,
)
print(f"Valid compression created: {good_compression}")Install with Tessl CLI
npx tessl i tessl/pypi-wsproto