or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

additional.mdauth.mdclient.mdcore-server.mdindex.mdrequest-response.mdtesting.mdwebsockets.md

websockets.mddocs/

0

# WebSocket Support

1

2

BlackSheep provides comprehensive WebSocket support for real-time, full-duplex communication between clients and servers. The framework handles connection management, message routing, and error handling with a clean, async-first API.

3

4

## WebSocket Overview

5

6

WebSockets enable persistent connections for real-time applications like chat systems, live updates, gaming, and collaborative tools.

7

8

### Basic WebSocket Setup

9

10

```python { .api }

11

from blacksheep import Application, WebSocket, WebSocketState

12

from blacksheep.server.websocket import WebSocketError, WebSocketDisconnectError

13

14

app = Application()

15

16

# Simple WebSocket endpoint

17

@app.ws("/ws")

18

async def websocket_handler(websocket: WebSocket):

19

await websocket.accept()

20

21

try:

22

while True:

23

message = await websocket.receive_text()

24

await websocket.send_text(f"Echo: {message}")

25

except WebSocketDisconnectError:

26

print("Client disconnected")

27

```

28

29

## WebSocket Class

30

31

The `WebSocket` class manages connection lifecycle, message sending/receiving, and state tracking.

32

33

### Connection Management

34

35

```python { .api }

36

from blacksheep.server.websocket import WebSocketState, MessageMode

37

38

@app.ws("/chat")

39

async def chat_handler(websocket: WebSocket):

40

# Accept the WebSocket connection

41

await websocket.accept()

42

print(f"Connection state: {websocket.state}") # WebSocketState.CONNECTED

43

44

# Connection information

45

client_ip = websocket.scope.get("client", ["unknown"])[0]

46

print(f"Client connected from: {client_ip}")

47

48

try:

49

# Connection loop

50

while websocket.state == WebSocketState.CONNECTED:

51

message = await websocket.receive_text()

52

await websocket.send_text(f"Received: {message}")

53

54

except WebSocketDisconnectError as e:

55

print(f"Client disconnected with code: {e.code}")

56

57

finally:

58

# Cleanup when connection ends

59

print("Connection closed")

60

```

61

62

### Message Types

63

64

```python { .api }

65

@app.ws("/messages")

66

async def message_handler(websocket: WebSocket):

67

await websocket.accept()

68

69

try:

70

while True:

71

# Receive different message types

72

message_type = await websocket.receive()

73

74

if message_type["type"] == "websocket.receive":

75

if "text" in message_type:

76

# Text message

77

text = message_type["text"]

78

await websocket.send_text(f"Got text: {text}")

79

80

elif "bytes" in message_type:

81

# Binary message

82

data = message_type["bytes"]

83

await websocket.send_bytes(data)

84

85

except WebSocketDisconnectError:

86

pass

87

```

88

89

### Sending Messages

90

91

```python { .api }

92

import json

93

from typing import Dict, Any

94

95

@app.ws("/updates")

96

async def update_handler(websocket: WebSocket):

97

await websocket.accept()

98

99

try:

100

# Send text messages

101

await websocket.send_text("Welcome to updates!")

102

103

# Send JSON messages

104

data = {"type": "notification", "message": "Server started"}

105

await websocket.send_json(data)

106

107

# Send binary data

108

binary_data = b"Binary data here"

109

await websocket.send_bytes(binary_data)

110

111

# Custom JSON serializer

112

complex_data = {"timestamp": datetime.now(), "users": [1, 2, 3]}

113

await websocket.send_json(complex_data, dumps=custom_json_dumps)

114

115

# Keep connection alive

116

while True:

117

await websocket.receive_text()

118

119

except WebSocketDisconnectError:

120

pass

121

```

122

123

### Receiving Messages

124

125

```python { .api }

126

@app.ws("/receiver")

127

async def receive_handler(websocket: WebSocket):

128

await websocket.accept()

129

130

try:

131

while True:

132

# Receive text

133

text_message = await websocket.receive_text()

134

print(f"Received text: {text_message}")

135

136

# Receive JSON with automatic parsing

137

json_data = await websocket.receive_json()

138

print(f"Received JSON: {json_data}")

139

140

# Receive binary data

141

binary_data = await websocket.receive_bytes()

142

print(f"Received {len(binary_data)} bytes")

143

144

# Custom JSON deserializer

145

custom_data = await websocket.receive_json(loads=custom_json_loads)

146

147

except WebSocketDisconnectError:

148

pass

149

```

150

151

## WebSocket Routing

152

153

WebSocket endpoints support the same routing features as HTTP endpoints, including parameter extraction.

154

155

### Route Parameters

156

157

```python { .api }

158

# WebSocket with route parameters

159

@app.ws("/chat/{room_id}")

160

async def chat_room(websocket: WebSocket, room_id: str):

161

await websocket.accept()

162

163

# Access route parameters

164

room = websocket.route_values.get("room_id", "default")

165

print(f"Joined room: {room}")

166

167

try:

168

await websocket.send_text(f"Welcome to room {room}!")

169

170

while True:

171

message = await websocket.receive_text()

172

# Broadcast to room (implementation depends on your architecture)

173

await broadcast_to_room(room, f"{message}")

174

175

except WebSocketDisconnectError:

176

print(f"User left room {room}")

177

178

# Typed route parameters

179

@app.ws("/user/{user_id:int}/notifications")

180

async def user_notifications(websocket: WebSocket, user_id: int):

181

await websocket.accept()

182

183

# user_id is automatically converted to int

184

print(f"Notifications for user {user_id}")

185

186

try:

187

# Send user-specific notifications

188

notifications = await get_user_notifications(user_id)

189

for notification in notifications:

190

await websocket.send_json(notification)

191

192

# Listen for real-time updates

193

while True:

194

await websocket.receive_text() # Keep alive

195

196

except WebSocketDisconnectError:

197

pass

198

```

199

200

## WebSocket State Management

201

202

Track connection states and handle state transitions properly.

203

204

### Connection States

205

206

```python { .api }

207

from blacksheep.server.websocket import WebSocketState

208

209

@app.ws("/stateful")

210

async def stateful_handler(websocket: WebSocket):

211

print(f"Initial state: {websocket.state}") # WebSocketState.CONNECTING

212

213

await websocket.accept()

214

print(f"After accept: {websocket.state}") # WebSocketState.CONNECTED

215

216

try:

217

while websocket.state == WebSocketState.CONNECTED:

218

message = await websocket.receive_text()

219

220

# Check state before sending

221

if websocket.state == WebSocketState.CONNECTED:

222

await websocket.send_text(f"Echo: {message}")

223

else:

224

break

225

226

except WebSocketDisconnectError:

227

pass

228

229

print(f"Final state: {websocket.state}") # WebSocketState.DISCONNECTED

230

```

231

232

### State Validation

233

234

```python { .api }

235

from blacksheep.server.websocket import InvalidWebSocketStateError

236

237

@app.ws("/validated")

238

async def validated_handler(websocket: WebSocket):

239

try:

240

await websocket.accept()

241

242

while True:

243

message = await websocket.receive_text()

244

245

# State validation before operations

246

if websocket.state != WebSocketState.CONNECTED:

247

raise InvalidWebSocketStateError(

248

party="server",

249

current_state=websocket.state,

250

expected_state=WebSocketState.CONNECTED

251

)

252

253

await websocket.send_text(f"Processed: {message}")

254

255

except InvalidWebSocketStateError as e:

256

print(f"Invalid state: {e.current_state}, expected: {e.expected_state}")

257

258

except WebSocketDisconnectError:

259

pass

260

```

261

262

## Error Handling

263

264

Comprehensive error handling for WebSocket connections and operations.

265

266

### WebSocket Exceptions

267

268

```python { .api }

269

from blacksheep.server.websocket import (

270

WebSocketError,

271

WebSocketDisconnectError,

272

InvalidWebSocketStateError

273

)

274

275

@app.ws("/error-handling")

276

async def error_handler(websocket: WebSocket):

277

try:

278

await websocket.accept()

279

280

while True:

281

try:

282

message = await websocket.receive_text()

283

284

# Process message (might raise application errors)

285

result = await process_message(message)

286

await websocket.send_json({"result": result})

287

288

except ValueError as e:

289

# Application error - send error message but keep connection

290

await websocket.send_json({

291

"error": "validation_error",

292

"message": str(e)

293

})

294

295

except Exception as e:

296

# Unexpected error - send error and close connection

297

await websocket.send_json({

298

"error": "internal_error",

299

"message": "An unexpected error occurred"

300

})

301

await websocket.close(code=1011) # Internal server error

302

break

303

304

except WebSocketDisconnectError as e:

305

print(f"Client disconnected: code={e.code}")

306

307

except InvalidWebSocketStateError as e:

308

print(f"Invalid WebSocket state: {e}")

309

310

except WebSocketError as e:

311

print(f"WebSocket error: {e}")

312

313

except Exception as e:

314

print(f"Unexpected error: {e}")

315

# Ensure connection is closed

316

if websocket.state == WebSocketState.CONNECTED:

317

await websocket.close(code=1011)

318

```

319

320

### Connection Close Codes

321

322

```python { .api }

323

@app.ws("/close-codes")

324

async def close_codes_handler(websocket: WebSocket):

325

await websocket.accept()

326

327

try:

328

message = await websocket.receive_text()

329

330

if message == "invalid":

331

# Close with specific code

332

await websocket.close(code=1003) # Unsupported data type

333

334

elif message == "policy":

335

await websocket.close(code=1008) # Policy violation

336

337

elif message == "size":

338

await websocket.close(code=1009) # Message too large

339

340

else:

341

await websocket.send_text("Message accepted")

342

343

except WebSocketDisconnectError as e:

344

# Standard close codes:

345

# 1000 = Normal closure

346

# 1001 = Going away

347

# 1002 = Protocol error

348

# 1003 = Unsupported data

349

# 1007 = Invalid data

350

# 1008 = Policy violation

351

# 1009 = Message too large

352

# 1011 = Internal server error

353

print(f"Connection closed with code: {e.code}")

354

```

355

356

## Real-time Applications

357

358

Examples of common real-time application patterns.

359

360

### Chat Application

361

362

```python { .api }

363

import asyncio

364

from typing import Dict, Set

365

from dataclasses import dataclass

366

367

@dataclass

368

class ChatRoom:

369

room_id: str

370

connections: Set[WebSocket]

371

message_history: list

372

373

# Global room registry

374

chat_rooms: Dict[str, ChatRoom] = {}

375

376

async def get_or_create_room(room_id: str) -> ChatRoom:

377

if room_id not in chat_rooms:

378

chat_rooms[room_id] = ChatRoom(room_id, set(), [])

379

return chat_rooms[room_id]

380

381

async def broadcast_to_room(room: ChatRoom, message: dict, sender: WebSocket = None):

382

"""Broadcast message to all connections in room except sender"""

383

disconnected = set()

384

385

for connection in room.connections:

386

if connection != sender:

387

try:

388

await connection.send_json(message)

389

except WebSocketDisconnectError:

390

disconnected.add(connection)

391

392

# Remove disconnected clients

393

room.connections -= disconnected

394

395

@app.ws("/chat/{room_id}")

396

async def chat_room(websocket: WebSocket, room_id: str):

397

room = await get_or_create_room(room_id)

398

room.connections.add(websocket)

399

400

await websocket.accept()

401

402

# Send chat history

403

for message in room.message_history[-50:]: # Last 50 messages

404

await websocket.send_json(message)

405

406

# Announce user joined

407

join_message = {

408

"type": "user_joined",

409

"room": room_id,

410

"timestamp": time.time()

411

}

412

await broadcast_to_room(room, join_message, websocket)

413

414

try:

415

while True:

416

data = await websocket.receive_json()

417

418

# Create message

419

message = {

420

"type": "chat_message",

421

"room": room_id,

422

"user": data.get("user", "Anonymous"),

423

"message": data.get("message", ""),

424

"timestamp": time.time()

425

}

426

427

# Store in history

428

room.message_history.append(message)

429

if len(room.message_history) > 1000:

430

room.message_history = room.message_history[-1000:]

431

432

# Broadcast to room

433

await broadcast_to_room(room, message, websocket)

434

435

except WebSocketDisconnectError:

436

pass

437

finally:

438

# Remove from room

439

room.connections.discard(websocket)

440

441

# Announce user left

442

leave_message = {

443

"type": "user_left",

444

"room": room_id,

445

"timestamp": time.time()

446

}

447

await broadcast_to_room(room, leave_message)

448

```

449

450

### Live Updates

451

452

```python { .api }

453

import asyncio

454

from typing import Dict, List

455

456

# Global subscribers registry

457

update_subscribers: Dict[str, List[WebSocket]] = {}

458

459

async def add_subscriber(topic: str, websocket: WebSocket):

460

if topic not in update_subscribers:

461

update_subscribers[topic] = []

462

update_subscribers[topic].append(websocket)

463

464

async def remove_subscriber(topic: str, websocket: WebSocket):

465

if topic in update_subscribers:

466

try:

467

update_subscribers[topic].remove(websocket)

468

except ValueError:

469

pass

470

471

async def broadcast_update(topic: str, data: dict):

472

"""Broadcast update to all subscribers of a topic"""

473

if topic not in update_subscribers:

474

return

475

476

disconnected = []

477

478

for websocket in update_subscribers[topic]:

479

try:

480

await websocket.send_json({

481

"topic": topic,

482

"data": data,

483

"timestamp": time.time()

484

})

485

except WebSocketDisconnectError:

486

disconnected.append(websocket)

487

488

# Remove disconnected subscribers

489

for ws in disconnected:

490

await remove_subscriber(topic, ws)

491

492

@app.ws("/updates/{topic}")

493

async def live_updates(websocket: WebSocket, topic: str):

494

await websocket.accept()

495

await add_subscriber(topic, websocket)

496

497

# Send initial data

498

initial_data = await get_topic_data(topic)

499

await websocket.send_json({

500

"type": "initial_data",

501

"topic": topic,

502

"data": initial_data

503

})

504

505

try:

506

# Keep connection alive and handle client messages

507

while True:

508

message = await websocket.receive_json()

509

510

if message.get("type") == "subscribe_additional":

511

additional_topic = message.get("topic")

512

if additional_topic:

513

await add_subscriber(additional_topic, websocket)

514

515

elif message.get("type") == "unsubscribe":

516

unsub_topic = message.get("topic")

517

if unsub_topic:

518

await remove_subscriber(unsub_topic, websocket)

519

520

except WebSocketDisconnectError:

521

pass

522

finally:

523

# Remove from all subscriptions

524

for subscribers in update_subscribers.values():

525

if websocket in subscribers:

526

subscribers.remove(websocket)

527

528

# Trigger updates from other parts of application

529

async def notify_data_change(topic: str, data: dict):

530

"""Call this function when data changes to notify subscribers"""

531

await broadcast_update(topic, data)

532

533

# Example: Update triggered by HTTP endpoint

534

@app.post("/api/data/{topic}")

535

async def update_data(topic: str, data: FromJSON[dict]):

536

# Update data in database

537

await save_data(topic, data.value)

538

539

# Notify WebSocket subscribers

540

await notify_data_change(topic, data.value)

541

542

return json({"updated": True})

543

```

544

545

## Authentication with WebSockets

546

547

Secure WebSocket connections with authentication.

548

549

### WebSocket Authentication

550

551

```python { .api }

552

from blacksheep import auth

553

from guardpost import Identity

554

555

# Authenticated WebSocket endpoint

556

@app.ws("/secure-chat")

557

@auth() # Require authentication

558

async def secure_chat(websocket: WebSocket, request: Request):

559

# Authentication happens before WebSocket upgrade

560

identity: Identity = request.identity

561

user_id = identity.id

562

563

await websocket.accept()

564

565

# Send welcome message with user info

566

await websocket.send_json({

567

"type": "welcome",

568

"user_id": user_id,

569

"username": identity.claims.get("name", "Unknown")

570

})

571

572

try:

573

while True:

574

message = await websocket.receive_json()

575

576

# Add user context to messages

577

message["user_id"] = user_id

578

message["timestamp"] = time.time()

579

580

# Process authenticated message

581

await process_user_message(message)

582

583

except WebSocketDisconnectError:

584

print(f"User {user_id} disconnected")

585

```

586

587

### Token-Based WebSocket Auth

588

589

```python { .api }

590

@app.ws("/token-auth")

591

async def token_auth_websocket(websocket: WebSocket):

592

# Get token from query parameters or headers during handshake

593

query_params = websocket.scope.get("query_string", b"").decode()

594

token = None

595

596

# Parse token from query string

597

if "token=" in query_params:

598

for param in query_params.split("&"):

599

if param.startswith("token="):

600

token = param.split("=", 1)[1]

601

break

602

603

if not token:

604

# Reject connection

605

await websocket.close(code=1008) # Policy violation

606

return

607

608

# Validate token

609

try:

610

user_data = await validate_token(token)

611

except Exception:

612

await websocket.close(code=1008)

613

return

614

615

await websocket.accept()

616

617

# Continue with authenticated connection

618

await websocket.send_json({

619

"type": "authenticated",

620

"user": user_data

621

})

622

623

try:

624

while True:

625

message = await websocket.receive_json()

626

# Process message with user context

627

await process_authenticated_message(message, user_data)

628

629

except WebSocketDisconnectError:

630

pass

631

```

632

633

## Performance and Scaling

634

635

Optimize WebSocket performance for high-concurrency scenarios.

636

637

### Connection Management

638

639

```python { .api }

640

import weakref

641

from typing import WeakSet

642

643

# Use weak references to avoid memory leaks

644

active_connections: WeakSet[WebSocket] = weakref.WeakSet()

645

646

@app.ws("/optimized")

647

async def optimized_handler(websocket: WebSocket):

648

await websocket.accept()

649

active_connections.add(websocket)

650

651

try:

652

# Efficient message processing

653

while True:

654

message = await websocket.receive_text()

655

656

# Process in background to avoid blocking

657

asyncio.create_task(process_message_async(message, websocket))

658

659

except WebSocketDisconnectError:

660

pass

661

# WeakSet automatically removes disconnected websockets

662

663

async def process_message_async(message: str, websocket: WebSocket):

664

"""Process message without blocking the receive loop"""

665

try:

666

result = await heavy_processing(message)

667

668

if websocket.state == WebSocketState.CONNECTED:

669

await websocket.send_json({"result": result})

670

671

except Exception as e:

672

if websocket.state == WebSocketState.CONNECTED:

673

await websocket.send_json({"error": str(e)})

674

```

675

676

### Broadcast Optimization

677

678

```python { .api }

679

import asyncio

680

from typing import List

681

682

async def efficient_broadcast(connections: List[WebSocket], message: dict):

683

"""Efficiently broadcast to multiple connections"""

684

685

# Serialize message once

686

serialized = json.dumps(message).encode()

687

688

async def send_to_connection(websocket: WebSocket):

689

try:

690

if websocket.state == WebSocketState.CONNECTED:

691

await websocket.send_bytes(serialized)

692

except WebSocketDisconnectError:

693

pass # Connection already closed

694

695

# Send to all connections concurrently

696

tasks = [send_to_connection(ws) for ws in connections]

697

await asyncio.gather(*tasks, return_exceptions=True)

698

```

699

700

BlackSheep's WebSocket support provides a robust foundation for building real-time applications with proper error handling, authentication, and performance optimization capabilities.