CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

asyncio-client.mddocs/

Asyncio Client Operations

High-level asyncio-based WebSocket client functionality for connecting to WebSocket servers, managing connection lifecycle, and handling message communication using Python's async/await syntax.

Capabilities

Connection Functions

Create WebSocket client connections with support for various protocols, authentication, compression, and connection options.

async def connect(
    uri: str,
    *,
    additional_headers: HeadersLike = None,
    user_agent_header: str = None,
    compression: str = "deflate",
    logger: LoggerLike = None,
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    open_timeout: float = 10,
    close_timeout: float = 10,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ClientExtensionFactory] = None,
    **kwargs
) -> ClientConnection:
    """
    Connect to a WebSocket server.

    Parameters:
    - uri: WebSocket URI (ws:// or wss://)
    - additional_headers: Extra HTTP headers for handshake
    - user_agent_header: Custom User-Agent header
    - compression: Compression mode ("deflate" or None)
    - logger: Logger instance for connection logging
    - subprotocols: List of supported subprotocols
    - extra_headers: Additional headers (deprecated, use additional_headers)
    - open_timeout: Timeout for connection establishment (seconds)
    - close_timeout: Timeout for connection closure (seconds)
    - ping_interval: Interval between ping frames (seconds)
    - ping_timeout: Timeout for ping/pong exchange (seconds)
    - max_size: Maximum message size (bytes)
    - max_queue: Maximum number of queued messages
    - read_limit: Buffer size for reading (bytes)
    - write_limit: Buffer size for writing (bytes)
    - extensions: List of WebSocket extensions

    Returns:
    ClientConnection: Async context manager and iterator for WebSocket communication
    """

async def unix_connect(
    path: str,
    *,
    additional_headers: HeadersLike = None,
    user_agent_header: str = None,
    compression: str = "deflate",
    logger: LoggerLike = None,
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    open_timeout: float = 10,
    close_timeout: float = 10,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ClientExtensionFactory] = None,
    **kwargs
) -> ClientConnection:
    """
    Connect to a WebSocket server over Unix domain socket.

    Parameters:
    - path: Unix domain socket path
    - Other parameters same as connect()

    Returns:
    ClientConnection: WebSocket connection over Unix socket
    """

Client Connection Management

The ClientConnection class provides the main interface for WebSocket client operations with support for async context management and message iteration.

class ClientConnection:
    """WebSocket client connection with asyncio support."""
    
    @property
    def closed(self) -> bool:
        """Check if connection is closed."""
    
    @property
    def local_address(self) -> Tuple[str, int]:
        """Get local socket address."""
    
    @property
    def remote_address(self) -> Tuple[str, int]:
        """Get remote socket address."""
    
    @property
    def subprotocol(self) -> Subprotocol | None:
        """Get negotiated subprotocol."""
    
    async def send(self, message: Data) -> None:
        """
        Send a message to the WebSocket server.

        Parameters:
        - message: Text (str) or binary (bytes) message to send

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def recv(self) -> Data:
        """
        Receive a message from the WebSocket server.

        Returns:
        str | bytes: Received message (text or binary)

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def ping(self, data: bytes = b"") -> Awaitable[float]:
        """
        Send a ping frame and wait for pong response.

        Parameters:
        - data: Optional payload for ping frame

        Returns:
        Awaitable[float]: Coroutine that resolves to round-trip time

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def pong(self, data: bytes = b"") -> None:
        """
        Send a pong frame.

        Parameters:
        - data: Payload for pong frame

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def close(self, code: int = 1000, reason: str = "") -> None:
        """
        Close the WebSocket connection.

        Parameters:
        - code: Close code (default 1000 for normal closure)
        - reason: Human-readable close reason

        Raises:
        - ProtocolError: If code is invalid
        """
    
    async def wait_closed(self) -> None:
        """Wait until the connection is closed."""
    
    # Context manager support
    async def __aenter__(self) -> ClientConnection:
        """Enter async context manager."""
        return self
    
    async def __aexit__(self, exc_type, exc_value, traceback) -> None:
        """Exit async context manager and close connection."""
    
    # Async iterator support  
    def __aiter__(self) -> AsyncIterator[Data]:
        """Return async iterator for receiving messages."""
        return self
    
    async def __anext__(self) -> Data:
        """Get next message from async iterator."""

Error Classification

Utility function for classifying connection errors and determining appropriate handling strategies.

def process_exception(exception: Exception) -> Exception:
    """
    Classify exceptions for connection error handling.

    Parameters:
    - exception: Exception that occurred during connection

    Returns:
    Exception: Classified exception (may be the same or different type)
    """

Usage Examples

Basic Client Connection

import asyncio
import websockets

async def basic_client():
    # Simple connection with default settings
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello, Server!")
        response = await websocket.recv()
        print(f"Server response: {response}")

asyncio.run(basic_client())

Client with Custom Configuration

import asyncio
import websockets
import logging

async def configured_client():
    # Client with custom headers, compression, and logging
    headers = {"Authorization": "Bearer token123"}
    logger = logging.getLogger("websocket_client")
    
    async with websockets.connect(
        "wss://api.example.com/ws",
        additional_headers=headers,
        compression="deflate",
        logger=logger,
        ping_interval=30,
        ping_timeout=10,
        max_size=1024*1024  # 1MB max message size
    ) as websocket:
        
        # Send JSON data
        import json
        data = {"type": "subscribe", "channel": "updates"}
        await websocket.send(json.dumps(data))
        
        # Listen for messages
        async for message in websocket:
            data = json.loads(message)
            print(f"Received: {data}")
            
            # Break on specific message
            if data.get("type") == "close":
                break

asyncio.run(configured_client())

Connection with Error Handling

import asyncio
import websockets
from websockets import ConnectionClosedError, InvalidURI

async def robust_client():
    try:
        async with websockets.connect(
            "ws://localhost:8765",
            open_timeout=5,
            close_timeout=5
        ) as websocket:
            
            # Send ping to test connectivity
            pong_waiter = await websocket.ping()
            latency = await pong_waiter
            print(f"Connection latency: {latency:.3f}s")
            
            # Send messages with error handling
            for i in range(10):
                try:
                    await websocket.send(f"Message {i}")
                    response = await websocket.recv()
                    print(f"Echo {i}: {response}")
                except ConnectionClosedError:
                    print("Connection closed by server")
                    break
                    
    except InvalidURI as e:
        print(f"Invalid WebSocket URI: {e}")
    except ConnectionClosedError as e:
        print(f"Connection failed: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

asyncio.run(robust_client())

Unix Domain Socket Client

import asyncio
import websockets

async def unix_client():
    # Connect via Unix domain socket
    async with websockets.unix_connect("/tmp/websocket.sock") as websocket:
        await websocket.send("Hello via Unix socket!")
        response = await websocket.recv()
        print(f"Response: {response}")

asyncio.run(unix_client())

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json