An implementation of the WebSocket Protocol (RFC 6455 & 7692)
Complete asyncio-based WebSocket server functionality for creating WebSocket servers, managing connections, handling client requests, and broadcasting messages to multiple clients.
Start WebSocket servers with customizable handlers, authentication, compression, and connection management.
async def serve(
handler: Callable[[ServerConnection], Awaitable[None]],
host: str,
port: int,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start a WebSocket server.
Parameters:
- handler: Coroutine to handle each WebSocket connection
- host: Server host address
- port: Server port number
- logger: Logger instance for server logging
- compression: Compression mode ("deflate" or None)
- subprotocols: List of supported subprotocols
- extra_headers: Additional response headers
- process_request: Function to process HTTP request before upgrade
- select_subprotocol: Function to select subprotocol from client list
- ping_interval: Interval between ping frames (seconds)
- ping_timeout: Timeout for ping/pong exchange (seconds)
- close_timeout: Timeout for connection closure (seconds)
- max_size: Maximum message size (bytes)
- max_queue: Maximum number of queued messages
- read_limit: Buffer size for reading (bytes)
- write_limit: Buffer size for writing (bytes)
- extensions: List of WebSocket extensions
Returns:
Server: Async context manager for server lifecycle management
"""
async def unix_serve(
handler: Callable[[ServerConnection], Awaitable[None]],
path: str,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start a WebSocket server on Unix domain socket.
Parameters:
- handler: Coroutine to handle each WebSocket connection
- path: Unix domain socket path
- Other parameters same as serve()
Returns:
Server: WebSocket server bound to Unix socket
"""The ServerConnection class represents individual client connections on the server side.
class ServerConnection:
"""WebSocket server connection representing a client."""
@property
def closed(self) -> bool:
"""Check if connection is closed."""
@property
def local_address(self) -> Tuple[str, int]:
"""Get server socket address."""
@property
def remote_address(self) -> Tuple[str, int]:
"""Get client socket address."""
@property
def subprotocol(self) -> Subprotocol | None:
"""Get negotiated subprotocol."""
@property
def request_headers(self) -> Headers:
"""Get HTTP request headers from handshake."""
@property
def response_headers(self) -> Headers:
"""Get HTTP response headers from handshake."""
async def send(self, message: Data) -> None:
"""
Send a message to the WebSocket client.
Parameters:
- message: Text (str) or binary (bytes) message to send
Raises:
- ConnectionClosed: If connection is closed
"""
async def recv(self) -> Data:
"""
Receive a message from the WebSocket client.
Returns:
str | bytes: Received message (text or binary)
Raises:
- ConnectionClosed: If connection is closed
"""
async def ping(self, data: bytes = b"") -> Awaitable[float]:
"""
Send a ping frame and wait for pong response.
Parameters:
- data: Optional payload for ping frame
Returns:
Awaitable[float]: Coroutine that resolves to round-trip time
Raises:
- ConnectionClosed: If connection is closed
"""
async def pong(self, data: bytes = b"") -> None:
"""
Send a pong frame.
Parameters:
- data: Payload for pong frame
Raises:
- ConnectionClosed: If connection is closed
"""
async def close(self, code: int = 1000, reason: str = "") -> None:
"""
Close the WebSocket connection.
Parameters:
- code: Close code (default 1000 for normal closure)
- reason: Human-readable close reason
Raises:
- ProtocolError: If code is invalid
"""
async def wait_closed(self) -> None:
"""Wait until the connection is closed."""
# Async iterator support for receiving messages
def __aiter__(self) -> AsyncIterator[Data]:
"""Return async iterator for receiving messages."""
return self
async def __anext__(self) -> Data:
"""Get next message from async iterator."""The Server class manages the WebSocket server lifecycle and provides access to connected clients.
class Server:
"""WebSocket server management."""
@property
def sockets(self) -> Set[socket.socket]:
"""Get server sockets."""
@property
def websockets(self) -> Set[ServerConnection]:
"""Get active WebSocket connections."""
def close(self) -> None:
"""
Stop accepting new connections and close existing ones.
This initiates server shutdown but doesn't wait for completion.
Use wait_closed() to wait for full shutdown.
"""
async def wait_closed(self) -> None:
"""Wait until server and all connections are closed."""
# Context manager support
async def __aenter__(self) -> Server:
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
"""Exit async context manager and close server."""Utility functions for sending messages to multiple WebSocket connections concurrently.
def broadcast(
websockets: Iterable[ServerConnection],
message: Data,
raise_exceptions: bool = False
) -> None:
"""
Broadcast a message to multiple WebSocket connections.
Parameters:
- websockets: Iterable of ServerConnection objects
- message: Text (str) or binary (bytes) message to broadcast
- raise_exceptions: Whether to raise exceptions from failed sends
Note:
This function sends messages concurrently and handles connection failures gracefully.
Failed connections are silently ignored unless raise_exceptions=True.
"""Built-in HTTP basic authentication decorator for WebSocket handlers.
def basic_auth(
username: str,
password: str,
realm: str = "WebSocket"
) -> Callable:
"""
Create HTTP basic authentication decorator for WebSocket handlers.
Parameters:
- username: Expected username
- password: Expected password
- realm: Authentication realm name
Returns:
Callable: Decorator that adds basic authentication to handler
Usage:
@basic_auth("admin", "secret")
async def protected_handler(websocket):
# Handler code here
pass
"""import asyncio
import websockets
async def echo_handler(websocket):
"""Simple echo handler that returns received messages."""
async for message in websocket:
await websocket.send(f"Echo: {message}")
async def main():
# Start server on localhost:8765
async with websockets.serve(echo_handler, "localhost", 8765):
print("WebSocket server started on ws://localhost:8765")
await asyncio.Future() # Run forever
asyncio.run(main())import asyncio
import websockets
import json
from typing import Set
# Keep track of connected clients
clients: Set[websockets.ServerConnection] = set()
async def chat_handler(websocket):
"""Chat server handler with broadcasting."""
# Register client
clients.add(websocket)
print(f"Client connected: {websocket.remote_address}")
try:
async for message in websocket:
try:
# Parse JSON message
data = json.loads(message)
chat_message = {
"user": data.get("user", "Anonymous"),
"message": data.get("message", ""),
"timestamp": asyncio.get_event_loop().time()
}
# Broadcast to all connected clients
broadcast_message = json.dumps(chat_message)
websockets.broadcast(clients, broadcast_message)
except json.JSONDecodeError:
# Send error response
error = {"error": "Invalid JSON format"}
await websocket.send(json.dumps(error))
except websockets.ConnectionClosedError:
pass
finally:
# Unregister client
clients.discard(websocket)
print(f"Client disconnected: {websocket.remote_address}")
async def main():
async with websockets.serve(
chat_handler,
"localhost",
8765,
ping_interval=30,
ping_timeout=10,
max_size=1024*10 # 10KB max message
):
print("Chat server started on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())import asyncio
import websockets
from websockets import basic_auth
@basic_auth("admin", "secret123")
async def protected_handler(websocket):
"""Handler that requires authentication."""
await websocket.send("Welcome! You are authenticated.")
async for message in websocket:
if message.lower() == "status":
await websocket.send("Server is running normally")
else:
await websocket.send(f"Received: {message}")
async def main():
async with websockets.serve(
protected_handler,
"localhost",
8765,
extra_headers={"Server": "MyWebSocketServer/1.0"}
):
print("Protected server started on ws://localhost:8765")
print("Use Authorization: Basic YWRtaW46c2VjcmV0MTIz")
await asyncio.Future()
asyncio.run(main())import asyncio
import websockets
from websockets import Request, Response
async def process_request(connection, request: Request):
"""Custom request processing before WebSocket upgrade."""
# Check custom header
api_key = request.headers.get("X-API-Key")
if not api_key or api_key != "valid-api-key":
return Response(403, "Forbidden", b"Invalid API key")
# Allow upgrade to proceed
return None
async def api_handler(websocket):
"""API handler for authenticated clients."""
await websocket.send("API connection established")
async for message in websocket:
# Process API commands
if message.startswith("GET /"):
await websocket.send('{"status": "success", "data": "..."}')
else:
await websocket.send('{"error": "Unknown command"}')
async def main():
async with websockets.serve(
api_handler,
"localhost",
8765,
process_request=process_request,
compression="deflate"
):
print("API server started on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())import asyncio
import websockets
import os
async def unix_handler(websocket):
"""Handler for Unix socket connections."""
await websocket.send("Connected via Unix socket")
async for message in websocket:
await websocket.send(f"Unix echo: {message}")
async def main():
socket_path = "/tmp/websocket.sock"
# Remove existing socket file
if os.path.exists(socket_path):
os.unlink(socket_path)
async with websockets.unix_serve(unix_handler, socket_path):
print(f"Unix WebSocket server started on {socket_path}")
await asyncio.Future()
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-websockets