An implementation of the WebSocket Protocol (RFC 6455 & 7692)
Sans-I/O WebSocket protocol implementation providing the core WebSocket functionality independent of I/O handling. This enables custom integrations, advanced use cases, and provides the foundation for both asyncio and synchronous implementations.
Core protocol implementation that handles WebSocket frame processing, state management, and message assembly without performing any I/O operations.
class Protocol:
"""
Base WebSocket protocol implementation (Sans-I/O).
Handles WebSocket frame processing, connection state management,
and message assembly/disassembly without performing I/O operations.
"""
def __init__(
self,
side: Side,
*,
logger: LoggerLike = None,
max_size: int = 2**20,
extensions: List[ExtensionFactory] = None
):
"""
Initialize protocol handler.
Parameters:
- side: Side.CLIENT or Side.SERVER
- logger: Logger instance for protocol logging
- max_size: Maximum message size (bytes)
- extensions: List of WebSocket extensions
"""
@property
def state(self) -> State:
"""Get current connection state."""
@property
def side(self) -> Side:
"""Get protocol side (CLIENT or SERVER)."""
@property
def close_code(self) -> int | None:
"""Get close code if connection is closed."""
@property
def close_reason(self) -> str | None:
"""Get close reason if connection is closed."""
def send_text(self, data: str) -> List[bytes]:
"""
Create frames for sending text message.
Parameters:
- data: Text message to send
Returns:
List[bytes]: List of frame bytes to transmit
Raises:
- ProtocolError: If connection state doesn't allow sending
"""
def send_binary(self, data: bytes) -> List[bytes]:
"""
Create frames for sending binary message.
Parameters:
- data: Binary message to send
Returns:
List[bytes]: List of frame bytes to transmit
Raises:
- ProtocolError: If connection state doesn't allow sending
"""
def send_ping(self, data: bytes = b"") -> List[bytes]:
"""
Create ping frame.
Parameters:
- data: Optional ping payload
Returns:
List[bytes]: List of frame bytes to transmit
Raises:
- ProtocolError: If connection state doesn't allow ping
"""
def send_pong(self, data: bytes = b"") -> List[bytes]:
"""
Create pong frame.
Parameters:
- data: Pong payload (should match received ping)
Returns:
List[bytes]: List of frame bytes to transmit
Raises:
- ProtocolError: If connection state doesn't allow pong
"""
def send_close(self, code: int = 1000, reason: str = "") -> List[bytes]:
"""
Create close frame and update connection state.
Parameters:
- code: Close code (1000 for normal closure)
- reason: Human-readable close reason
Returns:
List[bytes]: List of frame bytes to transmit
Raises:
- ProtocolError: If close code is invalid
"""
def receive_data(self, data: bytes) -> List[Event]:
"""
Process received data and return events.
Parameters:
- data: Raw bytes received from network
Returns:
List[Event]: List of protocol events (messages, pings, pongs, close)
Raises:
- ProtocolError: If data violates WebSocket protocol
"""
def receive_eof(self) -> List[Event]:
"""
Handle end-of-file condition.
Returns:
List[Event]: List of protocol events (typically ConnectionClosed)
"""
class ClientProtocol(Protocol):
"""
WebSocket client protocol implementation.
Handles client-specific protocol behavior including handshake validation
and client-side frame masking.
"""
def __init__(
self,
*,
logger: LoggerLike = None,
max_size: int = 2**20,
extensions: List[ClientExtensionFactory] = None
):
"""
Initialize client protocol.
Parameters:
- logger: Logger instance for protocol logging
- max_size: Maximum message size (bytes)
- extensions: List of client WebSocket extensions
"""
class ServerProtocol(Protocol):
"""
WebSocket server protocol implementation.
Handles server-specific protocol behavior including handshake processing
and server-side frame handling (no masking).
"""
def __init__(
self,
*,
logger: LoggerLike = None,
max_size: int = 2**20,
extensions: List[ServerExtensionFactory] = None
):
"""
Initialize server protocol.
Parameters:
- logger: Logger instance for protocol logging
- max_size: Maximum message size (bytes)
- extensions: List of server WebSocket extensions
"""Enumerations and utilities for managing WebSocket connection state and side identification.
class Side(Enum):
"""WebSocket connection side identification."""
CLIENT = "client"
SERVER = "server"
class State(Enum):
"""WebSocket connection states."""
CONNECTING = "connecting" # Initial state during handshake
OPEN = "open" # Connection established and ready
CLOSING = "closing" # Close frame sent, waiting for close response
CLOSED = "closed" # Connection fully closedEvent types returned by the protocol when processing received data.
class Event:
"""Base class for protocol events."""
pass
class TextMessage(Event):
"""Text message received event."""
def __init__(self, data: str):
self.data = data
class BinaryMessage(Event):
"""Binary message received event."""
def __init__(self, data: bytes):
self.data = data
class Ping(Event):
"""Ping frame received event."""
def __init__(self, data: bytes):
self.data = data
class Pong(Event):
"""Pong frame received event."""
def __init__(self, data: bytes):
self.data = data
class Close(Event):
"""Close frame received event."""
def __init__(self, code: int, reason: str):
self.code = code
self.reason = reason
class ConnectionClosed(Event):
"""Connection closed event (EOF or error)."""
def __init__(self, exception: Exception = None):
self.exception = exceptionfrom websockets.protocol import ClientProtocol, Side, State
from websockets.frames import Frame, Opcode
def basic_protocol_example():
"""Demonstrate basic protocol usage."""
# Create client protocol
protocol = ClientProtocol(max_size=1024*1024) # 1MB max message
print(f"Initial state: {protocol.state}")
print(f"Protocol side: {protocol.side}")
# Send a text message
frames_to_send = protocol.send_text("Hello, WebSocket!")
print(f"Frames to send: {len(frames_to_send)}")
# Send a ping
ping_frames = protocol.send_ping(b"ping-data")
print(f"Ping frames: {len(ping_frames)}")
# Simulate receiving data (this would come from network)
# In real usage, you'd receive actual bytes from socket
received_data = b"..." # Raw frame bytes
# events = protocol.receive_data(received_data)
# Close connection
close_frames = protocol.send_close(1000, "Normal closure")
print(f"Close frames: {len(close_frames)}")
print(f"Final state: {protocol.state}")
basic_protocol_example()import socket
from websockets.protocol import ServerProtocol
from websockets.exceptions import ProtocolError
class CustomWebSocketServer:
"""Example of custom WebSocket server using Sans-I/O protocol."""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.socket = None
def start(self):
"""Start the custom server."""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(5)
print(f"Custom WebSocket server listening on {self.host}:{self.port}")
while True:
try:
client_socket, address = self.socket.accept()
print(f"Client connected: {address}")
# Handle client in separate method
self.handle_client(client_socket, address)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Server error: {e}")
self.socket.close()
def handle_client(self, client_socket: socket.socket, address):
"""Handle individual client connection."""
protocol = ServerProtocol(max_size=64*1024) # 64KB max message
try:
while True:
# Receive data from client
data = client_socket.recv(4096)
if not data:
# Client disconnected
events = protocol.receive_eof()
break
# Process data through protocol
try:
events = protocol.receive_data(data)
except ProtocolError as e:
print(f"Protocol error: {e}")
break
# Handle protocol events
for event in events:
response_frames = self.process_event(event, protocol)
# Send response frames
for frame_bytes in response_frames:
client_socket.send(frame_bytes)
# Check if connection should close
if protocol.state == State.CLOSED:
break
except Exception as e:
print(f"Client error: {e}")
finally:
client_socket.close()
print(f"Client disconnected: {address}")
def process_event(self, event, protocol):
"""Process protocol events and return response frames."""
from websockets.protocol import TextMessage, BinaryMessage, Ping, Close
if isinstance(event, TextMessage):
# Echo text messages
return protocol.send_text(f"Echo: {event.data}")
elif isinstance(event, BinaryMessage):
# Echo binary messages
return protocol.send_binary(b"Echo: " + event.data)
elif isinstance(event, Ping):
# Respond to pings with pongs
return protocol.send_pong(event.data)
elif isinstance(event, Close):
# Respond to close frames
return protocol.send_close(event.code, event.reason)
return []
# Example usage
def run_custom_server():
server = CustomWebSocketServer("localhost", 8765)
try:
server.start()
except KeyboardInterrupt:
print("\nServer stopped")
# Uncomment to run
# run_custom_server()from websockets.protocol import ClientProtocol, State
from websockets.exceptions import ProtocolError
def protocol_state_machine_example():
"""Demonstrate protocol state transitions."""
protocol = ClientProtocol()
def print_state():
print(f"Current state: {protocol.state}")
print("=== Protocol State Machine Example ===")
# Initial state
print_state() # Should be CONNECTING initially
# Simulate successful handshake (this would be done by connection layer)
# protocol._state = State.OPEN # Private attribute, don't do this in real code
try:
# Try to send message in OPEN state
frames = protocol.send_text("Hello")
print(f"Sent text message: {len(frames)} frames")
print_state()
# Send ping
ping_frames = protocol.send_ping(b"test")
print(f"Sent ping: {len(ping_frames)} frames")
print_state()
# Start closing
close_frames = protocol.send_close(1000, "Normal closure")
print(f"Sent close frame: {len(close_frames)} frames")
print_state() # Should be CLOSING
# Try to send after close (should fail)
try:
protocol.send_text("This should fail")
except ProtocolError as e:
print(f"Expected error: {e}")
# Simulate receiving close response
# In real usage, this would come from receive_data()
# protocol._state = State.CLOSED
print_state()
except ProtocolError as e:
print(f"Protocol error: {e}")
protocol_state_machine_example()from websockets.protocol import ServerProtocol
from websockets.frames import Frame, Opcode
import struct
def frame_processing_example():
"""Demonstrate low-level frame processing."""
protocol = ServerProtocol()
# Create a simple text frame manually (for demonstration)
message = "Hello, WebSocket!"
message_bytes = message.encode('utf-8')
# WebSocket frame format (simplified)
frame_data = bytearray()
# First byte: FIN=1, RSV=000, Opcode=TEXT(1)
frame_data.append(0x81) # 10000001
# Payload length
if len(message_bytes) < 126:
frame_data.append(len(message_bytes))
elif len(message_bytes) < 65536:
frame_data.append(126)
frame_data.extend(struct.pack('!H', len(message_bytes)))
else:
frame_data.append(127)
frame_data.extend(struct.pack('!Q', len(message_bytes)))
# Payload data
frame_data.extend(message_bytes)
print(f"Created frame: {len(frame_data)} bytes")
print(f"Frame hex: {frame_data.hex()}")
# Process frame through protocol
try:
events = protocol.receive_data(bytes(frame_data))
print(f"Generated {len(events)} events")
for event in events:
if hasattr(event, 'data'):
print(f"Event data: {event.data}")
except Exception as e:
print(f"Frame processing error: {e}")
frame_processing_example()from websockets.protocol import ClientProtocol
from websockets.extensions.base import Extension
class SimpleExtension(Extension):
"""Example custom extension."""
def __init__(self, name: str):
super().__init__()
self.name = name
def encode(self, frame):
"""Process outgoing frames."""
# Simple example: add prefix to text frames
if frame.opcode == 1: # TEXT frame
modified_data = f"[{self.name}] ".encode() + frame.data
return frame._replace(data=modified_data)
return frame
def decode(self, frame):
"""Process incoming frames."""
# Simple example: remove prefix from text frames
if frame.opcode == 1: # TEXT frame
prefix = f"[{self.name}] ".encode()
if frame.data.startswith(prefix):
modified_data = frame.data[len(prefix):]
return frame._replace(data=modified_data)
return frame
def extension_example():
"""Demonstrate protocol with custom extensions."""
# Create extension
extension = SimpleExtension("MyExt")
# Create protocol with extension
# Note: Real extension integration is more complex
protocol = ClientProtocol()
print("Extension integration example")
print(f"Extension name: {extension.name}")
# In a real implementation, extensions would be negotiated
# during handshake and integrated into the protocol
extension_example()Install with Tessl CLI
npx tessl i tessl/pypi-websockets