CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-wsproto

WebSockets state-machine based protocol implementation

Overview
Eval results
Files

event-system.mddocs/

Event System

Comprehensive event-driven API covering all WebSocket operations including handshake, messages, control frames, and connection lifecycle. The event system provides a type-safe, structured approach to WebSocket protocol handling.

Capabilities

Base Event Class

All wsproto events inherit from the base Event class, providing a common interface for protocol operations.

class Event(ABC):
    """
    Base class for wsproto events.
    """
    pass

Handshake Events

Events related to WebSocket connection establishment and rejection.

@dataclass(frozen=True)
class Request(Event):
    """
    The beginning of a WebSocket connection, the HTTP Upgrade request.
    
    This event is fired when a SERVER connection receives a WebSocket
    handshake request (HTTP with upgrade header).
    """
    host: str  # Required hostname or host header value
    target: str  # Required request target (path and query string)
    extensions: Union[Sequence[Extension], Sequence[str]] = field(default_factory=list)  # Proposed extensions
    extra_headers: Headers = field(default_factory=list)  # Additional request headers
    subprotocols: List[str] = field(default_factory=list)  # Proposed subprotocols list

@dataclass(frozen=True)
class AcceptConnection(Event):
    """
    The acceptance of a WebSocket upgrade request.
    
    This event is fired when a CLIENT receives an acceptance response
    from a server. It is also used to accept an upgrade request when
    acting as a SERVER.
    """
    subprotocol: Optional[str] = None  # The accepted subprotocol to use
    extensions: List[Extension] = field(default_factory=list)  # Accepted extensions
    extra_headers: Headers = field(default_factory=list)  # Additional response headers

@dataclass(frozen=True)
class RejectConnection(Event):
    """
    The rejection of a WebSocket upgrade request, the HTTP response.
    
    The RejectConnection event sends appropriate HTTP headers to communicate
    that the handshake has been rejected. You may send an HTTP body by setting
    has_body to True and following with RejectData events.
    """
    status_code: int = 400  # HTTP response status code
    headers: Headers = field(default_factory=list)  # Response headers
    has_body: bool = False  # Whether response has body (see RejectData)

@dataclass(frozen=True)
class RejectData(Event):
    """
    The rejection HTTP response body.
    
    The caller may send multiple RejectData events. The final event should
    have the body_finished attribute set to True.
    """
    data: bytes  # Required raw body data
    body_finished: bool = True  # True if this is the final chunk of body data

Message Events

Events for WebSocket data messages, supporting both text and binary data with fragmentation control.

@dataclass(frozen=True)
class Message(Event, Generic[T]):
    """
    The WebSocket data message base class.
    """
    data: T  # Required message data (str for text, bytes for binary)
    frame_finished: bool = True  # Whether frame is finished (for fragmentation)
    message_finished: bool = True  # True if this is the last frame of message

@dataclass(frozen=True)
class TextMessage(Message[str]):
    """
    This event is fired when a data frame with TEXT payload is received.
    
    The data represents a single chunk and may not be a complete WebSocket 
    message. You need to buffer and reassemble chunks using message_finished
    to get the full message.
    """
    data: str  # Text message data as string

@dataclass(frozen=True)
class BytesMessage(Message[bytes]):
    """
    This event is fired when a data frame with BINARY payload is received.
    
    The data represents a single chunk and may not be a complete WebSocket
    message. You need to buffer and reassemble chunks using message_finished
    to get the full message.
    """
    data: bytes  # Binary message data as bytes

Control Frame Events

Events for WebSocket control frames including connection close, ping, and pong.

@dataclass(frozen=True)
class CloseConnection(Event):
    """
    The end of a WebSocket connection, represents a closure frame.
    
    wsproto does not automatically send a response to a close event. To comply
    with the RFC you MUST send a close event back to the remote WebSocket if
    you have not already sent one.
    """
    code: int  # Required integer close code indicating why connection closed
    reason: Optional[str] = None  # Optional additional reasoning for closure
    
    def response(self) -> "CloseConnection":
        """
        Generate an RFC-compliant close frame to send back to the peer.
        
        Returns:
            CloseConnection event with same code and reason
        """

@dataclass(frozen=True)
class Ping(Event):
    """
    The Ping event can be sent to trigger a ping frame and is fired when received.
    
    wsproto does not automatically send a pong response to a ping event. To comply
    with the RFC you MUST send a pong event as soon as practical.
    """
    payload: bytes = b""  # Optional payload to emit with the ping frame
    
    def response(self) -> "Pong":
        """
        Generate an RFC-compliant Pong response to this ping.
        
        Returns:
            Pong event with same payload
        """

@dataclass(frozen=True)
class Pong(Event):
    """
    The Pong event is fired when a Pong is received.
    """
    payload: bytes = b""  # Optional payload from the pong frame

Usage Examples

Handling Handshake Events

from wsproto import WSConnection, ConnectionType
from wsproto.events import Request, AcceptConnection, RejectConnection

# Server handling handshake
ws = WSConnection(ConnectionType.SERVER)
ws.receive_data(handshake_data)

for event in ws.events():
    if isinstance(event, Request):
        print(f"WebSocket request for {event.target} from {event.host}")
        print(f"Subprotocols: {event.subprotocols}")
        print(f"Extensions: {event.extensions}")
        
        # Accept the connection
        if event.target == '/chat':
            accept_data = ws.send(AcceptConnection(
                subprotocol='chat.v1' if 'chat.v1' in event.subprotocols else None
            ))
        else:
            # Reject the connection
            reject_data = ws.send(RejectConnection(
                status_code=404,
                headers=[(b'content-type', b'text/plain')],
                has_body=True
            ))
            reject_body = ws.send(RejectData(
                data=b'Path not found',
                body_finished=True
            ))

Handling Message Events

from wsproto.events import TextMessage, BytesMessage
import json

# Process different message types
for event in ws.events():
    if isinstance(event, TextMessage):
        print(f"Received text: {event.data}")
        
        # Handle JSON messages
        try:
            json_data = json.loads(event.data)
            print(f"JSON payload: {json_data}")
        except json.JSONDecodeError:
            print("Not valid JSON")
            
        # Handle fragmented messages
        if not event.message_finished:
            # Buffer this chunk and wait for more
            text_buffer += event.data
        else:
            # Complete message received
            complete_message = text_buffer + event.data
            text_buffer = ""
            
    elif isinstance(event, BytesMessage):
        print(f"Received {len(event.data)} bytes")
        
        # Handle binary data
        if event.data.startswith(b'\x89PNG'):
            print("Received PNG image")
        
        # Handle fragmented binary messages
        if not event.message_finished:
            binary_buffer += event.data
        else:
            complete_binary = binary_buffer + event.data
            binary_buffer = b""

Handling Control Frame Events

from wsproto.events import Ping, Pong, CloseConnection
from wsproto.frame_protocol import CloseReason

# Handle control frames
for event in ws.events():
    if isinstance(event, Ping):
        print(f"Received ping with payload: {event.payload}")
        # Must respond with pong
        pong_data = ws.send(event.response())
        socket.send(pong_data)
        
    elif isinstance(event, Pong):
        print(f"Received pong with payload: {event.payload}")
        # Handle pong response (e.g., measure round-trip time)
        
    elif isinstance(event, CloseConnection):
        print(f"Connection closing: code={event.code}, reason={event.reason}")
        
        # Must respond to close frame
        if ws.state != ConnectionState.LOCAL_CLOSING:
            close_response = ws.send(event.response())
            socket.send(close_response)
        
        # Handle different close codes
        if event.code == CloseReason.NORMAL_CLOSURE:
            print("Normal closure")
        elif event.code == CloseReason.GOING_AWAY:
            print("Server going away")
        elif event.code == CloseReason.PROTOCOL_ERROR:
            print("Protocol error occurred")

Sending Events

from wsproto.events import TextMessage, BytesMessage, CloseConnection, Ping

# Send text message
text_data = ws.send(TextMessage(data="Hello, WebSocket!"))
socket.send(text_data)

# Send binary message
binary_data = ws.send(BytesMessage(data=b"Binary payload"))
socket.send(binary_data)

# Send fragmented message
fragment1 = ws.send(TextMessage(data="Start of ", message_finished=False))
fragment2 = ws.send(TextMessage(data="long message", message_finished=True))
socket.send(fragment1 + fragment2)

# Send ping
ping_data = ws.send(Ping(payload=b"ping-payload"))
socket.send(ping_data)

# Close connection
close_data = ws.send(CloseConnection(code=1000, reason="Goodbye"))
socket.send(close_data)

Event-Driven WebSocket Echo Server

import socket
from wsproto import WSConnection, ConnectionType
from wsproto.events import (
    Request, AcceptConnection, TextMessage, BytesMessage, 
    CloseConnection, Ping
)

def handle_client(client_socket):
    ws = WSConnection(ConnectionType.SERVER)
    
    while True:
        try:
            data = client_socket.recv(4096)
            if not data:
                break
                
            ws.receive_data(data)
            
            for event in ws.events():
                if isinstance(event, Request):
                    # Accept all connections
                    response = ws.send(AcceptConnection())
                    client_socket.send(response)
                    
                elif isinstance(event, TextMessage):
                    # Echo text messages
                    echo_data = ws.send(TextMessage(data=f"Echo: {event.data}"))
                    client_socket.send(echo_data)
                    
                elif isinstance(event, BytesMessage):
                    # Echo binary messages
                    echo_data = ws.send(BytesMessage(data=b"Echo: " + event.data))
                    client_socket.send(echo_data)
                    
                elif isinstance(event, Ping):
                    # Respond to pings
                    pong_data = ws.send(event.response())
                    client_socket.send(pong_data)
                    
                elif isinstance(event, CloseConnection):
                    # Respond to close
                    close_data = ws.send(event.response())
                    client_socket.send(close_data)
                    return
                    
        except Exception as e:
            print(f"Error: {e}")
            break
    
    client_socket.close()

# Server setup
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8765))
server.listen(1)

while True:
    client, addr = server.accept()
    handle_client(client)

Install with Tessl CLI

npx tessl i tessl/pypi-wsproto

docs

connection-management.md

event-system.md

extensions.md

index.md

low-level-protocol.md

tile.json