WebSocket client for Python with low level API options
Comprehensive exception hierarchy for WebSocket-specific error conditions, connection management, and protocol violations. All exceptions inherit from the base WebSocketException class for consistent error handling patterns.
Root exception class for all WebSocket-related errors.
class WebSocketException(Exception):
"""
Base class for all WebSocket exceptions.
Provides common interface for WebSocket-specific error handling
and allows catching all WebSocket errors with single except clause.
"""
passExceptions raised when WebSocket protocol rules are violated.
class WebSocketProtocolException(WebSocketException):
"""
Raised when WebSocket protocol is violated.
Indicates frame format errors, invalid opcodes, reserved bit usage,
or other protocol-level violations that prevent proper communication.
"""
pass
class WebSocketPayloadException(WebSocketException):
"""
Raised when WebSocket payload is invalid.
Indicates UTF-8 encoding errors in text frames, invalid close frame
payloads, or other data-specific validation failures.
"""
passExceptions related to WebSocket connection lifecycle and state management.
class WebSocketConnectionClosedException(WebSocketException):
"""
Raised when operation attempted on closed connection.
Indicates the remote host closed the connection, network connection
was lost, or operation was attempted after connection closure.
"""
pass
class WebSocketTimeoutException(WebSocketException):
"""
Raised when socket operations exceed timeout limits.
Occurs during read/write operations, connection establishment,
or other time-sensitive WebSocket operations.
"""
passExceptions related to network connectivity and proxy configuration.
class WebSocketProxyException(WebSocketException):
"""
Raised when proxy-related errors occur.
Indicates proxy connection failures, authentication errors,
or proxy configuration problems.
"""
pass
class WebSocketAddressException(WebSocketException):
"""
Raised when WebSocket address cannot be resolved.
Indicates DNS resolution failures or invalid URL formats
that prevent connection establishment.
"""
passException for WebSocket handshake failures with detailed HTTP response information.
class WebSocketBadStatusException(WebSocketException):
"""
Raised when WebSocket handshake receives bad HTTP status code.
Provides access to HTTP response details for debugging
handshake failures and server-side errors.
"""
def __init__(
self,
message: str,
status_code: int,
status_message=None,
resp_headers=None,
resp_body=None,
):
"""
Initialize bad status exception with HTTP response details.
Parameters:
- message: Error description
- status_code: HTTP status code from handshake response
- status_message: HTTP status message (optional)
- resp_headers: HTTP response headers dictionary (optional)
- resp_body: HTTP response body content (optional)
"""
super().__init__(message)
self.status_code = status_code
self.resp_headers = resp_headers
self.resp_body = resp_bodyfrom websocket import (
create_connection,
WebSocketException,
WebSocketConnectionClosedException,
WebSocketTimeoutException
)
try:
ws = create_connection("ws://echo.websocket.events/", timeout=5)
ws.send("Hello")
response = ws.recv()
print(f"Received: {response}")
ws.close()
except WebSocketConnectionClosedException:
print("Connection was closed unexpectedly")
except WebSocketTimeoutException:
print("Connection timed out")
except WebSocketException as e:
print(f"WebSocket error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")from websocket import create_connection, WebSocketBadStatusException
try:
ws = create_connection("ws://example.com/protected-endpoint")
except WebSocketBadStatusException as e:
print(f"Handshake failed: {e}")
print(f"HTTP Status: {e.status_code}")
if e.resp_headers:
print("Response headers:")
for header, value in e.resp_headers.items():
print(f" {header}: {value}")
if e.resp_body:
print(f"Response body: {e.resp_body}")
# Handle specific status codes
if e.status_code == 401:
print("Authentication required")
elif e.status_code == 403:
print("Access forbidden")
elif e.status_code == 404:
print("WebSocket endpoint not found")
elif e.status_code >= 500:
print("Server error occurred")
except Exception as e:
print(f"Other error: {e}")from websocket import (
WebSocket,
WebSocketProtocolException,
WebSocketPayloadException,
ABNF
)
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
try:
# Attempt to send invalid frame
invalid_frame = ABNF(
fin=1,
rsv1=1, # Invalid - reserved bit set
opcode=ABNF.OPCODE_TEXT,
data="test"
)
invalid_frame.validate()
except WebSocketProtocolException as e:
print(f"Protocol violation: {e}")
try:
# Send invalid UTF-8 in text frame
invalid_utf8 = b'\xff\xfe' # Invalid UTF-8 sequence
ws.send(invalid_utf8.decode('utf-8', errors='ignore'), ABNF.OPCODE_TEXT)
except WebSocketPayloadException as e:
print(f"Payload error: {e}")
except UnicodeDecodeError:
print("Cannot decode invalid UTF-8")
ws.close()from websocket import WebSocket, WebSocketConnectionClosedException
import threading
import time
def message_sender(ws):
"""Send messages in background thread."""
try:
for i in range(10):
time.sleep(1)
ws.send(f"Message {i}")
print(f"Sent message {i}")
except WebSocketConnectionClosedException:
print("Connection closed while sending")
def message_receiver(ws):
"""Receive messages in background thread."""
try:
while True:
message = ws.recv()
print(f"Received: {message}")
except WebSocketConnectionClosedException:
print("Connection closed while receiving")
# Create connection
ws = WebSocket(enable_multithread=True)
ws.connect("ws://echo.websocket.events/")
# Start background threads
sender_thread = threading.Thread(target=message_sender, args=(ws,))
receiver_thread = threading.Thread(target=message_receiver, args=(ws,))
sender_thread.start()
receiver_thread.start()
# Simulate connection interruption after 3 seconds
time.sleep(3)
ws.close()
# Wait for threads to handle the closure
sender_thread.join()
receiver_thread.join()
print("All threads completed")from websocket import create_connection, WebSocketTimeoutException
import time
# Test different timeout scenarios
timeout_configs = [
{"name": "Fast timeout", "timeout": 1},
{"name": "Medium timeout", "timeout": 5},
{"name": "No timeout", "timeout": None}
]
for config in timeout_configs:
print(f"\nTesting {config['name']} (timeout={config['timeout']})")
try:
ws = create_connection(
"ws://echo.websocket.events/",
timeout=config['timeout']
)
start_time = time.time()
ws.send("ping")
response = ws.recv()
elapsed = time.time() - start_time
print(f" Success: received '{response}' in {elapsed:.2f}s")
ws.close()
except WebSocketTimeoutException:
print(f" Timeout occurred after {config['timeout']}s")
except Exception as e:
print(f" Unexpected error: {e}")from websocket import create_connection, WebSocketProxyException
proxy_configs = [
{
"name": "Invalid proxy host",
"http_proxy_host": "nonexistent.proxy.com",
"http_proxy_port": 8080
},
{
"name": "Wrong proxy port",
"http_proxy_host": "proxy.company.com",
"http_proxy_port": 9999
},
{
"name": "Authentication required",
"http_proxy_host": "authenticated.proxy.com",
"http_proxy_port": 8080
# Missing http_proxy_auth
}
]
for config in proxy_configs:
print(f"\nTesting {config['name']}")
try:
ws = create_connection(
"ws://echo.websocket.events/",
http_proxy_host=config["http_proxy_host"],
http_proxy_port=config["http_proxy_port"],
http_proxy_auth=config.get("http_proxy_auth")
)
print(" Proxy connection successful")
ws.close()
except WebSocketProxyException as e:
print(f" Proxy error: {e}")
except Exception as e:
print(f" Other error: {e}")from websocket import (
WebSocketApp,
WebSocketException,
WebSocketConnectionClosedException,
WebSocketTimeoutException,
WebSocketBadStatusException
)
import time
class RobustWebSocketClient:
def __init__(self, url):
self.url = url
self.ws = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
def connect(self):
"""Connect with comprehensive error handling."""
while self.reconnect_attempts < self.max_reconnect_attempts:
try:
print(f"Connection attempt {self.reconnect_attempts + 1}")
self.ws = WebSocketApp(
self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.run_forever(
ping_interval=30,
ping_timeout=10,
reconnect=5
)
break
except WebSocketBadStatusException as e:
print(f"Handshake failed (HTTP {e.status_code}): {e}")
if e.status_code in [401, 403, 404]:
print("Permanent error, not retrying")
break
except WebSocketConnectionClosedException:
print("Connection closed, will retry")
except WebSocketTimeoutException:
print("Connection timeout, will retry")
except WebSocketException as e:
print(f"WebSocket error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
self.reconnect_attempts += 1
if self.reconnect_attempts < self.max_reconnect_attempts:
wait_time = min(30, 2 ** self.reconnect_attempts)
print(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)
if self.reconnect_attempts >= self.max_reconnect_attempts:
print("Max reconnection attempts reached, giving up")
def on_open(self, ws):
print("Connection established")
self.reconnect_attempts = 0 # Reset on successful connection
def on_message(self, ws, message):
print(f"Received: {message}")
def on_error(self, ws, error):
print(f"WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print(f"Connection closed: {close_status_code} - {close_msg}")
# Usage
client = RobustWebSocketClient("ws://echo.websocket.events/")
client.connect()Install with Tessl CLI
npx tessl i tessl/pypi-websocket-client