Litestar is a powerful, flexible yet opinionated ASGI web framework specifically focused on building high-performance APIs.
—
WebSocket connection handling with decorators for WebSocket routes, listeners, and streaming. Litestar provides comprehensive WebSocket support for real-time bidirectional communication between client and server.
Decorators for creating WebSocket route handlers with different patterns of communication.
def websocket(
path: str | Sequence[str] | None = None,
*,
dependencies: Dependencies | None = None,
exception_handlers: ExceptionHandlersMap | None = None,
guards: Sequence[Guard] | None = None,
middleware: Sequence[Middleware] | None = None,
name: str | None = None,
opt: dict[str, Any] | None = None,
signature_namespace: dict[str, Any] | None = None,
websocket_class: type[WebSocket] | None = None,
connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
) -> Callable[[AnyCallable], WebsocketRouteHandler]:
"""
Create a WebSocket route handler.
Parameters:
- path: WebSocket route path(s)
- dependencies: Route-specific dependency providers
- exception_handlers: Route-specific exception handlers
- guards: Authorization guards
- middleware: Route-specific middleware
- name: Route name for URL generation
- opt: Arbitrary options dictionary
- signature_namespace: Additional namespace for signature inspection
- websocket_class: Custom WebSocket class
- connection_lifespan: Lifespan managers for the connection
Returns:
Decorator function that creates a WebsocketRouteHandler
"""
def websocket_listener(
path: str | Sequence[str] | None = None,
**kwargs: Any
) -> Callable[[AnyCallable], WebsocketListenerRouteHandler]:
"""
Create a WebSocket listener route handler.
Listener handlers automatically accept connections and continuously
listen for messages, calling the handler function for each message.
"""
def websocket_stream(
path: str | Sequence[str] | None = None,
**kwargs: Any
) -> Callable[[AnyCallable], WebsocketRouteHandler]:
"""
Create a WebSocket streaming route handler.
Stream handlers send data to the client using an async generator function.
"""The WebSocket connection object provides methods for bidirectional communication.
class WebSocket(ASGIConnection):
def __init__(self, scope: Scope, receive: Receive, send: Send):
"""
Initialize a WebSocket connection.
Parameters:
- scope: ASGI scope dictionary
- receive: ASGI receive callable
- send: ASGI send callable
"""
# Connection management
async def accept(
self,
subprotocols: str | Sequence[str] | None = None,
headers: Sequence[tuple[str, str]] | None = None,
) -> None:
"""
Accept the WebSocket connection.
Parameters:
- subprotocols: Supported subprotocols
- headers: Additional headers to send
"""
async def close(self, code: int = 1000, reason: str | None = None) -> None:
"""
Close the WebSocket connection.
Parameters:
- code: Close code (default 1000 for normal closure)
- reason: Reason for closing
"""
# Sending data
async def send_text(self, data: str) -> None:
"""Send text data to the client."""
async def send_bytes(self, data: bytes) -> None:
"""Send binary data to the client."""
async def send_json(self, data: Any, mode: Literal["text", "binary"] = "text") -> None:
"""
Send JSON data to the client.
Parameters:
- data: Data to serialize as JSON
- mode: Send as text or binary message
"""
# Receiving data
async def receive_text(self) -> str:
"""Receive text data from the client."""
async def receive_bytes(self) -> bytes:
"""Receive binary data from the client."""
async def receive_json(self, mode: Literal["text", "binary"] = "text") -> Any:
"""
Receive JSON data from the client.
Parameters:
- mode: Expect text or binary JSON message
Returns:
Deserialized JSON data
"""
async def receive(self) -> Message:
"""Receive raw ASGI message from the client."""
async def send(self, message: Message) -> None:
"""Send raw ASGI message to the client."""
# Connection state
@property
def connection_state(self) -> WebSocketState:
"""Get current connection state."""
@property
def client_state(self) -> WebSocketState:
"""Get client connection state."""
# Iterator support
def iter_text(self) -> AsyncIterator[str]:
"""Iterate over incoming text messages."""
def iter_bytes(self) -> AsyncIterator[bytes]:
"""Iterate over incoming binary messages."""
def iter_json(self, mode: Literal["text", "binary"] = "text") -> AsyncIterator[Any]:
"""Iterate over incoming JSON messages."""Route handler classes for different WebSocket patterns.
class WebsocketRouteHandler(BaseRouteHandler):
def __init__(
self,
fn: AnyCallable,
*,
path: str | Sequence[str] | None = None,
connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
**kwargs: Any,
):
"""Create a WebSocket route handler."""
class WebsocketListenerRouteHandler(WebsocketRouteHandler):
"""Route handler that automatically listens for messages."""
class WebsocketListener:
def __init__(
self,
path: str,
*,
connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
**kwargs: Any,
):
"""Create a WebSocket listener."""Utility functions for WebSocket streaming and message handling.
async def send_websocket_stream(
websocket: WebSocket,
stream: AsyncIterator[str | bytes | dict],
*,
mode: Literal["text", "binary"] = "text",
) -> None:
"""
Send a stream of data over WebSocket.
Parameters:
- websocket: WebSocket connection
- stream: Async iterator yielding data to send
- mode: Send as text or binary messages
"""from litestar import websocket, WebSocket
import asyncio
@websocket("/ws")
async def websocket_handler(websocket: WebSocket) -> None:
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
# Echo the message back
await websocket.send_text(f"Echo: {message}")
except Exception:
await websocket.close()from litestar import websocket_listener, WebSocket
@websocket_listener("/chat")
async def chat_handler(websocket: WebSocket, data: str) -> None:
"""Handler is called for each message received."""
# Process the message
response = f"Received: {data}"
await websocket.send_text(response)from litestar import websocket_stream, WebSocket
import asyncio
import json
@websocket_stream("/stream")
async def stream_handler(websocket: WebSocket) -> AsyncIterator[str]:
"""Stream data to the client using an async generator."""
counter = 0
while True:
data = {"counter": counter, "timestamp": time.time()}
yield json.dumps(data)
counter += 1
await asyncio.sleep(1)from litestar import websocket, WebSocket
from typing import Dict, Any
@websocket("/api/ws")
async def api_websocket(websocket: WebSocket) -> None:
await websocket.accept()
try:
while True:
# Receive JSON message
message: Dict[str, Any] = await websocket.receive_json()
# Process based on message type
if message.get("type") == "ping":
await websocket.send_json({"type": "pong"})
elif message.get("type") == "echo":
await websocket.send_json({
"type": "echo_response",
"data": message.get("data")
})
else:
await websocket.send_json({
"type": "error",
"message": "Unknown message type"
})
except Exception as e:
await websocket.close(code=1011, reason=str(e))from litestar import websocket, WebSocket, Dependency
from litestar.exceptions import WebSocketException
async def authenticate_websocket(websocket: WebSocket) -> dict:
"""Authenticate WebSocket connection."""
token = websocket.query_params.get("token")
if not token:
raise WebSocketException("Authentication required", code=4001)
# Validate token (simplified)
if token != "valid_token":
raise WebSocketException("Invalid token", code=4003)
return {"user_id": 123, "username": "alice"}
@websocket("/secure-ws", dependencies={"user": Dependency(authenticate_websocket)})
async def secure_websocket(websocket: WebSocket, user: dict) -> None:
await websocket.accept()
# Send welcome message
await websocket.send_json({
"type": "welcome",
"user": user["username"]
})
try:
while True:
message = await websocket.receive_json()
# Process authenticated user messages
await websocket.send_json({
"type": "response",
"user_id": user["user_id"],
"echo": message
})
except Exception:
await websocket.close()from litestar import Litestar, websocket, WebSocket
import asyncio
from typing import Set
# Store active connections
active_connections: Set[WebSocket] = set()
async def add_connection(websocket: WebSocket) -> None:
active_connections.add(websocket)
async def remove_connection(websocket: WebSocket) -> None:
active_connections.discard(websocket)
async def broadcast_message(message: str) -> None:
"""Broadcast message to all active connections."""
if active_connections:
await asyncio.gather(
*[ws.send_text(message) for ws in active_connections.copy()],
return_exceptions=True
)
@websocket("/broadcast")
async def broadcast_websocket(websocket: WebSocket) -> None:
await websocket.accept()
await add_connection(websocket)
try:
while True:
message = await websocket.receive_text()
# Broadcast to all connections
await broadcast_message(f"User says: {message}")
except Exception:
pass
finally:
await remove_connection(websocket)
await websocket.close()from litestar import websocket, WebSocket
from litestar.concurrency import sync_to_thread
import asyncio
import queue
import threading
# Message queue for background processing
message_queue = queue.Queue()
def background_processor():
"""Background thread that processes messages."""
while True:
try:
message = message_queue.get(timeout=1)
# Process message (simulate work)
processed = f"Processed: {message}"
# In real app, you'd send this back to specific connections
print(processed)
except queue.Empty:
continue
# Start background thread
threading.Thread(target=background_processor, daemon=True).start()
@websocket("/background")
async def background_websocket(websocket: WebSocket) -> None:
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
# Add to background processing queue
await sync_to_thread(message_queue.put, message)
# Acknowledge receipt
await websocket.send_text("Message queued for processing")
except Exception:
await websocket.close()from litestar import websocket, WebSocket
@websocket("/rooms/{room_id:int}/ws")
async def room_websocket(websocket: WebSocket, room_id: int) -> None:
await websocket.accept()
# Send room info
await websocket.send_json({
"type": "room_joined",
"room_id": room_id,
"message": f"Welcome to room {room_id}"
})
try:
while True:
message = await websocket.receive_json()
# Handle room-specific messages
await websocket.send_json({
"type": "room_message",
"room_id": room_id,
"data": message
})
except Exception:
await websocket.close()# WebSocket states
class WebSocketState(Enum):
CONNECTING = "CONNECTING"
CONNECTED = "CONNECTED"
DISCONNECTED = "DISCONNECTED"
# WebSocket message types
WebSocketMessage = dict[str, Any]
# Close codes (RFC 6455)
WS_1000_NORMAL_CLOSURE = 1000
WS_1001_GOING_AWAY = 1001
WS_1002_PROTOCOL_ERROR = 1002
WS_1003_UNSUPPORTED_DATA = 1003
WS_1007_INVALID_FRAME_PAYLOAD_DATA = 1007
WS_1008_POLICY_VIOLATION = 1008
WS_1009_MESSAGE_TOO_BIG = 1009
WS_1010_MANDATORY_EXTENSION = 1010
WS_1011_INTERNAL_ERROR = 1011
WS_1012_SERVICE_RESTART = 1012
WS_1013_TRY_AGAIN_LATER = 1013
# Custom close codes for application use (4000-4999)
WS_4001_UNAUTHORIZED = 4001
WS_4003_FORBIDDEN = 4003
WS_4004_NOT_FOUND = 4004
# Iterator types
AsyncTextIterator = AsyncIterator[str]
AsyncBytesIterator = AsyncIterator[bytes]
AsyncJSONIterator = AsyncIterator[Any]Install with Tessl CLI
npx tessl i tessl/pypi-litestar