0
# WebSocket Support
1
2
Starlette provides first-class WebSocket support for real-time, bidirectional communication between clients and servers, enabling interactive applications like chat systems, live updates, and collaborative tools.
3
4
## WebSocket Class
5
6
```python { .api }
7
from starlette.websockets import WebSocket, WebSocketState, WebSocketDisconnect
8
from starlette.datastructures import URL, Headers, QueryParams
9
from starlette.types import Scope, Receive, Send
10
from typing import Any, AsyncIterator
11
12
class WebSocket:
13
"""
14
WebSocket connection handler providing bidirectional communication.
15
16
The WebSocket class manages:
17
- Connection lifecycle (accept, close)
18
- Message sending and receiving (text, binary, JSON)
19
- Connection state management
20
- Subprotocol negotiation
21
- Error handling and disconnection
22
"""
23
24
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
25
"""
26
Initialize WebSocket from ASGI scope.
27
28
Args:
29
scope: ASGI WebSocket scope
30
receive: ASGI receive callable
31
send: ASGI send callable
32
"""
33
34
# Inherited from HTTPConnection
35
@property
36
def url(self) -> URL:
37
"""WebSocket connection URL."""
38
39
@property
40
def base_url(self) -> URL:
41
"""Base URL for generating absolute URLs."""
42
43
@property
44
def headers(self) -> Headers:
45
"""WebSocket handshake headers."""
46
47
@property
48
def query_params(self) -> QueryParams:
49
"""URL query parameters."""
50
51
@property
52
def path_params(self) -> dict[str, Any]:
53
"""Path parameters from URL pattern."""
54
55
@property
56
def cookies(self) -> dict[str, str]:
57
"""Cookies from handshake request."""
58
59
@property
60
def client(self) -> Address | None:
61
"""Client address information."""
62
63
@property
64
def session(self) -> dict[str, Any]:
65
"""Session data (requires SessionMiddleware)."""
66
67
@property
68
def auth(self) -> Any:
69
"""Authentication data (requires AuthenticationMiddleware)."""
70
71
@property
72
def user(self) -> Any:
73
"""User object (requires AuthenticationMiddleware)."""
74
75
@property
76
def state(self) -> State:
77
"""WebSocket-scoped state storage."""
78
79
# WebSocket-specific properties
80
@property
81
def client_state(self) -> WebSocketState:
82
"""Client connection state."""
83
84
@property
85
def application_state(self) -> WebSocketState:
86
"""Application connection state."""
87
88
# Connection management
89
async def accept(
90
self,
91
subprotocol: str | None = None,
92
headers: Sequence[tuple[bytes, bytes]] | None = None,
93
) -> None:
94
"""
95
Accept WebSocket connection.
96
97
Args:
98
subprotocol: Selected subprotocol from client's list
99
headers: Additional response headers
100
101
Raises:
102
RuntimeError: If connection already accepted/closed
103
"""
104
105
async def close(
106
self,
107
code: int = 1000,
108
reason: str | None = None
109
) -> None:
110
"""
111
Close WebSocket connection.
112
113
Args:
114
code: WebSocket close code (default: 1000 normal closure)
115
reason: Optional close reason string
116
"""
117
118
async def send_denial_response(self, response: Response) -> None:
119
"""
120
Send HTTP response instead of accepting WebSocket.
121
122
Used for authentication/authorization failures during handshake.
123
124
Args:
125
response: HTTP response to send
126
"""
127
128
# Message receiving
129
async def receive(self) -> dict[str, Any]:
130
"""
131
Receive raw WebSocket message.
132
133
Returns:
134
dict: ASGI WebSocket message
135
136
Raises:
137
WebSocketDisconnect: When connection closes
138
"""
139
140
async def receive_text(self) -> str:
141
"""
142
Receive text message.
143
144
Returns:
145
str: Text message content
146
147
Raises:
148
WebSocketDisconnect: When connection closes
149
RuntimeError: If message is not text
150
"""
151
152
async def receive_bytes(self) -> bytes:
153
"""
154
Receive binary message.
155
156
Returns:
157
bytes: Binary message content
158
159
Raises:
160
WebSocketDisconnect: When connection closes
161
RuntimeError: If message is not binary
162
"""
163
164
async def receive_json(self, mode: str = "text") -> Any:
165
"""
166
Receive JSON message.
167
168
Args:
169
mode: "text" or "binary" message mode
170
171
Returns:
172
Any: Parsed JSON data
173
174
Raises:
175
WebSocketDisconnect: When connection closes
176
ValueError: If message is not valid JSON
177
"""
178
179
# Message iteration
180
def iter_text(self) -> AsyncIterator[str]:
181
"""
182
Async iterator for text messages.
183
184
Yields:
185
str: Text messages until connection closes
186
"""
187
188
def iter_bytes(self) -> AsyncIterator[bytes]:
189
"""
190
Async iterator for binary messages.
191
192
Yields:
193
bytes: Binary messages until connection closes
194
"""
195
196
def iter_json(self, mode: str = "text") -> AsyncIterator[Any]:
197
"""
198
Async iterator for JSON messages.
199
200
Args:
201
mode: "text" or "binary" message mode
202
203
Yields:
204
Any: Parsed JSON data until connection closes
205
"""
206
207
# Message sending
208
async def send(self, message: dict[str, Any]) -> None:
209
"""
210
Send raw ASGI WebSocket message.
211
212
Args:
213
message: ASGI WebSocket message dict
214
"""
215
216
async def send_text(self, data: str) -> None:
217
"""
218
Send text message.
219
220
Args:
221
data: Text content to send
222
"""
223
224
async def send_bytes(self, data: bytes) -> None:
225
"""
226
Send binary message.
227
228
Args:
229
data: Binary content to send
230
"""
231
232
async def send_json(
233
self,
234
data: Any,
235
mode: str = "text"
236
) -> None:
237
"""
238
Send JSON message.
239
240
Args:
241
data: JSON-serializable data
242
mode: "text" or "binary" message mode
243
"""
244
245
def url_for(self, name: str, /, **path_params: Any) -> URL:
246
"""
247
Generate absolute URL for named route.
248
249
Args:
250
name: Route name
251
**path_params: Path parameter values
252
253
Returns:
254
URL: Absolute URL
255
"""
256
```
257
258
## WebSocket State and Exceptions
259
260
```python { .api }
261
from enum import Enum
262
263
class WebSocketState(Enum):
264
"""WebSocket connection states."""
265
266
CONNECTING = 0 # Initial state before accept/deny
267
CONNECTED = 1 # Connection accepted and active
268
DISCONNECTED = 2 # Connection closed
269
RESPONSE = 3 # HTTP response sent instead of WebSocket
270
271
class WebSocketDisconnect(Exception):
272
"""
273
Exception raised when WebSocket connection closes.
274
275
Contains close code and optional reason.
276
"""
277
278
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
279
"""
280
Initialize disconnect exception.
281
282
Args:
283
code: WebSocket close code
284
reason: Optional close reason
285
"""
286
self.code = code
287
self.reason = reason
288
289
class WebSocketClose:
290
"""
291
ASGI application for closing WebSocket connections.
292
293
Can be used as a route endpoint to immediately close connections.
294
"""
295
296
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
297
"""
298
Initialize WebSocket close handler.
299
300
Args:
301
code: Close code to send
302
reason: Close reason to send
303
"""
304
```
305
306
## Basic WebSocket Usage
307
308
### Simple Echo Server
309
310
```python { .api }
311
from starlette.applications import Starlette
312
from starlette.routing import WebSocketRoute
313
from starlette.websockets import WebSocket, WebSocketDisconnect
314
315
async def websocket_endpoint(websocket: WebSocket):
316
# Accept the connection
317
await websocket.accept()
318
319
try:
320
while True:
321
# Receive message from client
322
message = await websocket.receive_text()
323
324
# Echo message back
325
await websocket.send_text(f"Echo: {message}")
326
327
except WebSocketDisconnect:
328
print("Client disconnected")
329
330
app = Starlette(routes=[
331
WebSocketRoute("/ws", websocket_endpoint)
332
])
333
```
334
335
### JSON Message Handling
336
337
```python { .api }
338
async def json_websocket(websocket: WebSocket):
339
await websocket.accept()
340
341
try:
342
while True:
343
# Receive JSON data
344
data = await websocket.receive_json()
345
346
# Process message based on type
347
if data.get("type") == "ping":
348
await websocket.send_json({
349
"type": "pong",
350
"timestamp": time.time()
351
})
352
elif data.get("type") == "message":
353
await websocket.send_json({
354
"type": "response",
355
"original": data.get("content"),
356
"processed": data.get("content", "").upper()
357
})
358
else:
359
await websocket.send_json({
360
"type": "error",
361
"message": "Unknown message type"
362
})
363
364
except WebSocketDisconnect:
365
print("Client disconnected")
366
except ValueError as e:
367
# Invalid JSON
368
await websocket.send_json({
369
"type": "error",
370
"message": "Invalid JSON"
371
})
372
await websocket.close(code=1003) # Unsupported data
373
```
374
375
## WebSocket Routing
376
377
### Path Parameters
378
379
```python { .api }
380
from starlette.routing import WebSocketRoute
381
382
async def user_websocket(websocket: WebSocket):
383
# Extract path parameters
384
user_id = websocket.path_params["user_id"]
385
room_id = websocket.path_params.get("room_id")
386
387
await websocket.accept()
388
389
try:
390
# Send welcome message
391
await websocket.send_json({
392
"type": "welcome",
393
"user_id": int(user_id),
394
"room_id": room_id and int(room_id)
395
})
396
397
async for message in websocket.iter_json():
398
# Process user messages in room context
399
response = {
400
"type": "message",
401
"user_id": int(user_id),
402
"room_id": room_id and int(room_id),
403
"content": message.get("content")
404
}
405
await websocket.send_json(response)
406
407
except WebSocketDisconnect:
408
print(f"User {user_id} disconnected from room {room_id}")
409
410
routes = [
411
WebSocketRoute("/ws/user/{user_id:int}", user_websocket),
412
WebSocketRoute("/ws/room/{room_id:int}/user/{user_id:int}", user_websocket),
413
]
414
```
415
416
### Query Parameters and Headers
417
418
```python { .api }
419
async def websocket_with_auth(websocket: WebSocket):
420
# Check authentication in query params or headers
421
token = websocket.query_params.get("token")
422
auth_header = websocket.headers.get("authorization")
423
424
# Validate authentication
425
if not token and not auth_header:
426
await websocket.close(code=1008, reason="Authentication required")
427
return
428
429
# Extract user info from token
430
user = authenticate_token(token or auth_header.split(" ")[-1])
431
if not user:
432
await websocket.close(code=1008, reason="Invalid token")
433
return
434
435
# Store user in WebSocket state
436
websocket.state.user = user
437
438
await websocket.accept()
439
440
try:
441
await websocket.send_json({
442
"type": "authenticated",
443
"user": user.username
444
})
445
446
async for message in websocket.iter_json():
447
# Process authenticated user messages
448
await process_user_message(websocket.state.user, message)
449
450
except WebSocketDisconnect:
451
print(f"User {user.username} disconnected")
452
```
453
454
## Advanced WebSocket Patterns
455
456
### Connection Manager
457
458
```python { .api }
459
import json
460
from typing import Dict, Set
461
462
class ConnectionManager:
463
"""Manage multiple WebSocket connections."""
464
465
def __init__(self):
466
self.active_connections: Dict[str, Set[WebSocket]] = {}
467
self.user_connections: Dict[str, WebSocket] = {}
468
469
async def connect(self, websocket: WebSocket, room_id: str, user_id: str):
470
"""Add connection to room and user mapping."""
471
await websocket.accept()
472
473
# Add to room
474
if room_id not in self.active_connections:
475
self.active_connections[room_id] = set()
476
self.active_connections[room_id].add(websocket)
477
478
# Add to user mapping
479
self.user_connections[user_id] = websocket
480
481
# Notify room of new connection
482
await self.broadcast_to_room(room_id, {
483
"type": "user_joined",
484
"user_id": user_id,
485
"count": len(self.active_connections[room_id])
486
}, exclude=websocket)
487
488
def disconnect(self, websocket: WebSocket, room_id: str, user_id: str):
489
"""Remove connection from room and user mapping."""
490
if room_id in self.active_connections:
491
self.active_connections[room_id].discard(websocket)
492
if not self.active_connections[room_id]:
493
del self.active_connections[room_id]
494
495
if user_id in self.user_connections:
496
del self.user_connections[user_id]
497
498
async def send_personal_message(self, user_id: str, message: dict):
499
"""Send message to specific user."""
500
if user_id in self.user_connections:
501
websocket = self.user_connections[user_id]
502
await websocket.send_json(message)
503
504
async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None):
505
"""Broadcast message to all connections in room."""
506
if room_id in self.active_connections:
507
for connection in self.active_connections[room_id]:
508
if connection != exclude:
509
try:
510
await connection.send_json(message)
511
except:
512
# Connection is broken, remove it
513
self.active_connections[room_id].discard(connection)
514
515
# Global connection manager
516
manager = ConnectionManager()
517
518
async def chat_websocket(websocket: WebSocket):
519
room_id = websocket.path_params["room_id"]
520
user_id = websocket.query_params.get("user_id", "anonymous")
521
522
await manager.connect(websocket, room_id, user_id)
523
524
try:
525
async for data in websocket.iter_json():
526
message = {
527
"type": "message",
528
"user_id": user_id,
529
"room_id": room_id,
530
"content": data.get("content"),
531
"timestamp": time.time()
532
}
533
534
# Broadcast to room
535
await manager.broadcast_to_room(room_id, message)
536
537
except WebSocketDisconnect:
538
manager.disconnect(websocket, room_id, user_id)
539
540
# Notify room of disconnection
541
await manager.broadcast_to_room(room_id, {
542
"type": "user_left",
543
"user_id": user_id,
544
"count": len(manager.active_connections.get(room_id, set()))
545
})
546
```
547
548
### Subprotocol Handling
549
550
```python { .api }
551
async def subprotocol_websocket(websocket: WebSocket):
552
# Get requested subprotocols from client
553
subprotocols = websocket.headers.get("sec-websocket-protocol", "").split(",")
554
subprotocols = [p.strip() for p in subprotocols if p.strip()]
555
556
# Select supported subprotocol
557
supported = ["chat", "notification", "api.v1"]
558
selected = None
559
560
for protocol in subprotocols:
561
if protocol in supported:
562
selected = protocol
563
break
564
565
# Accept with selected subprotocol
566
await websocket.accept(subprotocol=selected)
567
568
try:
569
# Handle different protocols
570
if selected == "chat":
571
await handle_chat_protocol(websocket)
572
elif selected == "notification":
573
await handle_notification_protocol(websocket)
574
elif selected == "api.v1":
575
await handle_api_protocol(websocket)
576
else:
577
await handle_default_protocol(websocket)
578
579
except WebSocketDisconnect:
580
print(f"Client disconnected (protocol: {selected})")
581
582
async def handle_chat_protocol(websocket: WebSocket):
583
"""Handle chat-specific protocol."""
584
async for message in websocket.iter_text():
585
# Chat protocol: plain text messages
586
response = f"[CHAT] {message}"
587
await websocket.send_text(response)
588
589
async def handle_api_protocol(websocket: WebSocket):
590
"""Handle API-specific protocol."""
591
async for data in websocket.iter_json():
592
# API protocol: structured JSON commands
593
if data.get("command") == "list_users":
594
await websocket.send_json({
595
"type": "user_list",
596
"users": get_active_users()
597
})
598
elif data.get("command") == "send_message":
599
await websocket.send_json({
600
"type": "message_sent",
601
"id": data.get("message_id")
602
})
603
```
604
605
### Error Handling and Graceful Shutdown
606
607
```python { .api }
608
import asyncio
609
import signal
610
from typing import Set
611
612
class WebSocketServer:
613
def __init__(self):
614
self.connections: Set[WebSocket] = set()
615
self.shutdown_event = asyncio.Event()
616
617
async def websocket_endpoint(self, websocket: WebSocket):
618
await websocket.accept()
619
self.connections.add(websocket)
620
621
try:
622
# Send initial connection message
623
await websocket.send_json({
624
"type": "connected",
625
"server_time": time.time()
626
})
627
628
# Handle messages with timeout
629
while not self.shutdown_event.is_set():
630
try:
631
# Use timeout to allow checking shutdown event
632
message = await asyncio.wait_for(
633
websocket.receive_json(),
634
timeout=1.0
635
)
636
637
# Process message
638
await self.process_message(websocket, message)
639
640
except asyncio.TimeoutError:
641
# Timeout is normal, continue loop
642
continue
643
except ValueError:
644
# Invalid JSON
645
await websocket.send_json({
646
"type": "error",
647
"message": "Invalid JSON format"
648
})
649
except Exception as e:
650
# Unexpected error
651
await websocket.send_json({
652
"type": "error",
653
"message": "Server error occurred"
654
})
655
break
656
657
except WebSocketDisconnect:
658
pass
659
finally:
660
# Cleanup connection
661
self.connections.discard(websocket)
662
print(f"Connection closed. Active: {len(self.connections)}")
663
664
async def process_message(self, websocket: WebSocket, message: dict):
665
"""Process incoming message with error handling."""
666
try:
667
msg_type = message.get("type")
668
669
if msg_type == "ping":
670
await websocket.send_json({"type": "pong"})
671
elif msg_type == "echo":
672
await websocket.send_json({
673
"type": "echo_response",
674
"data": message.get("data")
675
})
676
else:
677
await websocket.send_json({
678
"type": "error",
679
"message": f"Unknown message type: {msg_type}"
680
})
681
682
except Exception as e:
683
print(f"Error processing message: {e}")
684
await websocket.send_json({
685
"type": "error",
686
"message": "Failed to process message"
687
})
688
689
async def broadcast_shutdown(self):
690
"""Notify all connections of shutdown."""
691
if self.connections:
692
message = {
693
"type": "server_shutdown",
694
"message": "Server is shutting down"
695
}
696
697
# Send to all connections
698
await asyncio.gather(
699
*[conn.send_json(message) for conn in self.connections],
700
return_exceptions=True
701
)
702
703
# Close all connections
704
await asyncio.gather(
705
*[conn.close(code=1001, reason="Server shutdown") for conn in self.connections],
706
return_exceptions=True
707
)
708
709
def setup_shutdown_handlers(self):
710
"""Setup graceful shutdown handlers."""
711
def signal_handler(sig, frame):
712
print(f"Received signal {sig}, starting graceful shutdown...")
713
self.shutdown_event.set()
714
715
signal.signal(signal.SIGTERM, signal_handler)
716
signal.signal(signal.SIGINT, signal_handler)
717
718
# Usage
719
server = WebSocketServer()
720
server.setup_shutdown_handlers()
721
722
async def websocket_endpoint(websocket: WebSocket):
723
await server.websocket_endpoint(websocket)
724
```
725
726
## Testing WebSocket Connections
727
728
```python { .api }
729
from starlette.testclient import TestClient
730
731
def test_websocket():
732
with TestClient(app) as client:
733
with client.websocket_connect("/ws") as websocket:
734
# Send message
735
websocket.send_text("Hello")
736
737
# Receive response
738
data = websocket.receive_text()
739
assert data == "Echo: Hello"
740
741
def test_websocket_json():
742
with TestClient(app) as client:
743
with client.websocket_connect("/ws/json") as websocket:
744
# Send JSON message
745
websocket.send_json({"type": "ping"})
746
747
# Receive JSON response
748
data = websocket.receive_json()
749
assert data["type"] == "pong"
750
751
def test_websocket_with_params():
752
with TestClient(app) as client:
753
with client.websocket_connect("/ws/user/123?token=abc") as websocket:
754
# Test authenticated connection
755
welcome = websocket.receive_json()
756
assert welcome["user_id"] == 123
757
758
def test_websocket_disconnect():
759
with TestClient(app) as client:
760
with client.websocket_connect("/ws") as websocket:
761
websocket.send_text("Hello")
762
websocket.close()
763
764
# Verify connection closed gracefully
765
assert websocket.client_state == WebSocketState.DISCONNECTED
766
```
767
768
## Real-time Applications
769
770
### Live Updates
771
772
```python { .api }
773
import asyncio
774
from datetime import datetime
775
776
async def live_updates_websocket(websocket: WebSocket):
777
await websocket.accept()
778
779
try:
780
# Send periodic updates
781
while True:
782
# Get latest data
783
data = {
784
"type": "update",
785
"timestamp": datetime.now().isoformat(),
786
"data": get_latest_data(),
787
"metrics": get_system_metrics()
788
}
789
790
await websocket.send_json(data)
791
792
# Wait before next update
793
await asyncio.sleep(5)
794
795
except WebSocketDisconnect:
796
print("Client disconnected from live updates")
797
798
def get_latest_data():
799
# Fetch real-time data
800
return {"value": random.randint(1, 100)}
801
802
def get_system_metrics():
803
# Get system metrics
804
return {
805
"cpu": psutil.cpu_percent(),
806
"memory": psutil.virtual_memory().percent
807
}
808
```
809
810
### Collaborative Editing
811
812
```python { .api }
813
class DocumentManager:
814
def __init__(self):
815
self.documents = {}
816
self.connections = {}
817
818
async def join_document(self, websocket: WebSocket, doc_id: str, user_id: str):
819
if doc_id not in self.documents:
820
self.documents[doc_id] = {"content": "", "version": 0}
821
822
if doc_id not in self.connections:
823
self.connections[doc_id] = {}
824
825
self.connections[doc_id][user_id] = websocket
826
827
# Send current document state
828
await websocket.send_json({
829
"type": "document_state",
830
"content": self.documents[doc_id]["content"],
831
"version": self.documents[doc_id]["version"]
832
})
833
834
# Notify other users
835
await self.broadcast_to_document(doc_id, {
836
"type": "user_joined",
837
"user_id": user_id
838
}, exclude=user_id)
839
840
async def handle_edit(self, doc_id: str, user_id: str, operation: dict):
841
# Apply operation to document
842
self.documents[doc_id]["content"] = apply_operation(
843
self.documents[doc_id]["content"],
844
operation
845
)
846
self.documents[doc_id]["version"] += 1
847
848
# Broadcast change to other users
849
await self.broadcast_to_document(doc_id, {
850
"type": "document_change",
851
"operation": operation,
852
"user_id": user_id,
853
"version": self.documents[doc_id]["version"]
854
}, exclude=user_id)
855
856
async def broadcast_to_document(self, doc_id: str, message: dict, exclude: str = None):
857
if doc_id in self.connections:
858
for user_id, websocket in self.connections[doc_id].items():
859
if user_id != exclude:
860
try:
861
await websocket.send_json(message)
862
except:
863
# Remove broken connection
864
del self.connections[doc_id][user_id]
865
```
866
867
Starlette's WebSocket support enables building real-time, interactive applications with robust connection management, error handling, and testing capabilities.