Real-time bidirectional communication with security focus on CSWSH prevention, authentication, and message validation
69
58%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advisory
Suggest reviewing before use
Optimize this skill with Tessl
npx tessl skill review --optimize ./skills/websocket/SKILL.mdGate 0.2: PASSED (5+ vulnerabilities documented) - CVE-2024-23898, CVE-2024-26135, CVE-2023-0957
Risk Level: HIGH
Justification: WebSocket connections bypass Same-Origin Policy protections, making them vulnerable to Cross-Site WebSocket Hijacking (CSWSH). Persistent connections require careful authentication, session management, and input validation.
You are an expert in WebSocket security, understanding the unique vulnerabilities of persistent bidirectional connections.
| Situation | Approach |
|---|---|
| New connection | Validate Origin, require authentication token |
| Each message | Validate format, check authorization for action |
| Sensitive operations | Re-verify session, log action |
| Idle connection | Timeout after inactivity period |
| Error condition | Close connection, log details |
| Component | Version | Notes |
|---|---|---|
| FastAPI/Starlette | 0.115+ | WebSocket support |
| websockets | 12.0+ | Python WebSocket library |
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}
# NEVER use "*" for origins
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient
# Test security boundaries first
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH prevention - must reject invalid origins."""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# This should fail until origin validation is implemented
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""Must reject connections without valid token."""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""Each message action must be authorized."""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"# Implement only what's needed to pass the test
async def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True# Run all WebSocket tests
pytest tests/websocket/ -v --asyncio-mode=auto
# Check for security issues
bandit -r src/websocket/
# Verify no regressions
pytest tests/ -v# BAD - Create new connection for each request
ws = await create_connection(user_id) # Expensive!
# GOOD - Reuse connections from pool
class ConnectionPool:
def __init__(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]# BAD - Send messages one at a time
for item in items:
await websocket.send_json({"type": "item", "data": item})
# GOOD - Batch messages to reduce overhead
await websocket.send_json({"type": "batch", "data": items[:50]})# BAD - JSON for high-frequency data (~80 bytes)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
# GOOD - Binary format (20 bytes)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))# BAD - Fixed frequent heartbeats
HEARTBEAT_INTERVAL = 5 # Every 5 seconds
# GOOD - Adaptive heartbeats based on activity
interval = 60 if (time() - last_activity) < 60 else 30# BAD - Blocks on slow clients
await ws.send_json(message)
# GOOD - Timeout and bounded queue
from collections import deque
queue = deque(maxlen=100) # Drop oldest when full
try:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
except asyncio.TimeoutError:
pass # Client too slowfrom fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""Validate WebSocket origin against allowlist."""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""Authenticate via token (not cookies - vulnerable to CSWSH)."""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return Nonefrom pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return True@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-23898 | HIGH | Jenkins CSWSH - command execution | Validate Origin |
| CVE-2024-26135 | HIGH | MeshCentral CSWSH - config leak | Origin + SameSite |
| CVE-2023-0957 | CRITICAL | Gitpod CSWSH - account takeover | Origin + token auth |
| Category | Mitigations |
|---|---|
| A01 Access Control | Origin validation, per-message authz |
| A02 Crypto Failures | TLS/WSS only, signed tokens |
| A03 Injection | Validate all message content |
| A07 Auth Failures | Token auth, session validation |
async def secure_websocket_handler(websocket: WebSocket):
# 1. VALIDATE ORIGIN (Critical)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. AUTHENTICATE with token (not cookies)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. Accept only after validation
await websocket.accept()
# 4. AUTHORIZE each message, 5. RATE LIMIT, 6. TIMEOUT idle# NEVER - vulnerable to CSWSH
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # Accepts any origin!
# ALWAYS - validate origin first
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return# NEVER - cookies sent automatically in CSWSH attacks
session = websocket.cookies.get("session")
# ALWAYS - require explicit token parameter
token = websocket.query_params.get("token")# NEVER - assumes connection = full access
if data["action"] == "delete":
await delete_resource(data["id"])
# ALWAYS - check permission for each action
if not user.has_permission("delete"):
return {"error": "Permission denied"}# NEVER - trust WebSocket messages
await db.execute(f"SELECT * FROM {data['table']}") # SQL injection!
# ALWAYS - validate with Pydantic
message = WebSocketMessage(**data)references/threat-model.mdpytest tests/websocket/ -vbandit -r src/websocket/pytest tests/ -vSecurity Goals:
Critical Reminders: ALWAYS validate Origin, use token auth (not cookies), authorize EACH message, use WSS in production.
1086ef2
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.