A modern Python web server and web framework designed for high performance and speed using async/await syntax.
—
Sanic provides native WebSocket support for real-time, bidirectional communication between client and server. This includes WebSocket routing, connection management, message handling, and integration with the broader Sanic ecosystem.
Define WebSocket endpoints using decorators and programmatic registration.
def websocket(
uri: str,
host: str = None,
strict_slashes: bool = None,
subprotocols: list = None,
name: str = None,
apply: list = None,
version: int = None,
version_prefix: str = "/v",
error_format: str = None,
):
"""
Decorator for WebSocket routes.
Parameters:
- uri: WebSocket endpoint path
- host: Host restriction pattern
- strict_slashes: Override global strict slash setting
- subprotocols: Supported WebSocket subprotocols
- name: Route name for url_for
- apply: List of decorators to apply
- version: API version number
- version_prefix: Version URL prefix
- error_format: Error response format
Usage:
@app.websocket('/ws')
async def websocket_handler(request, ws):
await ws.send("Hello WebSocket!")
msg = await ws.recv()
print(f"Received: {msg}")
"""
def add_websocket_route(
handler,
uri: str,
**kwargs
):
"""
Add WebSocket route programmatically.
Parameters:
- handler: WebSocket handler function
- uri: WebSocket endpoint path
- **kwargs: Additional route options
"""The WebSocket connection interface for message handling and connection management.
class Websocket:
"""WebSocket connection protocol interface."""
async def send(
self,
data,
encoding: str = "text"
):
"""
Send data to WebSocket client.
Parameters:
- data: Data to send (str, bytes, or dict for JSON)
- encoding: Data encoding type ("text", "bytes", or "json")
Raises:
- WebsocketClosed: If connection is closed
- ConnectionError: If send fails
"""
async def recv(self, timeout: float = None):
"""
Receive data from WebSocket client.
Parameters:
- timeout: Receive timeout in seconds
Returns:
- Received data (str or bytes)
Raises:
- WebsocketClosed: If connection is closed
- asyncio.TimeoutError: If timeout exceeded
"""
async def ping(self, data: bytes = b""):
"""
Send ping frame to client.
Parameters:
- data: Optional ping data
Returns:
- Pong waiter coroutine
"""
async def pong(self, data: bytes = b""):
"""
Send pong frame to client.
Parameters:
- data: Optional pong data
"""
async def close(
self,
code: int = 1000,
reason: str = ""
):
"""
Close WebSocket connection.
Parameters:
- code: Close status code
- reason: Close reason string
"""
async def wait_closed(self):
"""Wait for connection to be closed."""
@property
def closed(self) -> bool:
"""Whether the connection is closed."""
@property
def open(self) -> bool:
"""Whether the connection is open."""
@property
def client_state(self):
"""Client connection state."""
@property
def server_state(self):
"""Server connection state."""
@property
def subprotocol(self) -> str:
"""Negotiated subprotocol."""The signature and structure for WebSocket handler functions.
async def websocket_handler(request, ws):
"""
WebSocket handler function signature.
Parameters:
- request: HTTP request object (for initial handshake)
- ws: WebSocket connection object
The handler should:
- Handle the WebSocket connection lifecycle
- Process incoming messages
- Send outgoing messages
- Handle connection errors and cleanup
- Manage connection state
"""Configuration options for WebSocket behavior and limits.
# WebSocket timeout settings
WEBSOCKET_TIMEOUT: int = 10 # Connection timeout
WEBSOCKET_PING_INTERVAL: int = 20 # Ping interval in seconds
WEBSOCKET_PING_TIMEOUT: int = 20 # Ping timeout in seconds
# WebSocket message limits
WEBSOCKET_MAX_SIZE: int = 1048576 # Maximum message size (1MB)
WEBSOCKET_MAX_QUEUE: int = 32 # Maximum queued messages
# WebSocket compression
WEBSOCKET_COMPRESSION: str = None # Compression algorithmSupport for WebSocket subprotocols for application-specific protocols.
@app.websocket('/ws', subprotocols=['chat', 'notification'])
async def websocket_with_subprotocols(request, ws):
"""
WebSocket handler with subprotocol negotiation.
The client can request specific subprotocols during handshake.
The server selects the first supported subprotocol.
"""
selected_protocol = ws.subprotocol
if selected_protocol == 'chat':
await handle_chat_protocol(ws)
elif selected_protocol == 'notification':
await handle_notification_protocol(ws)
else:
await ws.close(code=1002, reason="Unsupported subprotocol")Authenticate WebSocket connections using request data and custom logic.
@app.websocket('/ws/authenticated')
async def authenticated_websocket(request, ws):
"""
WebSocket handler with authentication.
Authentication is performed during the initial HTTP handshake
using headers, query parameters, or other request data.
"""
# Authenticate using request headers
auth_header = request.headers.get('Authorization')
if not auth_header:
await ws.close(code=1008, reason="Authentication required")
return
try:
user = await validate_websocket_token(auth_header)
request.ctx.user = user
except AuthenticationError:
await ws.close(code=1008, reason="Invalid authentication")
return
# Continue with authenticated WebSocket handling
await handle_authenticated_websocket(request, ws)from sanic import Sanic
from sanic.response import html
app = Sanic("WebSocketApp")
@app.websocket('/ws')
async def websocket_echo(request, ws):
"""Simple echo WebSocket handler."""
await ws.send("Welcome to WebSocket echo server!")
async for msg in ws:
print(f"Received: {msg}")
await ws.send(f"Echo: {msg}")
@app.route('/')
async def index(request):
"""Serve WebSocket test page."""
return html('''
<!DOCTYPE html>
<html>
<head><title>WebSocket Test</title></head>
<body>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
<script>
const ws = new WebSocket('ws://localhost:8000/ws');
const messages = document.getElementById('messages');
ws.onmessage = function(event) {
const div = document.createElement('div');
div.textContent = event.data;
messages.appendChild(div);
};
function sendMessage() {
const input = document.getElementById('messageInput');
ws.send(input.value);
input.value = '';
}
</script>
</body>
</html>
''')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)import asyncio
from sanic import Sanic
from sanic.response import json
import json as json_lib
app = Sanic("ChatApp")
# Store active connections
connected_clients = set()
chat_rooms = {}
@app.websocket('/ws/chat/<room_id>')
async def chat_room(request, ws, room_id):
"""Multi-room chat WebSocket handler."""
# Initialize room if it doesn't exist
if room_id not in chat_rooms:
chat_rooms[room_id] = set()
# Add client to room
chat_rooms[room_id].add(ws)
connected_clients.add(ws)
try:
# Send welcome message
await ws.send(json_lib.dumps({
"type": "system",
"message": f"Welcome to chat room {room_id}",
"room": room_id
}))
# Broadcast user joined
await broadcast_to_room(room_id, {
"type": "user_joined",
"message": "A user joined the room",
"room": room_id
}, exclude=ws)
# Handle messages
async for message in ws:
try:
data = json_lib.loads(message)
if data.get("type") == "chat":
# Broadcast chat message to room
await broadcast_to_room(room_id, {
"type": "chat",
"message": data.get("message", ""),
"user": data.get("user", "Anonymous"),
"room": room_id
})
except json_lib.JSONDecodeError:
await ws.send(json_lib.dumps({
"type": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
print(f"WebSocket error: {e}")
finally:
# Clean up when client disconnects
if ws in connected_clients:
connected_clients.remove(ws)
if room_id in chat_rooms and ws in chat_rooms[room_id]:
chat_rooms[room_id].remove(ws)
# Broadcast user left
await broadcast_to_room(room_id, {
"type": "user_left",
"message": "A user left the room",
"room": room_id
})
# Remove empty rooms
if not chat_rooms[room_id]:
del chat_rooms[room_id]
async def broadcast_to_room(room_id, message, exclude=None):
"""Broadcast message to all clients in a room."""
if room_id not in chat_rooms:
return
message_str = json_lib.dumps(message)
disconnected = set()
for client in chat_rooms[room_id]:
if client == exclude:
continue
try:
await client.send(message_str)
except Exception:
# Mark disconnected clients for removal
disconnected.add(client)
# Remove disconnected clients
chat_rooms[room_id] -= disconnected
@app.route('/api/rooms')
async def get_active_rooms(request):
"""Get list of active chat rooms."""
return json({
"rooms": list(chat_rooms.keys()),
"total_clients": len(connected_clients)
})import asyncio
import random
from datetime import datetime
app = Sanic("DataStreamApp")
@app.websocket('/ws/data-stream')
async def data_stream(request, ws):
"""Stream real-time data to WebSocket clients."""
# Send initial connection confirmation
await ws.send(json_lib.dumps({
"type": "connected",
"message": "Data stream connected",
"timestamp": datetime.utcnow().isoformat()
}))
# Create background task for data streaming
stream_task = asyncio.create_task(stream_data(ws))
receive_task = asyncio.create_task(handle_client_messages(ws))
try:
# Wait for either task to complete
done, pending = await asyncio.wait(
[stream_task, receive_task],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending tasks
for task in pending:
task.cancel()
except Exception as e:
print(f"WebSocket streaming error: {e}")
finally:
if not ws.closed:
await ws.close()
async def stream_data(ws):
"""Stream data to WebSocket client."""
while not ws.closed:
try:
# Generate sample data
data = {
"type": "data",
"timestamp": datetime.utcnow().isoformat(),
"value": random.randint(1, 100),
"sensor_id": "sensor_001",
"temperature": round(random.uniform(20.0, 30.0), 2),
"humidity": round(random.uniform(40.0, 80.0), 2)
}
await ws.send(json_lib.dumps(data))
await asyncio.sleep(1) # Send data every second
except Exception as e:
print(f"Data streaming error: {e}")
break
async def handle_client_messages(ws):
"""Handle messages from WebSocket client."""
async for message in ws:
try:
data = json_lib.loads(message)
if data.get("type") == "ping":
await ws.send(json_lib.dumps({
"type": "pong",
"timestamp": datetime.utcnow().isoformat()
}))
elif data.get("type") == "subscribe":
# Handle subscription requests
sensor_id = data.get("sensor_id")
await ws.send(json_lib.dumps({
"type": "subscribed",
"sensor_id": sensor_id,
"message": f"Subscribed to {sensor_id}"
}))
except json_lib.JSONDecodeError:
await ws.send(json_lib.dumps({
"type": "error",
"message": "Invalid message format"
}))import time
from collections import defaultdict
app = Sanic("SecureWebSocketApp")
# Rate limiting storage
client_requests = defaultdict(list)
RATE_LIMIT = 10 # 10 messages per minute
RATE_WINDOW = 60 # 1 minute window
@app.websocket('/ws/secure')
async def secure_websocket(request, ws):
"""WebSocket with authentication and rate limiting."""
# Authenticate connection
token = request.args.get('token') or request.headers.get('Authorization')
if not token:
await ws.close(code=1008, reason="Authentication token required")
return
try:
user = await validate_token(token)
client_id = user['id']
except Exception:
await ws.close(code=1008, reason="Invalid authentication token")
return
await ws.send(json_lib.dumps({
"type": "authenticated",
"user": user['username'],
"message": "Authentication successful"
}))
try:
async for message in ws:
# Rate limiting check
if not check_rate_limit(client_id):
await ws.send(json_lib.dumps({
"type": "error",
"message": "Rate limit exceeded. Please slow down."
}))
continue
# Process message
try:
data = json_lib.loads(message)
# Echo message back with user info
response = {
"type": "message",
"user": user['username'],
"message": data.get("message", ""),
"timestamp": datetime.utcnow().isoformat()
}
await ws.send(json_lib.dumps(response))
except json_lib.JSONDecodeError:
await ws.send(json_lib.dumps({
"type": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
print(f"Secure WebSocket error: {e}")
finally:
# Clean up rate limiting data
if client_id in client_requests:
del client_requests[client_id]
def check_rate_limit(client_id):
"""Check if client is within rate limits."""
now = time.time()
# Clean old requests
client_requests[client_id] = [
req_time for req_time in client_requests[client_id]
if now - req_time < RATE_WINDOW
]
# Check rate limit
if len(client_requests[client_id]) >= RATE_LIMIT:
return False
# Record this request
client_requests[client_id].append(now)
return True
async def validate_token(token):
"""Validate authentication token."""
# Implement your token validation logic
if token == "valid_token":
return {"id": "user123", "username": "testuser"}
raise ValueError("Invalid token")Install with Tessl CLI
npx tessl i tessl/pypi-sanic