CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

asyncio-server.mddocs/

Asyncio Server Operations

Complete asyncio-based WebSocket server functionality for creating WebSocket servers, managing connections, handling client requests, and broadcasting messages to multiple clients.

Capabilities

Server Creation Functions

Start WebSocket servers with customizable handlers, authentication, compression, and connection management.

async def serve(
    handler: Callable[[ServerConnection], Awaitable[None]],
    host: str,
    port: int,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate", 
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start a WebSocket server.

    Parameters:
    - handler: Coroutine to handle each WebSocket connection
    - host: Server host address
    - port: Server port number
    - logger: Logger instance for server logging
    - compression: Compression mode ("deflate" or None)
    - subprotocols: List of supported subprotocols
    - extra_headers: Additional response headers
    - process_request: Function to process HTTP request before upgrade
    - select_subprotocol: Function to select subprotocol from client list
    - ping_interval: Interval between ping frames (seconds)
    - ping_timeout: Timeout for ping/pong exchange (seconds)
    - close_timeout: Timeout for connection closure (seconds)
    - max_size: Maximum message size (bytes)
    - max_queue: Maximum number of queued messages
    - read_limit: Buffer size for reading (bytes)
    - write_limit: Buffer size for writing (bytes)
    - extensions: List of WebSocket extensions

    Returns:
    Server: Async context manager for server lifecycle management
    """

async def unix_serve(
    handler: Callable[[ServerConnection], Awaitable[None]],
    path: str,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate",
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start a WebSocket server on Unix domain socket.

    Parameters:
    - handler: Coroutine to handle each WebSocket connection
    - path: Unix domain socket path
    - Other parameters same as serve()

    Returns:
    Server: WebSocket server bound to Unix socket
    """

Server Connection Management

The ServerConnection class represents individual client connections on the server side.

class ServerConnection:
    """WebSocket server connection representing a client."""
    
    @property
    def closed(self) -> bool:
        """Check if connection is closed."""
    
    @property
    def local_address(self) -> Tuple[str, int]:
        """Get server socket address."""
    
    @property
    def remote_address(self) -> Tuple[str, int]:
        """Get client socket address."""
    
    @property
    def subprotocol(self) -> Subprotocol | None:
        """Get negotiated subprotocol."""
    
    @property 
    def request_headers(self) -> Headers:
        """Get HTTP request headers from handshake."""
    
    @property
    def response_headers(self) -> Headers:
        """Get HTTP response headers from handshake."""
    
    async def send(self, message: Data) -> None:
        """
        Send a message to the WebSocket client.

        Parameters:
        - message: Text (str) or binary (bytes) message to send

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def recv(self) -> Data:
        """
        Receive a message from the WebSocket client.

        Returns:
        str | bytes: Received message (text or binary)

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def ping(self, data: bytes = b"") -> Awaitable[float]:
        """
        Send a ping frame and wait for pong response.

        Parameters:
        - data: Optional payload for ping frame

        Returns:
        Awaitable[float]: Coroutine that resolves to round-trip time

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def pong(self, data: bytes = b"") -> None:
        """
        Send a pong frame.

        Parameters:
        - data: Payload for pong frame

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    async def close(self, code: int = 1000, reason: str = "") -> None:
        """
        Close the WebSocket connection.

        Parameters:
        - code: Close code (default 1000 for normal closure)
        - reason: Human-readable close reason

        Raises:
        - ProtocolError: If code is invalid
        """
    
    async def wait_closed(self) -> None:
        """Wait until the connection is closed."""
    
    # Async iterator support for receiving messages
    def __aiter__(self) -> AsyncIterator[Data]:
        """Return async iterator for receiving messages."""
        return self
    
    async def __anext__(self) -> Data:
        """Get next message from async iterator."""

Server Management

The Server class manages the WebSocket server lifecycle and provides access to connected clients.

class Server:
    """WebSocket server management."""
    
    @property
    def sockets(self) -> Set[socket.socket]:
        """Get server sockets."""
    
    @property
    def websockets(self) -> Set[ServerConnection]:
        """Get active WebSocket connections."""
    
    def close(self) -> None:
        """
        Stop accepting new connections and close existing ones.
        
        This initiates server shutdown but doesn't wait for completion.
        Use wait_closed() to wait for full shutdown.
        """
    
    async def wait_closed(self) -> None:
        """Wait until server and all connections are closed."""
    
    # Context manager support
    async def __aenter__(self) -> Server:
        """Enter async context manager."""
        return self
    
    async def __aexit__(self, exc_type, exc_value, traceback) -> None:
        """Exit async context manager and close server."""

Message Broadcasting

Utility functions for sending messages to multiple WebSocket connections concurrently.

def broadcast(
    websockets: Iterable[ServerConnection], 
    message: Data,
    raise_exceptions: bool = False
) -> None:
    """
    Broadcast a message to multiple WebSocket connections.

    Parameters:
    - websockets: Iterable of ServerConnection objects
    - message: Text (str) or binary (bytes) message to broadcast
    - raise_exceptions: Whether to raise exceptions from failed sends

    Note:
    This function sends messages concurrently and handles connection failures gracefully.
    Failed connections are silently ignored unless raise_exceptions=True.
    """

Authentication Support

Built-in HTTP basic authentication decorator for WebSocket handlers.

def basic_auth(
    username: str, 
    password: str,
    realm: str = "WebSocket"
) -> Callable:
    """
    Create HTTP basic authentication decorator for WebSocket handlers.

    Parameters:
    - username: Expected username
    - password: Expected password  
    - realm: Authentication realm name

    Returns:
    Callable: Decorator that adds basic authentication to handler

    Usage:
    @basic_auth("admin", "secret")
    async def protected_handler(websocket):
        # Handler code here
        pass
    """

Usage Examples

Basic Echo Server

import asyncio
import websockets

async def echo_handler(websocket):
    """Simple echo handler that returns received messages."""
    async for message in websocket:
        await websocket.send(f"Echo: {message}")

async def main():
    # Start server on localhost:8765
    async with websockets.serve(echo_handler, "localhost", 8765):
        print("WebSocket server started on ws://localhost:8765")
        await asyncio.Future()  # Run forever

asyncio.run(main())

Chat Server with Broadcasting

import asyncio
import websockets
import json
from typing import Set

# Keep track of connected clients
clients: Set[websockets.ServerConnection] = set()

async def chat_handler(websocket):
    """Chat server handler with broadcasting."""
    # Register client
    clients.add(websocket)
    print(f"Client connected: {websocket.remote_address}")
    
    try:
        async for message in websocket:
            try:
                # Parse JSON message
                data = json.loads(message)
                chat_message = {
                    "user": data.get("user", "Anonymous"),
                    "message": data.get("message", ""),
                    "timestamp": asyncio.get_event_loop().time()
                }
                
                # Broadcast to all connected clients
                broadcast_message = json.dumps(chat_message)
                websockets.broadcast(clients, broadcast_message)
                
            except json.JSONDecodeError:
                # Send error response
                error = {"error": "Invalid JSON format"}
                await websocket.send(json.dumps(error))
                
    except websockets.ConnectionClosedError:
        pass
    finally:
        # Unregister client
        clients.discard(websocket)
        print(f"Client disconnected: {websocket.remote_address}")

async def main():
    async with websockets.serve(
        chat_handler, 
        "localhost", 
        8765,
        ping_interval=30,
        ping_timeout=10,
        max_size=1024*10  # 10KB max message
    ):
        print("Chat server started on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Authenticated Server

import asyncio
import websockets
from websockets import basic_auth

@basic_auth("admin", "secret123")
async def protected_handler(websocket):
    """Handler that requires authentication."""
    await websocket.send("Welcome! You are authenticated.")
    
    async for message in websocket:
        if message.lower() == "status":
            await websocket.send("Server is running normally")
        else:
            await websocket.send(f"Received: {message}")

async def main():
    async with websockets.serve(
        protected_handler,
        "localhost", 
        8765,
        extra_headers={"Server": "MyWebSocketServer/1.0"}
    ):
        print("Protected server started on ws://localhost:8765")
        print("Use Authorization: Basic YWRtaW46c2VjcmV0MTIz")
        await asyncio.Future()

asyncio.run(main())

Server with Custom Request Processing

import asyncio
import websockets
from websockets import Request, Response

async def process_request(connection, request: Request):
    """Custom request processing before WebSocket upgrade."""
    # Check custom header
    api_key = request.headers.get("X-API-Key")
    if not api_key or api_key != "valid-api-key":
        return Response(403, "Forbidden", b"Invalid API key")
    
    # Allow upgrade to proceed
    return None

async def api_handler(websocket):
    """API handler for authenticated clients."""
    await websocket.send("API connection established")
    
    async for message in websocket:
        # Process API commands
        if message.startswith("GET /"):
            await websocket.send('{"status": "success", "data": "..."}')
        else:
            await websocket.send('{"error": "Unknown command"}')

async def main():
    async with websockets.serve(
        api_handler,
        "localhost",
        8765,
        process_request=process_request,
        compression="deflate"
    ):
        print("API server started on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Unix Domain Socket Server

import asyncio
import websockets
import os

async def unix_handler(websocket):
    """Handler for Unix socket connections."""
    await websocket.send("Connected via Unix socket")
    
    async for message in websocket:
        await websocket.send(f"Unix echo: {message}")

async def main():
    socket_path = "/tmp/websocket.sock"
    
    # Remove existing socket file
    if os.path.exists(socket_path):
        os.unlink(socket_path)
    
    async with websockets.unix_serve(unix_handler, socket_path):
        print(f"Unix WebSocket server started on {socket_path}")
        await asyncio.Future()

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json