CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-websockets

An implementation of the WebSocket Protocol (RFC 6455 & 7692)

Overview
Eval results
Files

routing.mddocs/

Routing

WebSocket server routing with werkzeug integration for URL pattern matching, parameter extraction, and request routing to different handlers based on URL patterns. Supports both asyncio and synchronous implementations.

Capabilities

Asyncio Routing Functions

Create routing WebSocket servers that dispatch connections to different handlers based on URL patterns.

async def route(
    router: Router,
    host: str,
    port: int,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate",
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start an asyncio WebSocket server with routing.

    Parameters:
    - router: Router instance with registered routes
    - host: Server host address
    - port: Server port number
    - Other parameters same as websockets.serve()

    Returns:
    Server: WebSocket server with routing capabilities

    Raises:
    - OSError: If server cannot bind to address/port
    """

async def unix_route(
    router: Router,
    path: str,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate",
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start an asyncio WebSocket server with routing on Unix domain socket.

    Parameters:
    - router: Router instance with registered routes
    - path: Unix domain socket path
    - Other parameters same as unix_route()

    Returns:
    Server: WebSocket server with routing on Unix socket
    """

Synchronous Routing Functions

Create synchronous routing WebSocket servers for traditional Python applications.

def route(
    router: Router,
    host: str,
    port: int,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate",
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start a synchronous WebSocket server with routing.

    Parameters:
    - router: Router instance with registered routes
    - host: Server host address
    - port: Server port number
    - Other parameters same as websockets.sync.serve()

    Returns:
    Server: Synchronous WebSocket server with routing capabilities
    """

def unix_route(
    router: Router,
    path: str,
    *,
    logger: LoggerLike = None,
    compression: str = "deflate",
    subprotocols: List[Subprotocol] = None,
    extra_headers: HeadersLike = None,
    process_request: Callable = None,
    select_subprotocol: Callable = None,
    ping_interval: float = 20,
    ping_timeout: float = 20,
    close_timeout: float = 10,
    max_size: int = 2**20,
    max_queue: int = 2**5,
    read_limit: int = 2**16,
    write_limit: int = 2**16,
    extensions: List[ServerExtensionFactory] = None,
    **kwargs
) -> Server:
    """
    Start a synchronous WebSocket server with routing on Unix domain socket.

    Parameters:
    - router: Router instance with registered routes
    - path: Unix domain socket path
    - Other parameters same as sync route()

    Returns:
    Server: Synchronous WebSocket server with routing on Unix socket
    """

Router Class

The Router class manages URL patterns and dispatches WebSocket connections to appropriate handlers.

class Router:
    """
    WebSocket request router with werkzeug integration.
    
    Supports URL pattern matching, parameter extraction, and
    handler dispatching based on request paths.
    """
    
    def __init__(self):
        """Initialize empty router."""
    
    def route(
        self, 
        path: str,
        *,
        methods: List[str] = None,
        host: str = None,
        subdomain: str = None,
        strict_slashes: bool = None,
        merge_slashes: bool = None,
        websocket: bool = True
    ) -> Callable:
        """
        Decorator to register a route handler.

        Parameters:
        - path: URL pattern (supports werkzeug URL rules)
        - methods: HTTP methods (for initial handshake)
        - host: Host matching pattern
        - subdomain: Subdomain matching pattern
        - strict_slashes: Strict slash handling
        - merge_slashes: Merge consecutive slashes
        - websocket: Enable WebSocket upgrade (default True)

        Returns:
        Callable: Decorator function

        Usage:
        @router.route("/chat/<room_id>")
        async def chat_handler(websocket, room_id):
            # Handler implementation
            pass
        """
    
    def add_route(
        self,
        handler: Callable,
        path: str,
        *,
        methods: List[str] = None,
        host: str = None,
        subdomain: str = None,
        strict_slashes: bool = None,
        merge_slashes: bool = None,
        websocket: bool = True
    ) -> None:
        """
        Register a route handler programmatically.

        Parameters:
        - handler: WebSocket handler function
        - path: URL pattern
        - Other parameters same as route() decorator

        Raises:
        - ValueError: If handler or path is invalid
        """
    
    def match(self, path: str, method: str = "GET") -> Tuple[Callable, Dict[str, Any]]:
        """
        Match a request path to a handler.

        Parameters:
        - path: Request path to match
        - method: HTTP method

        Returns:
        Tuple[Callable, Dict]: Handler function and extracted parameters

        Raises:
        - NotFound: If no route matches the path
        """
    
    @property
    def routes(self) -> List[str]:
        """Get list of registered route patterns."""

Usage Examples

Basic Asyncio Routing

import asyncio
from websockets.asyncio import route, Router

# Create router
router = Router()

@router.route("/")
async def root_handler(websocket):
    """Handle root path connections."""
    await websocket.send("Welcome to WebSocket server!")
    
    async for message in websocket:
        await websocket.send(f"Root echo: {message}")

@router.route("/chat")
async def chat_handler(websocket):
    """Handle chat connections."""
    await websocket.send("Joined general chat")
    
    async for message in websocket:
        # Broadcast to all chat clients (simplified)
        await websocket.send(f"Chat: {message}")

@router.route("/echo")
async def echo_handler(websocket):
    """Handle echo connections."""
    async for message in websocket:
        await websocket.send(f"Echo: {message}")

async def main():
    """Start routing server."""
    async with route(router, "localhost", 8765):
        print("Routing WebSocket server started on ws://localhost:8765")
        print("Routes: /", "/chat", "/echo")
        await asyncio.Future()  # Run forever

asyncio.run(main())

Routing with Parameters

import asyncio
from websockets.asyncio import route, Router
import json

router = Router()

# Global room storage (simplified)
rooms = {}

@router.route("/room/<room_id>")
async def room_handler(websocket, room_id):
    """Handle room-specific connections."""
    # Initialize room if doesn't exist
    if room_id not in rooms:
        rooms[room_id] = set()
    
    # Add client to room
    rooms[room_id].add(websocket)
    
    try:
        await websocket.send(json.dumps({
            "type": "joined",
            "room": room_id,
            "message": f"Joined room {room_id}"
        }))
        
        async for message in websocket:
            try:
                data = json.loads(message)
                
                # Broadcast to all clients in room
                broadcast_message = json.dumps({
                    "type": "message",
                    "room": room_id,
                    "user": data.get("user", "Anonymous"),
                    "message": data.get("message", "")
                })
                
                # Send to all clients in room
                for client in rooms[room_id].copy():
                    try:
                        await client.send(broadcast_message)
                    except:
                        rooms[room_id].discard(client)
                        
            except json.JSONDecodeError:
                await websocket.send(json.dumps({
                    "type": "error",
                    "message": "Invalid JSON"
                }))
                
    except Exception as e:
        print(f"Room handler error: {e}")
    finally:
        # Remove client from room
        if room_id in rooms:
            rooms[room_id].discard(websocket)
            if not rooms[room_id]:  # Remove empty room
                del rooms[room_id]

@router.route("/user/<user_id>/notifications")
async def notification_handler(websocket, user_id):
    """Handle user-specific notification connections."""
    await websocket.send(json.dumps({
        "type": "connected",
        "user_id": user_id,
        "message": f"Notification channel for user {user_id}"
    }))
    
    # In real application, you'd subscribe to user-specific events
    # For demo, just echo messages with user context
    async for message in websocket:
        await websocket.send(json.dumps({
            "type": "notification",
            "user_id": user_id,
            "data": message
        }))

@router.route("/api/v<int:version>/ws")
async def versioned_api_handler(websocket, version):
    """Handle versioned API connections."""
    await websocket.send(json.dumps({
        "type": "api_connected",
        "version": version,
        "message": f"Connected to API v{version}"
    }))
    
    async for message in websocket:
        try:
            data = json.loads(message)
            command = data.get("command")
            
            if command == "status":
                response = {
                    "type": "status",
                    "version": version,
                    "status": "healthy"
                }
            elif command == "info":
                response = {
                    "type": "info",
                    "version": version,
                    "features": ["chat", "notifications", "api"]
                }
            else:
                response = {
                    "type": "error",
                    "message": f"Unknown command: {command}"
                }
            
            await websocket.send(json.dumps(response))
            
        except json.JSONDecodeError:
            await websocket.send(json.dumps({
                "type": "error",
                "message": "Invalid JSON"
            }))

async def main():
    async with route(router, "localhost", 8765):
        print("Advanced routing server started on ws://localhost:8765")
        print("Routes:")
        print("  /room/<room_id>")
        print("  /user/<user_id>/notifications")
        print("  /api/v<version>/ws")
        await asyncio.Future()

asyncio.run(main())

Synchronous Routing

from websockets.sync import route, Router
import json
import threading

router = Router()

# Thread-safe storage
rooms = {}
rooms_lock = threading.Lock()

@router.route("/")
def root_handler(websocket):
    """Synchronous root handler."""
    websocket.send("Welcome to sync WebSocket server!")
    
    for message in websocket:
        websocket.send(f"Sync echo: {message}")

@router.route("/room/<room_id>")
def room_handler(websocket, room_id):
    """Synchronous room handler."""
    # Thread-safe room management
    with rooms_lock:
        if room_id not in rooms:
            rooms[room_id] = set()
        rooms[room_id].add(websocket)
    
    try:
        websocket.send(json.dumps({
            "type": "joined",
            "room": room_id
        }))
        
        for message in websocket:
            try:
                data = json.loads(message)
                
                # Broadcast to room (thread-safe)
                with rooms_lock:
                    room_clients = rooms[room_id].copy()
                
                broadcast_message = json.dumps({
                    "type": "message",
                    "room": room_id,
                    "user": data.get("user", "Anonymous"),
                    "message": data.get("message", "")
                })
                
                for client in room_clients:
                    try:
                        client.send(broadcast_message)
                    except:
                        with rooms_lock:
                            rooms[room_id].discard(client)
                            
            except json.JSONDecodeError:
                websocket.send(json.dumps({
                    "type": "error",
                    "message": "Invalid JSON"
                }))
    finally:
        # Clean up
        with rooms_lock:
            if room_id in rooms:
                rooms[room_id].discard(websocket)
                if not rooms[room_id]:
                    del rooms[room_id]

def main():
    """Start synchronous routing server."""
    with route(router, "localhost", 8765) as server:
        print("Sync routing server started on ws://localhost:8765")
        print("Routes: /, /room/<room_id>")
        server.serve_forever()

if __name__ == "__main__":
    main()

Advanced Routing with Middleware

import asyncio
from websockets.asyncio import route, Router
from websockets import Request, Response
import time
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

router = Router()

def rate_limit_middleware(max_requests=10, window_seconds=60):
    """Rate limiting middleware."""
    clients = {}
    
    def middleware(connection, request: Request):
        client_ip = connection.remote_address[0]
        current_time = time.time()
        
        # Clean old entries
        if client_ip in clients:
            clients[client_ip] = [
                req_time for req_time in clients[client_ip]
                if current_time - req_time < window_seconds
            ]
        else:
            clients[client_ip] = []
        
        # Check rate limit
        if len(clients[client_ip]) >= max_requests:
            logger.warning(f"Rate limit exceeded for {client_ip}")
            return Response(429, "Too Many Requests", b"Rate limit exceeded")
        
        # Add current request
        clients[client_ip].append(current_time)
        return None  # Allow request
    
    return middleware

def auth_middleware(api_keys):
    """API key authentication middleware."""
    def middleware(connection, request: Request):
        api_key = request.headers.get("X-API-Key")
        if not api_key or api_key not in api_keys:
            logger.warning(f"Invalid API key from {connection.remote_address}")
            return Response(401, "Unauthorized", b"Invalid API key")
        return None
    return middleware

# Apply middleware
rate_limiter = rate_limit_middleware(max_requests=5, window_seconds=30)
auth_checker = auth_middleware({"valid-key-1", "valid-key-2"})

@router.route("/public")
async def public_handler(websocket):
    """Public endpoint (no auth required)."""
    await websocket.send("Public endpoint - no auth required")
    
    async for message in websocket:
        await websocket.send(f"Public: {message}")

@router.route("/protected")
async def protected_handler(websocket):
    """Protected endpoint (auth required)."""
    await websocket.send("Protected endpoint - authenticated")
    
    async for message in websocket:
        await websocket.send(f"Protected: {message}")

async def process_request_with_middleware(connection, request: Request):
    """Process request through middleware chain."""
    # Apply rate limiting to all routes
    response = rate_limiter(connection, request)
    if response:
        return response
    
    # Apply authentication to protected routes
    if request.path.startswith("/protected"):
        response = auth_checker(connection, request)
        if response:
            return response
    
    # Log successful requests
    logger.info(f"Request: {request.method} {request.path} from {connection.remote_address}")
    return None

async def main():
    """Start server with middleware."""
    async with route(
        router,
        "localhost",
        8765,
        process_request=process_request_with_middleware,
        extra_headers={"Server": "WebSocket-Router/1.0"}
    ):
        print("Middleware routing server started on ws://localhost:8765")
        print("Routes:")
        print("  /public (no auth)")
        print("  /protected (requires X-API-Key header)")
        print("Rate limit: 5 requests per 30 seconds")
        await asyncio.Future()

asyncio.run(main())

Route Testing

from websockets.asyncio import Router
from websockets.exceptions import NotFound

def test_router():
    """Test router functionality."""
    router = Router()
    
    # Register test routes
    @router.route("/")
    def root():
        return "root"
    
    @router.route("/user/<user_id>")
    def user_profile(user_id):
        return f"user_{user_id}"
    
    @router.route("/api/v<int:version>/data")
    def api_data(version):
        return f"api_v{version}"
    
    # Test route matching
    test_cases = [
        ("/", "root"),
        ("/user/123", "user_123"),
        ("/api/v1/data", "api_v1"),
        ("/api/v2/data", "api_v2"),
    ]
    
    for path, expected in test_cases:
        try:
            handler, params = router.match(path)
            result = handler(**params) if params else handler()
            print(f"✓ {path} -> {result} (expected {expected})")
            assert result == expected
        except Exception as e:
            print(f"✗ {path} -> Error: {e}")
    
    # Test non-matching routes
    try:
        router.match("/nonexistent")
        print("✗ Should have raised NotFound")
    except NotFound:
        print("✓ Non-existent route correctly raises NotFound")
    
    print(f"Registered routes: {router.routes}")

# Uncomment to run tests
# test_router()

Install with Tessl CLI

npx tessl i tessl/pypi-websockets

docs

asyncio-client.md

asyncio-server.md

data-structures.md

exceptions.md

extensions.md

index.md

protocol.md

routing.md

sync-client.md

sync-server.md

tile.json