Fast web framework for Python asyncio
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
BlackSheep provides comprehensive WebSocket support for real-time, full-duplex communication between clients and servers. The framework handles connection management, message routing, and error handling with a clean, async-first API.
WebSockets enable persistent connections for real-time applications like chat systems, live updates, gaming, and collaborative tools.
from blacksheep import Application, WebSocket, WebSocketState
from blacksheep.server.websocket import WebSocketError, WebSocketDisconnectError
app = Application()
# Simple WebSocket endpoint
@app.ws("/ws")
async def websocket_handler(websocket: WebSocket):
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
await websocket.send_text(f"Echo: {message}")
except WebSocketDisconnectError:
print("Client disconnected")The WebSocket class manages connection lifecycle, message sending/receiving, and state tracking.
from blacksheep.server.websocket import WebSocketState, MessageMode
@app.ws("/chat")
async def chat_handler(websocket: WebSocket):
# Accept the WebSocket connection
await websocket.accept()
print(f"Connection state: {websocket.state}") # WebSocketState.CONNECTED
# Connection information
client_ip = websocket.scope.get("client", ["unknown"])[0]
print(f"Client connected from: {client_ip}")
try:
# Connection loop
while websocket.state == WebSocketState.CONNECTED:
message = await websocket.receive_text()
await websocket.send_text(f"Received: {message}")
except WebSocketDisconnectError as e:
print(f"Client disconnected with code: {e.code}")
finally:
# Cleanup when connection ends
print("Connection closed")@app.ws("/messages")
async def message_handler(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive different message types
message_type = await websocket.receive()
if message_type["type"] == "websocket.receive":
if "text" in message_type:
# Text message
text = message_type["text"]
await websocket.send_text(f"Got text: {text}")
elif "bytes" in message_type:
# Binary message
data = message_type["bytes"]
await websocket.send_bytes(data)
except WebSocketDisconnectError:
passimport json
from typing import Dict, Any
@app.ws("/updates")
async def update_handler(websocket: WebSocket):
await websocket.accept()
try:
# Send text messages
await websocket.send_text("Welcome to updates!")
# Send JSON messages
data = {"type": "notification", "message": "Server started"}
await websocket.send_json(data)
# Send binary data
binary_data = b"Binary data here"
await websocket.send_bytes(binary_data)
# Custom JSON serializer
complex_data = {"timestamp": datetime.now(), "users": [1, 2, 3]}
await websocket.send_json(complex_data, dumps=custom_json_dumps)
# Keep connection alive
while True:
await websocket.receive_text()
except WebSocketDisconnectError:
pass@app.ws("/receiver")
async def receive_handler(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive text
text_message = await websocket.receive_text()
print(f"Received text: {text_message}")
# Receive JSON with automatic parsing
json_data = await websocket.receive_json()
print(f"Received JSON: {json_data}")
# Receive binary data
binary_data = await websocket.receive_bytes()
print(f"Received {len(binary_data)} bytes")
# Custom JSON deserializer
custom_data = await websocket.receive_json(loads=custom_json_loads)
except WebSocketDisconnectError:
passWebSocket endpoints support the same routing features as HTTP endpoints, including parameter extraction.
# WebSocket with route parameters
@app.ws("/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
await websocket.accept()
# Access route parameters
room = websocket.route_values.get("room_id", "default")
print(f"Joined room: {room}")
try:
await websocket.send_text(f"Welcome to room {room}!")
while True:
message = await websocket.receive_text()
# Broadcast to room (implementation depends on your architecture)
await broadcast_to_room(room, f"{message}")
except WebSocketDisconnectError:
print(f"User left room {room}")
# Typed route parameters
@app.ws("/user/{user_id:int}/notifications")
async def user_notifications(websocket: WebSocket, user_id: int):
await websocket.accept()
# user_id is automatically converted to int
print(f"Notifications for user {user_id}")
try:
# Send user-specific notifications
notifications = await get_user_notifications(user_id)
for notification in notifications:
await websocket.send_json(notification)
# Listen for real-time updates
while True:
await websocket.receive_text() # Keep alive
except WebSocketDisconnectError:
passTrack connection states and handle state transitions properly.
from blacksheep.server.websocket import WebSocketState
@app.ws("/stateful")
async def stateful_handler(websocket: WebSocket):
print(f"Initial state: {websocket.state}") # WebSocketState.CONNECTING
await websocket.accept()
print(f"After accept: {websocket.state}") # WebSocketState.CONNECTED
try:
while websocket.state == WebSocketState.CONNECTED:
message = await websocket.receive_text()
# Check state before sending
if websocket.state == WebSocketState.CONNECTED:
await websocket.send_text(f"Echo: {message}")
else:
break
except WebSocketDisconnectError:
pass
print(f"Final state: {websocket.state}") # WebSocketState.DISCONNECTEDfrom blacksheep.server.websocket import InvalidWebSocketStateError
@app.ws("/validated")
async def validated_handler(websocket: WebSocket):
try:
await websocket.accept()
while True:
message = await websocket.receive_text()
# State validation before operations
if websocket.state != WebSocketState.CONNECTED:
raise InvalidWebSocketStateError(
party="server",
current_state=websocket.state,
expected_state=WebSocketState.CONNECTED
)
await websocket.send_text(f"Processed: {message}")
except InvalidWebSocketStateError as e:
print(f"Invalid state: {e.current_state}, expected: {e.expected_state}")
except WebSocketDisconnectError:
passComprehensive error handling for WebSocket connections and operations.
from blacksheep.server.websocket import (
WebSocketError,
WebSocketDisconnectError,
InvalidWebSocketStateError
)
@app.ws("/error-handling")
async def error_handler(websocket: WebSocket):
try:
await websocket.accept()
while True:
try:
message = await websocket.receive_text()
# Process message (might raise application errors)
result = await process_message(message)
await websocket.send_json({"result": result})
except ValueError as e:
# Application error - send error message but keep connection
await websocket.send_json({
"error": "validation_error",
"message": str(e)
})
except Exception as e:
# Unexpected error - send error and close connection
await websocket.send_json({
"error": "internal_error",
"message": "An unexpected error occurred"
})
await websocket.close(code=1011) # Internal server error
break
except WebSocketDisconnectError as e:
print(f"Client disconnected: code={e.code}")
except InvalidWebSocketStateError as e:
print(f"Invalid WebSocket state: {e}")
except WebSocketError as e:
print(f"WebSocket error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Ensure connection is closed
if websocket.state == WebSocketState.CONNECTED:
await websocket.close(code=1011)@app.ws("/close-codes")
async def close_codes_handler(websocket: WebSocket):
await websocket.accept()
try:
message = await websocket.receive_text()
if message == "invalid":
# Close with specific code
await websocket.close(code=1003) # Unsupported data type
elif message == "policy":
await websocket.close(code=1008) # Policy violation
elif message == "size":
await websocket.close(code=1009) # Message too large
else:
await websocket.send_text("Message accepted")
except WebSocketDisconnectError as e:
# Standard close codes:
# 1000 = Normal closure
# 1001 = Going away
# 1002 = Protocol error
# 1003 = Unsupported data
# 1007 = Invalid data
# 1008 = Policy violation
# 1009 = Message too large
# 1011 = Internal server error
print(f"Connection closed with code: {e.code}")Examples of common real-time application patterns.
import asyncio
from typing import Dict, Set
from dataclasses import dataclass
@dataclass
class ChatRoom:
room_id: str
connections: Set[WebSocket]
message_history: list
# Global room registry
chat_rooms: Dict[str, ChatRoom] = {}
async def get_or_create_room(room_id: str) -> ChatRoom:
if room_id not in chat_rooms:
chat_rooms[room_id] = ChatRoom(room_id, set(), [])
return chat_rooms[room_id]
async def broadcast_to_room(room: ChatRoom, message: dict, sender: WebSocket = None):
"""Broadcast message to all connections in room except sender"""
disconnected = set()
for connection in room.connections:
if connection != sender:
try:
await connection.send_json(message)
except WebSocketDisconnectError:
disconnected.add(connection)
# Remove disconnected clients
room.connections -= disconnected
@app.ws("/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
room = await get_or_create_room(room_id)
room.connections.add(websocket)
await websocket.accept()
# Send chat history
for message in room.message_history[-50:]: # Last 50 messages
await websocket.send_json(message)
# Announce user joined
join_message = {
"type": "user_joined",
"room": room_id,
"timestamp": time.time()
}
await broadcast_to_room(room, join_message, websocket)
try:
while True:
data = await websocket.receive_json()
# Create message
message = {
"type": "chat_message",
"room": room_id,
"user": data.get("user", "Anonymous"),
"message": data.get("message", ""),
"timestamp": time.time()
}
# Store in history
room.message_history.append(message)
if len(room.message_history) > 1000:
room.message_history = room.message_history[-1000:]
# Broadcast to room
await broadcast_to_room(room, message, websocket)
except WebSocketDisconnectError:
pass
finally:
# Remove from room
room.connections.discard(websocket)
# Announce user left
leave_message = {
"type": "user_left",
"room": room_id,
"timestamp": time.time()
}
await broadcast_to_room(room, leave_message)import asyncio
from typing import Dict, List
# Global subscribers registry
update_subscribers: Dict[str, List[WebSocket]] = {}
async def add_subscriber(topic: str, websocket: WebSocket):
if topic not in update_subscribers:
update_subscribers[topic] = []
update_subscribers[topic].append(websocket)
async def remove_subscriber(topic: str, websocket: WebSocket):
if topic in update_subscribers:
try:
update_subscribers[topic].remove(websocket)
except ValueError:
pass
async def broadcast_update(topic: str, data: dict):
"""Broadcast update to all subscribers of a topic"""
if topic not in update_subscribers:
return
disconnected = []
for websocket in update_subscribers[topic]:
try:
await websocket.send_json({
"topic": topic,
"data": data,
"timestamp": time.time()
})
except WebSocketDisconnectError:
disconnected.append(websocket)
# Remove disconnected subscribers
for ws in disconnected:
await remove_subscriber(topic, ws)
@app.ws("/updates/{topic}")
async def live_updates(websocket: WebSocket, topic: str):
await websocket.accept()
await add_subscriber(topic, websocket)
# Send initial data
initial_data = await get_topic_data(topic)
await websocket.send_json({
"type": "initial_data",
"topic": topic,
"data": initial_data
})
try:
# Keep connection alive and handle client messages
while True:
message = await websocket.receive_json()
if message.get("type") == "subscribe_additional":
additional_topic = message.get("topic")
if additional_topic:
await add_subscriber(additional_topic, websocket)
elif message.get("type") == "unsubscribe":
unsub_topic = message.get("topic")
if unsub_topic:
await remove_subscriber(unsub_topic, websocket)
except WebSocketDisconnectError:
pass
finally:
# Remove from all subscriptions
for subscribers in update_subscribers.values():
if websocket in subscribers:
subscribers.remove(websocket)
# Trigger updates from other parts of application
async def notify_data_change(topic: str, data: dict):
"""Call this function when data changes to notify subscribers"""
await broadcast_update(topic, data)
# Example: Update triggered by HTTP endpoint
@app.post("/api/data/{topic}")
async def update_data(topic: str, data: FromJSON[dict]):
# Update data in database
await save_data(topic, data.value)
# Notify WebSocket subscribers
await notify_data_change(topic, data.value)
return json({"updated": True})Secure WebSocket connections with authentication.
from blacksheep import auth
from guardpost import Identity
# Authenticated WebSocket endpoint
@app.ws("/secure-chat")
@auth() # Require authentication
async def secure_chat(websocket: WebSocket, request: Request):
# Authentication happens before WebSocket upgrade
identity: Identity = request.identity
user_id = identity.id
await websocket.accept()
# Send welcome message with user info
await websocket.send_json({
"type": "welcome",
"user_id": user_id,
"username": identity.claims.get("name", "Unknown")
})
try:
while True:
message = await websocket.receive_json()
# Add user context to messages
message["user_id"] = user_id
message["timestamp"] = time.time()
# Process authenticated message
await process_user_message(message)
except WebSocketDisconnectError:
print(f"User {user_id} disconnected")@app.ws("/token-auth")
async def token_auth_websocket(websocket: WebSocket):
# Get token from query parameters or headers during handshake
query_params = websocket.scope.get("query_string", b"").decode()
token = None
# Parse token from query string
if "token=" in query_params:
for param in query_params.split("&"):
if param.startswith("token="):
token = param.split("=", 1)[1]
break
if not token:
# Reject connection
await websocket.close(code=1008) # Policy violation
return
# Validate token
try:
user_data = await validate_token(token)
except Exception:
await websocket.close(code=1008)
return
await websocket.accept()
# Continue with authenticated connection
await websocket.send_json({
"type": "authenticated",
"user": user_data
})
try:
while True:
message = await websocket.receive_json()
# Process message with user context
await process_authenticated_message(message, user_data)
except WebSocketDisconnectError:
passOptimize WebSocket performance for high-concurrency scenarios.
import weakref
from typing import WeakSet
# Use weak references to avoid memory leaks
active_connections: WeakSet[WebSocket] = weakref.WeakSet()
@app.ws("/optimized")
async def optimized_handler(websocket: WebSocket):
await websocket.accept()
active_connections.add(websocket)
try:
# Efficient message processing
while True:
message = await websocket.receive_text()
# Process in background to avoid blocking
asyncio.create_task(process_message_async(message, websocket))
except WebSocketDisconnectError:
pass
# WeakSet automatically removes disconnected websockets
async def process_message_async(message: str, websocket: WebSocket):
"""Process message without blocking the receive loop"""
try:
result = await heavy_processing(message)
if websocket.state == WebSocketState.CONNECTED:
await websocket.send_json({"result": result})
except Exception as e:
if websocket.state == WebSocketState.CONNECTED:
await websocket.send_json({"error": str(e)})import asyncio
from typing import List
async def efficient_broadcast(connections: List[WebSocket], message: dict):
"""Efficiently broadcast to multiple connections"""
# Serialize message once
serialized = json.dumps(message).encode()
async def send_to_connection(websocket: WebSocket):
try:
if websocket.state == WebSocketState.CONNECTED:
await websocket.send_bytes(serialized)
except WebSocketDisconnectError:
pass # Connection already closed
# Send to all connections concurrently
tasks = [send_to_connection(ws) for ws in connections]
await asyncio.gather(*tasks, return_exceptions=True)BlackSheep's WebSocket support provides a robust foundation for building real-time applications with proper error handling, authentication, and performance optimization capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-blacksheep