CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Comprehensive exception hierarchy for precise error handling covering connection lifecycle, protocol violations, handshake failures, validation errors, and network issues in WebSocket operations.

Capabilities

Base Exception Classes

Foundation exception classes that serve as the root of the WebSocket exception hierarchy.

class WebSocketException(Exception):
    """
    Base class for all WebSocket-specific exceptions.
    
    All other WebSocket exceptions inherit from this class,
    allowing catch-all exception handling for WebSocket operations.
    """
    
    def __init__(self, message: str = None):
        """
        Initialize WebSocket exception.

        Parameters:
        - message: Human-readable error description
        """
        super().__init__(message)

Connection Lifecycle Exceptions

Exceptions related to WebSocket connection establishment, maintenance, and closure.

class ConnectionClosed(WebSocketException):
    """
    Base exception for closed WebSocket connections.
    
    Raised when attempting operations on a closed connection
    or when connection closure is detected.
    """
    
    def __init__(self, rcvd: Close = None, sent: Close = None):
        """
        Initialize connection closed exception.

        Parameters:
        - rcvd: Close frame received from peer (if any)
        - sent: Close frame sent to peer (if any)
        """
        self.rcvd = rcvd
        self.sent = sent
        
        if rcvd:
            super().__init__(f"Connection closed: {rcvd.code} {rcvd.reason}")
        else:
            super().__init__("Connection closed")
    
    @property
    def code(self) -> int | None:
        """Get close code from received close frame."""
        return self.rcvd.code if self.rcvd else None
    
    @property
    def reason(self) -> str | None:
        """Get close reason from received close frame."""
        return self.rcvd.reason if self.rcvd else None

class ConnectionClosedOK(ConnectionClosed):
    """
    Normal WebSocket connection closure (close code 1000).
    
    Raised when connection is closed normally without errors,
    typically when client or server explicitly closes connection.
    """
    pass

class ConnectionClosedError(ConnectionClosed):
    """
    Abnormal WebSocket connection closure (close code != 1000).
    
    Raised when connection is closed due to errors, protocol violations,
    or unexpected network conditions.
    """
    pass

Handshake Exceptions

Exceptions that occur during the WebSocket handshake process, including validation and negotiation failures.

class InvalidHandshake(WebSocketException):
    """
    Base exception for WebSocket handshake failures.
    
    Raised when handshake process fails due to invalid headers,
    unsupported protocols, or security policy violations.
    """
    pass

class SecurityError(InvalidHandshake):
    """
    Security policy violation during handshake.
    
    Raised when connection attempt violates security policies
    such as origin restrictions or certificate validation.
    """
    pass

class InvalidMessage(InvalidHandshake):
    """
    Malformed handshake message.
    
    Raised when HTTP handshake message is malformed or
    doesn't conform to WebSocket upgrade requirements.
    """
    pass

class InvalidStatus(InvalidHandshake):
    """
    WebSocket upgrade rejection.
    
    Raised when server rejects WebSocket upgrade request
    with non-101 HTTP status code.
    """
    
    def __init__(self, status_code: int, headers: Headers = None):
        """
        Initialize invalid status exception.

        Parameters:
        - status_code: HTTP status code received
        - headers: HTTP response headers
        """
        self.status_code = status_code
        self.headers = headers or Headers()
        super().__init__(f"HTTP {status_code}")

class InvalidHeader(InvalidHandshake):
    """
    Invalid HTTP header in handshake.
    
    Raised when handshake contains invalid or missing
    required HTTP headers.
    """
    
    def __init__(self, name: str, value: str = None):
        """
        Initialize invalid header exception.

        Parameters:
        - name: Header name that is invalid
        - value: Header value (if applicable)
        """
        self.name = name
        self.value = value
        
        if value:
            super().__init__(f"Invalid header {name}: {value}")
        else:
            super().__init__(f"Invalid header: {name}")

class InvalidHeaderFormat(InvalidHeader):
    """
    Header parsing failure.
    
    Raised when HTTP header cannot be parsed due to
    incorrect format or encoding issues.
    """
    pass

class InvalidHeaderValue(InvalidHeader):
    """
    Semantically invalid header value.
    
    Raised when header value is syntactically valid but
    semantically incorrect for WebSocket handshake.
    """
    pass

class InvalidOrigin(InvalidHandshake):
    """
    Disallowed origin header.
    
    Raised when Origin header doesn't match server's
    allowed origins policy.
    """
    
    def __init__(self, origin: str):
        """
        Initialize invalid origin exception.

        Parameters:
        - origin: The disallowed origin value
        """
        self.origin = origin
        super().__init__(f"Invalid origin: {origin}")

class InvalidUpgrade(InvalidHandshake):
    """
    Incorrect WebSocket upgrade headers.
    
    Raised when required WebSocket upgrade headers
    are missing or have incorrect values.
    """
    pass

class NegotiationError(InvalidHandshake):
    """
    Extension or subprotocol negotiation failure.
    
    Raised when client and server cannot agree on
    WebSocket extensions or subprotocols.
    """
    pass

Extension and Parameter Exceptions

Exceptions related to WebSocket extension processing and parameter validation.

class DuplicateParameter(InvalidHandshake):
    """
    Duplicate extension parameter.
    
    Raised when WebSocket extension contains
    duplicate parameter names.
    """
    
    def __init__(self, name: str):
        """
        Initialize duplicate parameter exception.

        Parameters:
        - name: Parameter name that is duplicated
        """
        self.name = name
        super().__init__(f"Duplicate parameter: {name}")

class InvalidParameterName(InvalidHandshake):
    """
    Invalid extension parameter name.
    
    Raised when extension parameter name doesn't
    conform to specification requirements.
    """
    
    def __init__(self, name: str):
        """
        Initialize invalid parameter name exception.

        Parameters:
        - name: Invalid parameter name
        """
        self.name = name
        super().__init__(f"Invalid parameter name: {name}")

class InvalidParameterValue(InvalidHandshake):
    """
    Invalid extension parameter value.
    
    Raised when extension parameter value is
    invalid for the specific parameter.
    """
    
    def __init__(self, name: str, value: str):
        """
        Initialize invalid parameter value exception.

        Parameters:
        - name: Parameter name
        - value: Invalid parameter value
        """
        self.name = name
        self.value = value
        super().__init__(f"Invalid parameter value {name}: {value}")

Network and URI Exceptions

Exceptions related to network connectivity, proxy handling, and URI validation.

class InvalidURI(WebSocketException):
    """
    Invalid WebSocket URI format.
    
    Raised when WebSocket URI is malformed or
    uses unsupported scheme or format.
    """
    
    def __init__(self, uri: str, reason: str = None):
        """
        Initialize invalid URI exception.

        Parameters:
        - uri: The invalid URI
        - reason: Specific reason for invalidity
        """
        self.uri = uri
        self.reason = reason
        
        if reason:
            super().__init__(f"Invalid URI {uri}: {reason}")
        else:
            super().__init__(f"Invalid URI: {uri}")

class ProxyError(InvalidHandshake):
    """
    Proxy connection failure.
    
    Raised when connection through HTTP/SOCKS proxy
    fails or proxy rejects the request.
    """
    pass

class InvalidProxy(ProxyError):
    """
    Invalid proxy configuration.
    
    Raised when proxy settings are malformed or
    proxy server information is invalid.
    """
    pass

class InvalidProxyMessage(ProxyError):
    """
    Malformed proxy response.
    
    Raised when proxy server returns malformed
    HTTP response during connection establishment.
    """
    pass

class InvalidProxyStatus(ProxyError):
    """
    Proxy rejection or error status.
    
    Raised when proxy server rejects connection
    request with error status code.
    """
    
    def __init__(self, status_code: int, reason: str = None):
        """
        Initialize invalid proxy status exception.

        Parameters:
        - status_code: HTTP status code from proxy
        - reason: Optional reason phrase
        """
        self.status_code = status_code
        self.reason = reason
        
        if reason:
            super().__init__(f"Proxy error {status_code}: {reason}")
        else:
            super().__init__(f"Proxy error: {status_code}")

Protocol Violations

Exceptions for WebSocket protocol violations and operational errors.

class ProtocolError(WebSocketException):
    """
    WebSocket protocol violation.
    
    Raised when received data violates WebSocket protocol
    specification or when invalid operations are attempted.
    """
    pass

class PayloadTooBig(ProtocolError):
    """
    Message exceeds size limits.
    
    Raised when received message or frame exceeds
    configured maximum size limits.
    """
    
    def __init__(self, size: int, max_size: int):
        """
        Initialize payload too big exception.

        Parameters:
        - size: Actual payload size
        - max_size: Maximum allowed size
        """
        self.size = size
        self.max_size = max_size
        super().__init__(f"Payload too big: {size} > {max_size}")

class InvalidState(ProtocolError):
    """
    Invalid operation for current connection state.
    
    Raised when attempting operations that are not
    valid for the current WebSocket connection state.
    """
    
    def __init__(self, operation: str, state: str):
        """
        Initialize invalid state exception.

        Parameters:
        - operation: Operation that was attempted
        - state: Current connection state
        """
        self.operation = operation
        self.state = state
        super().__init__(f"Cannot {operation} in state {state}")

class ConcurrencyError(ProtocolError):
    """
    Concurrent read/write operations detected.
    
    Raised when multiple coroutines attempt to read from
    or write to the same WebSocket connection simultaneously.
    """
    
    def __init__(self, operation: str):
        """
        Initialize concurrency error exception.

        Parameters:
        - operation: Operation that detected concurrency issue
        """
        self.operation = operation
        super().__init__(f"Concurrent {operation} operations")

Usage Examples

Basic Exception Handling

import asyncio
from websockets import connect, ConnectionClosed, InvalidURI, ProtocolError

async def basic_exception_handling():
    """Demonstrate basic WebSocket exception handling."""
    try:
        async with connect("ws://localhost:8765") as websocket:
            # Send and receive messages
            await websocket.send("Hello, Server!")
            response = await websocket.recv()
            print(f"Received: {response}")
            
    except ConnectionClosed as e:
        print(f"Connection closed: code={e.code}, reason={e.reason}")
        
        # Check if closure was normal or error
        if isinstance(e, ConnectionClosedOK):
            print("Connection closed normally")
        elif isinstance(e, ConnectionClosedError):
            print("Connection closed with error")
            
    except InvalidURI as e:
        print(f"Invalid WebSocket URI: {e.uri}")
        if e.reason:
            print(f"Reason: {e.reason}")
            
    except ProtocolError as e:
        print(f"Protocol error: {e}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")

asyncio.run(basic_exception_handling())

Handshake Exception Handling

import asyncio
from websockets import connect
from websockets import (
    InvalidHandshake, SecurityError, InvalidStatus, InvalidOrigin,
    NegotiationError, InvalidHeader
)

async def handshake_exception_handling():
    """Handle various handshake failures."""
    test_cases = [
        ("ws://localhost:8765", None),  # Normal case
        ("ws://invalid-server:9999", "Connection failed"),
        ("wss://expired-cert.example.com", "Certificate error"),
    ]
    
    for uri, expected_error in test_cases:
        try:
            print(f"Connecting to {uri}...")
            
            async with connect(
                uri,
                additional_headers={"Origin": "https://myapp.com"},
                subprotocols=["chat", "notifications"],
                open_timeout=5
            ) as websocket:
                await websocket.send("Connection successful")
                response = await websocket.recv()
                print(f"Success: {response}")
                
        except SecurityError as e:
            print(f"Security error: {e}")
            
        except InvalidStatus as e:
            print(f"Server rejected connection: HTTP {e.status_code}")
            print(f"Response headers: {dict(e.headers)}")
            
        except InvalidOrigin as e:
            print(f"Origin rejected: {e.origin}")
            
        except InvalidHeader as e:
            print(f"Invalid header: {e.name} = {e.value}")
            
        except NegotiationError as e:
            print(f"Failed to negotiate extensions/subprotocols: {e}")
            
        except InvalidHandshake as e:
            print(f"Handshake failed: {e}")
            
        except Exception as e:
            print(f"Other error: {e}")
        
        print()

asyncio.run(handshake_exception_handling())

Connection State Exception Handling

import asyncio
from websockets import connect, InvalidState, ConcurrencyError, PayloadTooBig

async def state_exception_handling():
    """Handle connection state and operational errors."""
    try:
        async with connect(
            "ws://localhost:8765",
            max_size=1024  # 1KB max message size
        ) as websocket:
            
            # Test large message (should raise PayloadTooBig)
            try:
                large_message = "x" * 2048  # 2KB message
                await websocket.send(large_message)
            except PayloadTooBig as e:
                print(f"Message too large: {e.size} > {e.max_size}")
            
            # Test concurrent operations
            try:
                # This would cause concurrency error if attempted
                # await asyncio.gather(
                #     websocket.recv(),
                #     websocket.recv()  # Concurrent recv operations
                # )
                pass
            except ConcurrencyError as e:
                print(f"Concurrency error: {e.operation}")
            
            # Normal operation
            await websocket.send("Hello")
            response = await websocket.recv()
            print(f"Normal operation: {response}")
            
            # Test operation after close
            await websocket.close()
            
            try:
                await websocket.send("This should fail")
            except InvalidState as e:
                print(f"Invalid state: {e.operation} in {e.state}")
                
    except Exception as e:
        print(f"Connection error: {e}")

asyncio.run(state_exception_handling())

Robust Error Recovery

import asyncio
import logging
from websockets import connect, WebSocketException, ConnectionClosed

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def robust_client_with_recovery():
    """Client with comprehensive error handling and recovery."""
    uri = "ws://localhost:8765"
    max_retries = 3
    retry_delay = 1
    
    for attempt in range(max_retries):
        try:
            logger.info(f"Connection attempt {attempt + 1}")
            
            async with connect(uri, ping_interval=20, ping_timeout=10) as websocket:
                logger.info("Connected successfully")
                
                # Main message loop with error recovery
                message_count = 0
                error_count = 0
                
                while error_count < 5:  # Max 5 consecutive errors
                    try:
                        # Send periodic messages
                        message = f"Message {message_count}"
                        await websocket.send(message)
                        message_count += 1
                        
                        # Wait for response with timeout
                        try:
                            response = await asyncio.wait_for(
                                websocket.recv(), 
                                timeout=30
                            )
                            logger.info(f"Received: {response}")
                            error_count = 0  # Reset error count on success
                            
                        except asyncio.TimeoutError:
                            logger.warning("Response timeout")
                            error_count += 1
                            continue
                        
                        # Wait before next message
                        await asyncio.sleep(5)
                        
                    except ConnectionClosed as e:
                        logger.warning(f"Connection closed: {e.code} {e.reason}")
                        break
                        
                    except WebSocketException as e:
                        logger.error(f"WebSocket error: {e}")
                        error_count += 1
                        
                        if error_count < 5:
                            await asyncio.sleep(1)  # Brief pause before retry
                        
                    except Exception as e:
                        logger.error(f"Unexpected error: {e}")
                        error_count += 1
                
                if error_count >= 5:
                    logger.error("Too many consecutive errors, giving up")
                    break
                    
        except WebSocketException as e:
            logger.error(f"WebSocket connection failed: {e}")
            
        except Exception as e:
            logger.error(f"Unexpected connection error: {e}")
        
        # Retry logic
        if attempt < max_retries - 1:
            logger.info(f"Retrying in {retry_delay} seconds...")
            await asyncio.sleep(retry_delay)
            retry_delay *= 2  # Exponential backoff
        else:
            logger.error("Max retries exceeded")

asyncio.run(robust_client_with_recovery())

Exception Classification and Logging

import asyncio
import logging
from websockets import connect
from websockets import (
    WebSocketException, ConnectionClosed, InvalidHandshake, ProtocolError,
    SecurityError, InvalidURI, ProxyError
)

class WebSocketErrorHandler:
    """Centralized WebSocket error handling and logging."""
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        
    def classify_error(self, exception: Exception) -> str:
        """Classify exception into error categories."""
        if isinstance(exception, ConnectionClosed):
            return "connection_closed"
        elif isinstance(exception, SecurityError):
            return "security_error"
        elif isinstance(exception, InvalidHandshake):
            return "handshake_error"
        elif isinstance(exception, ProtocolError):
            return "protocol_error"
        elif isinstance(exception, ProxyError):
            return "proxy_error"
        elif isinstance(exception, InvalidURI):
            return "uri_error"
        elif isinstance(exception, WebSocketException):
            return "websocket_error"
        else:
            return "unknown_error"
    
    def should_retry(self, exception: Exception) -> bool:
        """Determine if operation should be retried."""
        category = self.classify_error(exception)
        
        # Don't retry on these error types
        no_retry_categories = {
            "security_error", "uri_error", "handshake_error"
        }
        
        return category not in no_retry_categories
    
    def log_error(self, exception: Exception, context: str = ""):
        """Log error with appropriate level and details."""
        category = self.classify_error(exception)
        
        if category == "connection_closed":
            if isinstance(exception, ConnectionClosedOK):
                self.logger.info(f"{context}: Connection closed normally")
            else:
                self.logger.warning(f"{context}: {exception}")
                
        elif category in ["security_error", "protocol_error"]:
            self.logger.error(f"{context}: {category}: {exception}")
            
        elif category in ["handshake_error", "proxy_error"]:
            self.logger.warning(f"{context}: {category}: {exception}")
            
        else:
            self.logger.error(f"{context}: {category}: {exception}")

async def error_handling_example():
    """Demonstrate centralized error handling."""
    error_handler = WebSocketErrorHandler()
    
    test_uris = [
        "ws://localhost:8765",
        "ws://invalid-host:9999",
        "wss://expired-cert.example.com",
        "invalid-uri",
    ]
    
    for uri in test_uris:
        try:
            context = f"Connecting to {uri}"
            
            async with connect(uri, open_timeout=5) as websocket:
                await websocket.send("Test message")
                response = await websocket.recv()
                print(f"Success: {response}")
                
        except Exception as e:
            error_handler.log_error(e, context)
            
            if error_handler.should_retry(e):
                print(f"Error is retryable: {error_handler.classify_error(e)}")
            else:
                print(f"Error is not retryable: {error_handler.classify_error(e)}")

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

asyncio.run(error_handling_example())

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json