or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio-client.mdasyncio-server.mddata-structures.mdexceptions.mdextensions.mdindex.mdprotocol.mdrouting.mdsync-client.mdsync-server.md

routing.mddocs/

0

# Routing

1

2

WebSocket server routing with werkzeug integration for URL pattern matching, parameter extraction, and request routing to different handlers based on URL patterns. Supports both asyncio and synchronous implementations.

3

4

## Capabilities

5

6

### Asyncio Routing Functions

7

8

Create routing WebSocket servers that dispatch connections to different handlers based on URL patterns.

9

10

```python { .api }

11

async def route(

12

router: Router,

13

host: str,

14

port: int,

15

*,

16

logger: LoggerLike = None,

17

compression: str = "deflate",

18

subprotocols: List[Subprotocol] = None,

19

extra_headers: HeadersLike = None,

20

process_request: Callable = None,

21

select_subprotocol: Callable = None,

22

ping_interval: float = 20,

23

ping_timeout: float = 20,

24

close_timeout: float = 10,

25

max_size: int = 2**20,

26

max_queue: int = 2**5,

27

read_limit: int = 2**16,

28

write_limit: int = 2**16,

29

extensions: List[ServerExtensionFactory] = None,

30

**kwargs

31

) -> Server:

32

"""

33

Start an asyncio WebSocket server with routing.

34

35

Parameters:

36

- router: Router instance with registered routes

37

- host: Server host address

38

- port: Server port number

39

- Other parameters same as websockets.serve()

40

41

Returns:

42

Server: WebSocket server with routing capabilities

43

44

Raises:

45

- OSError: If server cannot bind to address/port

46

"""

47

48

async def unix_route(

49

router: Router,

50

path: str,

51

*,

52

logger: LoggerLike = None,

53

compression: str = "deflate",

54

subprotocols: List[Subprotocol] = None,

55

extra_headers: HeadersLike = None,

56

process_request: Callable = None,

57

select_subprotocol: Callable = None,

58

ping_interval: float = 20,

59

ping_timeout: float = 20,

60

close_timeout: float = 10,

61

max_size: int = 2**20,

62

max_queue: int = 2**5,

63

read_limit: int = 2**16,

64

write_limit: int = 2**16,

65

extensions: List[ServerExtensionFactory] = None,

66

**kwargs

67

) -> Server:

68

"""

69

Start an asyncio WebSocket server with routing on Unix domain socket.

70

71

Parameters:

72

- router: Router instance with registered routes

73

- path: Unix domain socket path

74

- Other parameters same as unix_route()

75

76

Returns:

77

Server: WebSocket server with routing on Unix socket

78

"""

79

```

80

81

### Synchronous Routing Functions

82

83

Create synchronous routing WebSocket servers for traditional Python applications.

84

85

```python { .api }

86

def route(

87

router: Router,

88

host: str,

89

port: int,

90

*,

91

logger: LoggerLike = None,

92

compression: str = "deflate",

93

subprotocols: List[Subprotocol] = None,

94

extra_headers: HeadersLike = None,

95

process_request: Callable = None,

96

select_subprotocol: Callable = None,

97

ping_interval: float = 20,

98

ping_timeout: float = 20,

99

close_timeout: float = 10,

100

max_size: int = 2**20,

101

max_queue: int = 2**5,

102

read_limit: int = 2**16,

103

write_limit: int = 2**16,

104

extensions: List[ServerExtensionFactory] = None,

105

**kwargs

106

) -> Server:

107

"""

108

Start a synchronous WebSocket server with routing.

109

110

Parameters:

111

- router: Router instance with registered routes

112

- host: Server host address

113

- port: Server port number

114

- Other parameters same as websockets.sync.serve()

115

116

Returns:

117

Server: Synchronous WebSocket server with routing capabilities

118

"""

119

120

def unix_route(

121

router: Router,

122

path: str,

123

*,

124

logger: LoggerLike = None,

125

compression: str = "deflate",

126

subprotocols: List[Subprotocol] = None,

127

extra_headers: HeadersLike = None,

128

process_request: Callable = None,

129

select_subprotocol: Callable = None,

130

ping_interval: float = 20,

131

ping_timeout: float = 20,

132

close_timeout: float = 10,

133

max_size: int = 2**20,

134

max_queue: int = 2**5,

135

read_limit: int = 2**16,

136

write_limit: int = 2**16,

137

extensions: List[ServerExtensionFactory] = None,

138

**kwargs

139

) -> Server:

140

"""

141

Start a synchronous WebSocket server with routing on Unix domain socket.

142

143

Parameters:

144

- router: Router instance with registered routes

145

- path: Unix domain socket path

146

- Other parameters same as sync route()

147

148

Returns:

149

Server: Synchronous WebSocket server with routing on Unix socket

150

"""

151

```

152

153

### Router Class

154

155

The Router class manages URL patterns and dispatches WebSocket connections to appropriate handlers.

156

157

```python { .api }

158

class Router:

159

"""

160

WebSocket request router with werkzeug integration.

161

162

Supports URL pattern matching, parameter extraction, and

163

handler dispatching based on request paths.

164

"""

165

166

def __init__(self):

167

"""Initialize empty router."""

168

169

def route(

170

self,

171

path: str,

172

*,

173

methods: List[str] = None,

174

host: str = None,

175

subdomain: str = None,

176

strict_slashes: bool = None,

177

merge_slashes: bool = None,

178

websocket: bool = True

179

) -> Callable:

180

"""

181

Decorator to register a route handler.

182

183

Parameters:

184

- path: URL pattern (supports werkzeug URL rules)

185

- methods: HTTP methods (for initial handshake)

186

- host: Host matching pattern

187

- subdomain: Subdomain matching pattern

188

- strict_slashes: Strict slash handling

189

- merge_slashes: Merge consecutive slashes

190

- websocket: Enable WebSocket upgrade (default True)

191

192

Returns:

193

Callable: Decorator function

194

195

Usage:

196

@router.route("/chat/<room_id>")

197

async def chat_handler(websocket, room_id):

198

# Handler implementation

199

pass

200

"""

201

202

def add_route(

203

self,

204

handler: Callable,

205

path: str,

206

*,

207

methods: List[str] = None,

208

host: str = None,

209

subdomain: str = None,

210

strict_slashes: bool = None,

211

merge_slashes: bool = None,

212

websocket: bool = True

213

) -> None:

214

"""

215

Register a route handler programmatically.

216

217

Parameters:

218

- handler: WebSocket handler function

219

- path: URL pattern

220

- Other parameters same as route() decorator

221

222

Raises:

223

- ValueError: If handler or path is invalid

224

"""

225

226

def match(self, path: str, method: str = "GET") -> Tuple[Callable, Dict[str, Any]]:

227

"""

228

Match a request path to a handler.

229

230

Parameters:

231

- path: Request path to match

232

- method: HTTP method

233

234

Returns:

235

Tuple[Callable, Dict]: Handler function and extracted parameters

236

237

Raises:

238

- NotFound: If no route matches the path

239

"""

240

241

@property

242

def routes(self) -> List[str]:

243

"""Get list of registered route patterns."""

244

```

245

246

## Usage Examples

247

248

### Basic Asyncio Routing

249

250

```python

251

import asyncio

252

from websockets.asyncio import route, Router

253

254

# Create router

255

router = Router()

256

257

@router.route("/")

258

async def root_handler(websocket):

259

"""Handle root path connections."""

260

await websocket.send("Welcome to WebSocket server!")

261

262

async for message in websocket:

263

await websocket.send(f"Root echo: {message}")

264

265

@router.route("/chat")

266

async def chat_handler(websocket):

267

"""Handle chat connections."""

268

await websocket.send("Joined general chat")

269

270

async for message in websocket:

271

# Broadcast to all chat clients (simplified)

272

await websocket.send(f"Chat: {message}")

273

274

@router.route("/echo")

275

async def echo_handler(websocket):

276

"""Handle echo connections."""

277

async for message in websocket:

278

await websocket.send(f"Echo: {message}")

279

280

async def main():

281

"""Start routing server."""

282

async with route(router, "localhost", 8765):

283

print("Routing WebSocket server started on ws://localhost:8765")

284

print("Routes: /", "/chat", "/echo")

285

await asyncio.Future() # Run forever

286

287

asyncio.run(main())

288

```

289

290

### Routing with Parameters

291

292

```python

293

import asyncio

294

from websockets.asyncio import route, Router

295

import json

296

297

router = Router()

298

299

# Global room storage (simplified)

300

rooms = {}

301

302

@router.route("/room/<room_id>")

303

async def room_handler(websocket, room_id):

304

"""Handle room-specific connections."""

305

# Initialize room if doesn't exist

306

if room_id not in rooms:

307

rooms[room_id] = set()

308

309

# Add client to room

310

rooms[room_id].add(websocket)

311

312

try:

313

await websocket.send(json.dumps({

314

"type": "joined",

315

"room": room_id,

316

"message": f"Joined room {room_id}"

317

}))

318

319

async for message in websocket:

320

try:

321

data = json.loads(message)

322

323

# Broadcast to all clients in room

324

broadcast_message = json.dumps({

325

"type": "message",

326

"room": room_id,

327

"user": data.get("user", "Anonymous"),

328

"message": data.get("message", "")

329

})

330

331

# Send to all clients in room

332

for client in rooms[room_id].copy():

333

try:

334

await client.send(broadcast_message)

335

except:

336

rooms[room_id].discard(client)

337

338

except json.JSONDecodeError:

339

await websocket.send(json.dumps({

340

"type": "error",

341

"message": "Invalid JSON"

342

}))

343

344

except Exception as e:

345

print(f"Room handler error: {e}")

346

finally:

347

# Remove client from room

348

if room_id in rooms:

349

rooms[room_id].discard(websocket)

350

if not rooms[room_id]: # Remove empty room

351

del rooms[room_id]

352

353

@router.route("/user/<user_id>/notifications")

354

async def notification_handler(websocket, user_id):

355

"""Handle user-specific notification connections."""

356

await websocket.send(json.dumps({

357

"type": "connected",

358

"user_id": user_id,

359

"message": f"Notification channel for user {user_id}"

360

}))

361

362

# In real application, you'd subscribe to user-specific events

363

# For demo, just echo messages with user context

364

async for message in websocket:

365

await websocket.send(json.dumps({

366

"type": "notification",

367

"user_id": user_id,

368

"data": message

369

}))

370

371

@router.route("/api/v<int:version>/ws")

372

async def versioned_api_handler(websocket, version):

373

"""Handle versioned API connections."""

374

await websocket.send(json.dumps({

375

"type": "api_connected",

376

"version": version,

377

"message": f"Connected to API v{version}"

378

}))

379

380

async for message in websocket:

381

try:

382

data = json.loads(message)

383

command = data.get("command")

384

385

if command == "status":

386

response = {

387

"type": "status",

388

"version": version,

389

"status": "healthy"

390

}

391

elif command == "info":

392

response = {

393

"type": "info",

394

"version": version,

395

"features": ["chat", "notifications", "api"]

396

}

397

else:

398

response = {

399

"type": "error",

400

"message": f"Unknown command: {command}"

401

}

402

403

await websocket.send(json.dumps(response))

404

405

except json.JSONDecodeError:

406

await websocket.send(json.dumps({

407

"type": "error",

408

"message": "Invalid JSON"

409

}))

410

411

async def main():

412

async with route(router, "localhost", 8765):

413

print("Advanced routing server started on ws://localhost:8765")

414

print("Routes:")

415

print(" /room/<room_id>")

416

print(" /user/<user_id>/notifications")

417

print(" /api/v<version>/ws")

418

await asyncio.Future()

419

420

asyncio.run(main())

421

```

422

423

### Synchronous Routing

424

425

```python

426

from websockets.sync import route, Router

427

import json

428

import threading

429

430

router = Router()

431

432

# Thread-safe storage

433

rooms = {}

434

rooms_lock = threading.Lock()

435

436

@router.route("/")

437

def root_handler(websocket):

438

"""Synchronous root handler."""

439

websocket.send("Welcome to sync WebSocket server!")

440

441

for message in websocket:

442

websocket.send(f"Sync echo: {message}")

443

444

@router.route("/room/<room_id>")

445

def room_handler(websocket, room_id):

446

"""Synchronous room handler."""

447

# Thread-safe room management

448

with rooms_lock:

449

if room_id not in rooms:

450

rooms[room_id] = set()

451

rooms[room_id].add(websocket)

452

453

try:

454

websocket.send(json.dumps({

455

"type": "joined",

456

"room": room_id

457

}))

458

459

for message in websocket:

460

try:

461

data = json.loads(message)

462

463

# Broadcast to room (thread-safe)

464

with rooms_lock:

465

room_clients = rooms[room_id].copy()

466

467

broadcast_message = json.dumps({

468

"type": "message",

469

"room": room_id,

470

"user": data.get("user", "Anonymous"),

471

"message": data.get("message", "")

472

})

473

474

for client in room_clients:

475

try:

476

client.send(broadcast_message)

477

except:

478

with rooms_lock:

479

rooms[room_id].discard(client)

480

481

except json.JSONDecodeError:

482

websocket.send(json.dumps({

483

"type": "error",

484

"message": "Invalid JSON"

485

}))

486

finally:

487

# Clean up

488

with rooms_lock:

489

if room_id in rooms:

490

rooms[room_id].discard(websocket)

491

if not rooms[room_id]:

492

del rooms[room_id]

493

494

def main():

495

"""Start synchronous routing server."""

496

with route(router, "localhost", 8765) as server:

497

print("Sync routing server started on ws://localhost:8765")

498

print("Routes: /, /room/<room_id>")

499

server.serve_forever()

500

501

if __name__ == "__main__":

502

main()

503

```

504

505

### Advanced Routing with Middleware

506

507

```python

508

import asyncio

509

from websockets.asyncio import route, Router

510

from websockets import Request, Response

511

import time

512

import logging

513

514

# Set up logging

515

logging.basicConfig(level=logging.INFO)

516

logger = logging.getLogger(__name__)

517

518

router = Router()

519

520

def rate_limit_middleware(max_requests=10, window_seconds=60):

521

"""Rate limiting middleware."""

522

clients = {}

523

524

def middleware(connection, request: Request):

525

client_ip = connection.remote_address[0]

526

current_time = time.time()

527

528

# Clean old entries

529

if client_ip in clients:

530

clients[client_ip] = [

531

req_time for req_time in clients[client_ip]

532

if current_time - req_time < window_seconds

533

]

534

else:

535

clients[client_ip] = []

536

537

# Check rate limit

538

if len(clients[client_ip]) >= max_requests:

539

logger.warning(f"Rate limit exceeded for {client_ip}")

540

return Response(429, "Too Many Requests", b"Rate limit exceeded")

541

542

# Add current request

543

clients[client_ip].append(current_time)

544

return None # Allow request

545

546

return middleware

547

548

def auth_middleware(api_keys):

549

"""API key authentication middleware."""

550

def middleware(connection, request: Request):

551

api_key = request.headers.get("X-API-Key")

552

if not api_key or api_key not in api_keys:

553

logger.warning(f"Invalid API key from {connection.remote_address}")

554

return Response(401, "Unauthorized", b"Invalid API key")

555

return None

556

return middleware

557

558

# Apply middleware

559

rate_limiter = rate_limit_middleware(max_requests=5, window_seconds=30)

560

auth_checker = auth_middleware({"valid-key-1", "valid-key-2"})

561

562

@router.route("/public")

563

async def public_handler(websocket):

564

"""Public endpoint (no auth required)."""

565

await websocket.send("Public endpoint - no auth required")

566

567

async for message in websocket:

568

await websocket.send(f"Public: {message}")

569

570

@router.route("/protected")

571

async def protected_handler(websocket):

572

"""Protected endpoint (auth required)."""

573

await websocket.send("Protected endpoint - authenticated")

574

575

async for message in websocket:

576

await websocket.send(f"Protected: {message}")

577

578

async def process_request_with_middleware(connection, request: Request):

579

"""Process request through middleware chain."""

580

# Apply rate limiting to all routes

581

response = rate_limiter(connection, request)

582

if response:

583

return response

584

585

# Apply authentication to protected routes

586

if request.path.startswith("/protected"):

587

response = auth_checker(connection, request)

588

if response:

589

return response

590

591

# Log successful requests

592

logger.info(f"Request: {request.method} {request.path} from {connection.remote_address}")

593

return None

594

595

async def main():

596

"""Start server with middleware."""

597

async with route(

598

router,

599

"localhost",

600

8765,

601

process_request=process_request_with_middleware,

602

extra_headers={"Server": "WebSocket-Router/1.0"}

603

):

604

print("Middleware routing server started on ws://localhost:8765")

605

print("Routes:")

606

print(" /public (no auth)")

607

print(" /protected (requires X-API-Key header)")

608

print("Rate limit: 5 requests per 30 seconds")

609

await asyncio.Future()

610

611

asyncio.run(main())

612

```

613

614

### Route Testing

615

616

```python

617

from websockets.asyncio import Router

618

from websockets.exceptions import NotFound

619

620

def test_router():

621

"""Test router functionality."""

622

router = Router()

623

624

# Register test routes

625

@router.route("/")

626

def root():

627

return "root"

628

629

@router.route("/user/<user_id>")

630

def user_profile(user_id):

631

return f"user_{user_id}"

632

633

@router.route("/api/v<int:version>/data")

634

def api_data(version):

635

return f"api_v{version}"

636

637

# Test route matching

638

test_cases = [

639

("/", "root"),

640

("/user/123", "user_123"),

641

("/api/v1/data", "api_v1"),

642

("/api/v2/data", "api_v2"),

643

]

644

645

for path, expected in test_cases:

646

try:

647

handler, params = router.match(path)

648

result = handler(**params) if params else handler()

649

print(f"✓ {path} -> {result} (expected {expected})")

650

assert result == expected

651

except Exception as e:

652

print(f"✗ {path} -> Error: {e}")

653

654

# Test non-matching routes

655

try:

656

router.match("/nonexistent")

657

print("✗ Should have raised NotFound")

658

except NotFound:

659

print("✓ Non-existent route correctly raises NotFound")

660

661

print(f"Registered routes: {router.routes}")

662

663

# Uncomment to run tests

664

# test_router()

665

```