CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fastapi-slim

FastAPI framework, high performance, easy to learn, fast to code, ready for production - slim version without standard dependencies

Pending
Overview
Eval results
Files

websocket-support.mddocs/

WebSocket Support

FastAPI provides comprehensive WebSocket support for real-time bidirectional communication between clients and servers. WebSocket connections enable live updates, chat applications, streaming data, and interactive features with full integration into FastAPI's dependency injection and validation systems.

Capabilities

WebSocket Connection Object

Object representing an active WebSocket connection with methods for sending and receiving messages.

class WebSocket:
    """
    WebSocket connection object providing bidirectional communication.
    
    Attributes (read-only):
    - url: WebSocket URL with protocol, host, path, and query parameters
    - headers: HTTP headers from the WebSocket handshake
    - query_params: Query parameters from the WebSocket URL
    - path_params: Path parameters extracted from URL pattern
    - cookies: HTTP cookies from the handshake request
    - client: Client connection information (host, port)
    - state: Application state for storing data across the connection
    """
    
    url: URL
    headers: Headers
    query_params: QueryParams
    path_params: Dict[str, Any]
    cookies: Dict[str, str]
    client: Optional[Address]
    state: State
    
    async def accept(
        self,
        subprotocol: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> None:
        """
        Accept the WebSocket connection.
        
        Parameters:
        - subprotocol: WebSocket subprotocol to use
        - headers: Additional headers to send in handshake response
        
        Note: Must be called before sending or receiving messages
        """
    
    async def close(
        self,
        code: int = 1000,
        reason: Optional[str] = None
    ) -> None:
        """
        Close the WebSocket connection.
        
        Parameters:
        - code: WebSocket close code (1000 = normal closure)
        - reason: Optional reason string for the close
        """
    
    async def send_text(self, data: str) -> None:
        """
        Send text message to client.
        
        Parameters:
        - data: Text string to send
        """
    
    async def send_bytes(self, data: bytes) -> None:
        """
        Send binary message to client.
        
        Parameters:
        - data: Binary data to send
        """
    
    async def send_json(self, data: Any, mode: str = "text") -> None:
        """
        Send JSON message to client.
        
        Parameters:
        - data: Python object to serialize as JSON
        - mode: Send as "text" or "binary" message
        """
    
    async def receive_text(self) -> str:
        """
        Receive text message from client.
        
        Returns:
        Text string from client
        
        Raises:
        WebSocketDisconnect: If connection is closed
        """
    
    async def receive_bytes(self) -> bytes:
        """
        Receive binary message from client.
        
        Returns:
        Binary data from client
        
        Raises:
        WebSocketDisconnect: If connection is closed
        """
    
    async def receive_json(self, mode: str = "text") -> Any:
        """
        Receive and parse JSON message from client.
        
        Parameters:
        - mode: Expect "text" or "binary" message
        
        Returns:
        Parsed JSON data (dict, list, or primitive)
        
        Raises:
        WebSocketDisconnect: If connection is closed
        JSONDecodeError: If message is not valid JSON
        """
    
    async def iter_text(self) -> AsyncIterator[str]:
        """
        Async iterator for receiving text messages.
        
        Yields:
        Text messages from client until connection closes
        """
    
    async def iter_bytes(self) -> AsyncIterator[bytes]:
        """
        Async iterator for receiving binary messages.
        
        Yields:
        Binary messages from client until connection closes
        """
    
    async def iter_json(self) -> AsyncIterator[Any]:
        """
        Async iterator for receiving JSON messages.
        
        Yields:
        Parsed JSON messages from client until connection closes
        """

WebSocket Disconnect Exception

Exception raised when WebSocket connection is closed by client or due to network issues.

class WebSocketDisconnect(Exception):
    def __init__(self, code: int = 1000, reason: Optional[str] = None) -> None:
        """
        WebSocket disconnection exception.
        
        Parameters:
        - code: WebSocket close code from client
        - reason: Optional reason string from client
        
        Standard close codes:
        - 1000: Normal closure
        - 1001: Going away (page refresh, navigation)
        - 1002: Protocol error
        - 1003: Unsupported data type
        - 1006: Abnormal closure (no close frame)
        - 1011: Server error
        """
        self.code = code
        self.reason = reason

WebSocket State Enum

Enumeration of WebSocket connection states.

class WebSocketState(Enum):
    """
    WebSocket connection state enumeration.
    
    Values:
    - CONNECTING: Connection handshake in progress
    - CONNECTED: Connection established and ready
    - DISCONNECTED: Connection closed
    """
    CONNECTING = 0
    CONNECTED = 1
    DISCONNECTED = 2

WebSocket Routing

WebSocket Route Decorator

Decorator for defining WebSocket endpoints with dependency injection support.

@app.websocket("/ws/{path}")
async def websocket_endpoint(
    websocket: WebSocket,
    path_param: str,
    query_param: str = Query(...),
    dependency_result = Depends(some_dependency)
):
    """
    WebSocket endpoint function.
    
    Parameters:
    - websocket: WebSocket connection object (automatically injected)
    - path_param: Path parameters work the same as HTTP routes
    - query_param: Query parameters with validation support
    - dependency_result: Dependencies work the same as HTTP routes
    
    Note: Must accept WebSocket as first parameter
    """

Usage Examples

Basic WebSocket Echo Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

WebSocket Chat Room

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except:
                # Remove broken connections
                self.active_connections.remove(connection)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    
    # Notify others about new connection
    await manager.broadcast(f"Client #{client_id} joined the chat")
    
    try:
        while True:
            data = await websocket.receive_text()
            message = f"Client #{client_id}: {data}"
            await manager.broadcast(message)
    
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")

WebSocket with Authentication and Validation

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
from fastapi.security import HTTPBearer
import json
import jwt

app = FastAPI()

# Security scheme for WebSocket authentication
security = HTTPBearer()

def verify_websocket_token(token: str = Query(...)):
    """Verify WebSocket authentication token from query parameter."""
    try:
        payload = jwt.decode(token, "secret", algorithms=["HS256"])
        return payload
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    user: dict = Depends(verify_websocket_token)
):
    await websocket.accept()
    
    # Send welcome message with user info
    await websocket.send_json({
        "type": "welcome",
        "message": f"Welcome {user['username']}!",
        "user_id": user["user_id"]
    })
    
    try:
        while True:
            # Receive JSON messages
            data = await websocket.receive_json()
            
            # Validate message structure
            if "type" not in data:
                await websocket.send_json({
                    "type": "error",
                    "message": "Message must include 'type' field"
                })
                continue
            
            # Handle different message types
            if data["type"] == "ping":
                await websocket.send_json({"type": "pong"})
            
            elif data["type"] == "message":
                if "content" not in data:
                    await websocket.send_json({
                        "type": "error",
                        "message": "Message content is required"
                    })
                    continue
                
                # Echo message with user info
                await websocket.send_json({
                    "type": "message_received",
                    "content": data["content"],
                    "from_user": user["username"],
                    "timestamp": "2023-01-01T00:00:00Z"
                })
            
            else:
                await websocket.send_json({
                    "type": "error",
                    "message": f"Unknown message type: {data['type']}"
                })
    
    except WebSocketDisconnect:
        print(f"User {user['username']} disconnected")
    
    except Exception as e:
        print(f"Error in WebSocket connection: {e}")
        await websocket.close(code=1011, reason="Internal server error")

Real-time Data Streaming

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
import random
from datetime import datetime

app = FastAPI()

async def generate_stock_data():
    """Generate mock stock price data."""
    stocks = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
    
    while True:
        for stock in stocks:
            price = round(random.uniform(100, 1000), 2)
            change = round(random.uniform(-5, 5), 2)
            yield {
                "symbol": stock,
                "price": price,
                "change": change,
                "timestamp": datetime.now().isoformat()
            }
        await asyncio.sleep(1)

@app.websocket("/ws/stocks")
async def stock_stream(websocket: WebSocket):
    await websocket.accept()
    
    try:
        # Send initial message
        await websocket.send_json({
            "type": "connected",
            "message": "Stock price stream connected"
        })
        
        # Stream stock data
        async for stock_data in generate_stock_data():
            await websocket.send_json({
                "type": "stock_update",
                "data": stock_data
            })
            
            # Check if client is still connected by trying to receive
            # (with a very short timeout)
            try:
                await asyncio.wait_for(websocket.receive_text(), timeout=0.001)
            except asyncio.TimeoutError:
                # No message received, continue streaming
                pass
            except WebSocketDisconnect:
                break
    
    except WebSocketDisconnect:
        print("Stock stream client disconnected")

WebSocket with Room Management

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List, Set
import json

app = FastAPI()

class RoomManager:
    def __init__(self):
        # rooms[room_id] = set of websocket connections
        self.rooms: Dict[str, Set[WebSocket]] = {}
        # websocket_to_room[websocket] = room_id
        self.websocket_to_room: Dict[WebSocket, str] = {}
    
    async def join_room(self, websocket: WebSocket, room_id: str):
        await websocket.accept()
        
        if room_id not in self.rooms:
            self.rooms[room_id] = set()
        
        self.rooms[room_id].add(websocket)
        self.websocket_to_room[websocket] = room_id
        
        # Notify room about new member
        await self.broadcast_to_room(
            room_id,
            {"type": "user_joined", "room_id": room_id, "member_count": len(self.rooms[room_id])},
            exclude=websocket
        )
    
    def leave_room(self, websocket: WebSocket):
        room_id = self.websocket_to_room.get(websocket)
        if room_id and room_id in self.rooms:
            self.rooms[room_id].discard(websocket)
            
            # Remove room if empty
            if not self.rooms[room_id]:
                del self.rooms[room_id]
            else:
                # Notify remaining members
                asyncio.create_task(
                    self.broadcast_to_room(
                        room_id,
                        {"type": "user_left", "room_id": room_id, "member_count": len(self.rooms[room_id])}
                    )
                )
        
        self.websocket_to_room.pop(websocket, None)
    
    async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None):
        if room_id not in self.rooms:
            return
        
        disconnected = set()
        for websocket in self.rooms[room_id]:
            if websocket == exclude:
                continue
            
            try:
                await websocket.send_json(message)
            except:
                disconnected.add(websocket)
        
        # Clean up disconnected websockets
        for websocket in disconnected:
            self.rooms[room_id].discard(websocket)
            self.websocket_to_room.pop(websocket, None)

room_manager = RoomManager()

@app.websocket("/ws/room/{room_id}")
async def room_websocket(websocket: WebSocket, room_id: str):
    await room_manager.join_room(websocket, room_id)
    
    try:
        # Send welcome message
        await websocket.send_json({
            "type": "welcome",
            "room_id": room_id,
            "message": f"Welcome to room {room_id}"
        })
        
        while True:
            data = await websocket.receive_json()
            
            # Handle different message types
            if data.get("type") == "message":
                # Broadcast message to all room members
                await room_manager.broadcast_to_room(room_id, {
                    "type": "room_message",
                    "room_id": room_id,
                    "message": data.get("content", ""),
                    "timestamp": "2023-01-01T00:00:00Z"
                })
            
            elif data.get("type") == "ping":
                await websocket.send_json({"type": "pong"})
    
    except WebSocketDisconnect:
        room_manager.leave_room(websocket)
        print(f"Client left room {room_id}")

WebSocket Error Handling and Reconnection

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, WebSocketException
import asyncio
import json
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

class WebSocketHandler:
    def __init__(self, websocket: WebSocket):
        self.websocket = websocket
        self.is_connected = False
    
    async def connect(self):
        await self.websocket.accept()
        self.is_connected = True
        logger.info("WebSocket connected")
    
    async def disconnect(self, code: int = 1000, reason: str = None):
        if self.is_connected:
            await self.websocket.close(code=code, reason=reason)
            self.is_connected = False
            logger.info(f"WebSocket disconnected: {code} - {reason}")
    
    async def send_safe(self, message: dict):
        """Send message with error handling."""
        if not self.is_connected:
            return False
        
        try:
            await self.websocket.send_json(message)
            return True
        except Exception as e:
            logger.error(f"Failed to send message: {e}")
            self.is_connected = False
            return False
    
    async def receive_safe(self):
        """Receive message with error handling."""
        try:
            return await self.websocket.receive_json()
        except WebSocketDisconnect as e:
            logger.info(f"WebSocket disconnected: {e.code} - {e.reason}")
            self.is_connected = False
            raise
        except Exception as e:
            logger.error(f"Failed to receive message: {e}")
            await self.disconnect(code=1002, reason="Protocol error")
            raise WebSocketException(code=1002, reason="Protocol error")

@app.websocket("/ws/robust")
async def robust_websocket(websocket: WebSocket):
    handler = WebSocketHandler(websocket)
    
    try:
        await handler.connect()
        
        # Send connection info
        await handler.send_safe({
            "type": "connection_info",
            "message": "Connected successfully",
            "heartbeat_interval": 30
        })
        
        # Set up heartbeat task
        async def heartbeat():
            while handler.is_connected:
                await asyncio.sleep(30)
                if not await handler.send_safe({"type": "heartbeat"}):
                    break
        
        heartbeat_task = asyncio.create_task(heartbeat())
        
        # Main message loop
        while handler.is_connected:
            try:
                data = await handler.receive_safe()
                
                # Handle different message types
                if data.get("type") == "heartbeat_response":
                    logger.debug("Received heartbeat response")
                    continue
                
                elif data.get("type") == "echo":
                    await handler.send_safe({
                        "type": "echo_response",
                        "original_message": data.get("message", "")
                    })
                
                elif data.get("type") == "error_test":
                    # Simulate different error conditions
                    error_type = data.get("error_type", "generic")
                    
                    if error_type == "protocol_error":
                        raise WebSocketException(code=1002, reason="Test protocol error")
                    elif error_type == "invalid_data":
                        raise WebSocketException(code=1003, reason="Test invalid data")
                    else:
                        raise Exception("Test generic error")
                
                else:
                    await handler.send_safe({
                        "type": "error",
                        "message": f"Unknown message type: {data.get('type')}"
                    })
            
            except (WebSocketDisconnect, WebSocketException):
                break
            except Exception as e:
                logger.error(f"Unexpected error in WebSocket handler: {e}")
                await handler.send_safe({
                    "type": "error",
                    "message": "An unexpected error occurred"
                })
        
        # Clean up heartbeat task
        heartbeat_task.cancel()
        try:
            await heartbeat_task
        except asyncio.CancelledError:
            pass
    
    except Exception as e:
        logger.error(f"Fatal error in WebSocket connection: {e}")
    
    finally:
        if handler.is_connected:
            await handler.disconnect()

WebSocket Testing Support

from fastapi import FastAPI, WebSocket
from fastapi.testclient import TestClient
import pytest

app = FastAPI()

@app.websocket("/ws/test")
async def test_websocket(websocket: WebSocket):
    await websocket.accept()
    
    # Echo messages with uppercase transformation
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(data.upper())
    except WebSocketDisconnect:
        pass

# Test client usage
def test_websocket_echo():
    client = TestClient(app)
    
    with client.websocket_connect("/ws/test") as websocket:
        websocket.send_text("hello")
        data = websocket.receive_text()
        assert data == "HELLO"
        
        websocket.send_text("world")
        data = websocket.receive_text()
        assert data == "WORLD"

@pytest.mark.asyncio
async def test_websocket_json():
    client = TestClient(app)
    
    with client.websocket_connect("/ws/test") as websocket:
        test_data = {"message": "test"}
        websocket.send_json(test_data)
        
        # Since our endpoint expects text, this would need
        # a JSON-aware endpoint in practice
        response = websocket.receive_text()
        assert response is not None

Install with Tessl CLI

npx tessl i tessl/pypi-fastapi-slim

docs

api-routing.md

background-tasks.md

core-application.md

dependency-injection.md

exception-handling.md

file-handling.md

index.md

parameter-declaration.md

request-response.md

websocket-support.md

tile.json