0
# WebSocket Support
1
2
WebSocket connection handling with decorators for WebSocket routes, listeners, and streaming. Litestar provides comprehensive WebSocket support for real-time bidirectional communication between client and server.
3
4
## Capabilities
5
6
### WebSocket Route Decorators
7
8
Decorators for creating WebSocket route handlers with different patterns of communication.
9
10
```python { .api }
11
def websocket(
12
path: str | Sequence[str] | None = None,
13
*,
14
dependencies: Dependencies | None = None,
15
exception_handlers: ExceptionHandlersMap | None = None,
16
guards: Sequence[Guard] | None = None,
17
middleware: Sequence[Middleware] | None = None,
18
name: str | None = None,
19
opt: dict[str, Any] | None = None,
20
signature_namespace: dict[str, Any] | None = None,
21
websocket_class: type[WebSocket] | None = None,
22
connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
23
) -> Callable[[AnyCallable], WebsocketRouteHandler]:
24
"""
25
Create a WebSocket route handler.
26
27
Parameters:
28
- path: WebSocket route path(s)
29
- dependencies: Route-specific dependency providers
30
- exception_handlers: Route-specific exception handlers
31
- guards: Authorization guards
32
- middleware: Route-specific middleware
33
- name: Route name for URL generation
34
- opt: Arbitrary options dictionary
35
- signature_namespace: Additional namespace for signature inspection
36
- websocket_class: Custom WebSocket class
37
- connection_lifespan: Lifespan managers for the connection
38
39
Returns:
40
Decorator function that creates a WebsocketRouteHandler
41
"""
42
43
def websocket_listener(
44
path: str | Sequence[str] | None = None,
45
**kwargs: Any
46
) -> Callable[[AnyCallable], WebsocketListenerRouteHandler]:
47
"""
48
Create a WebSocket listener route handler.
49
50
Listener handlers automatically accept connections and continuously
51
listen for messages, calling the handler function for each message.
52
"""
53
54
def websocket_stream(
55
path: str | Sequence[str] | None = None,
56
**kwargs: Any
57
) -> Callable[[AnyCallable], WebsocketRouteHandler]:
58
"""
59
Create a WebSocket streaming route handler.
60
61
Stream handlers send data to the client using an async generator function.
62
"""
63
```
64
65
### WebSocket Connection Object
66
67
The WebSocket connection object provides methods for bidirectional communication.
68
69
```python { .api }
70
class WebSocket(ASGIConnection):
71
def __init__(self, scope: Scope, receive: Receive, send: Send):
72
"""
73
Initialize a WebSocket connection.
74
75
Parameters:
76
- scope: ASGI scope dictionary
77
- receive: ASGI receive callable
78
- send: ASGI send callable
79
"""
80
81
# Connection management
82
async def accept(
83
self,
84
subprotocols: str | Sequence[str] | None = None,
85
headers: Sequence[tuple[str, str]] | None = None,
86
) -> None:
87
"""
88
Accept the WebSocket connection.
89
90
Parameters:
91
- subprotocols: Supported subprotocols
92
- headers: Additional headers to send
93
"""
94
95
async def close(self, code: int = 1000, reason: str | None = None) -> None:
96
"""
97
Close the WebSocket connection.
98
99
Parameters:
100
- code: Close code (default 1000 for normal closure)
101
- reason: Reason for closing
102
"""
103
104
# Sending data
105
async def send_text(self, data: str) -> None:
106
"""Send text data to the client."""
107
108
async def send_bytes(self, data: bytes) -> None:
109
"""Send binary data to the client."""
110
111
async def send_json(self, data: Any, mode: Literal["text", "binary"] = "text") -> None:
112
"""
113
Send JSON data to the client.
114
115
Parameters:
116
- data: Data to serialize as JSON
117
- mode: Send as text or binary message
118
"""
119
120
# Receiving data
121
async def receive_text(self) -> str:
122
"""Receive text data from the client."""
123
124
async def receive_bytes(self) -> bytes:
125
"""Receive binary data from the client."""
126
127
async def receive_json(self, mode: Literal["text", "binary"] = "text") -> Any:
128
"""
129
Receive JSON data from the client.
130
131
Parameters:
132
- mode: Expect text or binary JSON message
133
134
Returns:
135
Deserialized JSON data
136
"""
137
138
async def receive(self) -> Message:
139
"""Receive raw ASGI message from the client."""
140
141
async def send(self, message: Message) -> None:
142
"""Send raw ASGI message to the client."""
143
144
# Connection state
145
@property
146
def connection_state(self) -> WebSocketState:
147
"""Get current connection state."""
148
149
@property
150
def client_state(self) -> WebSocketState:
151
"""Get client connection state."""
152
153
# Iterator support
154
def iter_text(self) -> AsyncIterator[str]:
155
"""Iterate over incoming text messages."""
156
157
def iter_bytes(self) -> AsyncIterator[bytes]:
158
"""Iterate over incoming binary messages."""
159
160
def iter_json(self, mode: Literal["text", "binary"] = "text") -> AsyncIterator[Any]:
161
"""Iterate over incoming JSON messages."""
162
```
163
164
### WebSocket Route Handlers
165
166
Route handler classes for different WebSocket patterns.
167
168
```python { .api }
169
class WebsocketRouteHandler(BaseRouteHandler):
170
def __init__(
171
self,
172
fn: AnyCallable,
173
*,
174
path: str | Sequence[str] | None = None,
175
connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
176
**kwargs: Any,
177
):
178
"""Create a WebSocket route handler."""
179
180
class WebsocketListenerRouteHandler(WebsocketRouteHandler):
181
"""Route handler that automatically listens for messages."""
182
183
class WebsocketListener:
184
def __init__(
185
self,
186
path: str,
187
*,
188
connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,
189
**kwargs: Any,
190
):
191
"""Create a WebSocket listener."""
192
```
193
194
### WebSocket Utilities
195
196
Utility functions for WebSocket streaming and message handling.
197
198
```python { .api }
199
async def send_websocket_stream(
200
websocket: WebSocket,
201
stream: AsyncIterator[str | bytes | dict],
202
*,
203
mode: Literal["text", "binary"] = "text",
204
) -> None:
205
"""
206
Send a stream of data over WebSocket.
207
208
Parameters:
209
- websocket: WebSocket connection
210
- stream: Async iterator yielding data to send
211
- mode: Send as text or binary messages
212
"""
213
```
214
215
## Usage Examples
216
217
### Basic WebSocket Handler
218
219
```python
220
from litestar import websocket, WebSocket
221
import asyncio
222
223
@websocket("/ws")
224
async def websocket_handler(websocket: WebSocket) -> None:
225
await websocket.accept()
226
227
try:
228
while True:
229
message = await websocket.receive_text()
230
# Echo the message back
231
await websocket.send_text(f"Echo: {message}")
232
except Exception:
233
await websocket.close()
234
```
235
236
### WebSocket Listener
237
238
```python
239
from litestar import websocket_listener, WebSocket
240
241
@websocket_listener("/chat")
242
async def chat_handler(websocket: WebSocket, data: str) -> None:
243
"""Handler is called for each message received."""
244
# Process the message
245
response = f"Received: {data}"
246
await websocket.send_text(response)
247
```
248
249
### WebSocket Streaming
250
251
```python
252
from litestar import websocket_stream, WebSocket
253
import asyncio
254
import json
255
256
@websocket_stream("/stream")
257
async def stream_handler(websocket: WebSocket) -> AsyncIterator[str]:
258
"""Stream data to the client using an async generator."""
259
counter = 0
260
while True:
261
data = {"counter": counter, "timestamp": time.time()}
262
yield json.dumps(data)
263
counter += 1
264
await asyncio.sleep(1)
265
```
266
267
### JSON Message Handling
268
269
```python
270
from litestar import websocket, WebSocket
271
from typing import Dict, Any
272
273
@websocket("/api/ws")
274
async def api_websocket(websocket: WebSocket) -> None:
275
await websocket.accept()
276
277
try:
278
while True:
279
# Receive JSON message
280
message: Dict[str, Any] = await websocket.receive_json()
281
282
# Process based on message type
283
if message.get("type") == "ping":
284
await websocket.send_json({"type": "pong"})
285
elif message.get("type") == "echo":
286
await websocket.send_json({
287
"type": "echo_response",
288
"data": message.get("data")
289
})
290
else:
291
await websocket.send_json({
292
"type": "error",
293
"message": "Unknown message type"
294
})
295
296
except Exception as e:
297
await websocket.close(code=1011, reason=str(e))
298
```
299
300
### Connection Management with Authentication
301
302
```python
303
from litestar import websocket, WebSocket, Dependency
304
from litestar.exceptions import WebSocketException
305
306
async def authenticate_websocket(websocket: WebSocket) -> dict:
307
"""Authenticate WebSocket connection."""
308
token = websocket.query_params.get("token")
309
if not token:
310
raise WebSocketException("Authentication required", code=4001)
311
312
# Validate token (simplified)
313
if token != "valid_token":
314
raise WebSocketException("Invalid token", code=4003)
315
316
return {"user_id": 123, "username": "alice"}
317
318
@websocket("/secure-ws", dependencies={"user": Dependency(authenticate_websocket)})
319
async def secure_websocket(websocket: WebSocket, user: dict) -> None:
320
await websocket.accept()
321
322
# Send welcome message
323
await websocket.send_json({
324
"type": "welcome",
325
"user": user["username"]
326
})
327
328
try:
329
while True:
330
message = await websocket.receive_json()
331
# Process authenticated user messages
332
await websocket.send_json({
333
"type": "response",
334
"user_id": user["user_id"],
335
"echo": message
336
})
337
except Exception:
338
await websocket.close()
339
```
340
341
### Broadcasting to Multiple Connections
342
343
```python
344
from litestar import Litestar, websocket, WebSocket
345
import asyncio
346
from typing import Set
347
348
# Store active connections
349
active_connections: Set[WebSocket] = set()
350
351
async def add_connection(websocket: WebSocket) -> None:
352
active_connections.add(websocket)
353
354
async def remove_connection(websocket: WebSocket) -> None:
355
active_connections.discard(websocket)
356
357
async def broadcast_message(message: str) -> None:
358
"""Broadcast message to all active connections."""
359
if active_connections:
360
await asyncio.gather(
361
*[ws.send_text(message) for ws in active_connections.copy()],
362
return_exceptions=True
363
)
364
365
@websocket("/broadcast")
366
async def broadcast_websocket(websocket: WebSocket) -> None:
367
await websocket.accept()
368
await add_connection(websocket)
369
370
try:
371
while True:
372
message = await websocket.receive_text()
373
# Broadcast to all connections
374
await broadcast_message(f"User says: {message}")
375
except Exception:
376
pass
377
finally:
378
await remove_connection(websocket)
379
await websocket.close()
380
```
381
382
### WebSocket with Background Tasks
383
384
```python
385
from litestar import websocket, WebSocket
386
from litestar.concurrency import sync_to_thread
387
import asyncio
388
import queue
389
import threading
390
391
# Message queue for background processing
392
message_queue = queue.Queue()
393
394
def background_processor():
395
"""Background thread that processes messages."""
396
while True:
397
try:
398
message = message_queue.get(timeout=1)
399
# Process message (simulate work)
400
processed = f"Processed: {message}"
401
# In real app, you'd send this back to specific connections
402
print(processed)
403
except queue.Empty:
404
continue
405
406
# Start background thread
407
threading.Thread(target=background_processor, daemon=True).start()
408
409
@websocket("/background")
410
async def background_websocket(websocket: WebSocket) -> None:
411
await websocket.accept()
412
413
try:
414
while True:
415
message = await websocket.receive_text()
416
417
# Add to background processing queue
418
await sync_to_thread(message_queue.put, message)
419
420
# Acknowledge receipt
421
await websocket.send_text("Message queued for processing")
422
423
except Exception:
424
await websocket.close()
425
```
426
427
### WebSocket with Path Parameters
428
429
```python
430
from litestar import websocket, WebSocket
431
432
@websocket("/rooms/{room_id:int}/ws")
433
async def room_websocket(websocket: WebSocket, room_id: int) -> None:
434
await websocket.accept()
435
436
# Send room info
437
await websocket.send_json({
438
"type": "room_joined",
439
"room_id": room_id,
440
"message": f"Welcome to room {room_id}"
441
})
442
443
try:
444
while True:
445
message = await websocket.receive_json()
446
# Handle room-specific messages
447
await websocket.send_json({
448
"type": "room_message",
449
"room_id": room_id,
450
"data": message
451
})
452
except Exception:
453
await websocket.close()
454
```
455
456
## Types
457
458
```python { .api }
459
# WebSocket states
460
class WebSocketState(Enum):
461
CONNECTING = "CONNECTING"
462
CONNECTED = "CONNECTED"
463
DISCONNECTED = "DISCONNECTED"
464
465
# WebSocket message types
466
WebSocketMessage = dict[str, Any]
467
468
# Close codes (RFC 6455)
469
WS_1000_NORMAL_CLOSURE = 1000
470
WS_1001_GOING_AWAY = 1001
471
WS_1002_PROTOCOL_ERROR = 1002
472
WS_1003_UNSUPPORTED_DATA = 1003
473
WS_1007_INVALID_FRAME_PAYLOAD_DATA = 1007
474
WS_1008_POLICY_VIOLATION = 1008
475
WS_1009_MESSAGE_TOO_BIG = 1009
476
WS_1010_MANDATORY_EXTENSION = 1010
477
WS_1011_INTERNAL_ERROR = 1011
478
WS_1012_SERVICE_RESTART = 1012
479
WS_1013_TRY_AGAIN_LATER = 1013
480
481
# Custom close codes for application use (4000-4999)
482
WS_4001_UNAUTHORIZED = 4001
483
WS_4003_FORBIDDEN = 4003
484
WS_4004_NOT_FOUND = 4004
485
486
# Iterator types
487
AsyncTextIterator = AsyncIterator[str]
488
AsyncBytesIterator = AsyncIterator[bytes]
489
AsyncJSONIterator = AsyncIterator[Any]
490
```