CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

sync-server.mddocs/

Synchronous Server Operations

Complete threading-based synchronous WebSocket server functionality for creating WebSocket servers using blocking operations, suitable for traditional Python applications that don't use asyncio.

Core Imports

from websockets.sync.server import serve, unix_serve, ServerConnection, Server, basic_auth

Capabilities

Server Creation Functions

Start synchronous WebSocket servers with customizable handlers, authentication, and connection management using threading.

def serve(
    handler: Callable[[ServerConnection], None],
    host: str | None = None,
    port: int | None = None,
    *,
    # TCP/TLS
    sock: socket.socket | None = None,
    ssl: ssl.SSLContext | None = None,
    # WebSocket
    origins: Sequence[Origin | re.Pattern[str] | None] | None = None,
    extensions: Sequence[ServerExtensionFactory] | None = None,
    subprotocols: Sequence[Subprotocol] | None = None,
    select_subprotocol: Callable[[ServerConnection, Sequence[Subprotocol]], Subprotocol | None] | None = None,
    compression: str | None = "deflate",
    # HTTP
    process_request: Callable[[ServerConnection, Request], Response | None] | None = None,
    process_response: Callable[[ServerConnection, Request, Response], Response | None] | None = None,
    server_header: str | None = SERVER,
    # Timeouts
    open_timeout: float | None = 10,
    ping_interval: float | None = 20,
    ping_timeout: float | None = 20,
    close_timeout: float | None = 10,
    # Limits
    max_size: int | None = 2**20,
    max_queue: int | None | tuple[int | None, int | None] = 16,
    # Logging
    logger: LoggerLike | None = None,
    # Escape hatch for advanced customization
    create_connection: type[ServerConnection] | None = None,
    **kwargs: Any
) -> Server:
    """
    Start a synchronous WebSocket server.

    Parameters:
    - handler: Function to handle each WebSocket connection (runs in separate thread)
    - host: Server host address (None for all interfaces)
    - port: Server port number (None for automatic assignment)
    - sock: Preexisting TCP socket to use for server
    - ssl: SSL context for TLS configuration
    - origins: List of allowed origins (None allows all)
    - extensions: List of supported extensions in negotiation order
    - subprotocols: List of supported subprotocols in preference order
    - select_subprotocol: Function to select subprotocol from client list
    - compression: Compression mode ("deflate" or None)
    - process_request: Function to process HTTP request before WebSocket upgrade
    - process_response: Function to process HTTP response after WebSocket upgrade
    - server_header: Server header value (None to omit)
    - open_timeout: Timeout for connection establishment (seconds)
    - ping_interval: Interval between ping frames (seconds)
    - ping_timeout: Timeout for ping/pong exchange (seconds)
    - close_timeout: Timeout for connection closure (seconds)
    - max_size: Maximum message size (bytes)
    - max_queue: Maximum number of queued messages (int or tuple)
    - logger: Logger instance for server logging
    - create_connection: Custom connection class factory

    Returns:
    Server: Context manager for server lifecycle management

    Raises:
    - OSError: If server cannot bind to address/port
    """

def unix_serve(
    handler: Callable[[ServerConnection], None],
    path: str,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate",
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start a synchronous WebSocket server on Unix domain socket.

    Parameters:
    - handler: Function to handle each WebSocket connection
    - path: Unix domain socket path
    - Other parameters same as serve()

    Returns:
    Server: WebSocket server bound to Unix socket

    Raises:
    - OSError: If server cannot bind to Unix socket
    """

Server Connection Management

The synchronous ServerConnection class represents individual client connections with blocking operations.

class ServerConnection:
    """Synchronous WebSocket server connection representing a client."""
    
    @property
    def closed(self) -> bool:
        """Check if connection is closed."""
    
    @property
    def local_address(self) -> Tuple[str, int]:
        """Get server socket address."""
    
    @property
    def remote_address(self) -> Tuple[str, int]:
        """Get client socket address."""
    
    @property
    def subprotocol(self) -> Subprotocol | None:
        """Get negotiated subprotocol."""
    
    @property
    def request_headers(self) -> Headers:
        """Get HTTP request headers from handshake."""
    
    @property
    def response_headers(self) -> Headers:
        """Get HTTP response headers from handshake."""
    
    def send(self, message: Data, timeout: float = None) -> None:
        """
        Send a message to the WebSocket client.

        Parameters:
        - message: Text (str) or binary (bytes) message to send
        - timeout: Optional timeout for send operation (seconds)

        Raises:
        - ConnectionClosed: If connection is closed
        - TimeoutError: If timeout is exceeded
        """
    
    def recv(self, timeout: float = None) -> Data:
        """
        Receive a message from the WebSocket client.

        Parameters:
        - timeout: Optional timeout for receive operation (seconds)

        Returns:
        str | bytes: Received message (text or binary)

        Raises:
        - ConnectionClosed: If connection is closed
        - TimeoutError: If timeout is exceeded
        """
    
    def ping(self, data: bytes = b"", timeout: float = None) -> float:
        """
        Send a ping frame and wait for pong response.

        Parameters:
        - data: Optional payload for ping frame
        - timeout: Optional timeout for ping/pong exchange (seconds)

        Returns:
        float: Round-trip time in seconds

        Raises:
        - ConnectionClosed: If connection is closed
        - TimeoutError: If timeout is exceeded
        """
    
    def pong(self, data: bytes = b"") -> None:
        """
        Send a pong frame.

        Parameters:
        - data: Payload for pong frame

        Raises:
        - ConnectionClosed: If connection is closed
        """
    
    def close(self, code: int = 1000, reason: str = "") -> None:
        """
        Close the WebSocket connection.

        Parameters:
        - code: Close code (default 1000 for normal closure)
        - reason: Human-readable close reason

        Raises:
        - ProtocolError: If code is invalid
        """
    
    # Iterator support for receiving messages
    def __iter__(self) -> Iterator[Data]:
        """Return iterator for receiving messages."""
        return self
    
    def __next__(self) -> Data:
        """Get next message from iterator."""
        try:
            return self.recv()
        except ConnectionClosed:
            raise StopIteration

Server Management

The synchronous Server class manages the WebSocket server lifecycle with blocking operations.

class Server:
    """Synchronous WebSocket server management."""
    
    @property
    def socket(self) -> socket.socket:
        """Get server socket."""
    
    def close(self) -> None:
        """
        Stop accepting new connections and close existing ones.
        
        This initiates server shutdown and blocks until all connections are closed.
        """
    
    def serve_forever(self) -> None:
        """
        Run the server until interrupted.
        
        This method blocks and handles incoming connections until
        KeyboardInterrupt or server.close() is called.
        """
    
    # Context manager support
    def __enter__(self) -> Server:
        """Enter context manager."""
        return self
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Exit context manager and close server."""
        self.close()

Authentication Support

Built-in HTTP basic authentication decorator for synchronous WebSocket handlers.

def basic_auth(
    username: str,
    password: str,
    realm: str = "WebSocket"
) -> Callable:
    """
    Create HTTP basic authentication decorator for synchronous WebSocket handlers.

    Parameters:
    - username: Expected username
    - password: Expected password
    - realm: Authentication realm name

    Returns:
    Callable: Decorator that adds basic authentication to handler

    Usage:
    @basic_auth("admin", "secret")
    def protected_handler(websocket):
        # Handler code here
        pass
    """

Usage Examples

Basic Echo Server

from websockets.sync import serve
import logging

def echo_handler(websocket):
    """Simple echo handler that returns received messages."""
    print(f"Client connected: {websocket.remote_address}")
    
    try:
        for message in websocket:
            print(f"Received: {message}")
            websocket.send(f"Echo: {message}")
    except Exception as e:
        print(f"Handler error: {e}")
    finally:
        print(f"Client disconnected: {websocket.remote_address}")

def main():
    # Start server
    with serve(echo_handler, "localhost", 8765) as server:
        print("WebSocket server started on ws://localhost:8765")
        server.serve_forever()

if __name__ == "__main__":
    main()

Multi-threaded Chat Server

from websockets.sync import serve
import threading
import json
import time
from typing import Set

# Thread-safe set for connected clients
clients: Set = set()
clients_lock = threading.Lock()

def broadcast_message(message: str, sender=None):
    """Broadcast message to all connected clients except sender."""
    with clients_lock:
        for client in clients.copy():  # Copy to avoid modification during iteration
            if client != sender:
                try:
                    client.send(message)
                except Exception as e:
                    print(f"Failed to send to client: {e}")
                    clients.discard(client)

def chat_handler(websocket):
    """Chat server handler with broadcasting."""
    # Register client
    with clients_lock:
        clients.add(websocket)
    
    print(f"Client connected: {websocket.remote_address} (Total: {len(clients)})")
    
    try:
        # Send welcome message
        websocket.send(json.dumps({
            "type": "system",
            "message": "Welcome to the chat server!"
        }))
        
        # Handle messages
        for message in websocket:
            try:
                data = json.loads(message)
                chat_message = {
                    "type": "chat",
                    "user": data.get("user", "Anonymous"),
                    "message": data.get("message", ""),
                    "timestamp": time.time()
                }
                
                # Broadcast to all clients
                broadcast_message(json.dumps(chat_message), websocket)
                
            except json.JSONDecodeError:
                error = {"type": "error", "message": "Invalid JSON format"}
                websocket.send(json.dumps(error))
                
    except Exception as e:
        print(f"Chat handler error: {e}")
    finally:
        # Unregister client
        with clients_lock:
            clients.discard(websocket)
        print(f"Client disconnected: {websocket.remote_address} (Total: {len(clients)})")

def main():
    with serve(
        chat_handler,
        "localhost",
        8765,
        ping_interval=30,
        ping_timeout=10
    ) as server:
        print("Chat server started on ws://localhost:8765")
        try:
            server.serve_forever()
        except KeyboardInterrupt:
            print("\nShutting down server...")

if __name__ == "__main__":
    main()

Authenticated Server with Request Processing

from websockets.sync import serve, basic_auth
from websockets import Request, Response
import base64

def process_request(connection, request: Request):
    """Custom request processing with API key validation."""
    # Check for API key in headers
    api_key = request.headers.get("X-API-Key")
    if not api_key or api_key != "secret-api-key":
        return Response(401, "Unauthorized", b"Missing or invalid API key")
    
    # Check for valid client certificate (example)
    user_agent = request.headers.get("User-Agent", "")
    if "TrustedClient" not in user_agent:
        return Response(403, "Forbidden", b"Untrusted client")
    
    # Allow WebSocket upgrade
    return None

@basic_auth("admin", "password123")
def protected_handler(websocket):
    """Handler that requires both API key and basic auth."""
    websocket.send("Authentication successful!")
    
    try:
        for message in websocket:
            if message.startswith("/"):
                # Handle commands
                command = message[1:].strip().lower()
                
                if command == "status":
                    websocket.send("Server is running normally")
                elif command == "clients":
                    websocket.send(f"Connected clients: 1")  # Simplified
                elif command == "time":
                    import datetime
                    websocket.send(f"Server time: {datetime.datetime.now()}")
                else:
                    websocket.send(f"Unknown command: {command}")
            else:
                # Echo regular messages
                websocket.send(f"Received: {message}")
                
    except Exception as e:
        print(f"Protected handler error: {e}")

def main():
    with serve(
        protected_handler,
        "localhost",
        8765,
        process_request=process_request,
        extra_headers={"Server": "SecureWebSocketServer/1.0"}
    ) as server:
        print("Secure server started on ws://localhost:8765")
        print("Requires: X-API-Key: secret-api-key")
        print("And Basic Auth: admin/password123")
        server.serve_forever()

if __name__ == "__main__":
    main()

File Transfer Server

from websockets.sync import serve
import os
import json
import base64

def file_server_handler(websocket):
    """File transfer server handler."""
    websocket.send(json.dumps({
        "type": "welcome",
        "message": "File server ready. Send 'list' or 'get <filename>'"
    }))
    
    try:
        for message in websocket:
            try:
                data = json.loads(message)
                command = data.get("command", "").lower()
                
                if command == "list":
                    # List files in current directory
                    files = [f for f in os.listdir(".") if os.path.isfile(f)]
                    response = {
                        "type": "file_list",
                        "files": files
                    }
                    websocket.send(json.dumps(response))
                    
                elif command == "get":
                    filename = data.get("filename", "")
                    if not filename:
                        websocket.send(json.dumps({
                            "type": "error",
                            "message": "Filename required"
                        }))
                        continue
                    
                    try:
                        with open(filename, "rb") as f:
                            file_data = f.read()
                            encoded_data = base64.b64encode(file_data).decode()
                            
                        response = {
                            "type": "file_data",
                            "filename": filename,
                            "size": len(file_data),
                            "data": encoded_data
                        }
                        websocket.send(json.dumps(response))
                        
                    except FileNotFoundError:
                        websocket.send(json.dumps({
                            "type": "error",
                            "message": f"File not found: {filename}"
                        }))
                        
                else:
                    websocket.send(json.dumps({
                        "type": "error",
                        "message": f"Unknown command: {command}"
                    }))
                    
            except json.JSONDecodeError:
                websocket.send(json.dumps({
                    "type": "error",
                    "message": "Invalid JSON format"
                }))
                
    except Exception as e:
        print(f"File server error: {e}")

def main():
    with serve(
        file_server_handler,
        "localhost",
        8765,
        max_size=10*1024*1024  # 10MB max message size for file transfers
    ) as server:
        print("File server started on ws://localhost:8765")
        server.serve_forever()

if __name__ == "__main__":
    main()

Unix Domain Socket Server

from websockets.sync import unix_serve
import os
import tempfile

def unix_handler(websocket):
    """Handler for Unix socket connections."""
    print(f"Client connected via Unix socket")
    
    try:
        websocket.send("Connected to Unix WebSocket server")
        
        for message in websocket:
            print(f"Received: {message}")
            websocket.send(f"Unix echo: {message}")
            
    except Exception as e:
        print(f"Unix handler error: {e}")
    finally:
        print("Unix client disconnected")

def main():
    # Use temporary directory for socket
    socket_path = os.path.join(tempfile.gettempdir(), "websocket.sock")
    
    # Remove existing socket file
    if os.path.exists(socket_path):
        os.unlink(socket_path)
    
    try:
        with unix_serve(unix_handler, socket_path) as server:
            print(f"Unix WebSocket server started on {socket_path}")
            server.serve_forever()
    finally:
        # Clean up socket file
        if os.path.exists(socket_path):
            os.unlink(socket_path)

if __name__ == "__main__":
    main()

Server with Graceful Shutdown

from websockets.sync import serve
import signal
import sys
import threading
import time

# Global server reference for signal handler
server_instance = None

def signal_handler(signum, frame):
    """Handle shutdown signals gracefully."""
    print(f"\nReceived signal {signum}, shutting down gracefully...")
    if server_instance:
        server_instance.close()
    sys.exit(0)

def heartbeat_handler(websocket):
    """Handler that sends periodic heartbeats."""
    print(f"Client connected: {websocket.remote_address}")
    
    try:
        # Send welcome message
        websocket.send("Connected to heartbeat server")
        
        # Start heartbeat thread
        stop_heartbeat = threading.Event()
        
        def send_heartbeat():
            while not stop_heartbeat.is_set():
                try:
                    websocket.send(f"Heartbeat: {time.time()}")
                    time.sleep(5)
                except:
                    break
        
        heartbeat_thread = threading.Thread(target=send_heartbeat)
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        
        # Handle incoming messages
        for message in websocket:
            print(f"Received: {message}")
            if message.lower() == "stop heartbeat":
                stop_heartbeat.set()
                websocket.send("Heartbeat stopped")
            elif message.lower() == "start heartbeat":
                if stop_heartbeat.is_set():
                    stop_heartbeat.clear()
                    heartbeat_thread = threading.Thread(target=send_heartbeat)
                    heartbeat_thread.daemon = True
                    heartbeat_thread.start()
                    websocket.send("Heartbeat started")
            else:
                websocket.send(f"Echo: {message}")
        
        stop_heartbeat.set()
        
    except Exception as e:
        print(f"Heartbeat handler error: {e}")
    finally:
        print(f"Client disconnected: {websocket.remote_address}")

def main():
    global server_instance
    
    # Set up signal handlers
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    try:
        with serve(heartbeat_handler, "localhost", 8765) as server:
            server_instance = server
            print("Heartbeat server started on ws://localhost:8765")
            print("Press Ctrl+C to shutdown gracefully")
            server.serve_forever()
    except KeyboardInterrupt:
        print("\nServer interrupted")
    finally:
        print("Server shutdown complete")

if __name__ == "__main__":
    main()

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json