CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

sync-client.mddocs/

Synchronous Client Operations

Threading-based synchronous WebSocket client functionality for connecting to WebSocket servers using blocking operations, suitable for traditional Python applications that don't use asyncio.

Core Imports

from websockets.sync.client import connect, unix_connect, ClientConnection

Capabilities

Connection Functions

Create synchronous WebSocket client connections with blocking operations for environments where asyncio is not suitable or preferred.

def connect(
    uri: str,
    *,
    # TCP/TLS
    sock: socket.socket | None = None,
    ssl: ssl.SSLContext | None = None,
    server_hostname: str | None = None,
    # WebSocket
    origin: Origin | None = None,
    extensions: Sequence[ClientExtensionFactory] | None = None,
    subprotocols: Sequence[Subprotocol] | None = None,
    compression: str | None = "deflate",
    # HTTP
    additional_headers: HeadersLike | None = None,
    user_agent_header: str | None = USER_AGENT,
    proxy: str | Literal[True] | None = True,
    proxy_ssl: ssl.SSLContext | None = None,
    proxy_server_hostname: str | None = None,
    # Timeouts
    open_timeout: float | None = 10,
    ping_interval: float | None = 20,
    ping_timeout: float | None = 20,
    close_timeout: float | None = 10,
    # Limits
    max_size: int | None = 2**20,
    max_queue: int | None | tuple[int | None, int | None] = 16,
    # Logging
    logger: LoggerLike | None = None,
    # Escape hatch for advanced customization
    create_connection: type[ClientConnection] | None = None,
    **kwargs: Any
) -> ClientConnection:
    """
    Connect to a WebSocket server synchronously.

    Parameters:
    - uri: WebSocket URI (ws:// or wss://)
    - sock: Preexisting TCP socket (overrides host/port from URI)
    - ssl: SSL context for TLS configuration
    - server_hostname: Host name for TLS handshake (overrides URI hostname)
    - origin: Value of Origin header for servers that require it
    - extensions: List of supported extensions in negotiation order
    - subprotocols: List of supported subprotocols in preference order
    - compression: Compression mode ("deflate" or None)
    - additional_headers: Extra HTTP headers for handshake
    - user_agent_header: Custom User-Agent header
    - proxy: Proxy configuration (True for auto-detect, str for URL, None to disable)
    - proxy_ssl: SSL context for proxy connection
    - proxy_server_hostname: Server hostname for proxy TLS handshake
    - open_timeout: Timeout for connection establishment (seconds)
    - ping_interval: Interval between ping frames (seconds)
    - ping_timeout: Timeout for ping/pong exchange (seconds)
    - close_timeout: Timeout for connection closure (seconds)
    - max_size: Maximum message size (bytes)
    - max_queue: Maximum number of queued messages (int or tuple)
    - logger: Logger instance for connection logging
    - create_connection: Custom connection class factory

    Returns:
    ClientConnection: Synchronous WebSocket connection

    Raises:
    - InvalidURI: If URI format is invalid
    - InvalidHandshake: If handshake fails
    - ConnectionClosedError: If connection fails
    """

def unix_connect(
    path: str | None = None,
    uri: str | None = None,
    **kwargs: Any
) -> ClientConnection:
    """
    Connect to a WebSocket server over Unix domain socket synchronously.

    Parameters:
    - path: Unix socket path for connection (mutually exclusive with uri)
    - uri: WebSocket URI with unix:// scheme (mutually exclusive with path)
    - **kwargs: Same keyword arguments as connect()

    Returns:
    ClientConnection: Synchronous WebSocket connection

    Note:
    Only available on Unix-like systems (Linux, macOS).
    Accepts the same keyword arguments as connect().
    """

Client Connection Management

The synchronous ClientConnection class provides blocking operations for WebSocket communication.

class ClientConnection:
    """Synchronous WebSocket client connection."""
    
    @property
    def closed(self) -> bool:
        """Check if connection is closed."""
    
    @property
    def local_address(self) -> Tuple[str, int]:
        """Get local socket address."""
    
    @property
    def remote_address(self) -> Tuple[str, int]:
        """Get remote socket address."""
    
    @property
    def subprotocol(self) -> Subprotocol | None:
        """Get negotiated subprotocol."""
    
    def send(self, message: Data, timeout: float = None) -> None:
        """
        Send a message to the WebSocket server.

        Parameters:
        - message: Text (str) or binary (bytes) message to send
        - timeout: Optional timeout for send operation (seconds)

        Raises:
        - ConnectionClosed: If connection is closed
        - TimeoutError: If timeout is exceeded
        """
    
    def recv(self, timeout: float = None) -> Data:
        """
        Receive a message from the WebSocket server.

        Parameters:
        - timeout: Optional timeout for receive operation (seconds)

        Returns:
        str | bytes: Received message (text or binary)

        Raises:
        - ConnectionClosed: If connection is closed
        - TimeoutError: If timeout is exceeded
        """
    
    def ping(self, data: bytes = b"", timeout: float = None) -> float:
        """
        Send a ping frame and wait for pong response.

        Parameters:
        - data: Optional payload for ping frame
        - timeout: Optional timeout for ping/pong exchange (seconds)

        Returns:
        float: Round-trip time in seconds

        Raises:
        - ConnectionClosed: If connection is closed
        - TimeoutError: If timeout is exceeded
        """
    
    def pong(self, data: bytes = b"") -> None:
        """
        Send a pong frame.

        Parameters:
        - data: Payload for pong frame

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    def close(self, code: int = 1000, reason: str = "") -> None:
        """
        Close the WebSocket connection.

        Parameters:
        - code: Close code (default 1000 for normal closure)
        - reason: Human-readable close reason

        Raises:
        - ProtocolError: If code is invalid
        """
    
    # Context manager support
    def __enter__(self) -> ClientConnection:
        """Enter context manager."""
        return self
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Exit context manager and close connection."""
        self.close()
    
    # Iterator support for receiving messages
    def __iter__(self) -> Iterator[Data]:
        """Return iterator for receiving messages."""
        return self
    
    def __next__(self) -> Data:
        """Get next message from iterator."""
        try:
            return self.recv()
        except ConnectionClosed:
            raise StopIteration

Usage Examples

Basic Synchronous Client

from websockets.sync import connect

def basic_client():
    """Simple synchronous WebSocket client."""
    with connect("ws://localhost:8765") as websocket:
        # Send a message
        websocket.send("Hello, Server!")
        
        # Receive response
        response = websocket.recv()
        print(f"Server response: {response}")

# Run the client
basic_client()

Client with Timeout Handling

from websockets.sync import connect
from websockets import ConnectionClosed, TimeoutError

def timeout_client():
    """Client with timeout handling."""
    try:
        with connect(
            "ws://localhost:8765", 
            open_timeout=5,
            close_timeout=3
        ) as websocket:
            
            # Send with timeout
            websocket.send("Hello with timeout!", timeout=2.0)
            
            # Receive with timeout  
            try:
                response = websocket.recv(timeout=5.0)
                print(f"Response: {response}")
            except TimeoutError:
                print("No response received within timeout")
                
            # Test connection with ping
            try:
                latency = websocket.ping(timeout=3.0)
                print(f"Connection latency: {latency:.3f}s")
            except TimeoutError:
                print("Ping timeout - connection may be stale")
                
    except ConnectionClosed as e:
        print(f"Connection closed: {e}")
    except Exception as e:
        print(f"Connection error: {e}")

timeout_client()

Message Iterator Client

from websockets.sync import connect
from websockets import ConnectionClosed

def iterator_client():
    """Client using message iteration."""
    try:
        with connect("ws://localhost:8765") as websocket:
            # Send initial message
            websocket.send("Start streaming")
            
            # Iterate over received messages
            for message in websocket:
                print(f"Received: {message}")
                
                # Send acknowledgment
                websocket.send("ACK")
                
                # Break on specific message
                if message.lower() == "stop":
                    break
                    
    except ConnectionClosed:
        print("Connection closed by server")

iterator_client()

Multi-threaded Client Example

import threading
import time
from websockets.sync import connect
from websockets import ConnectionClosed

def worker_thread(thread_id: int, uri: str):
    """Worker thread with independent WebSocket connection."""
    try:
        with connect(uri) as websocket:
            print(f"Thread {thread_id}: Connected")
            
            # Send periodic messages
            for i in range(5):
                message = f"Thread {thread_id}, Message {i}"
                websocket.send(message)
                
                try:
                    response = websocket.recv(timeout=2.0)
                    print(f"Thread {thread_id}: {response}")
                except Exception as e:
                    print(f"Thread {thread_id}: Error - {e}")
                
                time.sleep(1)
                
    except ConnectionClosed as e:
        print(f"Thread {thread_id}: Connection closed - {e}")

def multi_threaded_client():
    """Example using multiple threads with sync connections."""
    threads = []
    uri = "ws://localhost:8765"
    
    # Create multiple worker threads
    for i in range(3):
        thread = threading.Thread(
            target=worker_thread, 
            args=(i, uri)
        )
        threads.append(thread)
        thread.start()
    
    # Wait for all threads to complete
    for thread in threads:
        thread.join()
    
    print("All threads completed")

multi_threaded_client()

Client with Custom Headers and Authentication

from websockets.sync import connect
import base64

def authenticated_client():
    """Client with custom headers and authentication."""
    # Prepare authentication header
    credentials = base64.b64encode(b"username:password").decode()
    
    headers = {
        "Authorization": f"Basic {credentials}",
        "X-Client-Version": "1.0",
        "X-Client-Type": "Python-Sync"
    }
    
    try:
        with connect(
            "ws://localhost:8765",
            additional_headers=headers,
            subprotocols=["chat", "notifications"],
            compression="deflate"
        ) as websocket:
            
            print(f"Connected with subprotocol: {websocket.subprotocol}")
            
            # Send authentication verification
            websocket.send("VERIFY_AUTH")
            auth_response = websocket.recv()
            
            if auth_response == "AUTH_OK":
                print("Authentication successful")
                
                # Normal operation
                websocket.send("GET_STATUS")
                status = websocket.recv()
                print(f"Status: {status}")
            else:
                print("Authentication failed")
                
    except Exception as e:
        print(f"Authentication error: {e}")

authenticated_client()

Unix Domain Socket Client

from websockets.sync import unix_connect

def unix_client():
    """Client connecting via Unix domain socket."""
    try:
        with unix_connect("/tmp/websocket.sock") as websocket:
            print("Connected via Unix socket")
            
            # Send message
            websocket.send("Hello via Unix socket!")
            
            # Receive response
            response = websocket.recv()
            print(f"Response: {response}")
            
    except FileNotFoundError:
        print("Unix socket not found - is the server running?")
    except Exception as e:
        print(f"Unix socket error: {e}")

unix_client()

Error Handling and Reconnection

import time
from websockets.sync import connect
from websockets import ConnectionClosed, InvalidURI

def robust_client_with_reconnect():
    """Client with automatic reconnection logic."""
    uri = "ws://localhost:8765"
    max_retries = 5
    retry_delay = 2
    
    for attempt in range(max_retries):
        try:
            print(f"Connection attempt {attempt + 1}")
            
            with connect(uri, open_timeout=5) as websocket:
                print("Connected successfully")
                
                # Normal operation
                try:
                    while True:
                        websocket.send("ping")
                        response = websocket.recv(timeout=10)
                        
                        if response == "pong":
                            print("Connection healthy")
                            time.sleep(5)
                        else:
                            print(f"Unexpected response: {response}")
                            
                except ConnectionClosed:
                    print("Connection lost")
                    break
                except KeyboardInterrupt:
                    print("Interrupted by user")
                    return
                    
        except (ConnectionClosed, InvalidURI) as e:
            print(f"Connection failed: {e}")
            
            if attempt < max_retries - 1:
                print(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
            else:
                print("Max retries exceeded")
                break
                
        except Exception as e:
            print(f"Unexpected error: {e}")
            break

robust_client_with_reconnect()

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json