WebSocket client for Python with low level API options
Low-level socket configuration, timeout management, and network-level options for advanced connection control and performance tuning. These functions provide fine-grained control over the underlying network socket behavior.
Pre-configured socket options for optimal WebSocket performance and reliability.
DEFAULT_SOCKET_OPTION: list
"""
Default socket configuration for WebSocket connections.
Includes:
- TCP_NODELAY: Disable Nagle's algorithm for low latency
- SO_KEEPALIVE: Enable TCP keepalive (if supported)
- TCP_KEEPIDLE: Keepalive idle time (if supported)
- TCP_KEEPINTVL: Keepalive probe interval (if supported)
- TCP_KEEPCNT: Keepalive probe count (if supported)
These options optimize for WebSocket's real-time communication patterns.
"""Container class for managing socket and SSL configuration options.
class sock_opt:
def __init__(self, sockopt: list, sslopt: dict) -> None:
"""
Initialize socket options container.
Parameters:
- sockopt: List of socket options for socket.setsockopt()
Format: [(level, optname, value), ...]
Example: [(socket.SOL_TCP, socket.TCP_NODELAY, 1)]
- sslopt: Dictionary of SSL options for SSL context
Example: {"cert_reqs": ssl.CERT_REQUIRED, "ca_certs": "/path/to/ca.pem"}
Attributes:
- sockopt: Socket options list
- sslopt: SSL options dictionary
- timeout: Socket timeout value
"""Configure default timeout behavior for all WebSocket connections.
def setdefaulttimeout(timeout: Union[int, float, None]) -> None:
"""
Set global default timeout for WebSocket connections.
Parameters:
- timeout: Default timeout in seconds (int/float) or None for blocking
Note: This affects all new WebSocket connections that don't specify
a timeout explicitly. Existing connections are not affected.
"""
def getdefaulttimeout() -> Union[int, float, None]:
"""
Get current global default timeout setting.
Returns:
Union[int, float, None]: Current default timeout in seconds or None
"""Direct socket I/O operations with WebSocket-specific error handling and timeout support.
def recv(sock: socket.socket, bufsize: int) -> bytes:
"""
Receive data from socket with WebSocket-specific error handling.
Parameters:
- sock: Socket object to receive from
- bufsize: Maximum number of bytes to receive
Returns:
bytes: Received data
Raises:
WebSocketConnectionClosedException: If socket is closed or connection lost
WebSocketTimeoutException: If receive operation times out
"""
def recv_line(sock: socket.socket) -> bytes:
"""
Receive single line from socket (reads until newline).
Parameters:
- sock: Socket object to receive from
Returns:
bytes: Received line including newline character
Note: Used primarily for HTTP header parsing during handshake
"""
def send(sock: socket.socket, data: Union[bytes, str]) -> int:
"""
Send data through socket with proper encoding and error handling.
Parameters:
- sock: Socket object to send through
- data: Data to send (automatically UTF-8 encoded if string)
Returns:
int: Number of bytes sent
Raises:
WebSocketConnectionClosedException: If socket is closed
WebSocketTimeoutException: If send operation times out
"""import websocket
import socket
# Define custom socket options
custom_sockopt = [
# Disable Nagle's algorithm for low latency
(socket.SOL_TCP, socket.TCP_NODELAY, 1),
# Set socket buffer sizes
(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536),
(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536),
# Enable keepalive with custom settings
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
# Add platform-specific keepalive options
if hasattr(socket, 'TCP_KEEPIDLE'):
custom_sockopt.append((socket.SOL_TCP, socket.TCP_KEEPIDLE, 60))
if hasattr(socket, 'TCP_KEEPINTVL'):
custom_sockopt.append((socket.SOL_TCP, socket.TCP_KEEPINTVL, 30))
if hasattr(socket, 'TCP_KEEPCNT'):
custom_sockopt.append((socket.SOL_TCP, socket.TCP_KEEPCNT, 3))
# Use custom socket options with WebSocket
ws = websocket.create_connection(
"ws://echo.websocket.events/",
sockopt=custom_sockopt
)
print("Connected with custom socket options")
ws.send("Test message")
response = ws.recv()
print(f"Response: {response}")
ws.close()import websocket
import ssl
# Comprehensive SSL configuration
ssl_options = {
# Certificate verification
"cert_reqs": ssl.CERT_REQUIRED,
"ca_certs": "/etc/ssl/certs/ca-certificates.crt",
# Client certificate authentication
"certfile": "/path/to/client.crt",
"keyfile": "/path/to/client.key",
# Protocol and cipher selection
"ssl_version": ssl.PROTOCOL_TLS,
"ciphers": "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS",
# Hostname verification
"check_hostname": True,
# Security options
"do_handshake_on_connect": True,
"suppress_ragged_eofs": True,
}
try:
ws = websocket.create_connection(
"wss://secure.example.com/websocket",
sslopt=ssl_options
)
print("Secure connection established")
print(f"SSL cipher: {ws.sock.cipher()}")
ws.close()
except ssl.SSLError as e:
print(f"SSL error: {e}")
except Exception as e:
print(f"Connection error: {e}")import websocket
import time
def test_timeouts():
# Set global default timeout
websocket.setdefaulttimeout(10.0)
print(f"Default timeout: {websocket.getdefaulttimeout()}")
# Test connection with default timeout
try:
ws = websocket.create_connection("ws://echo.websocket.events/")
print(f"Connection timeout: {ws.gettimeout()}")
# Change timeout for this connection
ws.settimeout(5.0)
print(f"Updated timeout: {ws.gettimeout()}")
ws.send("Test")
response = ws.recv()
print(f"Response: {response}")
ws.close()
except websocket.WebSocketTimeoutException:
print("Operation timed out")
# Test with explicit timeout override
try:
ws = websocket.create_connection(
"ws://echo.websocket.events/",
timeout=2.0 # Override default
)
print(f"Override timeout: {ws.gettimeout()}")
ws.close()
except Exception as e:
print(f"Error: {e}")
# Reset to no timeout
websocket.setdefaulttimeout(None)
print(f"Reset timeout: {websocket.getdefaulttimeout()}")
test_timeouts()import websocket
import socket
def create_high_performance_connection(url):
"""Create WebSocket connection optimized for high throughput."""
# High-performance socket options
perf_sockopt = [
# Disable Nagle's algorithm
(socket.SOL_TCP, socket.TCP_NODELAY, 1),
# Large socket buffers for high throughput
(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024), # 1MB receive buffer
(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024), # 1MB send buffer
# Enable keepalive
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
# Add TCP_USER_TIMEOUT if available (Linux)
if hasattr(socket, 'TCP_USER_TIMEOUT'):
perf_sockopt.append((socket.SOL_TCP, socket.TCP_USER_TIMEOUT, 30000)) # 30s
# Create connection with performance options
ws = websocket.create_connection(
url,
sockopt=perf_sockopt,
skip_utf8_validation=True, # Skip validation for binary data
enable_multithread=True # Enable thread safety
)
return ws
# Usage
ws = create_high_performance_connection("ws://echo.websocket.events/")
print("High-performance connection established")
# Test large message throughput
large_message = "x" * 10000
start_time = time.time()
for i in range(100):
ws.send(large_message)
response = ws.recv()
elapsed = time.time() - start_time
print(f"Sent/received 100 large messages in {elapsed:.2f}s")
ws.close()import websocket
import select
import socket
def manual_socket_handling():
"""Demonstrate direct socket access for advanced use cases."""
ws = websocket.create_connection("ws://echo.websocket.events/")
# Get underlying socket
raw_socket = ws.sock
print(f"Socket type: {type(raw_socket)}")
print(f"Socket family: {raw_socket.family}")
print(f"Socket type: {raw_socket.type}")
# Use socket for select operations
ws.send("async test")
# Wait for data with select
ready = select.select([raw_socket], [], [], 5.0)
if ready[0]:
print("Data available for reading")
response = ws.recv()
print(f"Response: {response}")
else:
print("No data received within timeout")
# Get socket statistics (platform dependent)
try:
sockopt_info = {
"SO_RCVBUF": raw_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF),
"SO_SNDBUF": raw_socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF),
"SO_KEEPALIVE": raw_socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE),
}
if hasattr(socket, 'TCP_NODELAY'):
sockopt_info["TCP_NODELAY"] = raw_socket.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY)
print("Socket options:")
for opt, value in sockopt_info.items():
print(f" {opt}: {value}")
except Exception as e:
print(f"Could not read socket options: {e}")
ws.close()
manual_socket_handling()import websocket
import socket
import errno
def robust_socket_operations():
"""Demonstrate robust socket error handling."""
try:
ws = websocket.create_connection("ws://echo.websocket.events/")
# Test various socket error conditions
test_cases = [
{"name": "Normal operation", "data": "Hello"},
{"name": "Large message", "data": "x" * 65536},
{"name": "Binary data", "data": b"\x00\x01\x02\x03"},
]
for test in test_cases:
try:
print(f"Testing: {test['name']}")
if isinstance(test['data'], bytes):
ws.send_bytes(test['data'])
else:
ws.send(test['data'])
response = ws.recv()
print(f" Success: received {len(response)} bytes")
except websocket.WebSocketConnectionClosedException:
print(f" Connection closed during {test['name']}")
break
except websocket.WebSocketTimeoutException:
print(f" Timeout during {test['name']}")
except socket.error as e:
if e.errno == errno.ECONNRESET:
print(f" Connection reset during {test['name']}")
elif e.errno == errno.EPIPE:
print(f" Broken pipe during {test['name']}")
else:
print(f" Socket error during {test['name']}: {e}")
ws.close()
except Exception as e:
print(f"Failed to establish connection: {e}")
robust_socket_operations()import websocket
import socket
import ssl
class CustomSocketFactory:
"""Factory for creating pre-configured sockets."""
@staticmethod
def create_optimized_socket():
"""Create socket with optimal settings for WebSocket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Apply optimizations
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Set large buffers
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144)
return sock
@staticmethod
def create_ssl_socket(hostname):
"""Create SSL socket with secure configuration."""
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
# Security hardening
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM')
sock = CustomSocketFactory.create_optimized_socket()
ssl_sock = context.wrap_socket(sock, server_hostname=hostname)
return ssl_sock
def use_custom_socket():
"""Demonstrate using pre-configured socket with WebSocket."""
# Create optimized socket
custom_sock = CustomSocketFactory.create_optimized_socket()
# Use with WebSocket connection
ws = websocket.create_connection(
"ws://echo.websocket.events/",
socket=custom_sock
)
print("Connected with custom socket")
ws.send("Custom socket test")
response = ws.recv()
print(f"Response: {response}")
ws.close()
use_custom_socket()# Socket option tuple format
SocketOption = tuple[int, int, int] # (level, optname, value)
SocketOptionList = list[SocketOption]
# SSL option dictionary keys (common options)
SSLOptions = dict[str, Union[str, int, bool]]
# Timeout types
TimeoutValue = Union[int, float, None]
# Socket operation return types
SocketBytes = bytes
SocketSendResult = intInstall with Tessl CLI
npx tessl i tessl/pypi-websocket-client