CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

protocol.mddocs/

Protocol Implementation

Sans-I/O WebSocket protocol implementation providing the core WebSocket functionality independent of I/O handling. This enables custom integrations, advanced use cases, and provides the foundation for both asyncio and synchronous implementations.

Capabilities

Protocol Base Classes

Core protocol implementation that handles WebSocket frame processing, state management, and message assembly without performing any I/O operations.

class Protocol:
    """
    Base WebSocket protocol implementation (Sans-I/O).
    
    Handles WebSocket frame processing, connection state management,
    and message assembly/disassembly without performing I/O operations.
    """
    
    def __init__(
        self,
        side: Side,
        *,
        logger: LoggerLike = None,
        max_size: int = 2**20,
        extensions: List[ExtensionFactory] = None
    ):
        """
        Initialize protocol handler.

        Parameters:
        - side: Side.CLIENT or Side.SERVER
        - logger: Logger instance for protocol logging
        - max_size: Maximum message size (bytes)
        - extensions: List of WebSocket extensions
        """
    
    @property
    def state(self) -> State:
        """Get current connection state."""
    
    @property
    def side(self) -> Side:
        """Get protocol side (CLIENT or SERVER)."""
    
    @property
    def close_code(self) -> int | None:
        """Get close code if connection is closed."""
    
    @property
    def close_reason(self) -> str | None:
        """Get close reason if connection is closed."""
    
    def send_text(self, data: str) -> List[bytes]:
        """
        Create frames for sending text message.

        Parameters:
        - data: Text message to send

        Returns:
        List[bytes]: List of frame bytes to transmit

        Raises:
        - ProtocolError: If connection state doesn't allow sending
        """
    
    def send_binary(self, data: bytes) -> List[bytes]:
        """
        Create frames for sending binary message.

        Parameters:
        - data: Binary message to send

        Returns:
        List[bytes]: List of frame bytes to transmit

        Raises:
        - ProtocolError: If connection state doesn't allow sending
        """
    
    def send_ping(self, data: bytes = b"") -> List[bytes]:
        """
        Create ping frame.

        Parameters:
        - data: Optional ping payload

        Returns:
        List[bytes]: List of frame bytes to transmit

        Raises:
        - ProtocolError: If connection state doesn't allow ping
        """
    
    def send_pong(self, data: bytes = b"") -> List[bytes]:
        """
        Create pong frame.

        Parameters:
        - data: Pong payload (should match received ping)

        Returns:
        List[bytes]: List of frame bytes to transmit

        Raises:
        - ProtocolError: If connection state doesn't allow pong
        """
    
    def send_close(self, code: int = 1000, reason: str = "") -> List[bytes]:
        """
        Create close frame and update connection state.

        Parameters:
        - code: Close code (1000 for normal closure)
        - reason: Human-readable close reason

        Returns:
        List[bytes]: List of frame bytes to transmit

        Raises:
        - ProtocolError: If close code is invalid
        """
    
    def receive_data(self, data: bytes) -> List[Event]:
        """
        Process received data and return events.

        Parameters:
        - data: Raw bytes received from network

        Returns:
        List[Event]: List of protocol events (messages, pings, pongs, close)

        Raises:
        - ProtocolError: If data violates WebSocket protocol
        """
    
    def receive_eof(self) -> List[Event]:
        """
        Handle end-of-file condition.

        Returns:
        List[Event]: List of protocol events (typically ConnectionClosed)
        """

class ClientProtocol(Protocol):
    """
    WebSocket client protocol implementation.
    
    Handles client-specific protocol behavior including handshake validation
    and client-side frame masking.
    """
    
    def __init__(
        self,
        *,
        logger: LoggerLike = None,
        max_size: int = 2**20,
        extensions: List[ClientExtensionFactory] = None
    ):
        """
        Initialize client protocol.

        Parameters:
        - logger: Logger instance for protocol logging
        - max_size: Maximum message size (bytes)
        - extensions: List of client WebSocket extensions
        """

class ServerProtocol(Protocol):
    """
    WebSocket server protocol implementation.
    
    Handles server-specific protocol behavior including handshake processing
    and server-side frame handling (no masking).
    """
    
    def __init__(
        self,
        *,
        logger: LoggerLike = None,
        max_size: int = 2**20,
        extensions: List[ServerExtensionFactory] = None
    ):
        """
        Initialize server protocol.

        Parameters:
        - logger: Logger instance for protocol logging
        - max_size: Maximum message size (bytes)
        - extensions: List of server WebSocket extensions
        """

Protocol State Management

Enumerations and utilities for managing WebSocket connection state and side identification.

class Side(Enum):
    """WebSocket connection side identification."""
    CLIENT = "client"
    SERVER = "server"

class State(Enum):
    """WebSocket connection states."""
    CONNECTING = "connecting"  # Initial state during handshake
    OPEN = "open"             # Connection established and ready
    CLOSING = "closing"       # Close frame sent, waiting for close response
    CLOSED = "closed"         # Connection fully closed

Protocol Events

Event types returned by the protocol when processing received data.

class Event:
    """Base class for protocol events."""
    pass

class TextMessage(Event):
    """Text message received event."""
    def __init__(self, data: str):
        self.data = data

class BinaryMessage(Event):
    """Binary message received event."""
    def __init__(self, data: bytes):
        self.data = data

class Ping(Event):
    """Ping frame received event."""
    def __init__(self, data: bytes):
        self.data = data

class Pong(Event):
    """Pong frame received event."""
    def __init__(self, data: bytes):
        self.data = data

class Close(Event):
    """Close frame received event."""
    def __init__(self, code: int, reason: str):
        self.code = code
        self.reason = reason

class ConnectionClosed(Event):
    """Connection closed event (EOF or error)."""
    def __init__(self, exception: Exception = None):
        self.exception = exception

Usage Examples

Basic Protocol Usage

from websockets.protocol import ClientProtocol, Side, State
from websockets.frames import Frame, Opcode

def basic_protocol_example():
    """Demonstrate basic protocol usage."""
    # Create client protocol
    protocol = ClientProtocol(max_size=1024*1024)  # 1MB max message
    
    print(f"Initial state: {protocol.state}")
    print(f"Protocol side: {protocol.side}")
    
    # Send a text message
    frames_to_send = protocol.send_text("Hello, WebSocket!")
    print(f"Frames to send: {len(frames_to_send)}")
    
    # Send a ping
    ping_frames = protocol.send_ping(b"ping-data")
    print(f"Ping frames: {len(ping_frames)}")
    
    # Simulate receiving data (this would come from network)
    # In real usage, you'd receive actual bytes from socket
    received_data = b"..." # Raw frame bytes
    # events = protocol.receive_data(received_data)
    
    # Close connection
    close_frames = protocol.send_close(1000, "Normal closure")
    print(f"Close frames: {len(close_frames)}")
    print(f"Final state: {protocol.state}")

basic_protocol_example()

Custom Protocol Integration

import socket
from websockets.protocol import ServerProtocol
from websockets.exceptions import ProtocolError

class CustomWebSocketServer:
    """Example of custom WebSocket server using Sans-I/O protocol."""
    
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.socket = None
        
    def start(self):
        """Start the custom server."""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind((self.host, self.port))
        self.socket.listen(5)
        
        print(f"Custom WebSocket server listening on {self.host}:{self.port}")
        
        while True:
            try:
                client_socket, address = self.socket.accept()
                print(f"Client connected: {address}")
                
                # Handle client in separate method
                self.handle_client(client_socket, address)
                
            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"Server error: {e}")
        
        self.socket.close()
    
    def handle_client(self, client_socket: socket.socket, address):
        """Handle individual client connection."""
        protocol = ServerProtocol(max_size=64*1024)  # 64KB max message
        
        try:
            while True:
                # Receive data from client
                data = client_socket.recv(4096)
                if not data:
                    # Client disconnected
                    events = protocol.receive_eof()
                    break
                
                # Process data through protocol
                try:
                    events = protocol.receive_data(data)
                except ProtocolError as e:
                    print(f"Protocol error: {e}")
                    break
                
                # Handle protocol events
                for event in events:
                    response_frames = self.process_event(event, protocol)
                    
                    # Send response frames
                    for frame_bytes in response_frames:
                        client_socket.send(frame_bytes)
                
                # Check if connection should close
                if protocol.state == State.CLOSED:
                    break
                    
        except Exception as e:
            print(f"Client error: {e}")
        finally:
            client_socket.close()
            print(f"Client disconnected: {address}")
    
    def process_event(self, event, protocol):
        """Process protocol events and return response frames."""
        from websockets.protocol import TextMessage, BinaryMessage, Ping, Close
        
        if isinstance(event, TextMessage):
            # Echo text messages
            return protocol.send_text(f"Echo: {event.data}")
            
        elif isinstance(event, BinaryMessage):
            # Echo binary messages
            return protocol.send_binary(b"Echo: " + event.data)
            
        elif isinstance(event, Ping):
            # Respond to pings with pongs
            return protocol.send_pong(event.data)
            
        elif isinstance(event, Close):
            # Respond to close frames
            return protocol.send_close(event.code, event.reason)
            
        return []

# Example usage
def run_custom_server():
    server = CustomWebSocketServer("localhost", 8765)
    try:
        server.start()
    except KeyboardInterrupt:
        print("\nServer stopped")

# Uncomment to run
# run_custom_server()

Protocol State Machine

from websockets.protocol import ClientProtocol, State
from websockets.exceptions import ProtocolError

def protocol_state_machine_example():
    """Demonstrate protocol state transitions."""
    protocol = ClientProtocol()
    
    def print_state():
        print(f"Current state: {protocol.state}")
    
    print("=== Protocol State Machine Example ===")
    
    # Initial state
    print_state()  # Should be CONNECTING initially
    
    # Simulate successful handshake (this would be done by connection layer)
    # protocol._state = State.OPEN  # Private attribute, don't do this in real code
    
    try:
        # Try to send message in OPEN state
        frames = protocol.send_text("Hello")
        print(f"Sent text message: {len(frames)} frames")
        print_state()
        
        # Send ping
        ping_frames = protocol.send_ping(b"test")
        print(f"Sent ping: {len(ping_frames)} frames")
        print_state()
        
        # Start closing
        close_frames = protocol.send_close(1000, "Normal closure")
        print(f"Sent close frame: {len(close_frames)} frames")
        print_state()  # Should be CLOSING
        
        # Try to send after close (should fail)
        try:
            protocol.send_text("This should fail")
        except ProtocolError as e:
            print(f"Expected error: {e}")
        
        # Simulate receiving close response
        # In real usage, this would come from receive_data()
        # protocol._state = State.CLOSED
        print_state()
        
    except ProtocolError as e:
        print(f"Protocol error: {e}")

protocol_state_machine_example()

Frame Processing Example

from websockets.protocol import ServerProtocol
from websockets.frames import Frame, Opcode
import struct

def frame_processing_example():
    """Demonstrate low-level frame processing."""
    protocol = ServerProtocol()
    
    # Create a simple text frame manually (for demonstration)
    message = "Hello, WebSocket!"
    message_bytes = message.encode('utf-8')
    
    # WebSocket frame format (simplified)
    frame_data = bytearray()
    
    # First byte: FIN=1, RSV=000, Opcode=TEXT(1)
    frame_data.append(0x81)  # 10000001
    
    # Payload length
    if len(message_bytes) < 126:
        frame_data.append(len(message_bytes))
    elif len(message_bytes) < 65536:
        frame_data.append(126)
        frame_data.extend(struct.pack('!H', len(message_bytes)))
    else:
        frame_data.append(127)
        frame_data.extend(struct.pack('!Q', len(message_bytes)))
    
    # Payload data
    frame_data.extend(message_bytes)
    
    print(f"Created frame: {len(frame_data)} bytes")
    print(f"Frame hex: {frame_data.hex()}")
    
    # Process frame through protocol
    try:
        events = protocol.receive_data(bytes(frame_data))
        print(f"Generated {len(events)} events")
        
        for event in events:
            if hasattr(event, 'data'):
                print(f"Event data: {event.data}")
                
    except Exception as e:
        print(f"Frame processing error: {e}")

frame_processing_example()

Extension Integration

from websockets.protocol import ClientProtocol
from websockets.extensions.base import Extension

class SimpleExtension(Extension):
    """Example custom extension."""
    
    def __init__(self, name: str):
        super().__init__()
        self.name = name
    
    def encode(self, frame):
        """Process outgoing frames."""
        # Simple example: add prefix to text frames
        if frame.opcode == 1:  # TEXT frame
            modified_data = f"[{self.name}] ".encode() + frame.data
            return frame._replace(data=modified_data)
        return frame
    
    def decode(self, frame):
        """Process incoming frames."""
        # Simple example: remove prefix from text frames
        if frame.opcode == 1:  # TEXT frame
            prefix = f"[{self.name}] ".encode()
            if frame.data.startswith(prefix):
                modified_data = frame.data[len(prefix):]
                return frame._replace(data=modified_data)
        return frame

def extension_example():
    """Demonstrate protocol with custom extensions."""
    # Create extension
    extension = SimpleExtension("MyExt")
    
    # Create protocol with extension
    # Note: Real extension integration is more complex
    protocol = ClientProtocol()
    
    print("Extension integration example")
    print(f"Extension name: {extension.name}")
    
    # In a real implementation, extensions would be negotiated
    # during handshake and integrated into the protocol

extension_example()

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json