An implementation of the WebSocket Protocol (RFC 6455 & 7692)
Complete threading-based synchronous WebSocket server functionality for creating WebSocket servers using blocking operations, suitable for traditional Python applications that don't use asyncio.
from websockets.sync.server import serve, unix_serve, ServerConnection, Server, basic_authStart synchronous WebSocket servers with customizable handlers, authentication, and connection management using threading.
def serve(
handler: Callable[[ServerConnection], None],
host: str | None = None,
port: int | None = None,
*,
# TCP/TLS
sock: socket.socket | None = None,
ssl: ssl.SSLContext | None = None,
# WebSocket
origins: Sequence[Origin | re.Pattern[str] | None] | None = None,
extensions: Sequence[ServerExtensionFactory] | None = None,
subprotocols: Sequence[Subprotocol] | None = None,
select_subprotocol: Callable[[ServerConnection, Sequence[Subprotocol]], Subprotocol | None] | None = None,
compression: str | None = "deflate",
# HTTP
process_request: Callable[[ServerConnection, Request], Response | None] | None = None,
process_response: Callable[[ServerConnection, Request, Response], Response | None] | None = None,
server_header: str | None = SERVER,
# Timeouts
open_timeout: float | None = 10,
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | None | tuple[int | None, int | None] = 16,
# Logging
logger: LoggerLike | None = None,
# Escape hatch for advanced customization
create_connection: type[ServerConnection] | None = None,
**kwargs: Any
) -> Server:
"""
Start a synchronous WebSocket server.
Parameters:
- handler: Function to handle each WebSocket connection (runs in separate thread)
- host: Server host address (None for all interfaces)
- port: Server port number (None for automatic assignment)
- sock: Preexisting TCP socket to use for server
- ssl: SSL context for TLS configuration
- origins: List of allowed origins (None allows all)
- extensions: List of supported extensions in negotiation order
- subprotocols: List of supported subprotocols in preference order
- select_subprotocol: Function to select subprotocol from client list
- compression: Compression mode ("deflate" or None)
- process_request: Function to process HTTP request before WebSocket upgrade
- process_response: Function to process HTTP response after WebSocket upgrade
- server_header: Server header value (None to omit)
- open_timeout: Timeout for connection establishment (seconds)
- ping_interval: Interval between ping frames (seconds)
- ping_timeout: Timeout for ping/pong exchange (seconds)
- close_timeout: Timeout for connection closure (seconds)
- max_size: Maximum message size (bytes)
- max_queue: Maximum number of queued messages (int or tuple)
- logger: Logger instance for server logging
- create_connection: Custom connection class factory
Returns:
Server: Context manager for server lifecycle management
Raises:
- OSError: If server cannot bind to address/port
"""
def unix_serve(
handler: Callable[[ServerConnection], None],
path: str,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start a synchronous WebSocket server on Unix domain socket.
Parameters:
- handler: Function to handle each WebSocket connection
- path: Unix domain socket path
- Other parameters same as serve()
Returns:
Server: WebSocket server bound to Unix socket
Raises:
- OSError: If server cannot bind to Unix socket
"""The synchronous ServerConnection class represents individual client connections with blocking operations.
class ServerConnection:
"""Synchronous WebSocket server connection representing a client."""
@property
def closed(self) -> bool:
"""Check if connection is closed."""
@property
def local_address(self) -> Tuple[str, int]:
"""Get server socket address."""
@property
def remote_address(self) -> Tuple[str, int]:
"""Get client socket address."""
@property
def subprotocol(self) -> Subprotocol | None:
"""Get negotiated subprotocol."""
@property
def request_headers(self) -> Headers:
"""Get HTTP request headers from handshake."""
@property
def response_headers(self) -> Headers:
"""Get HTTP response headers from handshake."""
def send(self, message: Data, timeout: float = None) -> None:
"""
Send a message to the WebSocket client.
Parameters:
- message: Text (str) or binary (bytes) message to send
- timeout: Optional timeout for send operation (seconds)
Raises:
- ConnectionClosed: If connection is closed
- TimeoutError: If timeout is exceeded
"""
def recv(self, timeout: float = None) -> Data:
"""
Receive a message from the WebSocket client.
Parameters:
- timeout: Optional timeout for receive operation (seconds)
Returns:
str | bytes: Received message (text or binary)
Raises:
- ConnectionClosed: If connection is closed
- TimeoutError: If timeout is exceeded
"""
def ping(self, data: bytes = b"", timeout: float = None) -> float:
"""
Send a ping frame and wait for pong response.
Parameters:
- data: Optional payload for ping frame
- timeout: Optional timeout for ping/pong exchange (seconds)
Returns:
float: Round-trip time in seconds
Raises:
- ConnectionClosed: If connection is closed
- TimeoutError: If timeout is exceeded
"""
def pong(self, data: bytes = b"") -> None:
"""
Send a pong frame.
Parameters:
- data: Payload for pong frame
Raises:
- ConnectionClosed: If connection is closed
"""
def close(self, code: int = 1000, reason: str = "") -> None:
"""
Close the WebSocket connection.
Parameters:
- code: Close code (default 1000 for normal closure)
- reason: Human-readable close reason
Raises:
- ProtocolError: If code is invalid
"""
# Iterator support for receiving messages
def __iter__(self) -> Iterator[Data]:
"""Return iterator for receiving messages."""
return self
def __next__(self) -> Data:
"""Get next message from iterator."""
try:
return self.recv()
except ConnectionClosed:
raise StopIterationThe synchronous Server class manages the WebSocket server lifecycle with blocking operations.
class Server:
"""Synchronous WebSocket server management."""
@property
def socket(self) -> socket.socket:
"""Get server socket."""
def close(self) -> None:
"""
Stop accepting new connections and close existing ones.
This initiates server shutdown and blocks until all connections are closed.
"""
def serve_forever(self) -> None:
"""
Run the server until interrupted.
This method blocks and handles incoming connections until
KeyboardInterrupt or server.close() is called.
"""
# Context manager support
def __enter__(self) -> Server:
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Exit context manager and close server."""
self.close()Built-in HTTP basic authentication decorator for synchronous WebSocket handlers.
def basic_auth(
username: str,
password: str,
realm: str = "WebSocket"
) -> Callable:
"""
Create HTTP basic authentication decorator for synchronous WebSocket handlers.
Parameters:
- username: Expected username
- password: Expected password
- realm: Authentication realm name
Returns:
Callable: Decorator that adds basic authentication to handler
Usage:
@basic_auth("admin", "secret")
def protected_handler(websocket):
# Handler code here
pass
"""from websockets.sync import serve
import logging
def echo_handler(websocket):
"""Simple echo handler that returns received messages."""
print(f"Client connected: {websocket.remote_address}")
try:
for message in websocket:
print(f"Received: {message}")
websocket.send(f"Echo: {message}")
except Exception as e:
print(f"Handler error: {e}")
finally:
print(f"Client disconnected: {websocket.remote_address}")
def main():
# Start server
with serve(echo_handler, "localhost", 8765) as server:
print("WebSocket server started on ws://localhost:8765")
server.serve_forever()
if __name__ == "__main__":
main()from websockets.sync import serve
import threading
import json
import time
from typing import Set
# Thread-safe set for connected clients
clients: Set = set()
clients_lock = threading.Lock()
def broadcast_message(message: str, sender=None):
"""Broadcast message to all connected clients except sender."""
with clients_lock:
for client in clients.copy(): # Copy to avoid modification during iteration
if client != sender:
try:
client.send(message)
except Exception as e:
print(f"Failed to send to client: {e}")
clients.discard(client)
def chat_handler(websocket):
"""Chat server handler with broadcasting."""
# Register client
with clients_lock:
clients.add(websocket)
print(f"Client connected: {websocket.remote_address} (Total: {len(clients)})")
try:
# Send welcome message
websocket.send(json.dumps({
"type": "system",
"message": "Welcome to the chat server!"
}))
# Handle messages
for message in websocket:
try:
data = json.loads(message)
chat_message = {
"type": "chat",
"user": data.get("user", "Anonymous"),
"message": data.get("message", ""),
"timestamp": time.time()
}
# Broadcast to all clients
broadcast_message(json.dumps(chat_message), websocket)
except json.JSONDecodeError:
error = {"type": "error", "message": "Invalid JSON format"}
websocket.send(json.dumps(error))
except Exception as e:
print(f"Chat handler error: {e}")
finally:
# Unregister client
with clients_lock:
clients.discard(websocket)
print(f"Client disconnected: {websocket.remote_address} (Total: {len(clients)})")
def main():
with serve(
chat_handler,
"localhost",
8765,
ping_interval=30,
ping_timeout=10
) as server:
print("Chat server started on ws://localhost:8765")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down server...")
if __name__ == "__main__":
main()from websockets.sync import serve, basic_auth
from websockets import Request, Response
import base64
def process_request(connection, request: Request):
"""Custom request processing with API key validation."""
# Check for API key in headers
api_key = request.headers.get("X-API-Key")
if not api_key or api_key != "secret-api-key":
return Response(401, "Unauthorized", b"Missing or invalid API key")
# Check for valid client certificate (example)
user_agent = request.headers.get("User-Agent", "")
if "TrustedClient" not in user_agent:
return Response(403, "Forbidden", b"Untrusted client")
# Allow WebSocket upgrade
return None
@basic_auth("admin", "password123")
def protected_handler(websocket):
"""Handler that requires both API key and basic auth."""
websocket.send("Authentication successful!")
try:
for message in websocket:
if message.startswith("/"):
# Handle commands
command = message[1:].strip().lower()
if command == "status":
websocket.send("Server is running normally")
elif command == "clients":
websocket.send(f"Connected clients: 1") # Simplified
elif command == "time":
import datetime
websocket.send(f"Server time: {datetime.datetime.now()}")
else:
websocket.send(f"Unknown command: {command}")
else:
# Echo regular messages
websocket.send(f"Received: {message}")
except Exception as e:
print(f"Protected handler error: {e}")
def main():
with serve(
protected_handler,
"localhost",
8765,
process_request=process_request,
extra_headers={"Server": "SecureWebSocketServer/1.0"}
) as server:
print("Secure server started on ws://localhost:8765")
print("Requires: X-API-Key: secret-api-key")
print("And Basic Auth: admin/password123")
server.serve_forever()
if __name__ == "__main__":
main()from websockets.sync import serve
import os
import json
import base64
def file_server_handler(websocket):
"""File transfer server handler."""
websocket.send(json.dumps({
"type": "welcome",
"message": "File server ready. Send 'list' or 'get <filename>'"
}))
try:
for message in websocket:
try:
data = json.loads(message)
command = data.get("command", "").lower()
if command == "list":
# List files in current directory
files = [f for f in os.listdir(".") if os.path.isfile(f)]
response = {
"type": "file_list",
"files": files
}
websocket.send(json.dumps(response))
elif command == "get":
filename = data.get("filename", "")
if not filename:
websocket.send(json.dumps({
"type": "error",
"message": "Filename required"
}))
continue
try:
with open(filename, "rb") as f:
file_data = f.read()
encoded_data = base64.b64encode(file_data).decode()
response = {
"type": "file_data",
"filename": filename,
"size": len(file_data),
"data": encoded_data
}
websocket.send(json.dumps(response))
except FileNotFoundError:
websocket.send(json.dumps({
"type": "error",
"message": f"File not found: {filename}"
}))
else:
websocket.send(json.dumps({
"type": "error",
"message": f"Unknown command: {command}"
}))
except json.JSONDecodeError:
websocket.send(json.dumps({
"type": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
print(f"File server error: {e}")
def main():
with serve(
file_server_handler,
"localhost",
8765,
max_size=10*1024*1024 # 10MB max message size for file transfers
) as server:
print("File server started on ws://localhost:8765")
server.serve_forever()
if __name__ == "__main__":
main()from websockets.sync import unix_serve
import os
import tempfile
def unix_handler(websocket):
"""Handler for Unix socket connections."""
print(f"Client connected via Unix socket")
try:
websocket.send("Connected to Unix WebSocket server")
for message in websocket:
print(f"Received: {message}")
websocket.send(f"Unix echo: {message}")
except Exception as e:
print(f"Unix handler error: {e}")
finally:
print("Unix client disconnected")
def main():
# Use temporary directory for socket
socket_path = os.path.join(tempfile.gettempdir(), "websocket.sock")
# Remove existing socket file
if os.path.exists(socket_path):
os.unlink(socket_path)
try:
with unix_serve(unix_handler, socket_path) as server:
print(f"Unix WebSocket server started on {socket_path}")
server.serve_forever()
finally:
# Clean up socket file
if os.path.exists(socket_path):
os.unlink(socket_path)
if __name__ == "__main__":
main()from websockets.sync import serve
import signal
import sys
import threading
import time
# Global server reference for signal handler
server_instance = None
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully."""
print(f"\nReceived signal {signum}, shutting down gracefully...")
if server_instance:
server_instance.close()
sys.exit(0)
def heartbeat_handler(websocket):
"""Handler that sends periodic heartbeats."""
print(f"Client connected: {websocket.remote_address}")
try:
# Send welcome message
websocket.send("Connected to heartbeat server")
# Start heartbeat thread
stop_heartbeat = threading.Event()
def send_heartbeat():
while not stop_heartbeat.is_set():
try:
websocket.send(f"Heartbeat: {time.time()}")
time.sleep(5)
except:
break
heartbeat_thread = threading.Thread(target=send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
# Handle incoming messages
for message in websocket:
print(f"Received: {message}")
if message.lower() == "stop heartbeat":
stop_heartbeat.set()
websocket.send("Heartbeat stopped")
elif message.lower() == "start heartbeat":
if stop_heartbeat.is_set():
stop_heartbeat.clear()
heartbeat_thread = threading.Thread(target=send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
websocket.send("Heartbeat started")
else:
websocket.send(f"Echo: {message}")
stop_heartbeat.set()
except Exception as e:
print(f"Heartbeat handler error: {e}")
finally:
print(f"Client disconnected: {websocket.remote_address}")
def main():
global server_instance
# Set up signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
with serve(heartbeat_handler, "localhost", 8765) as server:
server_instance = server
print("Heartbeat server started on ws://localhost:8765")
print("Press Ctrl+C to shutdown gracefully")
server.serve_forever()
except KeyboardInterrupt:
print("\nServer interrupted")
finally:
print("Server shutdown complete")
if __name__ == "__main__":
main()Install with Tessl CLI
npx tessl i tessl/pypi-websockets