0
# Routing
1
2
WebSocket server routing with werkzeug integration for URL pattern matching, parameter extraction, and request routing to different handlers based on URL patterns. Supports both asyncio and synchronous implementations.
3
4
## Capabilities
5
6
### Asyncio Routing Functions
7
8
Create routing WebSocket servers that dispatch connections to different handlers based on URL patterns.
9
10
```python { .api }
11
async def route(
12
router: Router,
13
host: str,
14
port: int,
15
*,
16
logger: LoggerLike = None,
17
compression: str = "deflate",
18
subprotocols: List[Subprotocol] = None,
19
extra_headers: HeadersLike = None,
20
process_request: Callable = None,
21
select_subprotocol: Callable = None,
22
ping_interval: float = 20,
23
ping_timeout: float = 20,
24
close_timeout: float = 10,
25
max_size: int = 2**20,
26
max_queue: int = 2**5,
27
read_limit: int = 2**16,
28
write_limit: int = 2**16,
29
extensions: List[ServerExtensionFactory] = None,
30
**kwargs
31
) -> Server:
32
"""
33
Start an asyncio WebSocket server with routing.
34
35
Parameters:
36
- router: Router instance with registered routes
37
- host: Server host address
38
- port: Server port number
39
- Other parameters same as websockets.serve()
40
41
Returns:
42
Server: WebSocket server with routing capabilities
43
44
Raises:
45
- OSError: If server cannot bind to address/port
46
"""
47
48
async def unix_route(
49
router: Router,
50
path: str,
51
*,
52
logger: LoggerLike = None,
53
compression: str = "deflate",
54
subprotocols: List[Subprotocol] = None,
55
extra_headers: HeadersLike = None,
56
process_request: Callable = None,
57
select_subprotocol: Callable = None,
58
ping_interval: float = 20,
59
ping_timeout: float = 20,
60
close_timeout: float = 10,
61
max_size: int = 2**20,
62
max_queue: int = 2**5,
63
read_limit: int = 2**16,
64
write_limit: int = 2**16,
65
extensions: List[ServerExtensionFactory] = None,
66
**kwargs
67
) -> Server:
68
"""
69
Start an asyncio WebSocket server with routing on Unix domain socket.
70
71
Parameters:
72
- router: Router instance with registered routes
73
- path: Unix domain socket path
74
- Other parameters same as unix_route()
75
76
Returns:
77
Server: WebSocket server with routing on Unix socket
78
"""
79
```
80
81
### Synchronous Routing Functions
82
83
Create synchronous routing WebSocket servers for traditional Python applications.
84
85
```python { .api }
86
def route(
87
router: Router,
88
host: str,
89
port: int,
90
*,
91
logger: LoggerLike = None,
92
compression: str = "deflate",
93
subprotocols: List[Subprotocol] = None,
94
extra_headers: HeadersLike = None,
95
process_request: Callable = None,
96
select_subprotocol: Callable = None,
97
ping_interval: float = 20,
98
ping_timeout: float = 20,
99
close_timeout: float = 10,
100
max_size: int = 2**20,
101
max_queue: int = 2**5,
102
read_limit: int = 2**16,
103
write_limit: int = 2**16,
104
extensions: List[ServerExtensionFactory] = None,
105
**kwargs
106
) -> Server:
107
"""
108
Start a synchronous WebSocket server with routing.
109
110
Parameters:
111
- router: Router instance with registered routes
112
- host: Server host address
113
- port: Server port number
114
- Other parameters same as websockets.sync.serve()
115
116
Returns:
117
Server: Synchronous WebSocket server with routing capabilities
118
"""
119
120
def unix_route(
121
router: Router,
122
path: str,
123
*,
124
logger: LoggerLike = None,
125
compression: str = "deflate",
126
subprotocols: List[Subprotocol] = None,
127
extra_headers: HeadersLike = None,
128
process_request: Callable = None,
129
select_subprotocol: Callable = None,
130
ping_interval: float = 20,
131
ping_timeout: float = 20,
132
close_timeout: float = 10,
133
max_size: int = 2**20,
134
max_queue: int = 2**5,
135
read_limit: int = 2**16,
136
write_limit: int = 2**16,
137
extensions: List[ServerExtensionFactory] = None,
138
**kwargs
139
) -> Server:
140
"""
141
Start a synchronous WebSocket server with routing on Unix domain socket.
142
143
Parameters:
144
- router: Router instance with registered routes
145
- path: Unix domain socket path
146
- Other parameters same as sync route()
147
148
Returns:
149
Server: Synchronous WebSocket server with routing on Unix socket
150
"""
151
```
152
153
### Router Class
154
155
The Router class manages URL patterns and dispatches WebSocket connections to appropriate handlers.
156
157
```python { .api }
158
class Router:
159
"""
160
WebSocket request router with werkzeug integration.
161
162
Supports URL pattern matching, parameter extraction, and
163
handler dispatching based on request paths.
164
"""
165
166
def __init__(self):
167
"""Initialize empty router."""
168
169
def route(
170
self,
171
path: str,
172
*,
173
methods: List[str] = None,
174
host: str = None,
175
subdomain: str = None,
176
strict_slashes: bool = None,
177
merge_slashes: bool = None,
178
websocket: bool = True
179
) -> Callable:
180
"""
181
Decorator to register a route handler.
182
183
Parameters:
184
- path: URL pattern (supports werkzeug URL rules)
185
- methods: HTTP methods (for initial handshake)
186
- host: Host matching pattern
187
- subdomain: Subdomain matching pattern
188
- strict_slashes: Strict slash handling
189
- merge_slashes: Merge consecutive slashes
190
- websocket: Enable WebSocket upgrade (default True)
191
192
Returns:
193
Callable: Decorator function
194
195
Usage:
196
@router.route("/chat/<room_id>")
197
async def chat_handler(websocket, room_id):
198
# Handler implementation
199
pass
200
"""
201
202
def add_route(
203
self,
204
handler: Callable,
205
path: str,
206
*,
207
methods: List[str] = None,
208
host: str = None,
209
subdomain: str = None,
210
strict_slashes: bool = None,
211
merge_slashes: bool = None,
212
websocket: bool = True
213
) -> None:
214
"""
215
Register a route handler programmatically.
216
217
Parameters:
218
- handler: WebSocket handler function
219
- path: URL pattern
220
- Other parameters same as route() decorator
221
222
Raises:
223
- ValueError: If handler or path is invalid
224
"""
225
226
def match(self, path: str, method: str = "GET") -> Tuple[Callable, Dict[str, Any]]:
227
"""
228
Match a request path to a handler.
229
230
Parameters:
231
- path: Request path to match
232
- method: HTTP method
233
234
Returns:
235
Tuple[Callable, Dict]: Handler function and extracted parameters
236
237
Raises:
238
- NotFound: If no route matches the path
239
"""
240
241
@property
242
def routes(self) -> List[str]:
243
"""Get list of registered route patterns."""
244
```
245
246
## Usage Examples
247
248
### Basic Asyncio Routing
249
250
```python
251
import asyncio
252
from websockets.asyncio import route, Router
253
254
# Create router
255
router = Router()
256
257
@router.route("/")
258
async def root_handler(websocket):
259
"""Handle root path connections."""
260
await websocket.send("Welcome to WebSocket server!")
261
262
async for message in websocket:
263
await websocket.send(f"Root echo: {message}")
264
265
@router.route("/chat")
266
async def chat_handler(websocket):
267
"""Handle chat connections."""
268
await websocket.send("Joined general chat")
269
270
async for message in websocket:
271
# Broadcast to all chat clients (simplified)
272
await websocket.send(f"Chat: {message}")
273
274
@router.route("/echo")
275
async def echo_handler(websocket):
276
"""Handle echo connections."""
277
async for message in websocket:
278
await websocket.send(f"Echo: {message}")
279
280
async def main():
281
"""Start routing server."""
282
async with route(router, "localhost", 8765):
283
print("Routing WebSocket server started on ws://localhost:8765")
284
print("Routes: /", "/chat", "/echo")
285
await asyncio.Future() # Run forever
286
287
asyncio.run(main())
288
```
289
290
### Routing with Parameters
291
292
```python
293
import asyncio
294
from websockets.asyncio import route, Router
295
import json
296
297
router = Router()
298
299
# Global room storage (simplified)
300
rooms = {}
301
302
@router.route("/room/<room_id>")
303
async def room_handler(websocket, room_id):
304
"""Handle room-specific connections."""
305
# Initialize room if doesn't exist
306
if room_id not in rooms:
307
rooms[room_id] = set()
308
309
# Add client to room
310
rooms[room_id].add(websocket)
311
312
try:
313
await websocket.send(json.dumps({
314
"type": "joined",
315
"room": room_id,
316
"message": f"Joined room {room_id}"
317
}))
318
319
async for message in websocket:
320
try:
321
data = json.loads(message)
322
323
# Broadcast to all clients in room
324
broadcast_message = json.dumps({
325
"type": "message",
326
"room": room_id,
327
"user": data.get("user", "Anonymous"),
328
"message": data.get("message", "")
329
})
330
331
# Send to all clients in room
332
for client in rooms[room_id].copy():
333
try:
334
await client.send(broadcast_message)
335
except:
336
rooms[room_id].discard(client)
337
338
except json.JSONDecodeError:
339
await websocket.send(json.dumps({
340
"type": "error",
341
"message": "Invalid JSON"
342
}))
343
344
except Exception as e:
345
print(f"Room handler error: {e}")
346
finally:
347
# Remove client from room
348
if room_id in rooms:
349
rooms[room_id].discard(websocket)
350
if not rooms[room_id]: # Remove empty room
351
del rooms[room_id]
352
353
@router.route("/user/<user_id>/notifications")
354
async def notification_handler(websocket, user_id):
355
"""Handle user-specific notification connections."""
356
await websocket.send(json.dumps({
357
"type": "connected",
358
"user_id": user_id,
359
"message": f"Notification channel for user {user_id}"
360
}))
361
362
# In real application, you'd subscribe to user-specific events
363
# For demo, just echo messages with user context
364
async for message in websocket:
365
await websocket.send(json.dumps({
366
"type": "notification",
367
"user_id": user_id,
368
"data": message
369
}))
370
371
@router.route("/api/v<int:version>/ws")
372
async def versioned_api_handler(websocket, version):
373
"""Handle versioned API connections."""
374
await websocket.send(json.dumps({
375
"type": "api_connected",
376
"version": version,
377
"message": f"Connected to API v{version}"
378
}))
379
380
async for message in websocket:
381
try:
382
data = json.loads(message)
383
command = data.get("command")
384
385
if command == "status":
386
response = {
387
"type": "status",
388
"version": version,
389
"status": "healthy"
390
}
391
elif command == "info":
392
response = {
393
"type": "info",
394
"version": version,
395
"features": ["chat", "notifications", "api"]
396
}
397
else:
398
response = {
399
"type": "error",
400
"message": f"Unknown command: {command}"
401
}
402
403
await websocket.send(json.dumps(response))
404
405
except json.JSONDecodeError:
406
await websocket.send(json.dumps({
407
"type": "error",
408
"message": "Invalid JSON"
409
}))
410
411
async def main():
412
async with route(router, "localhost", 8765):
413
print("Advanced routing server started on ws://localhost:8765")
414
print("Routes:")
415
print(" /room/<room_id>")
416
print(" /user/<user_id>/notifications")
417
print(" /api/v<version>/ws")
418
await asyncio.Future()
419
420
asyncio.run(main())
421
```
422
423
### Synchronous Routing
424
425
```python
426
from websockets.sync import route, Router
427
import json
428
import threading
429
430
router = Router()
431
432
# Thread-safe storage
433
rooms = {}
434
rooms_lock = threading.Lock()
435
436
@router.route("/")
437
def root_handler(websocket):
438
"""Synchronous root handler."""
439
websocket.send("Welcome to sync WebSocket server!")
440
441
for message in websocket:
442
websocket.send(f"Sync echo: {message}")
443
444
@router.route("/room/<room_id>")
445
def room_handler(websocket, room_id):
446
"""Synchronous room handler."""
447
# Thread-safe room management
448
with rooms_lock:
449
if room_id not in rooms:
450
rooms[room_id] = set()
451
rooms[room_id].add(websocket)
452
453
try:
454
websocket.send(json.dumps({
455
"type": "joined",
456
"room": room_id
457
}))
458
459
for message in websocket:
460
try:
461
data = json.loads(message)
462
463
# Broadcast to room (thread-safe)
464
with rooms_lock:
465
room_clients = rooms[room_id].copy()
466
467
broadcast_message = json.dumps({
468
"type": "message",
469
"room": room_id,
470
"user": data.get("user", "Anonymous"),
471
"message": data.get("message", "")
472
})
473
474
for client in room_clients:
475
try:
476
client.send(broadcast_message)
477
except:
478
with rooms_lock:
479
rooms[room_id].discard(client)
480
481
except json.JSONDecodeError:
482
websocket.send(json.dumps({
483
"type": "error",
484
"message": "Invalid JSON"
485
}))
486
finally:
487
# Clean up
488
with rooms_lock:
489
if room_id in rooms:
490
rooms[room_id].discard(websocket)
491
if not rooms[room_id]:
492
del rooms[room_id]
493
494
def main():
495
"""Start synchronous routing server."""
496
with route(router, "localhost", 8765) as server:
497
print("Sync routing server started on ws://localhost:8765")
498
print("Routes: /, /room/<room_id>")
499
server.serve_forever()
500
501
if __name__ == "__main__":
502
main()
503
```
504
505
### Advanced Routing with Middleware
506
507
```python
508
import asyncio
509
from websockets.asyncio import route, Router
510
from websockets import Request, Response
511
import time
512
import logging
513
514
# Set up logging
515
logging.basicConfig(level=logging.INFO)
516
logger = logging.getLogger(__name__)
517
518
router = Router()
519
520
def rate_limit_middleware(max_requests=10, window_seconds=60):
521
"""Rate limiting middleware."""
522
clients = {}
523
524
def middleware(connection, request: Request):
525
client_ip = connection.remote_address[0]
526
current_time = time.time()
527
528
# Clean old entries
529
if client_ip in clients:
530
clients[client_ip] = [
531
req_time for req_time in clients[client_ip]
532
if current_time - req_time < window_seconds
533
]
534
else:
535
clients[client_ip] = []
536
537
# Check rate limit
538
if len(clients[client_ip]) >= max_requests:
539
logger.warning(f"Rate limit exceeded for {client_ip}")
540
return Response(429, "Too Many Requests", b"Rate limit exceeded")
541
542
# Add current request
543
clients[client_ip].append(current_time)
544
return None # Allow request
545
546
return middleware
547
548
def auth_middleware(api_keys):
549
"""API key authentication middleware."""
550
def middleware(connection, request: Request):
551
api_key = request.headers.get("X-API-Key")
552
if not api_key or api_key not in api_keys:
553
logger.warning(f"Invalid API key from {connection.remote_address}")
554
return Response(401, "Unauthorized", b"Invalid API key")
555
return None
556
return middleware
557
558
# Apply middleware
559
rate_limiter = rate_limit_middleware(max_requests=5, window_seconds=30)
560
auth_checker = auth_middleware({"valid-key-1", "valid-key-2"})
561
562
@router.route("/public")
563
async def public_handler(websocket):
564
"""Public endpoint (no auth required)."""
565
await websocket.send("Public endpoint - no auth required")
566
567
async for message in websocket:
568
await websocket.send(f"Public: {message}")
569
570
@router.route("/protected")
571
async def protected_handler(websocket):
572
"""Protected endpoint (auth required)."""
573
await websocket.send("Protected endpoint - authenticated")
574
575
async for message in websocket:
576
await websocket.send(f"Protected: {message}")
577
578
async def process_request_with_middleware(connection, request: Request):
579
"""Process request through middleware chain."""
580
# Apply rate limiting to all routes
581
response = rate_limiter(connection, request)
582
if response:
583
return response
584
585
# Apply authentication to protected routes
586
if request.path.startswith("/protected"):
587
response = auth_checker(connection, request)
588
if response:
589
return response
590
591
# Log successful requests
592
logger.info(f"Request: {request.method} {request.path} from {connection.remote_address}")
593
return None
594
595
async def main():
596
"""Start server with middleware."""
597
async with route(
598
router,
599
"localhost",
600
8765,
601
process_request=process_request_with_middleware,
602
extra_headers={"Server": "WebSocket-Router/1.0"}
603
):
604
print("Middleware routing server started on ws://localhost:8765")
605
print("Routes:")
606
print(" /public (no auth)")
607
print(" /protected (requires X-API-Key header)")
608
print("Rate limit: 5 requests per 30 seconds")
609
await asyncio.Future()
610
611
asyncio.run(main())
612
```
613
614
### Route Testing
615
616
```python
617
from websockets.asyncio import Router
618
from websockets.exceptions import NotFound
619
620
def test_router():
621
"""Test router functionality."""
622
router = Router()
623
624
# Register test routes
625
@router.route("/")
626
def root():
627
return "root"
628
629
@router.route("/user/<user_id>")
630
def user_profile(user_id):
631
return f"user_{user_id}"
632
633
@router.route("/api/v<int:version>/data")
634
def api_data(version):
635
return f"api_v{version}"
636
637
# Test route matching
638
test_cases = [
639
("/", "root"),
640
("/user/123", "user_123"),
641
("/api/v1/data", "api_v1"),
642
("/api/v2/data", "api_v2"),
643
]
644
645
for path, expected in test_cases:
646
try:
647
handler, params = router.match(path)
648
result = handler(**params) if params else handler()
649
print(f"✓ {path} -> {result} (expected {expected})")
650
assert result == expected
651
except Exception as e:
652
print(f"✗ {path} -> Error: {e}")
653
654
# Test non-matching routes
655
try:
656
router.match("/nonexistent")
657
print("✗ Should have raised NotFound")
658
except NotFound:
659
print("✓ Non-existent route correctly raises NotFound")
660
661
print(f"Registered routes: {router.routes}")
662
663
# Uncomment to run tests
664
# test_router()
665
```