0
# WebSocket Support
1
2
FastAPI provides comprehensive WebSocket support for real-time bidirectional communication between clients and servers. WebSocket connections enable live updates, chat applications, streaming data, and interactive features with full integration into FastAPI's dependency injection and validation systems.
3
4
## Capabilities
5
6
### WebSocket Connection Object
7
8
Object representing an active WebSocket connection with methods for sending and receiving messages.
9
10
```python { .api }
11
class WebSocket:
12
"""
13
WebSocket connection object providing bidirectional communication.
14
15
Attributes (read-only):
16
- url: WebSocket URL with protocol, host, path, and query parameters
17
- headers: HTTP headers from the WebSocket handshake
18
- query_params: Query parameters from the WebSocket URL
19
- path_params: Path parameters extracted from URL pattern
20
- cookies: HTTP cookies from the handshake request
21
- client: Client connection information (host, port)
22
- state: Application state for storing data across the connection
23
"""
24
25
url: URL
26
headers: Headers
27
query_params: QueryParams
28
path_params: Dict[str, Any]
29
cookies: Dict[str, str]
30
client: Optional[Address]
31
state: State
32
33
async def accept(
34
self,
35
subprotocol: Optional[str] = None,
36
headers: Optional[Dict[str, str]] = None
37
) -> None:
38
"""
39
Accept the WebSocket connection.
40
41
Parameters:
42
- subprotocol: WebSocket subprotocol to use
43
- headers: Additional headers to send in handshake response
44
45
Note: Must be called before sending or receiving messages
46
"""
47
48
async def close(
49
self,
50
code: int = 1000,
51
reason: Optional[str] = None
52
) -> None:
53
"""
54
Close the WebSocket connection.
55
56
Parameters:
57
- code: WebSocket close code (1000 = normal closure)
58
- reason: Optional reason string for the close
59
"""
60
61
async def send_text(self, data: str) -> None:
62
"""
63
Send text message to client.
64
65
Parameters:
66
- data: Text string to send
67
"""
68
69
async def send_bytes(self, data: bytes) -> None:
70
"""
71
Send binary message to client.
72
73
Parameters:
74
- data: Binary data to send
75
"""
76
77
async def send_json(self, data: Any, mode: str = "text") -> None:
78
"""
79
Send JSON message to client.
80
81
Parameters:
82
- data: Python object to serialize as JSON
83
- mode: Send as "text" or "binary" message
84
"""
85
86
async def receive_text(self) -> str:
87
"""
88
Receive text message from client.
89
90
Returns:
91
Text string from client
92
93
Raises:
94
WebSocketDisconnect: If connection is closed
95
"""
96
97
async def receive_bytes(self) -> bytes:
98
"""
99
Receive binary message from client.
100
101
Returns:
102
Binary data from client
103
104
Raises:
105
WebSocketDisconnect: If connection is closed
106
"""
107
108
async def receive_json(self, mode: str = "text") -> Any:
109
"""
110
Receive and parse JSON message from client.
111
112
Parameters:
113
- mode: Expect "text" or "binary" message
114
115
Returns:
116
Parsed JSON data (dict, list, or primitive)
117
118
Raises:
119
WebSocketDisconnect: If connection is closed
120
JSONDecodeError: If message is not valid JSON
121
"""
122
123
async def iter_text(self) -> AsyncIterator[str]:
124
"""
125
Async iterator for receiving text messages.
126
127
Yields:
128
Text messages from client until connection closes
129
"""
130
131
async def iter_bytes(self) -> AsyncIterator[bytes]:
132
"""
133
Async iterator for receiving binary messages.
134
135
Yields:
136
Binary messages from client until connection closes
137
"""
138
139
async def iter_json(self) -> AsyncIterator[Any]:
140
"""
141
Async iterator for receiving JSON messages.
142
143
Yields:
144
Parsed JSON messages from client until connection closes
145
"""
146
```
147
148
### WebSocket Disconnect Exception
149
150
Exception raised when WebSocket connection is closed by client or due to network issues.
151
152
```python { .api }
153
class WebSocketDisconnect(Exception):
154
def __init__(self, code: int = 1000, reason: Optional[str] = None) -> None:
155
"""
156
WebSocket disconnection exception.
157
158
Parameters:
159
- code: WebSocket close code from client
160
- reason: Optional reason string from client
161
162
Standard close codes:
163
- 1000: Normal closure
164
- 1001: Going away (page refresh, navigation)
165
- 1002: Protocol error
166
- 1003: Unsupported data type
167
- 1006: Abnormal closure (no close frame)
168
- 1011: Server error
169
"""
170
self.code = code
171
self.reason = reason
172
```
173
174
### WebSocket State Enum
175
176
Enumeration of WebSocket connection states.
177
178
```python { .api }
179
class WebSocketState(Enum):
180
"""
181
WebSocket connection state enumeration.
182
183
Values:
184
- CONNECTING: Connection handshake in progress
185
- CONNECTED: Connection established and ready
186
- DISCONNECTED: Connection closed
187
"""
188
CONNECTING = 0
189
CONNECTED = 1
190
DISCONNECTED = 2
191
```
192
193
## WebSocket Routing
194
195
### WebSocket Route Decorator
196
197
Decorator for defining WebSocket endpoints with dependency injection support.
198
199
```python { .api }
200
@app.websocket("/ws/{path}")
201
async def websocket_endpoint(
202
websocket: WebSocket,
203
path_param: str,
204
query_param: str = Query(...),
205
dependency_result = Depends(some_dependency)
206
):
207
"""
208
WebSocket endpoint function.
209
210
Parameters:
211
- websocket: WebSocket connection object (automatically injected)
212
- path_param: Path parameters work the same as HTTP routes
213
- query_param: Query parameters with validation support
214
- dependency_result: Dependencies work the same as HTTP routes
215
216
Note: Must accept WebSocket as first parameter
217
"""
218
```
219
220
## Usage Examples
221
222
### Basic WebSocket Echo Server
223
224
```python
225
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
226
227
app = FastAPI()
228
229
@app.websocket("/ws")
230
async def websocket_endpoint(websocket: WebSocket):
231
await websocket.accept()
232
try:
233
while True:
234
data = await websocket.receive_text()
235
await websocket.send_text(f"Message text was: {data}")
236
except WebSocketDisconnect:
237
print("Client disconnected")
238
```
239
240
### WebSocket Chat Room
241
242
```python
243
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
244
from typing import List
245
import json
246
247
app = FastAPI()
248
249
class ConnectionManager:
250
def __init__(self):
251
self.active_connections: List[WebSocket] = []
252
253
async def connect(self, websocket: WebSocket):
254
await websocket.accept()
255
self.active_connections.append(websocket)
256
257
def disconnect(self, websocket: WebSocket):
258
self.active_connections.remove(websocket)
259
260
async def send_personal_message(self, message: str, websocket: WebSocket):
261
await websocket.send_text(message)
262
263
async def broadcast(self, message: str):
264
for connection in self.active_connections:
265
try:
266
await connection.send_text(message)
267
except:
268
# Remove broken connections
269
self.active_connections.remove(connection)
270
271
manager = ConnectionManager()
272
273
@app.websocket("/ws/{client_id}")
274
async def websocket_endpoint(websocket: WebSocket, client_id: str):
275
await manager.connect(websocket)
276
277
# Notify others about new connection
278
await manager.broadcast(f"Client #{client_id} joined the chat")
279
280
try:
281
while True:
282
data = await websocket.receive_text()
283
message = f"Client #{client_id}: {data}"
284
await manager.broadcast(message)
285
286
except WebSocketDisconnect:
287
manager.disconnect(websocket)
288
await manager.broadcast(f"Client #{client_id} left the chat")
289
```
290
291
### WebSocket with Authentication and Validation
292
293
```python
294
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
295
from fastapi.security import HTTPBearer
296
import json
297
import jwt
298
299
app = FastAPI()
300
301
# Security scheme for WebSocket authentication
302
security = HTTPBearer()
303
304
def verify_websocket_token(token: str = Query(...)):
305
"""Verify WebSocket authentication token from query parameter."""
306
try:
307
payload = jwt.decode(token, "secret", algorithms=["HS256"])
308
return payload
309
except jwt.InvalidTokenError:
310
raise HTTPException(status_code=401, detail="Invalid token")
311
312
@app.websocket("/ws")
313
async def websocket_endpoint(
314
websocket: WebSocket,
315
user: dict = Depends(verify_websocket_token)
316
):
317
await websocket.accept()
318
319
# Send welcome message with user info
320
await websocket.send_json({
321
"type": "welcome",
322
"message": f"Welcome {user['username']}!",
323
"user_id": user["user_id"]
324
})
325
326
try:
327
while True:
328
# Receive JSON messages
329
data = await websocket.receive_json()
330
331
# Validate message structure
332
if "type" not in data:
333
await websocket.send_json({
334
"type": "error",
335
"message": "Message must include 'type' field"
336
})
337
continue
338
339
# Handle different message types
340
if data["type"] == "ping":
341
await websocket.send_json({"type": "pong"})
342
343
elif data["type"] == "message":
344
if "content" not in data:
345
await websocket.send_json({
346
"type": "error",
347
"message": "Message content is required"
348
})
349
continue
350
351
# Echo message with user info
352
await websocket.send_json({
353
"type": "message_received",
354
"content": data["content"],
355
"from_user": user["username"],
356
"timestamp": "2023-01-01T00:00:00Z"
357
})
358
359
else:
360
await websocket.send_json({
361
"type": "error",
362
"message": f"Unknown message type: {data['type']}"
363
})
364
365
except WebSocketDisconnect:
366
print(f"User {user['username']} disconnected")
367
368
except Exception as e:
369
print(f"Error in WebSocket connection: {e}")
370
await websocket.close(code=1011, reason="Internal server error")
371
```
372
373
### Real-time Data Streaming
374
375
```python
376
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
377
import asyncio
378
import json
379
import random
380
from datetime import datetime
381
382
app = FastAPI()
383
384
async def generate_stock_data():
385
"""Generate mock stock price data."""
386
stocks = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
387
388
while True:
389
for stock in stocks:
390
price = round(random.uniform(100, 1000), 2)
391
change = round(random.uniform(-5, 5), 2)
392
yield {
393
"symbol": stock,
394
"price": price,
395
"change": change,
396
"timestamp": datetime.now().isoformat()
397
}
398
await asyncio.sleep(1)
399
400
@app.websocket("/ws/stocks")
401
async def stock_stream(websocket: WebSocket):
402
await websocket.accept()
403
404
try:
405
# Send initial message
406
await websocket.send_json({
407
"type": "connected",
408
"message": "Stock price stream connected"
409
})
410
411
# Stream stock data
412
async for stock_data in generate_stock_data():
413
await websocket.send_json({
414
"type": "stock_update",
415
"data": stock_data
416
})
417
418
# Check if client is still connected by trying to receive
419
# (with a very short timeout)
420
try:
421
await asyncio.wait_for(websocket.receive_text(), timeout=0.001)
422
except asyncio.TimeoutError:
423
# No message received, continue streaming
424
pass
425
except WebSocketDisconnect:
426
break
427
428
except WebSocketDisconnect:
429
print("Stock stream client disconnected")
430
```
431
432
### WebSocket with Room Management
433
434
```python
435
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
436
from typing import Dict, List, Set
437
import json
438
439
app = FastAPI()
440
441
class RoomManager:
442
def __init__(self):
443
# rooms[room_id] = set of websocket connections
444
self.rooms: Dict[str, Set[WebSocket]] = {}
445
# websocket_to_room[websocket] = room_id
446
self.websocket_to_room: Dict[WebSocket, str] = {}
447
448
async def join_room(self, websocket: WebSocket, room_id: str):
449
await websocket.accept()
450
451
if room_id not in self.rooms:
452
self.rooms[room_id] = set()
453
454
self.rooms[room_id].add(websocket)
455
self.websocket_to_room[websocket] = room_id
456
457
# Notify room about new member
458
await self.broadcast_to_room(
459
room_id,
460
{"type": "user_joined", "room_id": room_id, "member_count": len(self.rooms[room_id])},
461
exclude=websocket
462
)
463
464
def leave_room(self, websocket: WebSocket):
465
room_id = self.websocket_to_room.get(websocket)
466
if room_id and room_id in self.rooms:
467
self.rooms[room_id].discard(websocket)
468
469
# Remove room if empty
470
if not self.rooms[room_id]:
471
del self.rooms[room_id]
472
else:
473
# Notify remaining members
474
asyncio.create_task(
475
self.broadcast_to_room(
476
room_id,
477
{"type": "user_left", "room_id": room_id, "member_count": len(self.rooms[room_id])}
478
)
479
)
480
481
self.websocket_to_room.pop(websocket, None)
482
483
async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None):
484
if room_id not in self.rooms:
485
return
486
487
disconnected = set()
488
for websocket in self.rooms[room_id]:
489
if websocket == exclude:
490
continue
491
492
try:
493
await websocket.send_json(message)
494
except:
495
disconnected.add(websocket)
496
497
# Clean up disconnected websockets
498
for websocket in disconnected:
499
self.rooms[room_id].discard(websocket)
500
self.websocket_to_room.pop(websocket, None)
501
502
room_manager = RoomManager()
503
504
@app.websocket("/ws/room/{room_id}")
505
async def room_websocket(websocket: WebSocket, room_id: str):
506
await room_manager.join_room(websocket, room_id)
507
508
try:
509
# Send welcome message
510
await websocket.send_json({
511
"type": "welcome",
512
"room_id": room_id,
513
"message": f"Welcome to room {room_id}"
514
})
515
516
while True:
517
data = await websocket.receive_json()
518
519
# Handle different message types
520
if data.get("type") == "message":
521
# Broadcast message to all room members
522
await room_manager.broadcast_to_room(room_id, {
523
"type": "room_message",
524
"room_id": room_id,
525
"message": data.get("content", ""),
526
"timestamp": "2023-01-01T00:00:00Z"
527
})
528
529
elif data.get("type") == "ping":
530
await websocket.send_json({"type": "pong"})
531
532
except WebSocketDisconnect:
533
room_manager.leave_room(websocket)
534
print(f"Client left room {room_id}")
535
```
536
537
### WebSocket Error Handling and Reconnection
538
539
```python
540
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, WebSocketException
541
import asyncio
542
import json
543
import logging
544
545
app = FastAPI()
546
logger = logging.getLogger(__name__)
547
548
class WebSocketHandler:
549
def __init__(self, websocket: WebSocket):
550
self.websocket = websocket
551
self.is_connected = False
552
553
async def connect(self):
554
await self.websocket.accept()
555
self.is_connected = True
556
logger.info("WebSocket connected")
557
558
async def disconnect(self, code: int = 1000, reason: str = None):
559
if self.is_connected:
560
await self.websocket.close(code=code, reason=reason)
561
self.is_connected = False
562
logger.info(f"WebSocket disconnected: {code} - {reason}")
563
564
async def send_safe(self, message: dict):
565
"""Send message with error handling."""
566
if not self.is_connected:
567
return False
568
569
try:
570
await self.websocket.send_json(message)
571
return True
572
except Exception as e:
573
logger.error(f"Failed to send message: {e}")
574
self.is_connected = False
575
return False
576
577
async def receive_safe(self):
578
"""Receive message with error handling."""
579
try:
580
return await self.websocket.receive_json()
581
except WebSocketDisconnect as e:
582
logger.info(f"WebSocket disconnected: {e.code} - {e.reason}")
583
self.is_connected = False
584
raise
585
except Exception as e:
586
logger.error(f"Failed to receive message: {e}")
587
await self.disconnect(code=1002, reason="Protocol error")
588
raise WebSocketException(code=1002, reason="Protocol error")
589
590
@app.websocket("/ws/robust")
591
async def robust_websocket(websocket: WebSocket):
592
handler = WebSocketHandler(websocket)
593
594
try:
595
await handler.connect()
596
597
# Send connection info
598
await handler.send_safe({
599
"type": "connection_info",
600
"message": "Connected successfully",
601
"heartbeat_interval": 30
602
})
603
604
# Set up heartbeat task
605
async def heartbeat():
606
while handler.is_connected:
607
await asyncio.sleep(30)
608
if not await handler.send_safe({"type": "heartbeat"}):
609
break
610
611
heartbeat_task = asyncio.create_task(heartbeat())
612
613
# Main message loop
614
while handler.is_connected:
615
try:
616
data = await handler.receive_safe()
617
618
# Handle different message types
619
if data.get("type") == "heartbeat_response":
620
logger.debug("Received heartbeat response")
621
continue
622
623
elif data.get("type") == "echo":
624
await handler.send_safe({
625
"type": "echo_response",
626
"original_message": data.get("message", "")
627
})
628
629
elif data.get("type") == "error_test":
630
# Simulate different error conditions
631
error_type = data.get("error_type", "generic")
632
633
if error_type == "protocol_error":
634
raise WebSocketException(code=1002, reason="Test protocol error")
635
elif error_type == "invalid_data":
636
raise WebSocketException(code=1003, reason="Test invalid data")
637
else:
638
raise Exception("Test generic error")
639
640
else:
641
await handler.send_safe({
642
"type": "error",
643
"message": f"Unknown message type: {data.get('type')}"
644
})
645
646
except (WebSocketDisconnect, WebSocketException):
647
break
648
except Exception as e:
649
logger.error(f"Unexpected error in WebSocket handler: {e}")
650
await handler.send_safe({
651
"type": "error",
652
"message": "An unexpected error occurred"
653
})
654
655
# Clean up heartbeat task
656
heartbeat_task.cancel()
657
try:
658
await heartbeat_task
659
except asyncio.CancelledError:
660
pass
661
662
except Exception as e:
663
logger.error(f"Fatal error in WebSocket connection: {e}")
664
665
finally:
666
if handler.is_connected:
667
await handler.disconnect()
668
```
669
670
### WebSocket Testing Support
671
672
```python
673
from fastapi import FastAPI, WebSocket
674
from fastapi.testclient import TestClient
675
import pytest
676
677
app = FastAPI()
678
679
@app.websocket("/ws/test")
680
async def test_websocket(websocket: WebSocket):
681
await websocket.accept()
682
683
# Echo messages with uppercase transformation
684
try:
685
while True:
686
data = await websocket.receive_text()
687
await websocket.send_text(data.upper())
688
except WebSocketDisconnect:
689
pass
690
691
# Test client usage
692
def test_websocket_echo():
693
client = TestClient(app)
694
695
with client.websocket_connect("/ws/test") as websocket:
696
websocket.send_text("hello")
697
data = websocket.receive_text()
698
assert data == "HELLO"
699
700
websocket.send_text("world")
701
data = websocket.receive_text()
702
assert data == "WORLD"
703
704
@pytest.mark.asyncio
705
async def test_websocket_json():
706
client = TestClient(app)
707
708
with client.websocket_connect("/ws/test") as websocket:
709
test_data = {"message": "test"}
710
websocket.send_json(test_data)
711
712
# Since our endpoint expects text, this would need
713
# a JSON-aware endpoint in practice
714
response = websocket.receive_text()
715
assert response is not None
716
```