CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-sanic

A modern Python web server and web framework designed for high performance and speed using async/await syntax.

Pending
Overview
Eval results
Files

websockets.mddocs/

WebSocket Support

Sanic provides native WebSocket support for real-time, bidirectional communication between client and server. This includes WebSocket routing, connection management, message handling, and integration with the broader Sanic ecosystem.

Capabilities

WebSocket Routes

Define WebSocket endpoints using decorators and programmatic registration.

def websocket(
    uri: str,
    host: str = None,
    strict_slashes: bool = None,
    subprotocols: list = None,
    name: str = None,
    apply: list = None,
    version: int = None,
    version_prefix: str = "/v",
    error_format: str = None,
):
    """
    Decorator for WebSocket routes.
    
    Parameters:
    - uri: WebSocket endpoint path
    - host: Host restriction pattern
    - strict_slashes: Override global strict slash setting
    - subprotocols: Supported WebSocket subprotocols
    - name: Route name for url_for
    - apply: List of decorators to apply
    - version: API version number
    - version_prefix: Version URL prefix
    - error_format: Error response format
    
    Usage:
    @app.websocket('/ws')
    async def websocket_handler(request, ws):
        await ws.send("Hello WebSocket!")
        msg = await ws.recv()
        print(f"Received: {msg}")
    """

def add_websocket_route(
    handler,
    uri: str,
    **kwargs
):
    """
    Add WebSocket route programmatically.
    
    Parameters:
    - handler: WebSocket handler function
    - uri: WebSocket endpoint path
    - **kwargs: Additional route options
    """

WebSocket Protocol Interface

The WebSocket connection interface for message handling and connection management.

class Websocket:
    """WebSocket connection protocol interface."""
    
    async def send(
        self,
        data,
        encoding: str = "text"
    ):
        """
        Send data to WebSocket client.
        
        Parameters:
        - data: Data to send (str, bytes, or dict for JSON)
        - encoding: Data encoding type ("text", "bytes", or "json")
        
        Raises:
        - WebsocketClosed: If connection is closed
        - ConnectionError: If send fails
        """
    
    async def recv(self, timeout: float = None):
        """
        Receive data from WebSocket client.
        
        Parameters:
        - timeout: Receive timeout in seconds
        
        Returns:
        - Received data (str or bytes)
        
        Raises:
        - WebsocketClosed: If connection is closed
        - asyncio.TimeoutError: If timeout exceeded
        """
    
    async def ping(self, data: bytes = b""):
        """
        Send ping frame to client.
        
        Parameters:
        - data: Optional ping data
        
        Returns:
        - Pong waiter coroutine
        """
    
    async def pong(self, data: bytes = b""):
        """
        Send pong frame to client.
        
        Parameters:
        - data: Optional pong data
        """
    
    async def close(
        self,
        code: int = 1000,
        reason: str = ""
    ):
        """
        Close WebSocket connection.
        
        Parameters:
        - code: Close status code
        - reason: Close reason string
        """
    
    async def wait_closed(self):
        """Wait for connection to be closed."""
    
    @property
    def closed(self) -> bool:
        """Whether the connection is closed."""
    
    @property
    def open(self) -> bool:
        """Whether the connection is open."""
    
    @property
    def client_state(self):
        """Client connection state."""
    
    @property
    def server_state(self):
        """Server connection state."""
    
    @property
    def subprotocol(self) -> str:
        """Negotiated subprotocol."""

WebSocket Handler Function

The signature and structure for WebSocket handler functions.

async def websocket_handler(request, ws):
    """
    WebSocket handler function signature.
    
    Parameters:
    - request: HTTP request object (for initial handshake)
    - ws: WebSocket connection object
    
    The handler should:
    - Handle the WebSocket connection lifecycle
    - Process incoming messages
    - Send outgoing messages
    - Handle connection errors and cleanup
    - Manage connection state
    """

WebSocket Configuration

Configuration options for WebSocket behavior and limits.

# WebSocket timeout settings
WEBSOCKET_TIMEOUT: int = 10           # Connection timeout
WEBSOCKET_PING_INTERVAL: int = 20     # Ping interval in seconds
WEBSOCKET_PING_TIMEOUT: int = 20      # Ping timeout in seconds

# WebSocket message limits
WEBSOCKET_MAX_SIZE: int = 1048576     # Maximum message size (1MB)
WEBSOCKET_MAX_QUEUE: int = 32         # Maximum queued messages

# WebSocket compression
WEBSOCKET_COMPRESSION: str = None     # Compression algorithm

WebSocket Subprotocols

Support for WebSocket subprotocols for application-specific protocols.

@app.websocket('/ws', subprotocols=['chat', 'notification'])
async def websocket_with_subprotocols(request, ws):
    """
    WebSocket handler with subprotocol negotiation.
    
    The client can request specific subprotocols during handshake.
    The server selects the first supported subprotocol.
    """
    
    selected_protocol = ws.subprotocol
    
    if selected_protocol == 'chat':
        await handle_chat_protocol(ws)
    elif selected_protocol == 'notification':
        await handle_notification_protocol(ws)
    else:
        await ws.close(code=1002, reason="Unsupported subprotocol")

WebSocket Authentication

Authenticate WebSocket connections using request data and custom logic.

@app.websocket('/ws/authenticated')
async def authenticated_websocket(request, ws):
    """
    WebSocket handler with authentication.
    
    Authentication is performed during the initial HTTP handshake
    using headers, query parameters, or other request data.
    """
    
    # Authenticate using request headers
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        await ws.close(code=1008, reason="Authentication required")
        return
    
    try:
        user = await validate_websocket_token(auth_header)
        request.ctx.user = user
    except AuthenticationError:
        await ws.close(code=1008, reason="Invalid authentication")
        return
    
    # Continue with authenticated WebSocket handling
    await handle_authenticated_websocket(request, ws)

Usage Examples

Basic WebSocket Echo Server

from sanic import Sanic
from sanic.response import html

app = Sanic("WebSocketApp")

@app.websocket('/ws')
async def websocket_echo(request, ws):
    """Simple echo WebSocket handler."""
    
    await ws.send("Welcome to WebSocket echo server!")
    
    async for msg in ws:
        print(f"Received: {msg}")
        await ws.send(f"Echo: {msg}")

@app.route('/')
async def index(request):
    """Serve WebSocket test page."""
    return html('''
    <!DOCTYPE html>
    <html>
    <head><title>WebSocket Test</title></head>
    <body>
        <div id="messages"></div>
        <input type="text" id="messageInput" placeholder="Type a message...">
        <button onclick="sendMessage()">Send</button>
        
        <script>
            const ws = new WebSocket('ws://localhost:8000/ws');
            const messages = document.getElementById('messages');
            
            ws.onmessage = function(event) {
                const div = document.createElement('div');
                div.textContent = event.data;
                messages.appendChild(div);
            };
            
            function sendMessage() {
                const input = document.getElementById('messageInput');
                ws.send(input.value);
                input.value = '';
            }
        </script>
    </body>
    </html>
    ''')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

Chat Room WebSocket Server

import asyncio
from sanic import Sanic
from sanic.response import json
import json as json_lib

app = Sanic("ChatApp")

# Store active connections
connected_clients = set()
chat_rooms = {}

@app.websocket('/ws/chat/<room_id>')
async def chat_room(request, ws, room_id):
    """Multi-room chat WebSocket handler."""
    
    # Initialize room if it doesn't exist
    if room_id not in chat_rooms:
        chat_rooms[room_id] = set()
    
    # Add client to room
    chat_rooms[room_id].add(ws)
    connected_clients.add(ws)
    
    try:
        # Send welcome message
        await ws.send(json_lib.dumps({
            "type": "system",
            "message": f"Welcome to chat room {room_id}",
            "room": room_id
        }))
        
        # Broadcast user joined
        await broadcast_to_room(room_id, {
            "type": "user_joined",
            "message": "A user joined the room",
            "room": room_id
        }, exclude=ws)
        
        # Handle messages
        async for message in ws:
            try:
                data = json_lib.loads(message)
                
                if data.get("type") == "chat":
                    # Broadcast chat message to room
                    await broadcast_to_room(room_id, {
                        "type": "chat",
                        "message": data.get("message", ""),
                        "user": data.get("user", "Anonymous"),
                        "room": room_id
                    })
                
            except json_lib.JSONDecodeError:
                await ws.send(json_lib.dumps({
                    "type": "error",
                    "message": "Invalid JSON format"
                }))
    
    except Exception as e:
        print(f"WebSocket error: {e}")
    
    finally:
        # Clean up when client disconnects
        if ws in connected_clients:
            connected_clients.remove(ws)
        if room_id in chat_rooms and ws in chat_rooms[room_id]:
            chat_rooms[room_id].remove(ws)
            
            # Broadcast user left
            await broadcast_to_room(room_id, {
                "type": "user_left",
                "message": "A user left the room",
                "room": room_id
            })
            
            # Remove empty rooms
            if not chat_rooms[room_id]:
                del chat_rooms[room_id]

async def broadcast_to_room(room_id, message, exclude=None):
    """Broadcast message to all clients in a room."""
    if room_id not in chat_rooms:
        return
    
    message_str = json_lib.dumps(message)
    disconnected = set()
    
    for client in chat_rooms[room_id]:
        if client == exclude:
            continue
        
        try:
            await client.send(message_str)
        except Exception:
            # Mark disconnected clients for removal
            disconnected.add(client)
    
    # Remove disconnected clients
    chat_rooms[room_id] -= disconnected

@app.route('/api/rooms')
async def get_active_rooms(request):
    """Get list of active chat rooms."""
    return json({
        "rooms": list(chat_rooms.keys()),
        "total_clients": len(connected_clients)
    })

Real-time Data Streaming

import asyncio
import random
from datetime import datetime

app = Sanic("DataStreamApp")

@app.websocket('/ws/data-stream')
async def data_stream(request, ws):
    """Stream real-time data to WebSocket clients."""
    
    # Send initial connection confirmation
    await ws.send(json_lib.dumps({
        "type": "connected",
        "message": "Data stream connected",
        "timestamp": datetime.utcnow().isoformat()
    }))
    
    # Create background task for data streaming
    stream_task = asyncio.create_task(stream_data(ws))
    receive_task = asyncio.create_task(handle_client_messages(ws))
    
    try:
        # Wait for either task to complete
        done, pending = await asyncio.wait(
            [stream_task, receive_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        
        # Cancel pending tasks
        for task in pending:
            task.cancel()
    
    except Exception as e:
        print(f"WebSocket streaming error: {e}")
    
    finally:
        if not ws.closed:
            await ws.close()

async def stream_data(ws):
    """Stream data to WebSocket client."""
    while not ws.closed:
        try:
            # Generate sample data
            data = {
                "type": "data",
                "timestamp": datetime.utcnow().isoformat(),
                "value": random.randint(1, 100),
                "sensor_id": "sensor_001",
                "temperature": round(random.uniform(20.0, 30.0), 2),
                "humidity": round(random.uniform(40.0, 80.0), 2)
            }
            
            await ws.send(json_lib.dumps(data))
            await asyncio.sleep(1)  # Send data every second
            
        except Exception as e:
            print(f"Data streaming error: {e}")
            break

async def handle_client_messages(ws):
    """Handle messages from WebSocket client."""
    async for message in ws:
        try:
            data = json_lib.loads(message)
            
            if data.get("type") == "ping":
                await ws.send(json_lib.dumps({
                    "type": "pong",
                    "timestamp": datetime.utcnow().isoformat()
                }))
            
            elif data.get("type") == "subscribe":
                # Handle subscription requests
                sensor_id = data.get("sensor_id")
                await ws.send(json_lib.dumps({
                    "type": "subscribed",
                    "sensor_id": sensor_id,
                    "message": f"Subscribed to {sensor_id}"
                }))
        
        except json_lib.JSONDecodeError:
            await ws.send(json_lib.dumps({
                "type": "error",
                "message": "Invalid message format"
            }))

WebSocket with Authentication and Rate Limiting

import time
from collections import defaultdict

app = Sanic("SecureWebSocketApp")

# Rate limiting storage
client_requests = defaultdict(list)
RATE_LIMIT = 10  # 10 messages per minute
RATE_WINDOW = 60  # 1 minute window

@app.websocket('/ws/secure')
async def secure_websocket(request, ws):
    """WebSocket with authentication and rate limiting."""
    
    # Authenticate connection
    token = request.args.get('token') or request.headers.get('Authorization')
    if not token:
        await ws.close(code=1008, reason="Authentication token required")
        return
    
    try:
        user = await validate_token(token)
        client_id = user['id']
    except Exception:
        await ws.close(code=1008, reason="Invalid authentication token")
        return
    
    await ws.send(json_lib.dumps({
        "type": "authenticated",
        "user": user['username'],
        "message": "Authentication successful"
    }))
    
    try:
        async for message in ws:
            # Rate limiting check
            if not check_rate_limit(client_id):
                await ws.send(json_lib.dumps({
                    "type": "error",
                    "message": "Rate limit exceeded. Please slow down."
                }))
                continue
            
            # Process message
            try:
                data = json_lib.loads(message)
                
                # Echo message back with user info
                response = {
                    "type": "message",
                    "user": user['username'],
                    "message": data.get("message", ""),
                    "timestamp": datetime.utcnow().isoformat()
                }
                
                await ws.send(json_lib.dumps(response))
                
            except json_lib.JSONDecodeError:
                await ws.send(json_lib.dumps({
                    "type": "error",
                    "message": "Invalid JSON format"
                }))
    
    except Exception as e:
        print(f"Secure WebSocket error: {e}")
    
    finally:
        # Clean up rate limiting data
        if client_id in client_requests:
            del client_requests[client_id]

def check_rate_limit(client_id):
    """Check if client is within rate limits."""
    now = time.time()
    
    # Clean old requests
    client_requests[client_id] = [
        req_time for req_time in client_requests[client_id]
        if now - req_time < RATE_WINDOW
    ]
    
    # Check rate limit
    if len(client_requests[client_id]) >= RATE_LIMIT:
        return False
    
    # Record this request
    client_requests[client_id].append(now)
    return True

async def validate_token(token):
    """Validate authentication token."""
    # Implement your token validation logic
    if token == "valid_token":
        return {"id": "user123", "username": "testuser"}
    raise ValueError("Invalid token")

Install with Tessl CLI

npx tessl i tessl/pypi-sanic

docs

blueprints.md

configuration.md

core-application.md

exceptions.md

index.md

middleware-signals.md

request-response.md

server-deployment.md

websockets.md

tile.json