An implementation of the WebSocket Protocol (RFC 6455 & 7692)
Comprehensive exception hierarchy for precise error handling covering connection lifecycle, protocol violations, handshake failures, validation errors, and network issues in WebSocket operations.
Foundation exception classes that serve as the root of the WebSocket exception hierarchy.
class WebSocketException(Exception):
"""
Base class for all WebSocket-specific exceptions.
All other WebSocket exceptions inherit from this class,
allowing catch-all exception handling for WebSocket operations.
"""
def __init__(self, message: str = None):
"""
Initialize WebSocket exception.
Parameters:
- message: Human-readable error description
"""
super().__init__(message)Exceptions related to WebSocket connection establishment, maintenance, and closure.
class ConnectionClosed(WebSocketException):
"""
Base exception for closed WebSocket connections.
Raised when attempting operations on a closed connection
or when connection closure is detected.
"""
def __init__(self, rcvd: Close = None, sent: Close = None):
"""
Initialize connection closed exception.
Parameters:
- rcvd: Close frame received from peer (if any)
- sent: Close frame sent to peer (if any)
"""
self.rcvd = rcvd
self.sent = sent
if rcvd:
super().__init__(f"Connection closed: {rcvd.code} {rcvd.reason}")
else:
super().__init__("Connection closed")
@property
def code(self) -> int | None:
"""Get close code from received close frame."""
return self.rcvd.code if self.rcvd else None
@property
def reason(self) -> str | None:
"""Get close reason from received close frame."""
return self.rcvd.reason if self.rcvd else None
class ConnectionClosedOK(ConnectionClosed):
"""
Normal WebSocket connection closure (close code 1000).
Raised when connection is closed normally without errors,
typically when client or server explicitly closes connection.
"""
pass
class ConnectionClosedError(ConnectionClosed):
"""
Abnormal WebSocket connection closure (close code != 1000).
Raised when connection is closed due to errors, protocol violations,
or unexpected network conditions.
"""
passExceptions that occur during the WebSocket handshake process, including validation and negotiation failures.
class InvalidHandshake(WebSocketException):
"""
Base exception for WebSocket handshake failures.
Raised when handshake process fails due to invalid headers,
unsupported protocols, or security policy violations.
"""
pass
class SecurityError(InvalidHandshake):
"""
Security policy violation during handshake.
Raised when connection attempt violates security policies
such as origin restrictions or certificate validation.
"""
pass
class InvalidMessage(InvalidHandshake):
"""
Malformed handshake message.
Raised when HTTP handshake message is malformed or
doesn't conform to WebSocket upgrade requirements.
"""
pass
class InvalidStatus(InvalidHandshake):
"""
WebSocket upgrade rejection.
Raised when server rejects WebSocket upgrade request
with non-101 HTTP status code.
"""
def __init__(self, status_code: int, headers: Headers = None):
"""
Initialize invalid status exception.
Parameters:
- status_code: HTTP status code received
- headers: HTTP response headers
"""
self.status_code = status_code
self.headers = headers or Headers()
super().__init__(f"HTTP {status_code}")
class InvalidHeader(InvalidHandshake):
"""
Invalid HTTP header in handshake.
Raised when handshake contains invalid or missing
required HTTP headers.
"""
def __init__(self, name: str, value: str = None):
"""
Initialize invalid header exception.
Parameters:
- name: Header name that is invalid
- value: Header value (if applicable)
"""
self.name = name
self.value = value
if value:
super().__init__(f"Invalid header {name}: {value}")
else:
super().__init__(f"Invalid header: {name}")
class InvalidHeaderFormat(InvalidHeader):
"""
Header parsing failure.
Raised when HTTP header cannot be parsed due to
incorrect format or encoding issues.
"""
pass
class InvalidHeaderValue(InvalidHeader):
"""
Semantically invalid header value.
Raised when header value is syntactically valid but
semantically incorrect for WebSocket handshake.
"""
pass
class InvalidOrigin(InvalidHandshake):
"""
Disallowed origin header.
Raised when Origin header doesn't match server's
allowed origins policy.
"""
def __init__(self, origin: str):
"""
Initialize invalid origin exception.
Parameters:
- origin: The disallowed origin value
"""
self.origin = origin
super().__init__(f"Invalid origin: {origin}")
class InvalidUpgrade(InvalidHandshake):
"""
Incorrect WebSocket upgrade headers.
Raised when required WebSocket upgrade headers
are missing or have incorrect values.
"""
pass
class NegotiationError(InvalidHandshake):
"""
Extension or subprotocol negotiation failure.
Raised when client and server cannot agree on
WebSocket extensions or subprotocols.
"""
passExceptions related to WebSocket extension processing and parameter validation.
class DuplicateParameter(InvalidHandshake):
"""
Duplicate extension parameter.
Raised when WebSocket extension contains
duplicate parameter names.
"""
def __init__(self, name: str):
"""
Initialize duplicate parameter exception.
Parameters:
- name: Parameter name that is duplicated
"""
self.name = name
super().__init__(f"Duplicate parameter: {name}")
class InvalidParameterName(InvalidHandshake):
"""
Invalid extension parameter name.
Raised when extension parameter name doesn't
conform to specification requirements.
"""
def __init__(self, name: str):
"""
Initialize invalid parameter name exception.
Parameters:
- name: Invalid parameter name
"""
self.name = name
super().__init__(f"Invalid parameter name: {name}")
class InvalidParameterValue(InvalidHandshake):
"""
Invalid extension parameter value.
Raised when extension parameter value is
invalid for the specific parameter.
"""
def __init__(self, name: str, value: str):
"""
Initialize invalid parameter value exception.
Parameters:
- name: Parameter name
- value: Invalid parameter value
"""
self.name = name
self.value = value
super().__init__(f"Invalid parameter value {name}: {value}")Exceptions related to network connectivity, proxy handling, and URI validation.
class InvalidURI(WebSocketException):
"""
Invalid WebSocket URI format.
Raised when WebSocket URI is malformed or
uses unsupported scheme or format.
"""
def __init__(self, uri: str, reason: str = None):
"""
Initialize invalid URI exception.
Parameters:
- uri: The invalid URI
- reason: Specific reason for invalidity
"""
self.uri = uri
self.reason = reason
if reason:
super().__init__(f"Invalid URI {uri}: {reason}")
else:
super().__init__(f"Invalid URI: {uri}")
class ProxyError(InvalidHandshake):
"""
Proxy connection failure.
Raised when connection through HTTP/SOCKS proxy
fails or proxy rejects the request.
"""
pass
class InvalidProxy(ProxyError):
"""
Invalid proxy configuration.
Raised when proxy settings are malformed or
proxy server information is invalid.
"""
pass
class InvalidProxyMessage(ProxyError):
"""
Malformed proxy response.
Raised when proxy server returns malformed
HTTP response during connection establishment.
"""
pass
class InvalidProxyStatus(ProxyError):
"""
Proxy rejection or error status.
Raised when proxy server rejects connection
request with error status code.
"""
def __init__(self, status_code: int, reason: str = None):
"""
Initialize invalid proxy status exception.
Parameters:
- status_code: HTTP status code from proxy
- reason: Optional reason phrase
"""
self.status_code = status_code
self.reason = reason
if reason:
super().__init__(f"Proxy error {status_code}: {reason}")
else:
super().__init__(f"Proxy error: {status_code}")Exceptions for WebSocket protocol violations and operational errors.
class ProtocolError(WebSocketException):
"""
WebSocket protocol violation.
Raised when received data violates WebSocket protocol
specification or when invalid operations are attempted.
"""
pass
class PayloadTooBig(ProtocolError):
"""
Message exceeds size limits.
Raised when received message or frame exceeds
configured maximum size limits.
"""
def __init__(self, size: int, max_size: int):
"""
Initialize payload too big exception.
Parameters:
- size: Actual payload size
- max_size: Maximum allowed size
"""
self.size = size
self.max_size = max_size
super().__init__(f"Payload too big: {size} > {max_size}")
class InvalidState(ProtocolError):
"""
Invalid operation for current connection state.
Raised when attempting operations that are not
valid for the current WebSocket connection state.
"""
def __init__(self, operation: str, state: str):
"""
Initialize invalid state exception.
Parameters:
- operation: Operation that was attempted
- state: Current connection state
"""
self.operation = operation
self.state = state
super().__init__(f"Cannot {operation} in state {state}")
class ConcurrencyError(ProtocolError):
"""
Concurrent read/write operations detected.
Raised when multiple coroutines attempt to read from
or write to the same WebSocket connection simultaneously.
"""
def __init__(self, operation: str):
"""
Initialize concurrency error exception.
Parameters:
- operation: Operation that detected concurrency issue
"""
self.operation = operation
super().__init__(f"Concurrent {operation} operations")import asyncio
from websockets import connect, ConnectionClosed, InvalidURI, ProtocolError
async def basic_exception_handling():
"""Demonstrate basic WebSocket exception handling."""
try:
async with connect("ws://localhost:8765") as websocket:
# Send and receive messages
await websocket.send("Hello, Server!")
response = await websocket.recv()
print(f"Received: {response}")
except ConnectionClosed as e:
print(f"Connection closed: code={e.code}, reason={e.reason}")
# Check if closure was normal or error
if isinstance(e, ConnectionClosedOK):
print("Connection closed normally")
elif isinstance(e, ConnectionClosedError):
print("Connection closed with error")
except InvalidURI as e:
print(f"Invalid WebSocket URI: {e.uri}")
if e.reason:
print(f"Reason: {e.reason}")
except ProtocolError as e:
print(f"Protocol error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(basic_exception_handling())import asyncio
from websockets import connect
from websockets import (
InvalidHandshake, SecurityError, InvalidStatus, InvalidOrigin,
NegotiationError, InvalidHeader
)
async def handshake_exception_handling():
"""Handle various handshake failures."""
test_cases = [
("ws://localhost:8765", None), # Normal case
("ws://invalid-server:9999", "Connection failed"),
("wss://expired-cert.example.com", "Certificate error"),
]
for uri, expected_error in test_cases:
try:
print(f"Connecting to {uri}...")
async with connect(
uri,
additional_headers={"Origin": "https://myapp.com"},
subprotocols=["chat", "notifications"],
open_timeout=5
) as websocket:
await websocket.send("Connection successful")
response = await websocket.recv()
print(f"Success: {response}")
except SecurityError as e:
print(f"Security error: {e}")
except InvalidStatus as e:
print(f"Server rejected connection: HTTP {e.status_code}")
print(f"Response headers: {dict(e.headers)}")
except InvalidOrigin as e:
print(f"Origin rejected: {e.origin}")
except InvalidHeader as e:
print(f"Invalid header: {e.name} = {e.value}")
except NegotiationError as e:
print(f"Failed to negotiate extensions/subprotocols: {e}")
except InvalidHandshake as e:
print(f"Handshake failed: {e}")
except Exception as e:
print(f"Other error: {e}")
print()
asyncio.run(handshake_exception_handling())import asyncio
from websockets import connect, InvalidState, ConcurrencyError, PayloadTooBig
async def state_exception_handling():
"""Handle connection state and operational errors."""
try:
async with connect(
"ws://localhost:8765",
max_size=1024 # 1KB max message size
) as websocket:
# Test large message (should raise PayloadTooBig)
try:
large_message = "x" * 2048 # 2KB message
await websocket.send(large_message)
except PayloadTooBig as e:
print(f"Message too large: {e.size} > {e.max_size}")
# Test concurrent operations
try:
# This would cause concurrency error if attempted
# await asyncio.gather(
# websocket.recv(),
# websocket.recv() # Concurrent recv operations
# )
pass
except ConcurrencyError as e:
print(f"Concurrency error: {e.operation}")
# Normal operation
await websocket.send("Hello")
response = await websocket.recv()
print(f"Normal operation: {response}")
# Test operation after close
await websocket.close()
try:
await websocket.send("This should fail")
except InvalidState as e:
print(f"Invalid state: {e.operation} in {e.state}")
except Exception as e:
print(f"Connection error: {e}")
asyncio.run(state_exception_handling())import asyncio
import logging
from websockets import connect, WebSocketException, ConnectionClosed
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def robust_client_with_recovery():
"""Client with comprehensive error handling and recovery."""
uri = "ws://localhost:8765"
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
logger.info(f"Connection attempt {attempt + 1}")
async with connect(uri, ping_interval=20, ping_timeout=10) as websocket:
logger.info("Connected successfully")
# Main message loop with error recovery
message_count = 0
error_count = 0
while error_count < 5: # Max 5 consecutive errors
try:
# Send periodic messages
message = f"Message {message_count}"
await websocket.send(message)
message_count += 1
# Wait for response with timeout
try:
response = await asyncio.wait_for(
websocket.recv(),
timeout=30
)
logger.info(f"Received: {response}")
error_count = 0 # Reset error count on success
except asyncio.TimeoutError:
logger.warning("Response timeout")
error_count += 1
continue
# Wait before next message
await asyncio.sleep(5)
except ConnectionClosed as e:
logger.warning(f"Connection closed: {e.code} {e.reason}")
break
except WebSocketException as e:
logger.error(f"WebSocket error: {e}")
error_count += 1
if error_count < 5:
await asyncio.sleep(1) # Brief pause before retry
except Exception as e:
logger.error(f"Unexpected error: {e}")
error_count += 1
if error_count >= 5:
logger.error("Too many consecutive errors, giving up")
break
except WebSocketException as e:
logger.error(f"WebSocket connection failed: {e}")
except Exception as e:
logger.error(f"Unexpected connection error: {e}")
# Retry logic
if attempt < max_retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error("Max retries exceeded")
asyncio.run(robust_client_with_recovery())import asyncio
import logging
from websockets import connect
from websockets import (
WebSocketException, ConnectionClosed, InvalidHandshake, ProtocolError,
SecurityError, InvalidURI, ProxyError
)
class WebSocketErrorHandler:
"""Centralized WebSocket error handling and logging."""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def classify_error(self, exception: Exception) -> str:
"""Classify exception into error categories."""
if isinstance(exception, ConnectionClosed):
return "connection_closed"
elif isinstance(exception, SecurityError):
return "security_error"
elif isinstance(exception, InvalidHandshake):
return "handshake_error"
elif isinstance(exception, ProtocolError):
return "protocol_error"
elif isinstance(exception, ProxyError):
return "proxy_error"
elif isinstance(exception, InvalidURI):
return "uri_error"
elif isinstance(exception, WebSocketException):
return "websocket_error"
else:
return "unknown_error"
def should_retry(self, exception: Exception) -> bool:
"""Determine if operation should be retried."""
category = self.classify_error(exception)
# Don't retry on these error types
no_retry_categories = {
"security_error", "uri_error", "handshake_error"
}
return category not in no_retry_categories
def log_error(self, exception: Exception, context: str = ""):
"""Log error with appropriate level and details."""
category = self.classify_error(exception)
if category == "connection_closed":
if isinstance(exception, ConnectionClosedOK):
self.logger.info(f"{context}: Connection closed normally")
else:
self.logger.warning(f"{context}: {exception}")
elif category in ["security_error", "protocol_error"]:
self.logger.error(f"{context}: {category}: {exception}")
elif category in ["handshake_error", "proxy_error"]:
self.logger.warning(f"{context}: {category}: {exception}")
else:
self.logger.error(f"{context}: {category}: {exception}")
async def error_handling_example():
"""Demonstrate centralized error handling."""
error_handler = WebSocketErrorHandler()
test_uris = [
"ws://localhost:8765",
"ws://invalid-host:9999",
"wss://expired-cert.example.com",
"invalid-uri",
]
for uri in test_uris:
try:
context = f"Connecting to {uri}"
async with connect(uri, open_timeout=5) as websocket:
await websocket.send("Test message")
response = await websocket.recv()
print(f"Success: {response}")
except Exception as e:
error_handler.log_error(e, context)
if error_handler.should_retry(e):
print(f"Error is retryable: {error_handler.classify_error(e)}")
else:
print(f"Error is not retryable: {error_handler.classify_error(e)}")
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
asyncio.run(error_handling_example())Install with Tessl CLI
npx tessl i tessl/pypi-websockets