WebSockets state-machine based protocol implementation
Direct frame protocol access for advanced use cases requiring fine-grained control over WebSocket frame generation and parsing. This module provides the underlying protocol implementation that powers the high-level WSConnection interface.
Core WebSocket frame protocol implementation handling frame parsing, generation, and message assembly.
class FrameProtocol:
"""
Complete frame protocol implementation for WebSocket connections.
"""
def __init__(self, client: bool, extensions: List[Extension]) -> None:
"""
Initialize frame protocol.
Args:
client: True if this is a client connection, False for server
extensions: List of enabled extensions
"""
def receive_bytes(self, data: bytes) -> None:
"""
Feed received bytes into the protocol for parsing.
Args:
data: Raw bytes received from network
"""
def received_frames(self) -> Generator[Frame, None, None]:
"""
Generator yielding parsed frames from received data.
Yields:
Frame objects representing complete WebSocket frames
"""
def send_data(
self, payload: Union[bytes, bytearray, str] = b"", fin: bool = True
) -> bytes:
"""
Generate data frame bytes for transmission.
Args:
payload: Data to send (bytes for binary, str for text)
fin: Whether this completes the message
Returns:
Raw frame bytes to send over network
Raises:
ValueError: If payload type is invalid
TypeError: If data type changes within a fragmented message
"""
def ping(self, payload: bytes = b"") -> bytes:
"""
Generate ping frame bytes.
Args:
payload: Optional ping payload (max 125 bytes)
Returns:
Raw ping frame bytes to send
"""
def pong(self, payload: bytes = b"") -> bytes:
"""
Generate pong frame bytes.
Args:
payload: Optional pong payload (max 125 bytes)
Returns:
Raw pong frame bytes to send
"""
def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> bytes:
"""
Generate close frame bytes.
Args:
code: Close code (see CloseReason enum)
reason: Optional close reason string
Returns:
Raw close frame bytes to send
Raises:
TypeError: If reason provided without code
"""Core data structures representing WebSocket frames and their components.
class Frame(NamedTuple):
"""
Complete frame information.
"""
opcode: Opcode # Frame opcode (TEXT, BINARY, CLOSE, etc.)
payload: Union[bytes, str, Tuple[int, str]] # Frame payload data
frame_finished: bool # Whether this frame is complete
message_finished: bool # Whether this completes the message
class Header(NamedTuple):
"""
Frame header information.
"""
fin: bool # FIN bit - whether frame completes message
rsv: RsvBits # Reserved bits (used by extensions)
opcode: Opcode # Frame opcode
payload_len: int # Payload length
masking_key: Optional[bytes] # Masking key (client frames only)
class RsvBits(NamedTuple):
"""
Reserved bits in frame header.
"""
rsv1: bool # RSV1 bit (used by extensions like permessage-deflate)
rsv2: bool # RSV2 bit (reserved for future use)
rsv3: bool # RSV3 bit (reserved for future use)Enumeration of WebSocket frame opcodes as defined in RFC 6455.
class Opcode(IntEnum):
"""
RFC 6455, Section 5.2 - Base Framing Protocol opcodes.
"""
CONTINUATION = 0x0 # Continuation frame for fragmented messages
TEXT = 0x1 # Text message frame
BINARY = 0x2 # Binary message frame
CLOSE = 0x8 # Connection close frame
PING = 0x9 # Ping control frame
PONG = 0xA # Pong control frame
def iscontrol(self) -> bool:
"""
Check if this opcode represents a control frame.
Returns:
True if this is a control frame (CLOSE, PING, PONG)
"""Standard WebSocket close codes as defined in RFC 6455 Section 7.4.1.
class CloseReason(IntEnum):
"""
RFC 6455, Section 7.4.1 - Defined Status Codes.
"""
# Standard close codes
NORMAL_CLOSURE = 1000 # Normal closure
GOING_AWAY = 1001 # Endpoint going away
PROTOCOL_ERROR = 1002 # Protocol error
UNSUPPORTED_DATA = 1003 # Unsupported data type
NO_STATUS_RCVD = 1005 # No status received (reserved)
ABNORMAL_CLOSURE = 1006 # Abnormal closure (reserved)
INVALID_FRAME_PAYLOAD_DATA = 1007 # Invalid frame payload
POLICY_VIOLATION = 1008 # Policy violation
MESSAGE_TOO_BIG = 1009 # Message too big
MANDATORY_EXT = 1010 # Mandatory extension missing
INTERNAL_ERROR = 1011 # Internal server error
SERVICE_RESTART = 1012 # Service restart (non-RFC)
TRY_AGAIN_LATER = 1013 # Try again later (non-RFC)
TLS_HANDSHAKE_FAILED = 1015 # TLS handshake failed (reserved)Low-level frame decoder for parsing raw WebSocket frame data.
class FrameDecoder:
"""
Low-level WebSocket frame decoder.
"""
def __init__(
self, client: bool, extensions: Optional[List[Extension]] = None
) -> None:
"""
Initialize frame decoder.
Args:
client: True if decoding client frames, False for server frames
extensions: List of extensions for frame processing
"""
def receive_bytes(self, data: bytes) -> None:
"""
Feed bytes to the decoder.
Args:
data: Raw bytes from network
"""
def process_buffer(self) -> Optional[Frame]:
"""
Process buffered data and return a frame if complete.
Returns:
Complete Frame object or None if more data needed
Raises:
ParseFailed: If frame parsing fails
"""Exceptions raised during frame protocol operations.
class ParseFailed(Exception):
"""
Exception raised when frame parsing fails.
"""
def __init__(
self, msg: str, code: CloseReason = CloseReason.PROTOCOL_ERROR
) -> None:
"""
Initialize parse failure exception.
Args:
msg: Error message
code: Associated close code for the error
"""
self.code: CloseReason # Close code associated with the errorHelper class for assembling fragmented WebSocket messages from individual frames.
class MessageDecoder:
"""
Decoder for assembling WebSocket messages from frames.
Handles fragmented messages by buffering frame data and reconstructing
complete messages, including UTF-8 decoding for text messages.
"""
def __init__(self) -> None:
"""Initialize message decoder."""
def process_frame(self, frame: Frame) -> Frame:
"""
Process a frame and return assembled message data.
Handles continuation frames and message assembly, including
UTF-8 decoding for text messages.
Args:
frame: Frame to process
Returns:
Frame with assembled message data
Raises:
ParseFailed: If frame sequence is invalid or UTF-8 decoding fails
"""import struct
# Payload length constants
PAYLOAD_LENGTH_TWO_BYTE = 126 # Indicator for 2-byte length
PAYLOAD_LENGTH_EIGHT_BYTE = 127 # Indicator for 8-byte length
MAX_PAYLOAD_NORMAL = 125 # Maximum single-byte payload length
MAX_PAYLOAD_TWO_BYTE = 65535 # Maximum 2-byte payload length (2^16 - 1)
MAX_PAYLOAD_EIGHT_BYTE = 2**64 - 1 # Maximum 8-byte payload length
MAX_FRAME_PAYLOAD = MAX_PAYLOAD_EIGHT_BYTE # Alias for maximum frame payload
# Frame header bit masks
FIN_MASK = 0x80 # FIN bit mask
RSV1_MASK = 0x40 # RSV1 bit mask
RSV2_MASK = 0x20 # RSV2 bit mask
RSV3_MASK = 0x10 # RSV3 bit mask
OPCODE_MASK = 0x0F # Opcode mask
MASK_MASK = 0x80 # Mask bit mask
PAYLOAD_LEN_MASK = 0x7F # Payload length mask
# WebSocket protocol version
WEBSOCKET_VERSION = b"13" # RFC 6455 WebSocket version
# Close code ranges
MIN_CLOSE_REASON = 1000 # Minimum valid close code
MIN_PROTOCOL_CLOSE_REASON = 1000 # Minimum protocol-defined close code
MAX_PROTOCOL_CLOSE_REASON = 2999 # Maximum protocol-defined close code
MIN_LIBRARY_CLOSE_REASON = 3000 # Minimum library-defined close code
MAX_LIBRARY_CLOSE_REASON = 3999 # Maximum library-defined close code
MIN_PRIVATE_CLOSE_REASON = 4000 # Minimum private close code
MAX_PRIVATE_CLOSE_REASON = 4999 # Maximum private close code
MAX_CLOSE_REASON = 4999 # Maximum valid close code
LOCAL_ONLY_CLOSE_REASONS = ( # Codes that must not appear on wire
CloseReason.NO_STATUS_RCVD,
CloseReason.ABNORMAL_CLOSURE,
CloseReason.TLS_HANDSHAKE_FAILED,
)
# WebSocket accept GUID
ACCEPT_GUID = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # RFC 6455 magic string
# Null mask for server frames
NULL_MASK = struct.pack("!I", 0)from wsproto.frame_protocol import FrameProtocol, Opcode, CloseReason
from wsproto.extensions import PerMessageDeflate
# Create frame protocol instance
extensions = [PerMessageDeflate()]
protocol = FrameProtocol(client=True, extensions=extensions)
# Send text data
text_frame_bytes = protocol.send_data("Hello, WebSocket!", fin=True)
print(f"Text frame: {len(text_frame_bytes)} bytes")
# Send binary data
binary_frame_bytes = protocol.send_data(b"Binary data", fin=True)
print(f"Binary frame: {len(binary_frame_bytes)} bytes")
# Send fragmented message
fragment1_bytes = protocol.send_data("Start of ", fin=False)
fragment2_bytes = protocol.send_data("fragmented message", fin=True)
# Send control frames
ping_bytes = protocol.ping(b"ping-payload")
pong_bytes = protocol.pong(b"pong-payload")
close_bytes = protocol.close(CloseReason.NORMAL_CLOSURE, "Goodbye")from wsproto.frame_protocol import FrameProtocol, FrameDecoder, Opcode
# Create frame decoder
decoder = FrameDecoder(client=False, extensions=[])
# Process incoming frame data
decoder.receive_bytes(frame_data_chunk1)
decoder.receive_bytes(frame_data_chunk2)
# Try to parse complete frames
frame = decoder.process_buffer()
if frame:
print(f"Received frame: opcode={frame.opcode}")
print(f"Payload: {frame.payload}")
print(f"Frame finished: {frame.frame_finished}")
print(f"Message finished: {frame.message_finished}")
# Handle different frame types
if frame.opcode == Opcode.TEXT:
print(f"Text message: {frame.payload}")
elif frame.opcode == Opcode.BINARY:
print(f"Binary message: {len(frame.payload)} bytes")
elif frame.opcode == Opcode.CLOSE:
code, reason = frame.payload
print(f"Close frame: code={code}, reason='{reason}'")
elif frame.opcode == Opcode.PING:
print(f"Ping frame: payload={frame.payload}")
elif frame.opcode == Opcode.PONG:
print(f"Pong frame: payload={frame.payload}")from wsproto.frame_protocol import FrameProtocol, MessageDecoder, Opcode
# Assemble fragmented messages
message_decoder = MessageDecoder()
text_buffer = ""
binary_buffer = b""
protocol = FrameProtocol(client=False, extensions=[])
protocol.receive_bytes(fragmented_frame_data)
for frame in protocol.received_frames():
if frame.opcode in (Opcode.TEXT, Opcode.BINARY, Opcode.CONTINUATION):
# Process data frames through message decoder
message_frame = message_decoder.process_frame(frame)
if message_frame.opcode == Opcode.TEXT:
text_buffer += message_frame.payload
if message_frame.message_finished:
print(f"Complete text message: {text_buffer}")
text_buffer = ""
elif message_frame.opcode == Opcode.BINARY:
binary_buffer += message_frame.payload
if message_frame.message_finished:
print(f"Complete binary message: {len(binary_buffer)} bytes")
binary_buffer = b""import struct
from wsproto.frame_protocol import Opcode, RsvBits
def create_custom_frame(opcode: Opcode, payload: bytes, fin: bool = True, masked: bool = True):
"""Create a custom WebSocket frame."""
# Build first byte (FIN + RSV + OPCODE)
first_byte = (int(fin) << 7) | int(opcode)
# Build second byte (MASK + payload length)
payload_len = len(payload)
if payload_len <= 125:
second_byte = (int(masked) << 7) | payload_len
extended_length = b""
elif payload_len <= 65535:
second_byte = (int(masked) << 7) | 126
extended_length = struct.pack("!H", payload_len)
else:
second_byte = (int(masked) << 7) | 127
extended_length = struct.pack("!Q", payload_len)
# Build frame
frame = bytes([first_byte, second_byte]) + extended_length
if masked:
import os
mask = os.urandom(4)
frame += mask
# Apply mask to payload
masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
frame += masked_payload
else:
frame += payload
return frame
# Create custom frames
text_frame = create_custom_frame(Opcode.TEXT, b"Hello", fin=True, masked=True)
binary_frame = create_custom_frame(Opcode.BINARY, b"\x01\x02\x03", fin=True, masked=True)
ping_frame = create_custom_frame(Opcode.PING, b"ping", fin=True, masked=True)from wsproto.frame_protocol import FrameProtocol, ParseFailed, CloseReason
protocol = FrameProtocol(client=False, extensions=[])
try:
protocol.receive_bytes(malformed_frame_data)
for frame in protocol.received_frames():
print(f"Received frame: {frame}")
except ParseFailed as e:
print(f"Frame parsing failed: {e}")
print(f"Suggested close code: {e.code}")
# Generate appropriate close frame
close_frame = protocol.close(e.code, str(e))
# Handle specific error types
if e.code == CloseReason.PROTOCOL_ERROR:
print("Protocol violation detected")
elif e.code == CloseReason.INVALID_FRAME_PAYLOAD_DATA:
print("Invalid payload data")
elif e.code == CloseReason.MESSAGE_TOO_BIG:
print("Message exceeds size limit")from wsproto.frame_protocol import FrameProtocol
import time
class MonitoredFrameProtocol(FrameProtocol):
"""Frame protocol with performance monitoring."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.frame_count = 0
self.bytes_sent = 0
self.bytes_received = 0
self.start_time = time.time()
def send_data(self, payload=b"", fin=True):
frame_bytes = super().send_data(payload, fin)
self.frame_count += 1
self.bytes_sent += len(frame_bytes)
return frame_bytes
def receive_bytes(self, data):
self.bytes_received += len(data)
super().receive_bytes(data)
def get_stats(self):
elapsed = time.time() - self.start_time
return {
'frames_sent': self.frame_count,
'bytes_sent': self.bytes_sent,
'bytes_received': self.bytes_received,
'elapsed_time': elapsed,
'send_rate': self.bytes_sent / elapsed if elapsed > 0 else 0,
'receive_rate': self.bytes_received / elapsed if elapsed > 0 else 0,
}
# Use monitored protocol
protocol = MonitoredFrameProtocol(client=True, extensions=[])
# Send some data
protocol.send_data("Hello")
protocol.send_data(b"Binary data")
protocol.ping(b"ping")
# Check performance
stats = protocol.get_stats()
print(f"Performance stats: {stats}")Install with Tessl CLI
npx tessl i tessl/pypi-wsproto