CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-wsproto

WebSockets state-machine based protocol implementation

Overview
Eval results
Files

low-level-protocol.mddocs/

Low-Level Protocol

Direct frame protocol access for advanced use cases requiring fine-grained control over WebSocket frame generation and parsing. This module provides the underlying protocol implementation that powers the high-level WSConnection interface.

Capabilities

Frame Protocol

Core WebSocket frame protocol implementation handling frame parsing, generation, and message assembly.

class FrameProtocol:
    """
    Complete frame protocol implementation for WebSocket connections.
    """
    
    def __init__(self, client: bool, extensions: List[Extension]) -> None:
        """
        Initialize frame protocol.
        
        Args:
            client: True if this is a client connection, False for server
            extensions: List of enabled extensions
        """
    
    def receive_bytes(self, data: bytes) -> None:
        """
        Feed received bytes into the protocol for parsing.
        
        Args:
            data: Raw bytes received from network
        """
    
    def received_frames(self) -> Generator[Frame, None, None]:
        """
        Generator yielding parsed frames from received data.
        
        Yields:
            Frame objects representing complete WebSocket frames
        """
    
    def send_data(
        self, payload: Union[bytes, bytearray, str] = b"", fin: bool = True
    ) -> bytes:
        """
        Generate data frame bytes for transmission.
        
        Args:
            payload: Data to send (bytes for binary, str for text)
            fin: Whether this completes the message
            
        Returns:
            Raw frame bytes to send over network
            
        Raises:
            ValueError: If payload type is invalid
            TypeError: If data type changes within a fragmented message
        """
    
    def ping(self, payload: bytes = b"") -> bytes:
        """
        Generate ping frame bytes.
        
        Args:
            payload: Optional ping payload (max 125 bytes)
            
        Returns:
            Raw ping frame bytes to send
        """
    
    def pong(self, payload: bytes = b"") -> bytes:
        """
        Generate pong frame bytes.
        
        Args:
            payload: Optional pong payload (max 125 bytes)
            
        Returns:
            Raw pong frame bytes to send
        """
    
    def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> bytes:
        """
        Generate close frame bytes.
        
        Args:
            code: Close code (see CloseReason enum)
            reason: Optional close reason string
            
        Returns:
            Raw close frame bytes to send
            
        Raises:
            TypeError: If reason provided without code
        """

Frame Data Structures

Core data structures representing WebSocket frames and their components.

class Frame(NamedTuple):
    """
    Complete frame information.
    """
    opcode: Opcode  # Frame opcode (TEXT, BINARY, CLOSE, etc.)
    payload: Union[bytes, str, Tuple[int, str]]  # Frame payload data
    frame_finished: bool  # Whether this frame is complete
    message_finished: bool  # Whether this completes the message

class Header(NamedTuple):
    """
    Frame header information.
    """
    fin: bool  # FIN bit - whether frame completes message
    rsv: RsvBits  # Reserved bits (used by extensions)
    opcode: Opcode  # Frame opcode
    payload_len: int  # Payload length
    masking_key: Optional[bytes]  # Masking key (client frames only)

class RsvBits(NamedTuple):
    """
    Reserved bits in frame header.
    """
    rsv1: bool  # RSV1 bit (used by extensions like permessage-deflate)
    rsv2: bool  # RSV2 bit (reserved for future use)
    rsv3: bool  # RSV3 bit (reserved for future use)

Frame Opcodes

Enumeration of WebSocket frame opcodes as defined in RFC 6455.

class Opcode(IntEnum):
    """
    RFC 6455, Section 5.2 - Base Framing Protocol opcodes.
    """
    
    CONTINUATION = 0x0  # Continuation frame for fragmented messages
    TEXT = 0x1          # Text message frame
    BINARY = 0x2        # Binary message frame
    CLOSE = 0x8         # Connection close frame
    PING = 0x9          # Ping control frame
    PONG = 0xA          # Pong control frame
    
    def iscontrol(self) -> bool:
        """
        Check if this opcode represents a control frame.
        
        Returns:
            True if this is a control frame (CLOSE, PING, PONG)
        """

Close Reason Codes

Standard WebSocket close codes as defined in RFC 6455 Section 7.4.1.

class CloseReason(IntEnum):
    """
    RFC 6455, Section 7.4.1 - Defined Status Codes.
    """
    
    # Standard close codes
    NORMAL_CLOSURE = 1000           # Normal closure
    GOING_AWAY = 1001              # Endpoint going away
    PROTOCOL_ERROR = 1002          # Protocol error
    UNSUPPORTED_DATA = 1003        # Unsupported data type
    NO_STATUS_RCVD = 1005          # No status received (reserved)
    ABNORMAL_CLOSURE = 1006        # Abnormal closure (reserved)
    INVALID_FRAME_PAYLOAD_DATA = 1007  # Invalid frame payload
    POLICY_VIOLATION = 1008        # Policy violation
    MESSAGE_TOO_BIG = 1009         # Message too big
    MANDATORY_EXT = 1010           # Mandatory extension missing
    INTERNAL_ERROR = 1011          # Internal server error
    SERVICE_RESTART = 1012         # Service restart (non-RFC)
    TRY_AGAIN_LATER = 1013         # Try again later (non-RFC)
    TLS_HANDSHAKE_FAILED = 1015    # TLS handshake failed (reserved)

Frame Decoder

Low-level frame decoder for parsing raw WebSocket frame data.

class FrameDecoder:
    """
    Low-level WebSocket frame decoder.
    """
    
    def __init__(
        self, client: bool, extensions: Optional[List[Extension]] = None
    ) -> None:
        """
        Initialize frame decoder.
        
        Args:
            client: True if decoding client frames, False for server frames
            extensions: List of extensions for frame processing
        """
    
    def receive_bytes(self, data: bytes) -> None:
        """
        Feed bytes to the decoder.
        
        Args:
            data: Raw bytes from network
        """
    
    def process_buffer(self) -> Optional[Frame]:
        """
        Process buffered data and return a frame if complete.
        
        Returns:
            Complete Frame object or None if more data needed
            
        Raises:
            ParseFailed: If frame parsing fails
        """

Protocol Exceptions

Exceptions raised during frame protocol operations.

class ParseFailed(Exception):
    """
    Exception raised when frame parsing fails.
    """
    
    def __init__(
        self, msg: str, code: CloseReason = CloseReason.PROTOCOL_ERROR
    ) -> None:
        """
        Initialize parse failure exception.
        
        Args:
            msg: Error message
            code: Associated close code for the error
        """
        self.code: CloseReason  # Close code associated with the error

Message Decoder

Helper class for assembling fragmented WebSocket messages from individual frames.

class MessageDecoder:
    """
    Decoder for assembling WebSocket messages from frames.
    
    Handles fragmented messages by buffering frame data and reconstructing
    complete messages, including UTF-8 decoding for text messages.
    """
    
    def __init__(self) -> None:
        """Initialize message decoder."""
    
    def process_frame(self, frame: Frame) -> Frame:
        """
        Process a frame and return assembled message data.
        
        Handles continuation frames and message assembly, including
        UTF-8 decoding for text messages.
        
        Args:
            frame: Frame to process
            
        Returns:
            Frame with assembled message data
            
        Raises:
            ParseFailed: If frame sequence is invalid or UTF-8 decoding fails
        """

Constants

import struct
# Payload length constants
PAYLOAD_LENGTH_TWO_BYTE = 126    # Indicator for 2-byte length
PAYLOAD_LENGTH_EIGHT_BYTE = 127  # Indicator for 8-byte length
MAX_PAYLOAD_NORMAL = 125         # Maximum single-byte payload length
MAX_PAYLOAD_TWO_BYTE = 65535     # Maximum 2-byte payload length (2^16 - 1)
MAX_PAYLOAD_EIGHT_BYTE = 2**64 - 1  # Maximum 8-byte payload length
MAX_FRAME_PAYLOAD = MAX_PAYLOAD_EIGHT_BYTE  # Alias for maximum frame payload

# Frame header bit masks
FIN_MASK = 0x80      # FIN bit mask
RSV1_MASK = 0x40     # RSV1 bit mask
RSV2_MASK = 0x20     # RSV2 bit mask
RSV3_MASK = 0x10     # RSV3 bit mask
OPCODE_MASK = 0x0F   # Opcode mask
MASK_MASK = 0x80     # Mask bit mask
PAYLOAD_LEN_MASK = 0x7F  # Payload length mask

# WebSocket protocol version
WEBSOCKET_VERSION = b"13"  # RFC 6455 WebSocket version

# Close code ranges
MIN_CLOSE_REASON = 1000              # Minimum valid close code
MIN_PROTOCOL_CLOSE_REASON = 1000     # Minimum protocol-defined close code
MAX_PROTOCOL_CLOSE_REASON = 2999     # Maximum protocol-defined close code
MIN_LIBRARY_CLOSE_REASON = 3000      # Minimum library-defined close code
MAX_LIBRARY_CLOSE_REASON = 3999      # Maximum library-defined close code
MIN_PRIVATE_CLOSE_REASON = 4000      # Minimum private close code
MAX_PRIVATE_CLOSE_REASON = 4999      # Maximum private close code
MAX_CLOSE_REASON = 4999              # Maximum valid close code

LOCAL_ONLY_CLOSE_REASONS = (         # Codes that must not appear on wire
    CloseReason.NO_STATUS_RCVD,
    CloseReason.ABNORMAL_CLOSURE,
    CloseReason.TLS_HANDSHAKE_FAILED,
)

# WebSocket accept GUID
ACCEPT_GUID = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"  # RFC 6455 magic string

# Null mask for server frames
NULL_MASK = struct.pack("!I", 0)

Usage Examples

Direct Frame Protocol Usage

from wsproto.frame_protocol import FrameProtocol, Opcode, CloseReason
from wsproto.extensions import PerMessageDeflate

# Create frame protocol instance
extensions = [PerMessageDeflate()]
protocol = FrameProtocol(client=True, extensions=extensions)

# Send text data
text_frame_bytes = protocol.send_data("Hello, WebSocket!", fin=True)
print(f"Text frame: {len(text_frame_bytes)} bytes")

# Send binary data
binary_frame_bytes = protocol.send_data(b"Binary data", fin=True)
print(f"Binary frame: {len(binary_frame_bytes)} bytes")

# Send fragmented message
fragment1_bytes = protocol.send_data("Start of ", fin=False)
fragment2_bytes = protocol.send_data("fragmented message", fin=True)

# Send control frames
ping_bytes = protocol.ping(b"ping-payload")
pong_bytes = protocol.pong(b"pong-payload")
close_bytes = protocol.close(CloseReason.NORMAL_CLOSURE, "Goodbye")

Frame Parsing

from wsproto.frame_protocol import FrameProtocol, FrameDecoder, Opcode

# Create frame decoder
decoder = FrameDecoder(client=False, extensions=[])

# Process incoming frame data
decoder.receive_bytes(frame_data_chunk1)
decoder.receive_bytes(frame_data_chunk2)

# Try to parse complete frames
frame = decoder.process_buffer()
if frame:
    print(f"Received frame: opcode={frame.opcode}")
    print(f"Payload: {frame.payload}")
    print(f"Frame finished: {frame.frame_finished}")
    print(f"Message finished: {frame.message_finished}")
    
    # Handle different frame types
    if frame.opcode == Opcode.TEXT:
        print(f"Text message: {frame.payload}")
    elif frame.opcode == Opcode.BINARY:
        print(f"Binary message: {len(frame.payload)} bytes")
    elif frame.opcode == Opcode.CLOSE:
        code, reason = frame.payload
        print(f"Close frame: code={code}, reason='{reason}'")
    elif frame.opcode == Opcode.PING:
        print(f"Ping frame: payload={frame.payload}")
    elif frame.opcode == Opcode.PONG:
        print(f"Pong frame: payload={frame.payload}")

Message Assembly

from wsproto.frame_protocol import FrameProtocol, MessageDecoder, Opcode

# Assemble fragmented messages
message_decoder = MessageDecoder()
text_buffer = ""
binary_buffer = b""

protocol = FrameProtocol(client=False, extensions=[])
protocol.receive_bytes(fragmented_frame_data)

for frame in protocol.received_frames():
    if frame.opcode in (Opcode.TEXT, Opcode.BINARY, Opcode.CONTINUATION):
        # Process data frames through message decoder
        message_frame = message_decoder.process_frame(frame)
        
        if message_frame.opcode == Opcode.TEXT:
            text_buffer += message_frame.payload
            if message_frame.message_finished:
                print(f"Complete text message: {text_buffer}")
                text_buffer = ""
                
        elif message_frame.opcode == Opcode.BINARY:
            binary_buffer += message_frame.payload
            if message_frame.message_finished:
                print(f"Complete binary message: {len(binary_buffer)} bytes")
                binary_buffer = b""

Custom Frame Generation

import struct
from wsproto.frame_protocol import Opcode, RsvBits

def create_custom_frame(opcode: Opcode, payload: bytes, fin: bool = True, masked: bool = True):
    """Create a custom WebSocket frame."""
    # Build first byte (FIN + RSV + OPCODE)
    first_byte = (int(fin) << 7) | int(opcode)
    
    # Build second byte (MASK + payload length)
    payload_len = len(payload)
    if payload_len <= 125:
        second_byte = (int(masked) << 7) | payload_len
        extended_length = b""
    elif payload_len <= 65535:
        second_byte = (int(masked) << 7) | 126
        extended_length = struct.pack("!H", payload_len)
    else:
        second_byte = (int(masked) << 7) | 127
        extended_length = struct.pack("!Q", payload_len)
    
    # Build frame
    frame = bytes([first_byte, second_byte]) + extended_length
    
    if masked:
        import os
        mask = os.urandom(4)
        frame += mask
        # Apply mask to payload
        masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
        frame += masked_payload
    else:
        frame += payload
    
    return frame

# Create custom frames
text_frame = create_custom_frame(Opcode.TEXT, b"Hello", fin=True, masked=True)
binary_frame = create_custom_frame(Opcode.BINARY, b"\x01\x02\x03", fin=True, masked=True)
ping_frame = create_custom_frame(Opcode.PING, b"ping", fin=True, masked=True)

Error Handling

from wsproto.frame_protocol import FrameProtocol, ParseFailed, CloseReason

protocol = FrameProtocol(client=False, extensions=[])

try:
    protocol.receive_bytes(malformed_frame_data)
    for frame in protocol.received_frames():
        print(f"Received frame: {frame}")
        
except ParseFailed as e:
    print(f"Frame parsing failed: {e}")
    print(f"Suggested close code: {e.code}")
    
    # Generate appropriate close frame
    close_frame = protocol.close(e.code, str(e))
    
    # Handle specific error types
    if e.code == CloseReason.PROTOCOL_ERROR:
        print("Protocol violation detected")
    elif e.code == CloseReason.INVALID_FRAME_PAYLOAD_DATA:
        print("Invalid payload data")
    elif e.code == CloseReason.MESSAGE_TOO_BIG:
        print("Message exceeds size limit")

Performance Monitoring

from wsproto.frame_protocol import FrameProtocol
import time

class MonitoredFrameProtocol(FrameProtocol):
    """Frame protocol with performance monitoring."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.frame_count = 0
        self.bytes_sent = 0
        self.bytes_received = 0
        self.start_time = time.time()
    
    def send_data(self, payload=b"", fin=True):
        frame_bytes = super().send_data(payload, fin)
        self.frame_count += 1
        self.bytes_sent += len(frame_bytes)
        return frame_bytes
    
    def receive_bytes(self, data):
        self.bytes_received += len(data)
        super().receive_bytes(data)
    
    def get_stats(self):
        elapsed = time.time() - self.start_time
        return {
            'frames_sent': self.frame_count,
            'bytes_sent': self.bytes_sent,
            'bytes_received': self.bytes_received,
            'elapsed_time': elapsed,
            'send_rate': self.bytes_sent / elapsed if elapsed > 0 else 0,
            'receive_rate': self.bytes_received / elapsed if elapsed > 0 else 0,
        }

# Use monitored protocol
protocol = MonitoredFrameProtocol(client=True, extensions=[])

# Send some data
protocol.send_data("Hello")
protocol.send_data(b"Binary data")
protocol.ping(b"ping")

# Check performance
stats = protocol.get_stats()
print(f"Performance stats: {stats}")

Install with Tessl CLI

npx tessl i tessl/pypi-wsproto

docs

connection-management.md

event-system.md

extensions.md

index.md

low-level-protocol.md

tile.json