CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websocket-client

WebSocket client for Python with low level API options

Overview
Eval results
Files

socket-config.mddocs/

Socket Configuration

Low-level socket configuration, timeout management, and network-level options for advanced connection control and performance tuning. These functions provide fine-grained control over the underlying network socket behavior.

Capabilities

Default Socket Options

Pre-configured socket options for optimal WebSocket performance and reliability.

DEFAULT_SOCKET_OPTION: list
"""
Default socket configuration for WebSocket connections.

Includes:
- TCP_NODELAY: Disable Nagle's algorithm for low latency
- SO_KEEPALIVE: Enable TCP keepalive (if supported)
- TCP_KEEPIDLE: Keepalive idle time (if supported) 
- TCP_KEEPINTVL: Keepalive probe interval (if supported)
- TCP_KEEPCNT: Keepalive probe count (if supported)

These options optimize for WebSocket's real-time communication patterns.
"""

Socket Options Container

Container class for managing socket and SSL configuration options.

class sock_opt:
    def __init__(self, sockopt: list, sslopt: dict) -> None:
        """
        Initialize socket options container.

        Parameters:
        - sockopt: List of socket options for socket.setsockopt()
          Format: [(level, optname, value), ...]
          Example: [(socket.SOL_TCP, socket.TCP_NODELAY, 1)]
        - sslopt: Dictionary of SSL options for SSL context
          Example: {"cert_reqs": ssl.CERT_REQUIRED, "ca_certs": "/path/to/ca.pem"}

        Attributes:
        - sockopt: Socket options list
        - sslopt: SSL options dictionary  
        - timeout: Socket timeout value
        """

Global Timeout Management

Configure default timeout behavior for all WebSocket connections.

def setdefaulttimeout(timeout: Union[int, float, None]) -> None:
    """
    Set global default timeout for WebSocket connections.

    Parameters:
    - timeout: Default timeout in seconds (int/float) or None for blocking

    Note: This affects all new WebSocket connections that don't specify
    a timeout explicitly. Existing connections are not affected.
    """

def getdefaulttimeout() -> Union[int, float, None]:
    """
    Get current global default timeout setting.

    Returns:
    Union[int, float, None]: Current default timeout in seconds or None
    """

Low-Level Socket Operations

Direct socket I/O operations with WebSocket-specific error handling and timeout support.

def recv(sock: socket.socket, bufsize: int) -> bytes:
    """
    Receive data from socket with WebSocket-specific error handling.

    Parameters:
    - sock: Socket object to receive from
    - bufsize: Maximum number of bytes to receive

    Returns:
    bytes: Received data

    Raises:
    WebSocketConnectionClosedException: If socket is closed or connection lost
    WebSocketTimeoutException: If receive operation times out
    """

def recv_line(sock: socket.socket) -> bytes:
    """
    Receive single line from socket (reads until newline).

    Parameters:
    - sock: Socket object to receive from

    Returns:
    bytes: Received line including newline character

    Note: Used primarily for HTTP header parsing during handshake
    """

def send(sock: socket.socket, data: Union[bytes, str]) -> int:
    """
    Send data through socket with proper encoding and error handling.

    Parameters:
    - sock: Socket object to send through
    - data: Data to send (automatically UTF-8 encoded if string)

    Returns:
    int: Number of bytes sent

    Raises:
    WebSocketConnectionClosedException: If socket is closed
    WebSocketTimeoutException: If send operation times out
    """

Usage Examples

Custom Socket Options

import websocket
import socket

# Define custom socket options
custom_sockopt = [
    # Disable Nagle's algorithm for low latency
    (socket.SOL_TCP, socket.TCP_NODELAY, 1),
    # Set socket buffer sizes
    (socket.SOL_SOCKET, socket.SO_RCVBUF, 65536),
    (socket.SOL_SOCKET, socket.SO_SNDBUF, 65536),
    # Enable keepalive with custom settings
    (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]

# Add platform-specific keepalive options
if hasattr(socket, 'TCP_KEEPIDLE'):
    custom_sockopt.append((socket.SOL_TCP, socket.TCP_KEEPIDLE, 60))
if hasattr(socket, 'TCP_KEEPINTVL'):
    custom_sockopt.append((socket.SOL_TCP, socket.TCP_KEEPINTVL, 30))
if hasattr(socket, 'TCP_KEEPCNT'):
    custom_sockopt.append((socket.SOL_TCP, socket.TCP_KEEPCNT, 3))

# Use custom socket options with WebSocket
ws = websocket.create_connection(
    "ws://echo.websocket.events/",
    sockopt=custom_sockopt
)

print("Connected with custom socket options")
ws.send("Test message")
response = ws.recv()
print(f"Response: {response}")
ws.close()

SSL Configuration

import websocket
import ssl

# Comprehensive SSL configuration
ssl_options = {
    # Certificate verification
    "cert_reqs": ssl.CERT_REQUIRED,
    "ca_certs": "/etc/ssl/certs/ca-certificates.crt",
    
    # Client certificate authentication
    "certfile": "/path/to/client.crt",
    "keyfile": "/path/to/client.key",
    
    # Protocol and cipher selection
    "ssl_version": ssl.PROTOCOL_TLS,
    "ciphers": "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS",
    
    # Hostname verification
    "check_hostname": True,
    
    # Security options
    "do_handshake_on_connect": True,
    "suppress_ragged_eofs": True,
}

try:
    ws = websocket.create_connection(
        "wss://secure.example.com/websocket",
        sslopt=ssl_options
    )
    print("Secure connection established")
    print(f"SSL cipher: {ws.sock.cipher()}")
    ws.close()
    
except ssl.SSLError as e:
    print(f"SSL error: {e}")
except Exception as e:
    print(f"Connection error: {e}")

Timeout Configuration

import websocket
import time

def test_timeouts():
    # Set global default timeout
    websocket.setdefaulttimeout(10.0)
    print(f"Default timeout: {websocket.getdefaulttimeout()}")
    
    # Test connection with default timeout
    try:
        ws = websocket.create_connection("ws://echo.websocket.events/")
        print(f"Connection timeout: {ws.gettimeout()}")
        
        # Change timeout for this connection
        ws.settimeout(5.0)
        print(f"Updated timeout: {ws.gettimeout()}")
        
        ws.send("Test")
        response = ws.recv()
        print(f"Response: {response}")
        ws.close()
        
    except websocket.WebSocketTimeoutException:
        print("Operation timed out")
    
    # Test with explicit timeout override
    try:
        ws = websocket.create_connection(
            "ws://echo.websocket.events/",
            timeout=2.0  # Override default
        )
        print(f"Override timeout: {ws.gettimeout()}")
        ws.close()
        
    except Exception as e:
        print(f"Error: {e}")
    
    # Reset to no timeout
    websocket.setdefaulttimeout(None)
    print(f"Reset timeout: {websocket.getdefaulttimeout()}")

test_timeouts()

Performance Tuning

import websocket
import socket

def create_high_performance_connection(url):
    """Create WebSocket connection optimized for high throughput."""
    
    # High-performance socket options
    perf_sockopt = [
        # Disable Nagle's algorithm
        (socket.SOL_TCP, socket.TCP_NODELAY, 1),
        
        # Large socket buffers for high throughput
        (socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024),  # 1MB receive buffer
        (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024),  # 1MB send buffer
        
        # Enable keepalive
        (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
    ]
    
    # Add TCP_USER_TIMEOUT if available (Linux)
    if hasattr(socket, 'TCP_USER_TIMEOUT'):
        perf_sockopt.append((socket.SOL_TCP, socket.TCP_USER_TIMEOUT, 30000))  # 30s
    
    # Create connection with performance options
    ws = websocket.create_connection(
        url,
        sockopt=perf_sockopt,
        skip_utf8_validation=True,  # Skip validation for binary data
        enable_multithread=True     # Enable thread safety
    )
    
    return ws

# Usage
ws = create_high_performance_connection("ws://echo.websocket.events/")
print("High-performance connection established")

# Test large message throughput
large_message = "x" * 10000
start_time = time.time()

for i in range(100):
    ws.send(large_message)
    response = ws.recv()

elapsed = time.time() - start_time
print(f"Sent/received 100 large messages in {elapsed:.2f}s")
ws.close()

Low-Level Socket Access

import websocket
import select
import socket

def manual_socket_handling():
    """Demonstrate direct socket access for advanced use cases."""
    
    ws = websocket.create_connection("ws://echo.websocket.events/")
    
    # Get underlying socket
    raw_socket = ws.sock
    print(f"Socket type: {type(raw_socket)}")
    print(f"Socket family: {raw_socket.family}")
    print(f"Socket type: {raw_socket.type}")
    
    # Use socket for select operations
    ws.send("async test")
    
    # Wait for data with select
    ready = select.select([raw_socket], [], [], 5.0)
    if ready[0]:
        print("Data available for reading")
        response = ws.recv()
        print(f"Response: {response}")
    else:
        print("No data received within timeout")
    
    # Get socket statistics (platform dependent)
    try:
        sockopt_info = {
            "SO_RCVBUF": raw_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF),
            "SO_SNDBUF": raw_socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF),
            "SO_KEEPALIVE": raw_socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE),
        }
        
        if hasattr(socket, 'TCP_NODELAY'):
            sockopt_info["TCP_NODELAY"] = raw_socket.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY)
            
        print("Socket options:")
        for opt, value in sockopt_info.items():
            print(f"  {opt}: {value}")
            
    except Exception as e:
        print(f"Could not read socket options: {e}")
    
    ws.close()

manual_socket_handling()

Socket Error Handling

import websocket
import socket
import errno

def robust_socket_operations():
    """Demonstrate robust socket error handling."""
    
    try:
        ws = websocket.create_connection("ws://echo.websocket.events/")
        
        # Test various socket error conditions
        test_cases = [
            {"name": "Normal operation", "data": "Hello"},
            {"name": "Large message", "data": "x" * 65536},
            {"name": "Binary data", "data": b"\x00\x01\x02\x03"},
        ]
        
        for test in test_cases:
            try:
                print(f"Testing: {test['name']}")
                
                if isinstance(test['data'], bytes):
                    ws.send_bytes(test['data'])
                else:
                    ws.send(test['data'])
                    
                response = ws.recv()
                print(f"  Success: received {len(response)} bytes")
                
            except websocket.WebSocketConnectionClosedException:
                print(f"  Connection closed during {test['name']}")
                break
                
            except websocket.WebSocketTimeoutException:
                print(f"  Timeout during {test['name']}")
                
            except socket.error as e:
                if e.errno == errno.ECONNRESET:
                    print(f"  Connection reset during {test['name']}")
                elif e.errno == errno.EPIPE:
                    print(f"  Broken pipe during {test['name']}")
                else:
                    print(f"  Socket error during {test['name']}: {e}")
                    
        ws.close()
        
    except Exception as e:
        print(f"Failed to establish connection: {e}")

robust_socket_operations()

Custom Socket Factory

import websocket
import socket
import ssl

class CustomSocketFactory:
    """Factory for creating pre-configured sockets."""
    
    @staticmethod
    def create_optimized_socket():
        """Create socket with optimal settings for WebSocket."""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # Apply optimizations
        sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        
        # Set large buffers
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144)
        
        return sock
    
    @staticmethod 
    def create_ssl_socket(hostname):
        """Create SSL socket with secure configuration."""
        context = ssl.create_default_context()
        context.check_hostname = True
        context.verify_mode = ssl.CERT_REQUIRED
        
        # Security hardening
        context.minimum_version = ssl.TLSVersion.TLSv1_2
        context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM')
        
        sock = CustomSocketFactory.create_optimized_socket()
        ssl_sock = context.wrap_socket(sock, server_hostname=hostname)
        
        return ssl_sock

def use_custom_socket():
    """Demonstrate using pre-configured socket with WebSocket."""
    
    # Create optimized socket
    custom_sock = CustomSocketFactory.create_optimized_socket()
    
    # Use with WebSocket connection
    ws = websocket.create_connection(
        "ws://echo.websocket.events/",
        socket=custom_sock
    )
    
    print("Connected with custom socket")
    ws.send("Custom socket test")
    response = ws.recv()
    print(f"Response: {response}")
    ws.close()

use_custom_socket()

Types

# Socket option tuple format
SocketOption = tuple[int, int, int]  # (level, optname, value)
SocketOptionList = list[SocketOption]

# SSL option dictionary keys (common options)
SSLOptions = dict[str, Union[str, int, bool]]

# Timeout types
TimeoutValue = Union[int, float, None]

# Socket operation return types
SocketBytes = bytes
SocketSendResult = int

Install with Tessl CLI

npx tessl i tessl/pypi-websocket-client

docs

abnf-protocol.md

exceptions.md

index.md

logging.md

socket-config.md

websocket-app.md

websocket-core.md

tile.json