An implementation of the WebSocket Protocol (RFC 6455 & 7692)
Core data structures for HTTP headers, WebSocket frames, close codes, opcodes, and type definitions used throughout the WebSocket implementation. These provide the foundation for protocol handling and message processing.
Case-insensitive HTTP headers container with dict-like interface for WebSocket handshake processing.
class Headers:
"""
Case-insensitive HTTP headers container.
Provides dict-like interface for managing HTTP headers used in
WebSocket handshake with proper case-insensitive handling.
"""
def __init__(self, *args, **kwargs):
"""
Initialize headers container.
Parameters:
- *args: Dict, list of tuples, or Headers instance
- **kwargs: Header key-value pairs
Examples:
Headers({"Content-Type": "text/html"})
Headers([("Host", "example.com"), ("Origin", "https://app.com")])
Headers(host="example.com", origin="https://app.com")
"""
def __getitem__(self, key: str) -> str:
"""
Get header value by name (case-insensitive).
Parameters:
- key: Header name
Returns:
str: Header value
Raises:
- KeyError: If header doesn't exist
"""
def __setitem__(self, key: str, value: str) -> None:
"""
Set header value by name (case-insensitive).
Parameters:
- key: Header name
- value: Header value
"""
def __delitem__(self, key: str) -> None:
"""
Delete header by name (case-insensitive).
Parameters:
- key: Header name
Raises:
- KeyError: If header doesn't exist
"""
def __contains__(self, key: str) -> bool:
"""
Check if header exists (case-insensitive).
Parameters:
- key: Header name
Returns:
bool: True if header exists
"""
def __iter__(self):
"""Iterate over header names."""
def __len__(self) -> int:
"""Get number of headers."""
def get(self, key: str, default: str = None) -> str | None:
"""
Get header value with default.
Parameters:
- key: Header name (case-insensitive)
- default: Default value if header doesn't exist
Returns:
str | None: Header value or default
"""
def get_all(self, key: str) -> List[str]:
"""
Get all values for a header name.
Parameters:
- key: Header name (case-insensitive)
Returns:
List[str]: List of all values for the header
"""
def add(self, key: str, value: str) -> None:
"""
Add header value (allows multiple values for same name).
Parameters:
- key: Header name
- value: Header value to add
"""
def items(self):
"""Iterate over (name, value) pairs."""
def keys(self):
"""Get header names."""
def values(self):
"""Get header values."""
def copy(self) -> Headers:
"""Create a copy of the headers."""
def update(self, *args, **kwargs) -> None:
"""
Update headers with new values.
Parameters same as __init__()
"""
# Type alias for header input formats
HeadersLike = Union[Headers, Dict[str, str], List[Tuple[str, str]]]
class MultipleValuesError(Exception):
"""
Raised when single-value header has multiple values.
Some HTTP headers should only have one value, but the headers
container received multiple values for the same header name.
"""
def __init__(self, key: str, values: List[str]):
"""
Initialize multiple values error.
Parameters:
- key: Header name that has multiple values
- values: List of values found for the header
"""
self.key = key
self.values = values
super().__init__(f"Multiple values for header {key}: {values}")Low-level WebSocket frame representation and frame opcodes for protocol implementation.
class Frame:
"""
WebSocket frame representation.
Represents a single WebSocket frame with opcode, payload data,
and control flags according to RFC 6455.
"""
def __init__(
self,
opcode: Opcode,
data: bytes,
fin: bool = True,
rsv1: bool = False,
rsv2: bool = False,
rsv3: bool = False
):
"""
Initialize WebSocket frame.
Parameters:
- opcode: Frame opcode (text, binary, control, etc.)
- data: Frame payload data
- fin: Final fragment flag (True for complete messages)
- rsv1: Reserved bit 1 (used by extensions)
- rsv2: Reserved bit 2 (used by extensions)
- rsv3: Reserved bit 3 (used by extensions)
"""
self.opcode = opcode
self.data = data
self.fin = fin
self.rsv1 = rsv1
self.rsv2 = rsv2
self.rsv3 = rsv3
def __repr__(self) -> str:
"""String representation of frame."""
@property
def is_data_frame(self) -> bool:
"""Check if frame carries data (text/binary)."""
@property
def is_control_frame(self) -> bool:
"""Check if frame is control frame (close/ping/pong)."""
class Opcode(Enum):
"""
WebSocket frame opcodes as defined in RFC 6455.
Specifies the type of WebSocket frame and how the
payload should be interpreted.
"""
CONT = 0 # Continuation frame
TEXT = 1 # Text frame (UTF-8 data)
BINARY = 2 # Binary frame (arbitrary data)
# 3-7 reserved for future data frames
CLOSE = 8 # Close connection frame
PING = 9 # Ping frame
PONG = 10 # Pong frame
# 11-15 reserved for future control frames
# Opcode constants for convenience
OP_CONT = Opcode.CONT
OP_TEXT = Opcode.TEXT
OP_BINARY = Opcode.BINARY
OP_CLOSE = Opcode.CLOSE
OP_PING = Opcode.PING
OP_PONG = Opcode.PONG
# Opcode groups
DATA_OPCODES = frozenset([Opcode.CONT, Opcode.TEXT, Opcode.BINARY])
CTRL_OPCODES = frozenset([Opcode.CLOSE, Opcode.PING, Opcode.PONG])WebSocket connection close frame structure and standardized close codes.
class Close:
"""
WebSocket close frame representation.
Represents close frame with close code and optional reason,
used for graceful connection termination.
"""
def __init__(self, code: int, reason: str = ""):
"""
Initialize close frame.
Parameters:
- code: Close code (see CloseCode enum)
- reason: Optional human-readable close reason (max 123 bytes UTF-8)
Raises:
- ValueError: If code is invalid or reason is too long
"""
self.code = code
self.reason = reason
def __repr__(self) -> str:
"""String representation of close frame."""
@classmethod
def parse(cls, data: bytes) -> Close:
"""
Parse close frame payload.
Parameters:
- data: Close frame payload bytes
Returns:
Close: Parsed close frame
Raises:
- ValueError: If payload format is invalid
"""
def serialize(self) -> bytes:
"""
Serialize close frame to bytes.
Returns:
bytes: Close frame payload for transmission
"""
class CloseCode(Enum):
"""
Standard WebSocket close codes as defined in RFC 6455.
Indicates the reason for closing the WebSocket connection.
"""
NORMAL_CLOSURE = 1000 # Normal closure
GOING_AWAY = 1001 # Endpoint going away
PROTOCOL_ERROR = 1002 # Protocol error
UNSUPPORTED_DATA = 1003 # Unsupported data type
# 1004 reserved
NO_STATUS_RCVD = 1005 # No status received (reserved)
ABNORMAL_CLOSURE = 1006 # Abnormal closure (reserved)
INVALID_FRAME_PAYLOAD_DATA = 1007 # Invalid UTF-8 or extension data
POLICY_VIOLATION = 1008 # Policy violation
MESSAGE_TOO_BIG = 1009 # Message too big
MANDATORY_EXTENSION = 1010 # Missing mandatory extension
INTERNAL_ERROR = 1011 # Internal server error
SERVICE_RESTART = 1012 # Service restart
TRY_AGAIN_LATER = 1013 # Try again later
BAD_GATEWAY = 1014 # Bad gateway
TLS_HANDSHAKE = 1015 # TLS handshake failure (reserved)
# Close code constants for convenience
CLOSE_CODES = {
1000: "Normal Closure",
1001: "Going Away",
1002: "Protocol Error",
1003: "Unsupported Data",
1005: "No Status Received",
1006: "Abnormal Closure",
1007: "Invalid Frame Payload Data",
1008: "Policy Violation",
1009: "Message Too Big",
1010: "Mandatory Extension",
1011: "Internal Error",
1012: "Service Restart",
1013: "Try Again Later",
1014: "Bad Gateway",
1015: "TLS Handshake"
}Common type aliases and definitions used throughout the WebSocket implementation.
# Core data types
Data = Union[str, bytes] # WebSocket message content (text or binary)
# Header-related types
HeadersLike = Union[Headers, Dict[str, str], List[Tuple[str, str]]]
# WebSocket protocol types
Origin = str # Origin header value
Subprotocol = str # WebSocket subprotocol name
ExtensionName = str # WebSocket extension name
ExtensionParameter = Tuple[str, Optional[str]] # Extension parameter (name, value)
# HTTP-related types
StatusLike = Union[int, HTTPStatus] # HTTP status code types
# Logging types
LoggerLike = Union[logging.Logger, logging.LoggerAdapter] # Logger compatibility
# Async types (for type hints)
if TYPE_CHECKING:
from typing import AsyncIterator, Awaitable, Callable, Coroutine
# Handler function types
AsyncHandler = Callable[[Any], Awaitable[None]]
SyncHandler = Callable[[Any], None]
# Connection types
ConnectionType = Union['ClientConnection', 'ServerConnection']from websockets.datastructures import Headers, MultipleValuesError
def headers_examples():
"""Demonstrate Headers usage."""
# Create headers in different ways
headers1 = Headers({"Host": "example.com", "Origin": "https://app.com"})
headers2 = Headers([("Content-Type", "text/html"), ("Accept", "text/html")])
headers3 = Headers(host="example.com", origin="https://app.com")
print("=== Basic Headers Operations ===")
# Case-insensitive access
headers = Headers({"Content-Type": "application/json"})
print(f"Content-Type: {headers['content-type']}") # Works!
print(f"CONTENT-TYPE: {headers['CONTENT-TYPE']}") # Also works!
# Check existence
print(f"Has Content-Type: {'content-type' in headers}")
# Get with default
print(f"Authorization: {headers.get('authorization', 'None')}")
# Add multiple values
headers.add("Accept", "text/html")
headers.add("Accept", "application/json")
print(f"All Accept values: {headers.get_all('accept')}")
# Iterate over headers
print("\nAll headers:")
for name, value in headers.items():
print(f" {name}: {value}")
# Copy and update
new_headers = headers.copy()
new_headers.update({"User-Agent": "MyApp/1.0", "Accept-Language": "en-US"})
print(f"\nUpdated headers count: {len(new_headers)}")
# Handle multiple values error
try:
# This would raise MultipleValuesError if header should be single-value
single_value = headers["accept"] # But Accept can have multiple values
print(f"Single Accept value: {single_value}")
except MultipleValuesError as e:
print(f"Multiple values error: {e}")
headers_examples()from websockets.datastructures import Frame, Opcode, Close, CloseCode
def frame_examples():
"""Demonstrate Frame and Close usage."""
print("=== WebSocket Frame Examples ===")
# Create different frame types
text_frame = Frame(Opcode.TEXT, b"Hello, WebSocket!", fin=True)
binary_frame = Frame(Opcode.BINARY, b"\x00\x01\x02\x03", fin=True)
ping_frame = Frame(Opcode.PING, b"ping-data")
pong_frame = Frame(Opcode.PONG, b"pong-data")
frames = [text_frame, binary_frame, ping_frame, pong_frame]
for frame in frames:
print(f"Frame: {frame}")
print(f" Opcode: {frame.opcode}")
print(f" Data length: {len(frame.data)}")
print(f" Is data frame: {frame.is_data_frame}")
print(f" Is control frame: {frame.is_control_frame}")
print()
# Close frame examples
print("=== Close Frame Examples ===")
# Create close frames
normal_close = Close(CloseCode.NORMAL_CLOSURE, "Normal shutdown")
error_close = Close(CloseCode.PROTOCOL_ERROR, "Invalid frame received")
close_frames = [normal_close, error_close]
for close_frame in close_frames:
print(f"Close frame: {close_frame}")
print(f" Code: {close_frame.code}")
print(f" Reason: {close_frame.reason}")
# Serialize and parse
serialized = close_frame.serialize()
parsed = Close.parse(serialized)
print(f" Serialized: {serialized.hex()}")
print(f" Parsed back: {parsed}")
print()
# Close code information
print("=== Close Codes ===")
for code, description in CLOSE_CODES.items():
print(f" {code}: {description}")
frame_examples()from websockets.datastructures import Data, HeadersLike, LoggerLike
import logging
from typing import Union, List, Tuple, Dict
def type_examples():
"""Demonstrate type usage."""
print("=== Type Usage Examples ===")
# Data type examples
def process_message(data: Data) -> str:
"""Process WebSocket message data."""
if isinstance(data, str):
return f"Text message: {data}"
elif isinstance(data, bytes):
return f"Binary message: {len(data)} bytes"
# Test with different data types
text_msg = "Hello, World!"
binary_msg = b"\x00\x01\x02\x03"
print(process_message(text_msg))
print(process_message(binary_msg))
# HeadersLike examples
def process_headers(headers: HeadersLike) -> Headers:
"""Process headers in various input formats."""
if isinstance(headers, Headers):
return headers
elif isinstance(headers, dict):
return Headers(headers)
elif isinstance(headers, list):
return Headers(headers)
else:
raise ValueError("Invalid headers format")
# Test different header formats
header_formats = [
{"Host": "example.com"},
[("Host", "example.com"), ("Origin", "https://app.com")],
Headers(host="example.com")
]
for headers in header_formats:
processed = process_headers(headers)
print(f"Processed headers: {dict(processed.items())}")
# Logger type examples
def setup_logging(logger: LoggerLike) -> None:
"""Setup logging with different logger types."""
if hasattr(logger, 'info'):
logger.info("Logger setup complete")
# Test with different logger types
standard_logger = logging.getLogger("test")
adapter_logger = logging.LoggerAdapter(standard_logger, {"context": "websocket"})
setup_logging(standard_logger)
setup_logging(adapter_logger)
type_examples()from websockets.datastructures import Headers, Frame, Opcode
from typing import NamedTuple, Optional
import json
class WebSocketMessage(NamedTuple):
"""Custom message structure for application-level messages."""
type: str
data: Union[str, bytes, dict]
timestamp: float
headers: Optional[Headers] = None
class MessageFrame:
"""Custom frame wrapper with application metadata."""
def __init__(self, frame: Frame, message_id: str = None):
self.frame = frame
self.message_id = message_id
self.processed_at = time.time()
@property
def is_text(self) -> bool:
return self.frame.opcode == Opcode.TEXT
@property
def is_binary(self) -> bool:
return self.frame.opcode == Opcode.BINARY
def to_json(self) -> str:
"""Convert frame to JSON representation."""
return json.dumps({
"message_id": self.message_id,
"opcode": self.frame.opcode.name,
"data_length": len(self.frame.data),
"processed_at": self.processed_at,
"fin": self.frame.fin
})
def custom_structures_example():
"""Demonstrate custom data structures."""
import time
print("=== Custom Data Structures ===")
# Create custom message
message = WebSocketMessage(
type="chat_message",
data={"user": "Alice", "text": "Hello everyone!"},
timestamp=time.time(),
headers=Headers({"Room": "general", "Priority": "normal"})
)
print(f"Message: {message}")
print(f"Message type: {message.type}")
print(f"Message headers: {dict(message.headers.items())}")
# Create custom frame wrapper
frame = Frame(Opcode.TEXT, json.dumps(message.data).encode())
wrapped_frame = MessageFrame(frame, message_id="msg_001")
print(f"\nWrapped frame: {wrapped_frame.to_json()}")
print(f"Is text frame: {wrapped_frame.is_text}")
print(f"Is binary frame: {wrapped_frame.is_binary}")
custom_structures_example()from websockets.datastructures import Opcode, CloseCode, DATA_OPCODES, CTRL_OPCODES
def protocol_constants_example():
"""Demonstrate protocol constants usage."""
print("=== WebSocket Protocol Constants ===")
# Opcode information
print("Data Opcodes:")
for opcode in DATA_OPCODES:
print(f" {opcode.name} ({opcode.value})")
print("\nControl Opcodes:")
for opcode in CTRL_OPCODES:
print(f" {opcode.name} ({opcode.value})")
# Frame type checking
def classify_frame(opcode: Opcode) -> str:
if opcode in DATA_OPCODES:
return "data"
elif opcode in CTRL_OPCODES:
return "control"
else:
return "unknown"
test_opcodes = [Opcode.TEXT, Opcode.BINARY, Opcode.CLOSE, Opcode.PING]
print(f"\nFrame Classification:")
for opcode in test_opcodes:
classification = classify_frame(opcode)
print(f" {opcode.name}: {classification}")
# Close code categories
def categorize_close_code(code: int) -> str:
if 1000 <= code <= 1003:
return "normal"
elif 1004 <= code <= 1006:
return "reserved"
elif 1007 <= code <= 1011:
return "error"
elif 1012 <= code <= 1014:
return "service"
elif code == 1015:
return "tls"
elif 3000 <= code <= 3999:
return "library"
elif 4000 <= code <= 4999:
return "application"
else:
return "invalid"
test_codes = [1000, 1002, 1006, 1011, 3000, 4000, 9999]
print(f"\nClose Code Categories:")
for code in test_codes:
category = categorize_close_code(code)
description = CLOSE_CODES.get(code, "Unknown")
print(f" {code} ({description}): {category}")
protocol_constants_example()Install with Tessl CLI
npx tessl i tessl/pypi-websockets