0
# WebSocket Support
1
2
BlackSheep provides comprehensive WebSocket support for real-time, full-duplex communication between clients and servers. The framework handles connection management, message routing, and error handling with a clean, async-first API.
3
4
## WebSocket Overview
5
6
WebSockets enable persistent connections for real-time applications like chat systems, live updates, gaming, and collaborative tools.
7
8
### Basic WebSocket Setup
9
10
```python { .api }
11
from blacksheep import Application, WebSocket, WebSocketState
12
from blacksheep.server.websocket import WebSocketError, WebSocketDisconnectError
13
14
app = Application()
15
16
# Simple WebSocket endpoint
17
@app.ws("/ws")
18
async def websocket_handler(websocket: WebSocket):
19
await websocket.accept()
20
21
try:
22
while True:
23
message = await websocket.receive_text()
24
await websocket.send_text(f"Echo: {message}")
25
except WebSocketDisconnectError:
26
print("Client disconnected")
27
```
28
29
## WebSocket Class
30
31
The `WebSocket` class manages connection lifecycle, message sending/receiving, and state tracking.
32
33
### Connection Management
34
35
```python { .api }
36
from blacksheep.server.websocket import WebSocketState, MessageMode
37
38
@app.ws("/chat")
39
async def chat_handler(websocket: WebSocket):
40
# Accept the WebSocket connection
41
await websocket.accept()
42
print(f"Connection state: {websocket.state}") # WebSocketState.CONNECTED
43
44
# Connection information
45
client_ip = websocket.scope.get("client", ["unknown"])[0]
46
print(f"Client connected from: {client_ip}")
47
48
try:
49
# Connection loop
50
while websocket.state == WebSocketState.CONNECTED:
51
message = await websocket.receive_text()
52
await websocket.send_text(f"Received: {message}")
53
54
except WebSocketDisconnectError as e:
55
print(f"Client disconnected with code: {e.code}")
56
57
finally:
58
# Cleanup when connection ends
59
print("Connection closed")
60
```
61
62
### Message Types
63
64
```python { .api }
65
@app.ws("/messages")
66
async def message_handler(websocket: WebSocket):
67
await websocket.accept()
68
69
try:
70
while True:
71
# Receive different message types
72
message_type = await websocket.receive()
73
74
if message_type["type"] == "websocket.receive":
75
if "text" in message_type:
76
# Text message
77
text = message_type["text"]
78
await websocket.send_text(f"Got text: {text}")
79
80
elif "bytes" in message_type:
81
# Binary message
82
data = message_type["bytes"]
83
await websocket.send_bytes(data)
84
85
except WebSocketDisconnectError:
86
pass
87
```
88
89
### Sending Messages
90
91
```python { .api }
92
import json
93
from typing import Dict, Any
94
95
@app.ws("/updates")
96
async def update_handler(websocket: WebSocket):
97
await websocket.accept()
98
99
try:
100
# Send text messages
101
await websocket.send_text("Welcome to updates!")
102
103
# Send JSON messages
104
data = {"type": "notification", "message": "Server started"}
105
await websocket.send_json(data)
106
107
# Send binary data
108
binary_data = b"Binary data here"
109
await websocket.send_bytes(binary_data)
110
111
# Custom JSON serializer
112
complex_data = {"timestamp": datetime.now(), "users": [1, 2, 3]}
113
await websocket.send_json(complex_data, dumps=custom_json_dumps)
114
115
# Keep connection alive
116
while True:
117
await websocket.receive_text()
118
119
except WebSocketDisconnectError:
120
pass
121
```
122
123
### Receiving Messages
124
125
```python { .api }
126
@app.ws("/receiver")
127
async def receive_handler(websocket: WebSocket):
128
await websocket.accept()
129
130
try:
131
while True:
132
# Receive text
133
text_message = await websocket.receive_text()
134
print(f"Received text: {text_message}")
135
136
# Receive JSON with automatic parsing
137
json_data = await websocket.receive_json()
138
print(f"Received JSON: {json_data}")
139
140
# Receive binary data
141
binary_data = await websocket.receive_bytes()
142
print(f"Received {len(binary_data)} bytes")
143
144
# Custom JSON deserializer
145
custom_data = await websocket.receive_json(loads=custom_json_loads)
146
147
except WebSocketDisconnectError:
148
pass
149
```
150
151
## WebSocket Routing
152
153
WebSocket endpoints support the same routing features as HTTP endpoints, including parameter extraction.
154
155
### Route Parameters
156
157
```python { .api }
158
# WebSocket with route parameters
159
@app.ws("/chat/{room_id}")
160
async def chat_room(websocket: WebSocket, room_id: str):
161
await websocket.accept()
162
163
# Access route parameters
164
room = websocket.route_values.get("room_id", "default")
165
print(f"Joined room: {room}")
166
167
try:
168
await websocket.send_text(f"Welcome to room {room}!")
169
170
while True:
171
message = await websocket.receive_text()
172
# Broadcast to room (implementation depends on your architecture)
173
await broadcast_to_room(room, f"{message}")
174
175
except WebSocketDisconnectError:
176
print(f"User left room {room}")
177
178
# Typed route parameters
179
@app.ws("/user/{user_id:int}/notifications")
180
async def user_notifications(websocket: WebSocket, user_id: int):
181
await websocket.accept()
182
183
# user_id is automatically converted to int
184
print(f"Notifications for user {user_id}")
185
186
try:
187
# Send user-specific notifications
188
notifications = await get_user_notifications(user_id)
189
for notification in notifications:
190
await websocket.send_json(notification)
191
192
# Listen for real-time updates
193
while True:
194
await websocket.receive_text() # Keep alive
195
196
except WebSocketDisconnectError:
197
pass
198
```
199
200
## WebSocket State Management
201
202
Track connection states and handle state transitions properly.
203
204
### Connection States
205
206
```python { .api }
207
from blacksheep.server.websocket import WebSocketState
208
209
@app.ws("/stateful")
210
async def stateful_handler(websocket: WebSocket):
211
print(f"Initial state: {websocket.state}") # WebSocketState.CONNECTING
212
213
await websocket.accept()
214
print(f"After accept: {websocket.state}") # WebSocketState.CONNECTED
215
216
try:
217
while websocket.state == WebSocketState.CONNECTED:
218
message = await websocket.receive_text()
219
220
# Check state before sending
221
if websocket.state == WebSocketState.CONNECTED:
222
await websocket.send_text(f"Echo: {message}")
223
else:
224
break
225
226
except WebSocketDisconnectError:
227
pass
228
229
print(f"Final state: {websocket.state}") # WebSocketState.DISCONNECTED
230
```
231
232
### State Validation
233
234
```python { .api }
235
from blacksheep.server.websocket import InvalidWebSocketStateError
236
237
@app.ws("/validated")
238
async def validated_handler(websocket: WebSocket):
239
try:
240
await websocket.accept()
241
242
while True:
243
message = await websocket.receive_text()
244
245
# State validation before operations
246
if websocket.state != WebSocketState.CONNECTED:
247
raise InvalidWebSocketStateError(
248
party="server",
249
current_state=websocket.state,
250
expected_state=WebSocketState.CONNECTED
251
)
252
253
await websocket.send_text(f"Processed: {message}")
254
255
except InvalidWebSocketStateError as e:
256
print(f"Invalid state: {e.current_state}, expected: {e.expected_state}")
257
258
except WebSocketDisconnectError:
259
pass
260
```
261
262
## Error Handling
263
264
Comprehensive error handling for WebSocket connections and operations.
265
266
### WebSocket Exceptions
267
268
```python { .api }
269
from blacksheep.server.websocket import (
270
WebSocketError,
271
WebSocketDisconnectError,
272
InvalidWebSocketStateError
273
)
274
275
@app.ws("/error-handling")
276
async def error_handler(websocket: WebSocket):
277
try:
278
await websocket.accept()
279
280
while True:
281
try:
282
message = await websocket.receive_text()
283
284
# Process message (might raise application errors)
285
result = await process_message(message)
286
await websocket.send_json({"result": result})
287
288
except ValueError as e:
289
# Application error - send error message but keep connection
290
await websocket.send_json({
291
"error": "validation_error",
292
"message": str(e)
293
})
294
295
except Exception as e:
296
# Unexpected error - send error and close connection
297
await websocket.send_json({
298
"error": "internal_error",
299
"message": "An unexpected error occurred"
300
})
301
await websocket.close(code=1011) # Internal server error
302
break
303
304
except WebSocketDisconnectError as e:
305
print(f"Client disconnected: code={e.code}")
306
307
except InvalidWebSocketStateError as e:
308
print(f"Invalid WebSocket state: {e}")
309
310
except WebSocketError as e:
311
print(f"WebSocket error: {e}")
312
313
except Exception as e:
314
print(f"Unexpected error: {e}")
315
# Ensure connection is closed
316
if websocket.state == WebSocketState.CONNECTED:
317
await websocket.close(code=1011)
318
```
319
320
### Connection Close Codes
321
322
```python { .api }
323
@app.ws("/close-codes")
324
async def close_codes_handler(websocket: WebSocket):
325
await websocket.accept()
326
327
try:
328
message = await websocket.receive_text()
329
330
if message == "invalid":
331
# Close with specific code
332
await websocket.close(code=1003) # Unsupported data type
333
334
elif message == "policy":
335
await websocket.close(code=1008) # Policy violation
336
337
elif message == "size":
338
await websocket.close(code=1009) # Message too large
339
340
else:
341
await websocket.send_text("Message accepted")
342
343
except WebSocketDisconnectError as e:
344
# Standard close codes:
345
# 1000 = Normal closure
346
# 1001 = Going away
347
# 1002 = Protocol error
348
# 1003 = Unsupported data
349
# 1007 = Invalid data
350
# 1008 = Policy violation
351
# 1009 = Message too large
352
# 1011 = Internal server error
353
print(f"Connection closed with code: {e.code}")
354
```
355
356
## Real-time Applications
357
358
Examples of common real-time application patterns.
359
360
### Chat Application
361
362
```python { .api }
363
import asyncio
364
from typing import Dict, Set
365
from dataclasses import dataclass
366
367
@dataclass
368
class ChatRoom:
369
room_id: str
370
connections: Set[WebSocket]
371
message_history: list
372
373
# Global room registry
374
chat_rooms: Dict[str, ChatRoom] = {}
375
376
async def get_or_create_room(room_id: str) -> ChatRoom:
377
if room_id not in chat_rooms:
378
chat_rooms[room_id] = ChatRoom(room_id, set(), [])
379
return chat_rooms[room_id]
380
381
async def broadcast_to_room(room: ChatRoom, message: dict, sender: WebSocket = None):
382
"""Broadcast message to all connections in room except sender"""
383
disconnected = set()
384
385
for connection in room.connections:
386
if connection != sender:
387
try:
388
await connection.send_json(message)
389
except WebSocketDisconnectError:
390
disconnected.add(connection)
391
392
# Remove disconnected clients
393
room.connections -= disconnected
394
395
@app.ws("/chat/{room_id}")
396
async def chat_room(websocket: WebSocket, room_id: str):
397
room = await get_or_create_room(room_id)
398
room.connections.add(websocket)
399
400
await websocket.accept()
401
402
# Send chat history
403
for message in room.message_history[-50:]: # Last 50 messages
404
await websocket.send_json(message)
405
406
# Announce user joined
407
join_message = {
408
"type": "user_joined",
409
"room": room_id,
410
"timestamp": time.time()
411
}
412
await broadcast_to_room(room, join_message, websocket)
413
414
try:
415
while True:
416
data = await websocket.receive_json()
417
418
# Create message
419
message = {
420
"type": "chat_message",
421
"room": room_id,
422
"user": data.get("user", "Anonymous"),
423
"message": data.get("message", ""),
424
"timestamp": time.time()
425
}
426
427
# Store in history
428
room.message_history.append(message)
429
if len(room.message_history) > 1000:
430
room.message_history = room.message_history[-1000:]
431
432
# Broadcast to room
433
await broadcast_to_room(room, message, websocket)
434
435
except WebSocketDisconnectError:
436
pass
437
finally:
438
# Remove from room
439
room.connections.discard(websocket)
440
441
# Announce user left
442
leave_message = {
443
"type": "user_left",
444
"room": room_id,
445
"timestamp": time.time()
446
}
447
await broadcast_to_room(room, leave_message)
448
```
449
450
### Live Updates
451
452
```python { .api }
453
import asyncio
454
from typing import Dict, List
455
456
# Global subscribers registry
457
update_subscribers: Dict[str, List[WebSocket]] = {}
458
459
async def add_subscriber(topic: str, websocket: WebSocket):
460
if topic not in update_subscribers:
461
update_subscribers[topic] = []
462
update_subscribers[topic].append(websocket)
463
464
async def remove_subscriber(topic: str, websocket: WebSocket):
465
if topic in update_subscribers:
466
try:
467
update_subscribers[topic].remove(websocket)
468
except ValueError:
469
pass
470
471
async def broadcast_update(topic: str, data: dict):
472
"""Broadcast update to all subscribers of a topic"""
473
if topic not in update_subscribers:
474
return
475
476
disconnected = []
477
478
for websocket in update_subscribers[topic]:
479
try:
480
await websocket.send_json({
481
"topic": topic,
482
"data": data,
483
"timestamp": time.time()
484
})
485
except WebSocketDisconnectError:
486
disconnected.append(websocket)
487
488
# Remove disconnected subscribers
489
for ws in disconnected:
490
await remove_subscriber(topic, ws)
491
492
@app.ws("/updates/{topic}")
493
async def live_updates(websocket: WebSocket, topic: str):
494
await websocket.accept()
495
await add_subscriber(topic, websocket)
496
497
# Send initial data
498
initial_data = await get_topic_data(topic)
499
await websocket.send_json({
500
"type": "initial_data",
501
"topic": topic,
502
"data": initial_data
503
})
504
505
try:
506
# Keep connection alive and handle client messages
507
while True:
508
message = await websocket.receive_json()
509
510
if message.get("type") == "subscribe_additional":
511
additional_topic = message.get("topic")
512
if additional_topic:
513
await add_subscriber(additional_topic, websocket)
514
515
elif message.get("type") == "unsubscribe":
516
unsub_topic = message.get("topic")
517
if unsub_topic:
518
await remove_subscriber(unsub_topic, websocket)
519
520
except WebSocketDisconnectError:
521
pass
522
finally:
523
# Remove from all subscriptions
524
for subscribers in update_subscribers.values():
525
if websocket in subscribers:
526
subscribers.remove(websocket)
527
528
# Trigger updates from other parts of application
529
async def notify_data_change(topic: str, data: dict):
530
"""Call this function when data changes to notify subscribers"""
531
await broadcast_update(topic, data)
532
533
# Example: Update triggered by HTTP endpoint
534
@app.post("/api/data/{topic}")
535
async def update_data(topic: str, data: FromJSON[dict]):
536
# Update data in database
537
await save_data(topic, data.value)
538
539
# Notify WebSocket subscribers
540
await notify_data_change(topic, data.value)
541
542
return json({"updated": True})
543
```
544
545
## Authentication with WebSockets
546
547
Secure WebSocket connections with authentication.
548
549
### WebSocket Authentication
550
551
```python { .api }
552
from blacksheep import auth
553
from guardpost import Identity
554
555
# Authenticated WebSocket endpoint
556
@app.ws("/secure-chat")
557
@auth() # Require authentication
558
async def secure_chat(websocket: WebSocket, request: Request):
559
# Authentication happens before WebSocket upgrade
560
identity: Identity = request.identity
561
user_id = identity.id
562
563
await websocket.accept()
564
565
# Send welcome message with user info
566
await websocket.send_json({
567
"type": "welcome",
568
"user_id": user_id,
569
"username": identity.claims.get("name", "Unknown")
570
})
571
572
try:
573
while True:
574
message = await websocket.receive_json()
575
576
# Add user context to messages
577
message["user_id"] = user_id
578
message["timestamp"] = time.time()
579
580
# Process authenticated message
581
await process_user_message(message)
582
583
except WebSocketDisconnectError:
584
print(f"User {user_id} disconnected")
585
```
586
587
### Token-Based WebSocket Auth
588
589
```python { .api }
590
@app.ws("/token-auth")
591
async def token_auth_websocket(websocket: WebSocket):
592
# Get token from query parameters or headers during handshake
593
query_params = websocket.scope.get("query_string", b"").decode()
594
token = None
595
596
# Parse token from query string
597
if "token=" in query_params:
598
for param in query_params.split("&"):
599
if param.startswith("token="):
600
token = param.split("=", 1)[1]
601
break
602
603
if not token:
604
# Reject connection
605
await websocket.close(code=1008) # Policy violation
606
return
607
608
# Validate token
609
try:
610
user_data = await validate_token(token)
611
except Exception:
612
await websocket.close(code=1008)
613
return
614
615
await websocket.accept()
616
617
# Continue with authenticated connection
618
await websocket.send_json({
619
"type": "authenticated",
620
"user": user_data
621
})
622
623
try:
624
while True:
625
message = await websocket.receive_json()
626
# Process message with user context
627
await process_authenticated_message(message, user_data)
628
629
except WebSocketDisconnectError:
630
pass
631
```
632
633
## Performance and Scaling
634
635
Optimize WebSocket performance for high-concurrency scenarios.
636
637
### Connection Management
638
639
```python { .api }
640
import weakref
641
from typing import WeakSet
642
643
# Use weak references to avoid memory leaks
644
active_connections: WeakSet[WebSocket] = weakref.WeakSet()
645
646
@app.ws("/optimized")
647
async def optimized_handler(websocket: WebSocket):
648
await websocket.accept()
649
active_connections.add(websocket)
650
651
try:
652
# Efficient message processing
653
while True:
654
message = await websocket.receive_text()
655
656
# Process in background to avoid blocking
657
asyncio.create_task(process_message_async(message, websocket))
658
659
except WebSocketDisconnectError:
660
pass
661
# WeakSet automatically removes disconnected websockets
662
663
async def process_message_async(message: str, websocket: WebSocket):
664
"""Process message without blocking the receive loop"""
665
try:
666
result = await heavy_processing(message)
667
668
if websocket.state == WebSocketState.CONNECTED:
669
await websocket.send_json({"result": result})
670
671
except Exception as e:
672
if websocket.state == WebSocketState.CONNECTED:
673
await websocket.send_json({"error": str(e)})
674
```
675
676
### Broadcast Optimization
677
678
```python { .api }
679
import asyncio
680
from typing import List
681
682
async def efficient_broadcast(connections: List[WebSocket], message: dict):
683
"""Efficiently broadcast to multiple connections"""
684
685
# Serialize message once
686
serialized = json.dumps(message).encode()
687
688
async def send_to_connection(websocket: WebSocket):
689
try:
690
if websocket.state == WebSocketState.CONNECTED:
691
await websocket.send_bytes(serialized)
692
except WebSocketDisconnectError:
693
pass # Connection already closed
694
695
# Send to all connections concurrently
696
tasks = [send_to_connection(ws) for ws in connections]
697
await asyncio.gather(*tasks, return_exceptions=True)
698
```
699
700
BlackSheep's WebSocket support provides a robust foundation for building real-time applications with proper error handling, authentication, and performance optimization capabilities.