An implementation of the WebSocket Protocol (RFC 6455 & 7692)
WebSocket server routing with werkzeug integration for URL pattern matching, parameter extraction, and request routing to different handlers based on URL patterns. Supports both asyncio and synchronous implementations.
Create routing WebSocket servers that dispatch connections to different handlers based on URL patterns.
async def route(
router: Router,
host: str,
port: int,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start an asyncio WebSocket server with routing.
Parameters:
- router: Router instance with registered routes
- host: Server host address
- port: Server port number
- Other parameters same as websockets.serve()
Returns:
Server: WebSocket server with routing capabilities
Raises:
- OSError: If server cannot bind to address/port
"""
async def unix_route(
router: Router,
path: str,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start an asyncio WebSocket server with routing on Unix domain socket.
Parameters:
- router: Router instance with registered routes
- path: Unix domain socket path
- Other parameters same as unix_route()
Returns:
Server: WebSocket server with routing on Unix socket
"""Create synchronous routing WebSocket servers for traditional Python applications.
def route(
router: Router,
host: str,
port: int,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start a synchronous WebSocket server with routing.
Parameters:
- router: Router instance with registered routes
- host: Server host address
- port: Server port number
- Other parameters same as websockets.sync.serve()
Returns:
Server: Synchronous WebSocket server with routing capabilities
"""
def unix_route(
router: Router,
path: str,
*,
logger: LoggerLike = None,
compression: str = "deflate",
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
process_request: Callable = None,
select_subprotocol: Callable = None,
ping_interval: float = 20,
ping_timeout: float = 20,
close_timeout: float = 10,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ServerExtensionFactory] = None,
**kwargs
) -> Server:
"""
Start a synchronous WebSocket server with routing on Unix domain socket.
Parameters:
- router: Router instance with registered routes
- path: Unix domain socket path
- Other parameters same as sync route()
Returns:
Server: Synchronous WebSocket server with routing on Unix socket
"""The Router class manages URL patterns and dispatches WebSocket connections to appropriate handlers.
class Router:
"""
WebSocket request router with werkzeug integration.
Supports URL pattern matching, parameter extraction, and
handler dispatching based on request paths.
"""
def __init__(self):
"""Initialize empty router."""
def route(
self,
path: str,
*,
methods: List[str] = None,
host: str = None,
subdomain: str = None,
strict_slashes: bool = None,
merge_slashes: bool = None,
websocket: bool = True
) -> Callable:
"""
Decorator to register a route handler.
Parameters:
- path: URL pattern (supports werkzeug URL rules)
- methods: HTTP methods (for initial handshake)
- host: Host matching pattern
- subdomain: Subdomain matching pattern
- strict_slashes: Strict slash handling
- merge_slashes: Merge consecutive slashes
- websocket: Enable WebSocket upgrade (default True)
Returns:
Callable: Decorator function
Usage:
@router.route("/chat/<room_id>")
async def chat_handler(websocket, room_id):
# Handler implementation
pass
"""
def add_route(
self,
handler: Callable,
path: str,
*,
methods: List[str] = None,
host: str = None,
subdomain: str = None,
strict_slashes: bool = None,
merge_slashes: bool = None,
websocket: bool = True
) -> None:
"""
Register a route handler programmatically.
Parameters:
- handler: WebSocket handler function
- path: URL pattern
- Other parameters same as route() decorator
Raises:
- ValueError: If handler or path is invalid
"""
def match(self, path: str, method: str = "GET") -> Tuple[Callable, Dict[str, Any]]:
"""
Match a request path to a handler.
Parameters:
- path: Request path to match
- method: HTTP method
Returns:
Tuple[Callable, Dict]: Handler function and extracted parameters
Raises:
- NotFound: If no route matches the path
"""
@property
def routes(self) -> List[str]:
"""Get list of registered route patterns."""import asyncio
from websockets.asyncio import route, Router
# Create router
router = Router()
@router.route("/")
async def root_handler(websocket):
"""Handle root path connections."""
await websocket.send("Welcome to WebSocket server!")
async for message in websocket:
await websocket.send(f"Root echo: {message}")
@router.route("/chat")
async def chat_handler(websocket):
"""Handle chat connections."""
await websocket.send("Joined general chat")
async for message in websocket:
# Broadcast to all chat clients (simplified)
await websocket.send(f"Chat: {message}")
@router.route("/echo")
async def echo_handler(websocket):
"""Handle echo connections."""
async for message in websocket:
await websocket.send(f"Echo: {message}")
async def main():
"""Start routing server."""
async with route(router, "localhost", 8765):
print("Routing WebSocket server started on ws://localhost:8765")
print("Routes: /", "/chat", "/echo")
await asyncio.Future() # Run forever
asyncio.run(main())import asyncio
from websockets.asyncio import route, Router
import json
router = Router()
# Global room storage (simplified)
rooms = {}
@router.route("/room/<room_id>")
async def room_handler(websocket, room_id):
"""Handle room-specific connections."""
# Initialize room if doesn't exist
if room_id not in rooms:
rooms[room_id] = set()
# Add client to room
rooms[room_id].add(websocket)
try:
await websocket.send(json.dumps({
"type": "joined",
"room": room_id,
"message": f"Joined room {room_id}"
}))
async for message in websocket:
try:
data = json.loads(message)
# Broadcast to all clients in room
broadcast_message = json.dumps({
"type": "message",
"room": room_id,
"user": data.get("user", "Anonymous"),
"message": data.get("message", "")
})
# Send to all clients in room
for client in rooms[room_id].copy():
try:
await client.send(broadcast_message)
except:
rooms[room_id].discard(client)
except json.JSONDecodeError:
await websocket.send(json.dumps({
"type": "error",
"message": "Invalid JSON"
}))
except Exception as e:
print(f"Room handler error: {e}")
finally:
# Remove client from room
if room_id in rooms:
rooms[room_id].discard(websocket)
if not rooms[room_id]: # Remove empty room
del rooms[room_id]
@router.route("/user/<user_id>/notifications")
async def notification_handler(websocket, user_id):
"""Handle user-specific notification connections."""
await websocket.send(json.dumps({
"type": "connected",
"user_id": user_id,
"message": f"Notification channel for user {user_id}"
}))
# In real application, you'd subscribe to user-specific events
# For demo, just echo messages with user context
async for message in websocket:
await websocket.send(json.dumps({
"type": "notification",
"user_id": user_id,
"data": message
}))
@router.route("/api/v<int:version>/ws")
async def versioned_api_handler(websocket, version):
"""Handle versioned API connections."""
await websocket.send(json.dumps({
"type": "api_connected",
"version": version,
"message": f"Connected to API v{version}"
}))
async for message in websocket:
try:
data = json.loads(message)
command = data.get("command")
if command == "status":
response = {
"type": "status",
"version": version,
"status": "healthy"
}
elif command == "info":
response = {
"type": "info",
"version": version,
"features": ["chat", "notifications", "api"]
}
else:
response = {
"type": "error",
"message": f"Unknown command: {command}"
}
await websocket.send(json.dumps(response))
except json.JSONDecodeError:
await websocket.send(json.dumps({
"type": "error",
"message": "Invalid JSON"
}))
async def main():
async with route(router, "localhost", 8765):
print("Advanced routing server started on ws://localhost:8765")
print("Routes:")
print(" /room/<room_id>")
print(" /user/<user_id>/notifications")
print(" /api/v<version>/ws")
await asyncio.Future()
asyncio.run(main())from websockets.sync import route, Router
import json
import threading
router = Router()
# Thread-safe storage
rooms = {}
rooms_lock = threading.Lock()
@router.route("/")
def root_handler(websocket):
"""Synchronous root handler."""
websocket.send("Welcome to sync WebSocket server!")
for message in websocket:
websocket.send(f"Sync echo: {message}")
@router.route("/room/<room_id>")
def room_handler(websocket, room_id):
"""Synchronous room handler."""
# Thread-safe room management
with rooms_lock:
if room_id not in rooms:
rooms[room_id] = set()
rooms[room_id].add(websocket)
try:
websocket.send(json.dumps({
"type": "joined",
"room": room_id
}))
for message in websocket:
try:
data = json.loads(message)
# Broadcast to room (thread-safe)
with rooms_lock:
room_clients = rooms[room_id].copy()
broadcast_message = json.dumps({
"type": "message",
"room": room_id,
"user": data.get("user", "Anonymous"),
"message": data.get("message", "")
})
for client in room_clients:
try:
client.send(broadcast_message)
except:
with rooms_lock:
rooms[room_id].discard(client)
except json.JSONDecodeError:
websocket.send(json.dumps({
"type": "error",
"message": "Invalid JSON"
}))
finally:
# Clean up
with rooms_lock:
if room_id in rooms:
rooms[room_id].discard(websocket)
if not rooms[room_id]:
del rooms[room_id]
def main():
"""Start synchronous routing server."""
with route(router, "localhost", 8765) as server:
print("Sync routing server started on ws://localhost:8765")
print("Routes: /, /room/<room_id>")
server.serve_forever()
if __name__ == "__main__":
main()import asyncio
from websockets.asyncio import route, Router
from websockets import Request, Response
import time
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = Router()
def rate_limit_middleware(max_requests=10, window_seconds=60):
"""Rate limiting middleware."""
clients = {}
def middleware(connection, request: Request):
client_ip = connection.remote_address[0]
current_time = time.time()
# Clean old entries
if client_ip in clients:
clients[client_ip] = [
req_time for req_time in clients[client_ip]
if current_time - req_time < window_seconds
]
else:
clients[client_ip] = []
# Check rate limit
if len(clients[client_ip]) >= max_requests:
logger.warning(f"Rate limit exceeded for {client_ip}")
return Response(429, "Too Many Requests", b"Rate limit exceeded")
# Add current request
clients[client_ip].append(current_time)
return None # Allow request
return middleware
def auth_middleware(api_keys):
"""API key authentication middleware."""
def middleware(connection, request: Request):
api_key = request.headers.get("X-API-Key")
if not api_key or api_key not in api_keys:
logger.warning(f"Invalid API key from {connection.remote_address}")
return Response(401, "Unauthorized", b"Invalid API key")
return None
return middleware
# Apply middleware
rate_limiter = rate_limit_middleware(max_requests=5, window_seconds=30)
auth_checker = auth_middleware({"valid-key-1", "valid-key-2"})
@router.route("/public")
async def public_handler(websocket):
"""Public endpoint (no auth required)."""
await websocket.send("Public endpoint - no auth required")
async for message in websocket:
await websocket.send(f"Public: {message}")
@router.route("/protected")
async def protected_handler(websocket):
"""Protected endpoint (auth required)."""
await websocket.send("Protected endpoint - authenticated")
async for message in websocket:
await websocket.send(f"Protected: {message}")
async def process_request_with_middleware(connection, request: Request):
"""Process request through middleware chain."""
# Apply rate limiting to all routes
response = rate_limiter(connection, request)
if response:
return response
# Apply authentication to protected routes
if request.path.startswith("/protected"):
response = auth_checker(connection, request)
if response:
return response
# Log successful requests
logger.info(f"Request: {request.method} {request.path} from {connection.remote_address}")
return None
async def main():
"""Start server with middleware."""
async with route(
router,
"localhost",
8765,
process_request=process_request_with_middleware,
extra_headers={"Server": "WebSocket-Router/1.0"}
):
print("Middleware routing server started on ws://localhost:8765")
print("Routes:")
print(" /public (no auth)")
print(" /protected (requires X-API-Key header)")
print("Rate limit: 5 requests per 30 seconds")
await asyncio.Future()
asyncio.run(main())from websockets.asyncio import Router
from websockets.exceptions import NotFound
def test_router():
"""Test router functionality."""
router = Router()
# Register test routes
@router.route("/")
def root():
return "root"
@router.route("/user/<user_id>")
def user_profile(user_id):
return f"user_{user_id}"
@router.route("/api/v<int:version>/data")
def api_data(version):
return f"api_v{version}"
# Test route matching
test_cases = [
("/", "root"),
("/user/123", "user_123"),
("/api/v1/data", "api_v1"),
("/api/v2/data", "api_v2"),
]
for path, expected in test_cases:
try:
handler, params = router.match(path)
result = handler(**params) if params else handler()
print(f"✓ {path} -> {result} (expected {expected})")
assert result == expected
except Exception as e:
print(f"✗ {path} -> Error: {e}")
# Test non-matching routes
try:
router.match("/nonexistent")
print("✗ Should have raised NotFound")
except NotFound:
print("✓ Non-existent route correctly raises NotFound")
print(f"Registered routes: {router.routes}")
# Uncomment to run tests
# test_router()Install with Tessl CLI
npx tessl i tessl/pypi-websockets