WebSockets state-machine based protocol implementation
Comprehensive event-driven API covering all WebSocket operations including handshake, messages, control frames, and connection lifecycle. The event system provides a type-safe, structured approach to WebSocket protocol handling.
All wsproto events inherit from the base Event class, providing a common interface for protocol operations.
class Event(ABC):
"""
Base class for wsproto events.
"""
passEvents related to WebSocket connection establishment and rejection.
@dataclass(frozen=True)
class Request(Event):
"""
The beginning of a WebSocket connection, the HTTP Upgrade request.
This event is fired when a SERVER connection receives a WebSocket
handshake request (HTTP with upgrade header).
"""
host: str # Required hostname or host header value
target: str # Required request target (path and query string)
extensions: Union[Sequence[Extension], Sequence[str]] = field(default_factory=list) # Proposed extensions
extra_headers: Headers = field(default_factory=list) # Additional request headers
subprotocols: List[str] = field(default_factory=list) # Proposed subprotocols list
@dataclass(frozen=True)
class AcceptConnection(Event):
"""
The acceptance of a WebSocket upgrade request.
This event is fired when a CLIENT receives an acceptance response
from a server. It is also used to accept an upgrade request when
acting as a SERVER.
"""
subprotocol: Optional[str] = None # The accepted subprotocol to use
extensions: List[Extension] = field(default_factory=list) # Accepted extensions
extra_headers: Headers = field(default_factory=list) # Additional response headers
@dataclass(frozen=True)
class RejectConnection(Event):
"""
The rejection of a WebSocket upgrade request, the HTTP response.
The RejectConnection event sends appropriate HTTP headers to communicate
that the handshake has been rejected. You may send an HTTP body by setting
has_body to True and following with RejectData events.
"""
status_code: int = 400 # HTTP response status code
headers: Headers = field(default_factory=list) # Response headers
has_body: bool = False # Whether response has body (see RejectData)
@dataclass(frozen=True)
class RejectData(Event):
"""
The rejection HTTP response body.
The caller may send multiple RejectData events. The final event should
have the body_finished attribute set to True.
"""
data: bytes # Required raw body data
body_finished: bool = True # True if this is the final chunk of body dataEvents for WebSocket data messages, supporting both text and binary data with fragmentation control.
@dataclass(frozen=True)
class Message(Event, Generic[T]):
"""
The WebSocket data message base class.
"""
data: T # Required message data (str for text, bytes for binary)
frame_finished: bool = True # Whether frame is finished (for fragmentation)
message_finished: bool = True # True if this is the last frame of message
@dataclass(frozen=True)
class TextMessage(Message[str]):
"""
This event is fired when a data frame with TEXT payload is received.
The data represents a single chunk and may not be a complete WebSocket
message. You need to buffer and reassemble chunks using message_finished
to get the full message.
"""
data: str # Text message data as string
@dataclass(frozen=True)
class BytesMessage(Message[bytes]):
"""
This event is fired when a data frame with BINARY payload is received.
The data represents a single chunk and may not be a complete WebSocket
message. You need to buffer and reassemble chunks using message_finished
to get the full message.
"""
data: bytes # Binary message data as bytesEvents for WebSocket control frames including connection close, ping, and pong.
@dataclass(frozen=True)
class CloseConnection(Event):
"""
The end of a WebSocket connection, represents a closure frame.
wsproto does not automatically send a response to a close event. To comply
with the RFC you MUST send a close event back to the remote WebSocket if
you have not already sent one.
"""
code: int # Required integer close code indicating why connection closed
reason: Optional[str] = None # Optional additional reasoning for closure
def response(self) -> "CloseConnection":
"""
Generate an RFC-compliant close frame to send back to the peer.
Returns:
CloseConnection event with same code and reason
"""
@dataclass(frozen=True)
class Ping(Event):
"""
The Ping event can be sent to trigger a ping frame and is fired when received.
wsproto does not automatically send a pong response to a ping event. To comply
with the RFC you MUST send a pong event as soon as practical.
"""
payload: bytes = b"" # Optional payload to emit with the ping frame
def response(self) -> "Pong":
"""
Generate an RFC-compliant Pong response to this ping.
Returns:
Pong event with same payload
"""
@dataclass(frozen=True)
class Pong(Event):
"""
The Pong event is fired when a Pong is received.
"""
payload: bytes = b"" # Optional payload from the pong framefrom wsproto import WSConnection, ConnectionType
from wsproto.events import Request, AcceptConnection, RejectConnection
# Server handling handshake
ws = WSConnection(ConnectionType.SERVER)
ws.receive_data(handshake_data)
for event in ws.events():
if isinstance(event, Request):
print(f"WebSocket request for {event.target} from {event.host}")
print(f"Subprotocols: {event.subprotocols}")
print(f"Extensions: {event.extensions}")
# Accept the connection
if event.target == '/chat':
accept_data = ws.send(AcceptConnection(
subprotocol='chat.v1' if 'chat.v1' in event.subprotocols else None
))
else:
# Reject the connection
reject_data = ws.send(RejectConnection(
status_code=404,
headers=[(b'content-type', b'text/plain')],
has_body=True
))
reject_body = ws.send(RejectData(
data=b'Path not found',
body_finished=True
))from wsproto.events import TextMessage, BytesMessage
import json
# Process different message types
for event in ws.events():
if isinstance(event, TextMessage):
print(f"Received text: {event.data}")
# Handle JSON messages
try:
json_data = json.loads(event.data)
print(f"JSON payload: {json_data}")
except json.JSONDecodeError:
print("Not valid JSON")
# Handle fragmented messages
if not event.message_finished:
# Buffer this chunk and wait for more
text_buffer += event.data
else:
# Complete message received
complete_message = text_buffer + event.data
text_buffer = ""
elif isinstance(event, BytesMessage):
print(f"Received {len(event.data)} bytes")
# Handle binary data
if event.data.startswith(b'\x89PNG'):
print("Received PNG image")
# Handle fragmented binary messages
if not event.message_finished:
binary_buffer += event.data
else:
complete_binary = binary_buffer + event.data
binary_buffer = b""from wsproto.events import Ping, Pong, CloseConnection
from wsproto.frame_protocol import CloseReason
# Handle control frames
for event in ws.events():
if isinstance(event, Ping):
print(f"Received ping with payload: {event.payload}")
# Must respond with pong
pong_data = ws.send(event.response())
socket.send(pong_data)
elif isinstance(event, Pong):
print(f"Received pong with payload: {event.payload}")
# Handle pong response (e.g., measure round-trip time)
elif isinstance(event, CloseConnection):
print(f"Connection closing: code={event.code}, reason={event.reason}")
# Must respond to close frame
if ws.state != ConnectionState.LOCAL_CLOSING:
close_response = ws.send(event.response())
socket.send(close_response)
# Handle different close codes
if event.code == CloseReason.NORMAL_CLOSURE:
print("Normal closure")
elif event.code == CloseReason.GOING_AWAY:
print("Server going away")
elif event.code == CloseReason.PROTOCOL_ERROR:
print("Protocol error occurred")from wsproto.events import TextMessage, BytesMessage, CloseConnection, Ping
# Send text message
text_data = ws.send(TextMessage(data="Hello, WebSocket!"))
socket.send(text_data)
# Send binary message
binary_data = ws.send(BytesMessage(data=b"Binary payload"))
socket.send(binary_data)
# Send fragmented message
fragment1 = ws.send(TextMessage(data="Start of ", message_finished=False))
fragment2 = ws.send(TextMessage(data="long message", message_finished=True))
socket.send(fragment1 + fragment2)
# Send ping
ping_data = ws.send(Ping(payload=b"ping-payload"))
socket.send(ping_data)
# Close connection
close_data = ws.send(CloseConnection(code=1000, reason="Goodbye"))
socket.send(close_data)import socket
from wsproto import WSConnection, ConnectionType
from wsproto.events import (
Request, AcceptConnection, TextMessage, BytesMessage,
CloseConnection, Ping
)
def handle_client(client_socket):
ws = WSConnection(ConnectionType.SERVER)
while True:
try:
data = client_socket.recv(4096)
if not data:
break
ws.receive_data(data)
for event in ws.events():
if isinstance(event, Request):
# Accept all connections
response = ws.send(AcceptConnection())
client_socket.send(response)
elif isinstance(event, TextMessage):
# Echo text messages
echo_data = ws.send(TextMessage(data=f"Echo: {event.data}"))
client_socket.send(echo_data)
elif isinstance(event, BytesMessage):
# Echo binary messages
echo_data = ws.send(BytesMessage(data=b"Echo: " + event.data))
client_socket.send(echo_data)
elif isinstance(event, Ping):
# Respond to pings
pong_data = ws.send(event.response())
client_socket.send(pong_data)
elif isinstance(event, CloseConnection):
# Respond to close
close_data = ws.send(event.response())
client_socket.send(close_data)
return
except Exception as e:
print(f"Error: {e}")
break
client_socket.close()
# Server setup
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8765))
server.listen(1)
while True:
client, addr = server.accept()
handle_client(client)Install with Tessl CLI
npx tessl i tessl/pypi-wsproto