FastAPI framework, high performance, easy to learn, fast to code, ready for production - slim version without standard dependencies
—
FastAPI provides comprehensive WebSocket support for real-time bidirectional communication between clients and servers. WebSocket connections enable live updates, chat applications, streaming data, and interactive features with full integration into FastAPI's dependency injection and validation systems.
Object representing an active WebSocket connection with methods for sending and receiving messages.
class WebSocket:
"""
WebSocket connection object providing bidirectional communication.
Attributes (read-only):
- url: WebSocket URL with protocol, host, path, and query parameters
- headers: HTTP headers from the WebSocket handshake
- query_params: Query parameters from the WebSocket URL
- path_params: Path parameters extracted from URL pattern
- cookies: HTTP cookies from the handshake request
- client: Client connection information (host, port)
- state: Application state for storing data across the connection
"""
url: URL
headers: Headers
query_params: QueryParams
path_params: Dict[str, Any]
cookies: Dict[str, str]
client: Optional[Address]
state: State
async def accept(
self,
subprotocol: Optional[str] = None,
headers: Optional[Dict[str, str]] = None
) -> None:
"""
Accept the WebSocket connection.
Parameters:
- subprotocol: WebSocket subprotocol to use
- headers: Additional headers to send in handshake response
Note: Must be called before sending or receiving messages
"""
async def close(
self,
code: int = 1000,
reason: Optional[str] = None
) -> None:
"""
Close the WebSocket connection.
Parameters:
- code: WebSocket close code (1000 = normal closure)
- reason: Optional reason string for the close
"""
async def send_text(self, data: str) -> None:
"""
Send text message to client.
Parameters:
- data: Text string to send
"""
async def send_bytes(self, data: bytes) -> None:
"""
Send binary message to client.
Parameters:
- data: Binary data to send
"""
async def send_json(self, data: Any, mode: str = "text") -> None:
"""
Send JSON message to client.
Parameters:
- data: Python object to serialize as JSON
- mode: Send as "text" or "binary" message
"""
async def receive_text(self) -> str:
"""
Receive text message from client.
Returns:
Text string from client
Raises:
WebSocketDisconnect: If connection is closed
"""
async def receive_bytes(self) -> bytes:
"""
Receive binary message from client.
Returns:
Binary data from client
Raises:
WebSocketDisconnect: If connection is closed
"""
async def receive_json(self, mode: str = "text") -> Any:
"""
Receive and parse JSON message from client.
Parameters:
- mode: Expect "text" or "binary" message
Returns:
Parsed JSON data (dict, list, or primitive)
Raises:
WebSocketDisconnect: If connection is closed
JSONDecodeError: If message is not valid JSON
"""
async def iter_text(self) -> AsyncIterator[str]:
"""
Async iterator for receiving text messages.
Yields:
Text messages from client until connection closes
"""
async def iter_bytes(self) -> AsyncIterator[bytes]:
"""
Async iterator for receiving binary messages.
Yields:
Binary messages from client until connection closes
"""
async def iter_json(self) -> AsyncIterator[Any]:
"""
Async iterator for receiving JSON messages.
Yields:
Parsed JSON messages from client until connection closes
"""Exception raised when WebSocket connection is closed by client or due to network issues.
class WebSocketDisconnect(Exception):
def __init__(self, code: int = 1000, reason: Optional[str] = None) -> None:
"""
WebSocket disconnection exception.
Parameters:
- code: WebSocket close code from client
- reason: Optional reason string from client
Standard close codes:
- 1000: Normal closure
- 1001: Going away (page refresh, navigation)
- 1002: Protocol error
- 1003: Unsupported data type
- 1006: Abnormal closure (no close frame)
- 1011: Server error
"""
self.code = code
self.reason = reasonEnumeration of WebSocket connection states.
class WebSocketState(Enum):
"""
WebSocket connection state enumeration.
Values:
- CONNECTING: Connection handshake in progress
- CONNECTED: Connection established and ready
- DISCONNECTED: Connection closed
"""
CONNECTING = 0
CONNECTED = 1
DISCONNECTED = 2Decorator for defining WebSocket endpoints with dependency injection support.
@app.websocket("/ws/{path}")
async def websocket_endpoint(
websocket: WebSocket,
path_param: str,
query_param: str = Query(...),
dependency_result = Depends(some_dependency)
):
"""
WebSocket endpoint function.
Parameters:
- websocket: WebSocket connection object (automatically injected)
- path_param: Path parameters work the same as HTTP routes
- query_param: Query parameters with validation support
- dependency_result: Dependencies work the same as HTTP routes
Note: Must accept WebSocket as first parameter
"""from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
print("Client disconnected")from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
# Remove broken connections
self.active_connections.remove(connection)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
# Notify others about new connection
await manager.broadcast(f"Client #{client_id} joined the chat")
try:
while True:
data = await websocket.receive_text()
message = f"Client #{client_id}: {data}"
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
from fastapi.security import HTTPBearer
import json
import jwt
app = FastAPI()
# Security scheme for WebSocket authentication
security = HTTPBearer()
def verify_websocket_token(token: str = Query(...)):
"""Verify WebSocket authentication token from query parameter."""
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
user: dict = Depends(verify_websocket_token)
):
await websocket.accept()
# Send welcome message with user info
await websocket.send_json({
"type": "welcome",
"message": f"Welcome {user['username']}!",
"user_id": user["user_id"]
})
try:
while True:
# Receive JSON messages
data = await websocket.receive_json()
# Validate message structure
if "type" not in data:
await websocket.send_json({
"type": "error",
"message": "Message must include 'type' field"
})
continue
# Handle different message types
if data["type"] == "ping":
await websocket.send_json({"type": "pong"})
elif data["type"] == "message":
if "content" not in data:
await websocket.send_json({
"type": "error",
"message": "Message content is required"
})
continue
# Echo message with user info
await websocket.send_json({
"type": "message_received",
"content": data["content"],
"from_user": user["username"],
"timestamp": "2023-01-01T00:00:00Z"
})
else:
await websocket.send_json({
"type": "error",
"message": f"Unknown message type: {data['type']}"
})
except WebSocketDisconnect:
print(f"User {user['username']} disconnected")
except Exception as e:
print(f"Error in WebSocket connection: {e}")
await websocket.close(code=1011, reason="Internal server error")from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
import random
from datetime import datetime
app = FastAPI()
async def generate_stock_data():
"""Generate mock stock price data."""
stocks = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
while True:
for stock in stocks:
price = round(random.uniform(100, 1000), 2)
change = round(random.uniform(-5, 5), 2)
yield {
"symbol": stock,
"price": price,
"change": change,
"timestamp": datetime.now().isoformat()
}
await asyncio.sleep(1)
@app.websocket("/ws/stocks")
async def stock_stream(websocket: WebSocket):
await websocket.accept()
try:
# Send initial message
await websocket.send_json({
"type": "connected",
"message": "Stock price stream connected"
})
# Stream stock data
async for stock_data in generate_stock_data():
await websocket.send_json({
"type": "stock_update",
"data": stock_data
})
# Check if client is still connected by trying to receive
# (with a very short timeout)
try:
await asyncio.wait_for(websocket.receive_text(), timeout=0.001)
except asyncio.TimeoutError:
# No message received, continue streaming
pass
except WebSocketDisconnect:
break
except WebSocketDisconnect:
print("Stock stream client disconnected")from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List, Set
import json
app = FastAPI()
class RoomManager:
def __init__(self):
# rooms[room_id] = set of websocket connections
self.rooms: Dict[str, Set[WebSocket]] = {}
# websocket_to_room[websocket] = room_id
self.websocket_to_room: Dict[WebSocket, str] = {}
async def join_room(self, websocket: WebSocket, room_id: str):
await websocket.accept()
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(websocket)
self.websocket_to_room[websocket] = room_id
# Notify room about new member
await self.broadcast_to_room(
room_id,
{"type": "user_joined", "room_id": room_id, "member_count": len(self.rooms[room_id])},
exclude=websocket
)
def leave_room(self, websocket: WebSocket):
room_id = self.websocket_to_room.get(websocket)
if room_id and room_id in self.rooms:
self.rooms[room_id].discard(websocket)
# Remove room if empty
if not self.rooms[room_id]:
del self.rooms[room_id]
else:
# Notify remaining members
asyncio.create_task(
self.broadcast_to_room(
room_id,
{"type": "user_left", "room_id": room_id, "member_count": len(self.rooms[room_id])}
)
)
self.websocket_to_room.pop(websocket, None)
async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None):
if room_id not in self.rooms:
return
disconnected = set()
for websocket in self.rooms[room_id]:
if websocket == exclude:
continue
try:
await websocket.send_json(message)
except:
disconnected.add(websocket)
# Clean up disconnected websockets
for websocket in disconnected:
self.rooms[room_id].discard(websocket)
self.websocket_to_room.pop(websocket, None)
room_manager = RoomManager()
@app.websocket("/ws/room/{room_id}")
async def room_websocket(websocket: WebSocket, room_id: str):
await room_manager.join_room(websocket, room_id)
try:
# Send welcome message
await websocket.send_json({
"type": "welcome",
"room_id": room_id,
"message": f"Welcome to room {room_id}"
})
while True:
data = await websocket.receive_json()
# Handle different message types
if data.get("type") == "message":
# Broadcast message to all room members
await room_manager.broadcast_to_room(room_id, {
"type": "room_message",
"room_id": room_id,
"message": data.get("content", ""),
"timestamp": "2023-01-01T00:00:00Z"
})
elif data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
room_manager.leave_room(websocket)
print(f"Client left room {room_id}")from fastapi import FastAPI, WebSocket, WebSocketDisconnect, WebSocketException
import asyncio
import json
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
class WebSocketHandler:
def __init__(self, websocket: WebSocket):
self.websocket = websocket
self.is_connected = False
async def connect(self):
await self.websocket.accept()
self.is_connected = True
logger.info("WebSocket connected")
async def disconnect(self, code: int = 1000, reason: str = None):
if self.is_connected:
await self.websocket.close(code=code, reason=reason)
self.is_connected = False
logger.info(f"WebSocket disconnected: {code} - {reason}")
async def send_safe(self, message: dict):
"""Send message with error handling."""
if not self.is_connected:
return False
try:
await self.websocket.send_json(message)
return True
except Exception as e:
logger.error(f"Failed to send message: {e}")
self.is_connected = False
return False
async def receive_safe(self):
"""Receive message with error handling."""
try:
return await self.websocket.receive_json()
except WebSocketDisconnect as e:
logger.info(f"WebSocket disconnected: {e.code} - {e.reason}")
self.is_connected = False
raise
except Exception as e:
logger.error(f"Failed to receive message: {e}")
await self.disconnect(code=1002, reason="Protocol error")
raise WebSocketException(code=1002, reason="Protocol error")
@app.websocket("/ws/robust")
async def robust_websocket(websocket: WebSocket):
handler = WebSocketHandler(websocket)
try:
await handler.connect()
# Send connection info
await handler.send_safe({
"type": "connection_info",
"message": "Connected successfully",
"heartbeat_interval": 30
})
# Set up heartbeat task
async def heartbeat():
while handler.is_connected:
await asyncio.sleep(30)
if not await handler.send_safe({"type": "heartbeat"}):
break
heartbeat_task = asyncio.create_task(heartbeat())
# Main message loop
while handler.is_connected:
try:
data = await handler.receive_safe()
# Handle different message types
if data.get("type") == "heartbeat_response":
logger.debug("Received heartbeat response")
continue
elif data.get("type") == "echo":
await handler.send_safe({
"type": "echo_response",
"original_message": data.get("message", "")
})
elif data.get("type") == "error_test":
# Simulate different error conditions
error_type = data.get("error_type", "generic")
if error_type == "protocol_error":
raise WebSocketException(code=1002, reason="Test protocol error")
elif error_type == "invalid_data":
raise WebSocketException(code=1003, reason="Test invalid data")
else:
raise Exception("Test generic error")
else:
await handler.send_safe({
"type": "error",
"message": f"Unknown message type: {data.get('type')}"
})
except (WebSocketDisconnect, WebSocketException):
break
except Exception as e:
logger.error(f"Unexpected error in WebSocket handler: {e}")
await handler.send_safe({
"type": "error",
"message": "An unexpected error occurred"
})
# Clean up heartbeat task
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Fatal error in WebSocket connection: {e}")
finally:
if handler.is_connected:
await handler.disconnect()from fastapi import FastAPI, WebSocket
from fastapi.testclient import TestClient
import pytest
app = FastAPI()
@app.websocket("/ws/test")
async def test_websocket(websocket: WebSocket):
await websocket.accept()
# Echo messages with uppercase transformation
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(data.upper())
except WebSocketDisconnect:
pass
# Test client usage
def test_websocket_echo():
client = TestClient(app)
with client.websocket_connect("/ws/test") as websocket:
websocket.send_text("hello")
data = websocket.receive_text()
assert data == "HELLO"
websocket.send_text("world")
data = websocket.receive_text()
assert data == "WORLD"
@pytest.mark.asyncio
async def test_websocket_json():
client = TestClient(app)
with client.websocket_connect("/ws/test") as websocket:
test_data = {"message": "test"}
websocket.send_json(test_data)
# Since our endpoint expects text, this would need
# a JSON-aware endpoint in practice
response = websocket.receive_text()
assert response is not NoneInstall with Tessl CLI
npx tessl i tessl/pypi-fastapi-slim