CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-blacksheep

Fast web framework for Python asyncio

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

websockets.mddocs/

WebSocket Support

BlackSheep provides comprehensive WebSocket support for real-time, full-duplex communication between clients and servers. The framework handles connection management, message routing, and error handling with a clean, async-first API.

WebSocket Overview

WebSockets enable persistent connections for real-time applications like chat systems, live updates, gaming, and collaborative tools.

Basic WebSocket Setup

from blacksheep import Application, WebSocket, WebSocketState
from blacksheep.server.websocket import WebSocketError, WebSocketDisconnectError

app = Application()

# Simple WebSocket endpoint
@app.ws("/ws")
async def websocket_handler(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Echo: {message}")
    except WebSocketDisconnectError:
        print("Client disconnected")

WebSocket Class

The WebSocket class manages connection lifecycle, message sending/receiving, and state tracking.

Connection Management

from blacksheep.server.websocket import WebSocketState, MessageMode

@app.ws("/chat")
async def chat_handler(websocket: WebSocket):
    # Accept the WebSocket connection
    await websocket.accept()
    print(f"Connection state: {websocket.state}")  # WebSocketState.CONNECTED
    
    # Connection information
    client_ip = websocket.scope.get("client", ["unknown"])[0]
    print(f"Client connected from: {client_ip}")
    
    try:
        # Connection loop
        while websocket.state == WebSocketState.CONNECTED:
            message = await websocket.receive_text()
            await websocket.send_text(f"Received: {message}")
            
    except WebSocketDisconnectError as e:
        print(f"Client disconnected with code: {e.code}")
        
    finally:
        # Cleanup when connection ends
        print("Connection closed")

Message Types

@app.ws("/messages")
async def message_handler(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # Receive different message types
            message_type = await websocket.receive()
            
            if message_type["type"] == "websocket.receive":
                if "text" in message_type:
                    # Text message
                    text = message_type["text"]
                    await websocket.send_text(f"Got text: {text}")
                    
                elif "bytes" in message_type:
                    # Binary message
                    data = message_type["bytes"]
                    await websocket.send_bytes(data)
                    
    except WebSocketDisconnectError:
        pass

Sending Messages

import json
from typing import Dict, Any

@app.ws("/updates")
async def update_handler(websocket: WebSocket):
    await websocket.accept()
    
    try:
        # Send text messages
        await websocket.send_text("Welcome to updates!")
        
        # Send JSON messages
        data = {"type": "notification", "message": "Server started"}
        await websocket.send_json(data)
        
        # Send binary data
        binary_data = b"Binary data here"
        await websocket.send_bytes(binary_data)
        
        # Custom JSON serializer
        complex_data = {"timestamp": datetime.now(), "users": [1, 2, 3]}
        await websocket.send_json(complex_data, dumps=custom_json_dumps)
        
        # Keep connection alive
        while True:
            await websocket.receive_text()
            
    except WebSocketDisconnectError:
        pass

Receiving Messages

@app.ws("/receiver")
async def receive_handler(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # Receive text
            text_message = await websocket.receive_text()
            print(f"Received text: {text_message}")
            
            # Receive JSON with automatic parsing
            json_data = await websocket.receive_json()
            print(f"Received JSON: {json_data}")
            
            # Receive binary data
            binary_data = await websocket.receive_bytes()
            print(f"Received {len(binary_data)} bytes")
            
            # Custom JSON deserializer
            custom_data = await websocket.receive_json(loads=custom_json_loads)
            
    except WebSocketDisconnectError:
        pass

WebSocket Routing

WebSocket endpoints support the same routing features as HTTP endpoints, including parameter extraction.

Route Parameters

# WebSocket with route parameters
@app.ws("/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
    await websocket.accept()
    
    # Access route parameters
    room = websocket.route_values.get("room_id", "default")
    print(f"Joined room: {room}")
    
    try:
        await websocket.send_text(f"Welcome to room {room}!")
        
        while True:
            message = await websocket.receive_text()
            # Broadcast to room (implementation depends on your architecture)
            await broadcast_to_room(room, f"{message}")
            
    except WebSocketDisconnectError:
        print(f"User left room {room}")

# Typed route parameters
@app.ws("/user/{user_id:int}/notifications")
async def user_notifications(websocket: WebSocket, user_id: int):
    await websocket.accept()
    
    # user_id is automatically converted to int
    print(f"Notifications for user {user_id}")
    
    try:
        # Send user-specific notifications
        notifications = await get_user_notifications(user_id)
        for notification in notifications:
            await websocket.send_json(notification)
            
        # Listen for real-time updates
        while True:
            await websocket.receive_text()  # Keep alive
            
    except WebSocketDisconnectError:
        pass

WebSocket State Management

Track connection states and handle state transitions properly.

Connection States

from blacksheep.server.websocket import WebSocketState

@app.ws("/stateful")
async def stateful_handler(websocket: WebSocket):
    print(f"Initial state: {websocket.state}")  # WebSocketState.CONNECTING
    
    await websocket.accept()
    print(f"After accept: {websocket.state}")   # WebSocketState.CONNECTED
    
    try:
        while websocket.state == WebSocketState.CONNECTED:
            message = await websocket.receive_text()
            
            # Check state before sending
            if websocket.state == WebSocketState.CONNECTED:
                await websocket.send_text(f"Echo: {message}")
            else:
                break
                
    except WebSocketDisconnectError:
        pass
    
    print(f"Final state: {websocket.state}")     # WebSocketState.DISCONNECTED

State Validation

from blacksheep.server.websocket import InvalidWebSocketStateError

@app.ws("/validated")
async def validated_handler(websocket: WebSocket):
    try:
        await websocket.accept()
        
        while True:
            message = await websocket.receive_text()
            
            # State validation before operations
            if websocket.state != WebSocketState.CONNECTED:
                raise InvalidWebSocketStateError(
                    party="server",
                    current_state=websocket.state,
                    expected_state=WebSocketState.CONNECTED
                )
            
            await websocket.send_text(f"Processed: {message}")
            
    except InvalidWebSocketStateError as e:
        print(f"Invalid state: {e.current_state}, expected: {e.expected_state}")
        
    except WebSocketDisconnectError:
        pass

Error Handling

Comprehensive error handling for WebSocket connections and operations.

WebSocket Exceptions

from blacksheep.server.websocket import (
    WebSocketError, 
    WebSocketDisconnectError,
    InvalidWebSocketStateError
)

@app.ws("/error-handling")
async def error_handler(websocket: WebSocket):
    try:
        await websocket.accept()
        
        while True:
            try:
                message = await websocket.receive_text()
                
                # Process message (might raise application errors)
                result = await process_message(message)
                await websocket.send_json({"result": result})
                
            except ValueError as e:
                # Application error - send error message but keep connection
                await websocket.send_json({
                    "error": "validation_error",
                    "message": str(e)
                })
                
            except Exception as e:
                # Unexpected error - send error and close connection
                await websocket.send_json({
                    "error": "internal_error",
                    "message": "An unexpected error occurred"
                })
                await websocket.close(code=1011)  # Internal server error
                break
                
    except WebSocketDisconnectError as e:
        print(f"Client disconnected: code={e.code}")
        
    except InvalidWebSocketStateError as e:
        print(f"Invalid WebSocket state: {e}")
        
    except WebSocketError as e:
        print(f"WebSocket error: {e}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # Ensure connection is closed
        if websocket.state == WebSocketState.CONNECTED:
            await websocket.close(code=1011)

Connection Close Codes

@app.ws("/close-codes")
async def close_codes_handler(websocket: WebSocket):
    await websocket.accept()
    
    try:
        message = await websocket.receive_text()
        
        if message == "invalid":
            # Close with specific code
            await websocket.close(code=1003)  # Unsupported data type
            
        elif message == "policy":
            await websocket.close(code=1008)  # Policy violation
            
        elif message == "size":
            await websocket.close(code=1009)  # Message too large
            
        else:
            await websocket.send_text("Message accepted")
            
    except WebSocketDisconnectError as e:
        # Standard close codes:
        # 1000 = Normal closure
        # 1001 = Going away  
        # 1002 = Protocol error
        # 1003 = Unsupported data
        # 1007 = Invalid data
        # 1008 = Policy violation
        # 1009 = Message too large
        # 1011 = Internal server error
        print(f"Connection closed with code: {e.code}")

Real-time Applications

Examples of common real-time application patterns.

Chat Application

import asyncio
from typing import Dict, Set
from dataclasses import dataclass

@dataclass
class ChatRoom:
    room_id: str
    connections: Set[WebSocket]
    message_history: list

# Global room registry
chat_rooms: Dict[str, ChatRoom] = {}

async def get_or_create_room(room_id: str) -> ChatRoom:
    if room_id not in chat_rooms:
        chat_rooms[room_id] = ChatRoom(room_id, set(), [])
    return chat_rooms[room_id]

async def broadcast_to_room(room: ChatRoom, message: dict, sender: WebSocket = None):
    """Broadcast message to all connections in room except sender"""
    disconnected = set()
    
    for connection in room.connections:
        if connection != sender:
            try:
                await connection.send_json(message)
            except WebSocketDisconnectError:
                disconnected.add(connection)
    
    # Remove disconnected clients
    room.connections -= disconnected

@app.ws("/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
    room = await get_or_create_room(room_id)
    room.connections.add(websocket)
    
    await websocket.accept()
    
    # Send chat history
    for message in room.message_history[-50:]:  # Last 50 messages
        await websocket.send_json(message)
    
    # Announce user joined
    join_message = {
        "type": "user_joined",
        "room": room_id,
        "timestamp": time.time()
    }
    await broadcast_to_room(room, join_message, websocket)
    
    try:
        while True:
            data = await websocket.receive_json()
            
            # Create message
            message = {
                "type": "chat_message",
                "room": room_id,
                "user": data.get("user", "Anonymous"),
                "message": data.get("message", ""),
                "timestamp": time.time()
            }
            
            # Store in history
            room.message_history.append(message)
            if len(room.message_history) > 1000:
                room.message_history = room.message_history[-1000:]
            
            # Broadcast to room
            await broadcast_to_room(room, message, websocket)
            
    except WebSocketDisconnectError:
        pass
    finally:
        # Remove from room
        room.connections.discard(websocket)
        
        # Announce user left
        leave_message = {
            "type": "user_left",
            "room": room_id,
            "timestamp": time.time()
        }
        await broadcast_to_room(room, leave_message)

Live Updates

import asyncio
from typing import Dict, List

# Global subscribers registry
update_subscribers: Dict[str, List[WebSocket]] = {}

async def add_subscriber(topic: str, websocket: WebSocket):
    if topic not in update_subscribers:
        update_subscribers[topic] = []
    update_subscribers[topic].append(websocket)

async def remove_subscriber(topic: str, websocket: WebSocket):
    if topic in update_subscribers:
        try:
            update_subscribers[topic].remove(websocket)
        except ValueError:
            pass

async def broadcast_update(topic: str, data: dict):
    """Broadcast update to all subscribers of a topic"""
    if topic not in update_subscribers:
        return
    
    disconnected = []
    
    for websocket in update_subscribers[topic]:
        try:
            await websocket.send_json({
                "topic": topic,
                "data": data,
                "timestamp": time.time()
            })
        except WebSocketDisconnectError:
            disconnected.append(websocket)
    
    # Remove disconnected subscribers
    for ws in disconnected:
        await remove_subscriber(topic, ws)

@app.ws("/updates/{topic}")
async def live_updates(websocket: WebSocket, topic: str):
    await websocket.accept()
    await add_subscriber(topic, websocket)
    
    # Send initial data
    initial_data = await get_topic_data(topic)
    await websocket.send_json({
        "type": "initial_data",
        "topic": topic,
        "data": initial_data
    })
    
    try:
        # Keep connection alive and handle client messages
        while True:
            message = await websocket.receive_json()
            
            if message.get("type") == "subscribe_additional":
                additional_topic = message.get("topic")
                if additional_topic:
                    await add_subscriber(additional_topic, websocket)
                    
            elif message.get("type") == "unsubscribe":
                unsub_topic = message.get("topic") 
                if unsub_topic:
                    await remove_subscriber(unsub_topic, websocket)
                    
    except WebSocketDisconnectError:
        pass
    finally:
        # Remove from all subscriptions
        for subscribers in update_subscribers.values():
            if websocket in subscribers:
                subscribers.remove(websocket)

# Trigger updates from other parts of application
async def notify_data_change(topic: str, data: dict):
    """Call this function when data changes to notify subscribers"""
    await broadcast_update(topic, data)

# Example: Update triggered by HTTP endpoint
@app.post("/api/data/{topic}")
async def update_data(topic: str, data: FromJSON[dict]):
    # Update data in database
    await save_data(topic, data.value)
    
    # Notify WebSocket subscribers
    await notify_data_change(topic, data.value)
    
    return json({"updated": True})

Authentication with WebSockets

Secure WebSocket connections with authentication.

WebSocket Authentication

from blacksheep import auth
from guardpost import Identity

# Authenticated WebSocket endpoint
@app.ws("/secure-chat")
@auth()  # Require authentication
async def secure_chat(websocket: WebSocket, request: Request):
    # Authentication happens before WebSocket upgrade
    identity: Identity = request.identity
    user_id = identity.id
    
    await websocket.accept()
    
    # Send welcome message with user info
    await websocket.send_json({
        "type": "welcome",
        "user_id": user_id,
        "username": identity.claims.get("name", "Unknown")
    })
    
    try:
        while True:
            message = await websocket.receive_json()
            
            # Add user context to messages
            message["user_id"] = user_id
            message["timestamp"] = time.time()
            
            # Process authenticated message
            await process_user_message(message)
            
    except WebSocketDisconnectError:
        print(f"User {user_id} disconnected")

Token-Based WebSocket Auth

@app.ws("/token-auth")
async def token_auth_websocket(websocket: WebSocket):
    # Get token from query parameters or headers during handshake
    query_params = websocket.scope.get("query_string", b"").decode()
    token = None
    
    # Parse token from query string
    if "token=" in query_params:
        for param in query_params.split("&"):
            if param.startswith("token="):
                token = param.split("=", 1)[1]
                break
    
    if not token:
        # Reject connection
        await websocket.close(code=1008)  # Policy violation
        return
    
    # Validate token
    try:
        user_data = await validate_token(token)
    except Exception:
        await websocket.close(code=1008)
        return
    
    await websocket.accept()
    
    # Continue with authenticated connection
    await websocket.send_json({
        "type": "authenticated",
        "user": user_data
    })
    
    try:
        while True:
            message = await websocket.receive_json()
            # Process message with user context
            await process_authenticated_message(message, user_data)
            
    except WebSocketDisconnectError:
        pass

Performance and Scaling

Optimize WebSocket performance for high-concurrency scenarios.

Connection Management

import weakref
from typing import WeakSet

# Use weak references to avoid memory leaks
active_connections: WeakSet[WebSocket] = weakref.WeakSet()

@app.ws("/optimized")
async def optimized_handler(websocket: WebSocket):
    await websocket.accept()
    active_connections.add(websocket)
    
    try:
        # Efficient message processing
        while True:
            message = await websocket.receive_text()
            
            # Process in background to avoid blocking
            asyncio.create_task(process_message_async(message, websocket))
            
    except WebSocketDisconnectError:
        pass
    # WeakSet automatically removes disconnected websockets

async def process_message_async(message: str, websocket: WebSocket):
    """Process message without blocking the receive loop"""
    try:
        result = await heavy_processing(message)
        
        if websocket.state == WebSocketState.CONNECTED:
            await websocket.send_json({"result": result})
            
    except Exception as e:
        if websocket.state == WebSocketState.CONNECTED:
            await websocket.send_json({"error": str(e)})

Broadcast Optimization

import asyncio
from typing import List

async def efficient_broadcast(connections: List[WebSocket], message: dict):
    """Efficiently broadcast to multiple connections"""
    
    # Serialize message once
    serialized = json.dumps(message).encode()
    
    async def send_to_connection(websocket: WebSocket):
        try:
            if websocket.state == WebSocketState.CONNECTED:
                await websocket.send_bytes(serialized)
        except WebSocketDisconnectError:
            pass  # Connection already closed
    
    # Send to all connections concurrently
    tasks = [send_to_connection(ws) for ws in connections]
    await asyncio.gather(*tasks, return_exceptions=True)

BlackSheep's WebSocket support provides a robust foundation for building real-time applications with proper error handling, authentication, and performance optimization capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-blacksheep

docs

additional.md

auth.md

client.md

core-server.md

index.md

request-response.md

testing.md

websockets.md

tile.json