WebSocket client for Python with low level API options
WebSocket frame handling, message types, and protocol status codes for low-level protocol control and custom frame processing. The ABNF class implements the WebSocket frame format according to RFC 6455.
WebSocket frame implementation handling frame construction, validation, and binary serialization according to the ABNF specification.
class ABNF:
# Operation code constants
OPCODE_CONT = 0x0 # Continuation frame
OPCODE_TEXT = 0x1 # Text frame
OPCODE_BINARY = 0x2 # Binary frame
OPCODE_CLOSE = 0x8 # Close frame
OPCODE_PING = 0x9 # Ping frame
OPCODE_PONG = 0xa # Pong frame
# Valid opcodes tuple
OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG)
# Human readable opcode mapping
OPCODE_MAP = {
OPCODE_CONT: "cont",
OPCODE_TEXT: "text",
OPCODE_BINARY: "binary",
OPCODE_CLOSE: "close",
OPCODE_PING: "ping",
OPCODE_PONG: "pong",
}
# Frame length thresholds
LENGTH_7 = 0x7E # 126 - use 2-byte length
LENGTH_16 = 1 << 16 # 65536 - use 8-byte length
LENGTH_63 = 1 << 63 # Maximum frame size
def __init__(
self,
fin: int = 0,
rsv1: int = 0,
rsv2: int = 0,
rsv3: int = 0,
opcode: int = OPCODE_TEXT,
mask_value: int = 1,
data: Union[str, bytes, None] = "",
) -> None:
"""
Construct ABNF frame.
Parameters:
- fin: Final fragment flag (0 or 1)
- rsv1: Reserved bit 1 (must be 0)
- rsv2: Reserved bit 2 (must be 0)
- rsv3: Reserved bit 3 (must be 0)
- opcode: Frame opcode (see OPCODE_* constants)
- mask_value: Mask flag (1 for client frames, 0 for server)
- data: Frame payload data
"""Create, validate, and serialize WebSocket frames with proper protocol compliance.
def validate(self, skip_utf8_validation: bool = False) -> None:
"""
Validate frame according to WebSocket protocol rules.
Parameters:
- skip_utf8_validation: Skip UTF-8 validation for text frames
Raises:
WebSocketProtocolException: If frame violates protocol rules
"""
@staticmethod
def create_frame(data: Union[bytes, str], opcode: int, fin: int = 1) -> "ABNF":
"""
Create frame for sending data.
Parameters:
- data: Payload data (auto-encoded to UTF-8 if string and opcode is TEXT)
- opcode: Frame opcode (OPCODE_TEXT, OPCODE_BINARY, etc.)
- fin: Final fragment flag (1 for complete message, 0 for fragmented)
Returns:
ABNF: Configured frame ready for transmission
"""
def format(self) -> bytes:
"""
Serialize frame to wire format bytes.
Returns:
bytes: Complete frame in WebSocket wire format
Raises:
ValueError: If frame configuration is invalid
"""
@staticmethod
def mask(mask_key: Union[str, bytes], data: Union[str, bytes]) -> bytes:
"""
Apply XOR mask to data (mask or unmask).
Parameters:
- mask_key: 4-byte mask key
- data: Data to mask/unmask
Returns:
bytes: Masked/unmasked data
"""Standard close status codes for WebSocket connection termination.
# Normal closure codes
STATUS_NORMAL = 1000 # Normal closure
STATUS_GOING_AWAY = 1001 # Endpoint going away
STATUS_PROTOCOL_ERROR = 1002 # Protocol error
STATUS_UNSUPPORTED_DATA_TYPE = 1003 # Unsupported data type
# Special status codes
STATUS_STATUS_NOT_AVAILABLE = 1005 # No status code available
STATUS_ABNORMAL_CLOSED = 1006 # Abnormal closure (no close frame)
# Error status codes
STATUS_INVALID_PAYLOAD = 1007 # Invalid UTF-8 or other payload error
STATUS_POLICY_VIOLATION = 1008 # Policy violation
STATUS_MESSAGE_TOO_BIG = 1009 # Message too big
STATUS_INVALID_EXTENSION = 1010 # Extension negotiation failure
STATUS_UNEXPECTED_CONDITION = 1011 # Unexpected server condition
# Infrastructure status codes
STATUS_SERVICE_RESTART = 1012 # Service restart
STATUS_TRY_AGAIN_LATER = 1013 # Try again later
STATUS_BAD_GATEWAY = 1014 # Bad gateway
STATUS_TLS_HANDSHAKE_ERROR = 1015 # TLS handshake error
# Valid close status code range
VALID_CLOSE_STATUS = (
STATUS_NORMAL,
STATUS_GOING_AWAY,
STATUS_PROTOCOL_ERROR,
STATUS_UNSUPPORTED_DATA_TYPE,
STATUS_INVALID_PAYLOAD,
STATUS_POLICY_VIOLATION,
STATUS_MESSAGE_TOO_BIG,
STATUS_INVALID_EXTENSION,
STATUS_UNEXPECTED_CONDITION,
STATUS_SERVICE_RESTART,
STATUS_TRY_AGAIN_LATER,
STATUS_BAD_GATEWAY,
)Advanced frame handling classes for buffering and managing continuation frames.
class frame_buffer:
"""
Buffer incoming frame data from socket until complete frames are received.
Handles partial frame reception and frame reassembly from network packets.
"""
def __init__(self, recv_fn: Callable[[int], int], skip_utf8_validation: bool) -> None: ...
def recv_frame(self) -> ABNF: ...
def clear(self) -> None: ...
class continuous_frame:
"""
Handle WebSocket message fragmentation across multiple frames.
Manages continuation frames and message reassembly for large messages.
"""
def __init__(self, fire_cont_frame: bool, skip_utf8_validation: bool) -> None: ...
def validate(self, frame: ABNF) -> None: ...
def add(self, frame: ABNF) -> None: ...
def is_fire(self, frame: ABNF) -> Union[bool, int]: ...
def extract(self, frame: ABNF) -> tuple: ...from websocket import WebSocket, ABNF
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
# Create text frame
text_frame = ABNF.create_frame("Hello, World!", ABNF.OPCODE_TEXT)
ws.send_frame(text_frame)
# Create binary frame
binary_data = b"\x01\x02\x03\x04"
binary_frame = ABNF.create_frame(binary_data, ABNF.OPCODE_BINARY)
ws.send_frame(binary_frame)
# Create ping frame
ping_frame = ABNF.create_frame("ping-payload", ABNF.OPCODE_PING)
ws.send_frame(ping_frame)
ws.close()from websocket import WebSocket, ABNF
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
message = "This is a long message that will be sent in fragments"
chunks = [message[i:i+10] for i in range(0, len(message), 10)]
# Send first fragment (not final)
first_frame = ABNF.create_frame(chunks[0], ABNF.OPCODE_TEXT, fin=0)
ws.send_frame(first_frame)
# Send middle fragments (continuation, not final)
for chunk in chunks[1:-1]:
cont_frame = ABNF.create_frame(chunk, ABNF.OPCODE_CONT, fin=0)
ws.send_frame(cont_frame)
# Send final fragment
final_frame = ABNF.create_frame(chunks[-1], ABNF.OPCODE_CONT, fin=1)
ws.send_frame(final_frame)
ws.close()from websocket import WebSocket, ABNF
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
# Send a message to get echo back
ws.send("Test message")
# Receive and analyze frame
opcode, frame = ws.recv_data_frame()
print(f"Frame details:")
print(f" Opcode: {opcode} ({ABNF.OPCODE_MAP.get(opcode, 'unknown')})")
print(f" Final frame: {bool(frame.fin)}")
print(f" Reserved bits: RSV1={frame.rsv1}, RSV2={frame.rsv2}, RSV3={frame.rsv3}")
print(f" Masked: {bool(frame.mask_value)}")
print(f" Data length: {len(frame.data)}")
print(f" Data: {frame.data}")
# Validate frame
try:
frame.validate()
print("Frame is valid")
except Exception as e:
print(f"Frame validation error: {e}")
ws.close()from websocket import WebSocket, ABNF, STATUS_NORMAL, STATUS_GOING_AWAY
import struct
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
# Send custom close frame with reason
close_reason = "Custom shutdown"
close_data = struct.pack("!H", STATUS_GOING_AWAY) + close_reason.encode('utf-8')
close_frame = ABNF.create_frame(close_data, ABNF.OPCODE_CLOSE)
ws.send_frame(close_frame)
# Wait for close response
try:
opcode, response_frame = ws.recv_data_frame()
if opcode == ABNF.OPCODE_CLOSE:
if len(response_frame.data) >= 2:
status_code = struct.unpack("!H", response_frame.data[:2])[0]
reason = response_frame.data[2:].decode('utf-8')
print(f"Server closed with status {status_code}: {reason}")
else:
print("Server closed without status code")
except Exception as e:
print(f"Close handshake error: {e}")
ws.shutdown()from websocket import ABNF
import os
# Example of manual frame masking
data = b"Secret payload data"
mask_key = os.urandom(4) # Generate random 4-byte mask
# Mask the data
masked_data = ABNF.mask(mask_key, data)
print(f"Original: {data}")
print(f"Mask key: {mask_key.hex()}")
print(f"Masked: {masked_data.hex()}")
# Unmask the data (XOR is reversible)
unmasked_data = ABNF.mask(mask_key, masked_data)
print(f"Unmasked: {unmasked_data}")
print(f"Match original: {unmasked_data == data}")from websocket import WebSocket, STATUS_NORMAL, STATUS_GOING_AWAY, STATUS_PROTOCOL_ERROR
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
# Close with different status codes based on condition
try:
# Normal operation
ws.send("Hello")
response = ws.recv()
print(f"Got response: {response}")
# Normal close
ws.close(status=STATUS_NORMAL, reason=b"Completed successfully")
except Exception as e:
# Error close
print(f"Error occurred: {e}")
ws.close(status=STATUS_PROTOCOL_ERROR, reason=b"Protocol error occurred")# Frame operation types
OpcodeType = int # 0x0-0xf, see OPCODE_* constants
StatusCodeType = int # 1000-4999, see STATUS_* constants
MaskKeyType = Union[str, bytes] # 4-byte mask key
FrameDataType = Union[str, bytes, None] # Frame payload data
# Frame validation function signature
ValidationCallable = Callable[[bytes], bool]
# Frame masking function signature
MaskingCallable = Callable[[int], bytes]Install with Tessl CLI
npx tessl i tessl/pypi-websocket-client