CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-litestar

Litestar is a powerful, flexible yet opinionated ASGI web framework specifically focused on building high-performance APIs.

Pending
Overview
Eval results
Files

websocket.mddocs/

WebSocket Support

WebSocket connection handling with decorators for WebSocket routes, listeners, and streaming. Litestar provides comprehensive WebSocket support for real-time bidirectional communication between client and server.

Capabilities

WebSocket Route Decorators

Decorators for creating WebSocket route handlers with different patterns of communication.

def websocket(
    path: str | Sequence[str] | None = None,
    *,
    dependencies: Dependencies | None = None,
    exception_handlers: ExceptionHandlersMap | None = None,
    guards: Sequence[Guard] | None = None,
    middleware: Sequence[Middleware] | None = None,
    name: str | None = None,
    opt: dict[str, Any] | None = None,
    signature_namespace: dict[str, Any] | None = None,
    websocket_class: type[WebSocket] | None = None,
    connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
) -> Callable[[AnyCallable], WebsocketRouteHandler]:
    """
    Create a WebSocket route handler.

    Parameters:
    - path: WebSocket route path(s)
    - dependencies: Route-specific dependency providers
    - exception_handlers: Route-specific exception handlers
    - guards: Authorization guards
    - middleware: Route-specific middleware
    - name: Route name for URL generation
    - opt: Arbitrary options dictionary
    - signature_namespace: Additional namespace for signature inspection
    - websocket_class: Custom WebSocket class
    - connection_lifespan: Lifespan managers for the connection

    Returns:
    Decorator function that creates a WebsocketRouteHandler
    """

def websocket_listener(
    path: str | Sequence[str] | None = None,
    **kwargs: Any
) -> Callable[[AnyCallable], WebsocketListenerRouteHandler]:
    """
    Create a WebSocket listener route handler.
    
    Listener handlers automatically accept connections and continuously
    listen for messages, calling the handler function for each message.
    """

def websocket_stream(
    path: str | Sequence[str] | None = None,
    **kwargs: Any
) -> Callable[[AnyCallable], WebsocketRouteHandler]:
    """
    Create a WebSocket streaming route handler.
    
    Stream handlers send data to the client using an async generator function.
    """

WebSocket Connection Object

The WebSocket connection object provides methods for bidirectional communication.

class WebSocket(ASGIConnection):
    def __init__(self, scope: Scope, receive: Receive, send: Send):
        """
        Initialize a WebSocket connection.

        Parameters:
        - scope: ASGI scope dictionary
        - receive: ASGI receive callable
        - send: ASGI send callable
        """

    # Connection management
    async def accept(
        self,
        subprotocols: str | Sequence[str] | None = None,
        headers: Sequence[tuple[str, str]] | None = None,
    ) -> None:
        """
        Accept the WebSocket connection.

        Parameters:
        - subprotocols: Supported subprotocols
        - headers: Additional headers to send
        """

    async def close(self, code: int = 1000, reason: str | None = None) -> None:
        """
        Close the WebSocket connection.

        Parameters:
        - code: Close code (default 1000 for normal closure)
        - reason: Reason for closing
        """

    # Sending data
    async def send_text(self, data: str) -> None:
        """Send text data to the client."""

    async def send_bytes(self, data: bytes) -> None:
        """Send binary data to the client."""

    async def send_json(self, data: Any, mode: Literal["text", "binary"] = "text") -> None:
        """
        Send JSON data to the client.

        Parameters:
        - data: Data to serialize as JSON
        - mode: Send as text or binary message
        """

    # Receiving data
    async def receive_text(self) -> str:
        """Receive text data from the client."""

    async def receive_bytes(self) -> bytes:
        """Receive binary data from the client."""

    async def receive_json(self, mode: Literal["text", "binary"] = "text") -> Any:
        """
        Receive JSON data from the client.

        Parameters:
        - mode: Expect text or binary JSON message

        Returns:
        Deserialized JSON data
        """

    async def receive(self) -> Message:
        """Receive raw ASGI message from the client."""

    async def send(self, message: Message) -> None:
        """Send raw ASGI message to the client."""

    # Connection state
    @property
    def connection_state(self) -> WebSocketState:
        """Get current connection state."""

    @property
    def client_state(self) -> WebSocketState:
        """Get client connection state."""

    # Iterator support
    def iter_text(self) -> AsyncIterator[str]:
        """Iterate over incoming text messages."""

    def iter_bytes(self) -> AsyncIterator[bytes]:
        """Iterate over incoming binary messages."""

    def iter_json(self, mode: Literal["text", "binary"] = "text") -> AsyncIterator[Any]:
        """Iterate over incoming JSON messages."""

WebSocket Route Handlers

Route handler classes for different WebSocket patterns.

class WebsocketRouteHandler(BaseRouteHandler):
    def __init__(
        self,
        fn: AnyCallable,
        *,
        path: str | Sequence[str] | None = None,
        connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
        **kwargs: Any,
    ):
        """Create a WebSocket route handler."""

class WebsocketListenerRouteHandler(WebsocketRouteHandler):
    """Route handler that automatically listens for messages."""

class WebsocketListener:
    def __init__(
        self,
        path: str,
        *,
        connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
        **kwargs: Any,
    ):
        """Create a WebSocket listener."""

WebSocket Utilities

Utility functions for WebSocket streaming and message handling.

async def send_websocket_stream(
    websocket: WebSocket,
    stream: AsyncIterator[str | bytes | dict],
    *,
    mode: Literal["text", "binary"] = "text",
) -> None:
    """
    Send a stream of data over WebSocket.

    Parameters:
    - websocket: WebSocket connection
    - stream: Async iterator yielding data to send
    - mode: Send as text or binary messages
    """

Usage Examples

Basic WebSocket Handler

from litestar import websocket, WebSocket
import asyncio

@websocket("/ws")
async def websocket_handler(websocket: WebSocket) -> None:
    await websocket.accept()
    
    try:
        while True:
            message = await websocket.receive_text()
            # Echo the message back
            await websocket.send_text(f"Echo: {message}")
    except Exception:
        await websocket.close()

WebSocket Listener

from litestar import websocket_listener, WebSocket

@websocket_listener("/chat")
async def chat_handler(websocket: WebSocket, data: str) -> None:
    """Handler is called for each message received."""
    # Process the message
    response = f"Received: {data}"
    await websocket.send_text(response)

WebSocket Streaming

from litestar import websocket_stream, WebSocket
import asyncio
import json

@websocket_stream("/stream")
async def stream_handler(websocket: WebSocket) -> AsyncIterator[str]:
    """Stream data to the client using an async generator."""
    counter = 0
    while True:
        data = {"counter": counter, "timestamp": time.time()}
        yield json.dumps(data)
        counter += 1
        await asyncio.sleep(1)

JSON Message Handling

from litestar import websocket, WebSocket
from typing import Dict, Any

@websocket("/api/ws")
async def api_websocket(websocket: WebSocket) -> None:
    await websocket.accept()
    
    try:
        while True:
            # Receive JSON message
            message: Dict[str, Any] = await websocket.receive_json()
            
            # Process based on message type
            if message.get("type") == "ping":
                await websocket.send_json({"type": "pong"})
            elif message.get("type") == "echo":
                await websocket.send_json({
                    "type": "echo_response",
                    "data": message.get("data")
                })
            else:
                await websocket.send_json({
                    "type": "error",
                    "message": "Unknown message type"
                })
                
    except Exception as e:
        await websocket.close(code=1011, reason=str(e))

Connection Management with Authentication

from litestar import websocket, WebSocket, Dependency
from litestar.exceptions import WebSocketException

async def authenticate_websocket(websocket: WebSocket) -> dict:
    """Authenticate WebSocket connection."""
    token = websocket.query_params.get("token")
    if not token:
        raise WebSocketException("Authentication required", code=4001)
    
    # Validate token (simplified)
    if token != "valid_token":
        raise WebSocketException("Invalid token", code=4003)
    
    return {"user_id": 123, "username": "alice"}

@websocket("/secure-ws", dependencies={"user": Dependency(authenticate_websocket)})
async def secure_websocket(websocket: WebSocket, user: dict) -> None:
    await websocket.accept()
    
    # Send welcome message
    await websocket.send_json({
        "type": "welcome",
        "user": user["username"]
    })
    
    try:
        while True:
            message = await websocket.receive_json()
            # Process authenticated user messages
            await websocket.send_json({
                "type": "response",
                "user_id": user["user_id"],
                "echo": message
            })
    except Exception:
        await websocket.close()

Broadcasting to Multiple Connections

from litestar import Litestar, websocket, WebSocket
import asyncio
from typing import Set

# Store active connections
active_connections: Set[WebSocket] = set()

async def add_connection(websocket: WebSocket) -> None:
    active_connections.add(websocket)

async def remove_connection(websocket: WebSocket) -> None:
    active_connections.discard(websocket)

async def broadcast_message(message: str) -> None:
    """Broadcast message to all active connections."""
    if active_connections:
        await asyncio.gather(
            *[ws.send_text(message) for ws in active_connections.copy()],
            return_exceptions=True
        )

@websocket("/broadcast")
async def broadcast_websocket(websocket: WebSocket) -> None:
    await websocket.accept()
    await add_connection(websocket)
    
    try:
        while True:
            message = await websocket.receive_text()
            # Broadcast to all connections
            await broadcast_message(f"User says: {message}")
    except Exception:
        pass
    finally:
        await remove_connection(websocket)
        await websocket.close()

WebSocket with Background Tasks

from litestar import websocket, WebSocket
from litestar.concurrency import sync_to_thread
import asyncio
import queue
import threading

# Message queue for background processing
message_queue = queue.Queue()

def background_processor():
    """Background thread that processes messages."""
    while True:
        try:
            message = message_queue.get(timeout=1)
            # Process message (simulate work)
            processed = f"Processed: {message}"
            # In real app, you'd send this back to specific connections
            print(processed)
        except queue.Empty:
            continue

# Start background thread
threading.Thread(target=background_processor, daemon=True).start()

@websocket("/background")
async def background_websocket(websocket: WebSocket) -> None:
    await websocket.accept()
    
    try:
        while True:
            message = await websocket.receive_text()
            
            # Add to background processing queue
            await sync_to_thread(message_queue.put, message)
            
            # Acknowledge receipt
            await websocket.send_text("Message queued for processing")
            
    except Exception:
        await websocket.close()

WebSocket with Path Parameters

from litestar import websocket, WebSocket

@websocket("/rooms/{room_id:int}/ws")
async def room_websocket(websocket: WebSocket, room_id: int) -> None:
    await websocket.accept()
    
    # Send room info
    await websocket.send_json({
        "type": "room_joined",
        "room_id": room_id,
        "message": f"Welcome to room {room_id}"
    })
    
    try:
        while True:
            message = await websocket.receive_json()
            # Handle room-specific messages
            await websocket.send_json({
                "type": "room_message",
                "room_id": room_id,
                "data": message
            })
    except Exception:
        await websocket.close()

Types

# WebSocket states  
class WebSocketState(Enum):
    CONNECTING = "CONNECTING"
    CONNECTED = "CONNECTED"
    DISCONNECTED = "DISCONNECTED"

# WebSocket message types
WebSocketMessage = dict[str, Any]

# Close codes (RFC 6455)
WS_1000_NORMAL_CLOSURE = 1000
WS_1001_GOING_AWAY = 1001
WS_1002_PROTOCOL_ERROR = 1002
WS_1003_UNSUPPORTED_DATA = 1003
WS_1007_INVALID_FRAME_PAYLOAD_DATA = 1007
WS_1008_POLICY_VIOLATION = 1008
WS_1009_MESSAGE_TOO_BIG = 1009
WS_1010_MANDATORY_EXTENSION = 1010
WS_1011_INTERNAL_ERROR = 1011
WS_1012_SERVICE_RESTART = 1012
WS_1013_TRY_AGAIN_LATER = 1013

# Custom close codes for application use (4000-4999)
WS_4001_UNAUTHORIZED = 4001
WS_4003_FORBIDDEN = 4003
WS_4004_NOT_FOUND = 4004

# Iterator types
AsyncTextIterator = AsyncIterator[str]
AsyncBytesIterator = AsyncIterator[bytes]
AsyncJSONIterator = AsyncIterator[Any]

Install with Tessl CLI

npx tessl i tessl/pypi-litestar

docs

application-routing.md

configuration.md

dto.md

exceptions.md

http-handlers.md

index.md

middleware.md

openapi.md

plugins.md

request-response.md

security.md

testing.md

websocket.md

tile.json