or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcore-application.mddata-structures.mdexceptions-status.mdindex.mdmiddleware.mdrequests-responses.mdrouting.mdstatic-files.mdtesting.mdwebsockets.md

websockets.mddocs/

0

# WebSocket Support

1

2

Starlette provides first-class WebSocket support for real-time, bidirectional communication between clients and servers, enabling interactive applications like chat systems, live updates, and collaborative tools.

3

4

## WebSocket Class

5

6

```python { .api }

7

from starlette.websockets import WebSocket, WebSocketState, WebSocketDisconnect

8

from starlette.datastructures import URL, Headers, QueryParams

9

from starlette.types import Scope, Receive, Send

10

from typing import Any, AsyncIterator

11

12

class WebSocket:

13

"""

14

WebSocket connection handler providing bidirectional communication.

15

16

The WebSocket class manages:

17

- Connection lifecycle (accept, close)

18

- Message sending and receiving (text, binary, JSON)

19

- Connection state management

20

- Subprotocol negotiation

21

- Error handling and disconnection

22

"""

23

24

def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:

25

"""

26

Initialize WebSocket from ASGI scope.

27

28

Args:

29

scope: ASGI WebSocket scope

30

receive: ASGI receive callable

31

send: ASGI send callable

32

"""

33

34

# Inherited from HTTPConnection

35

@property

36

def url(self) -> URL:

37

"""WebSocket connection URL."""

38

39

@property

40

def base_url(self) -> URL:

41

"""Base URL for generating absolute URLs."""

42

43

@property

44

def headers(self) -> Headers:

45

"""WebSocket handshake headers."""

46

47

@property

48

def query_params(self) -> QueryParams:

49

"""URL query parameters."""

50

51

@property

52

def path_params(self) -> dict[str, Any]:

53

"""Path parameters from URL pattern."""

54

55

@property

56

def cookies(self) -> dict[str, str]:

57

"""Cookies from handshake request."""

58

59

@property

60

def client(self) -> Address | None:

61

"""Client address information."""

62

63

@property

64

def session(self) -> dict[str, Any]:

65

"""Session data (requires SessionMiddleware)."""

66

67

@property

68

def auth(self) -> Any:

69

"""Authentication data (requires AuthenticationMiddleware)."""

70

71

@property

72

def user(self) -> Any:

73

"""User object (requires AuthenticationMiddleware)."""

74

75

@property

76

def state(self) -> State:

77

"""WebSocket-scoped state storage."""

78

79

# WebSocket-specific properties

80

@property

81

def client_state(self) -> WebSocketState:

82

"""Client connection state."""

83

84

@property

85

def application_state(self) -> WebSocketState:

86

"""Application connection state."""

87

88

# Connection management

89

async def accept(

90

self,

91

subprotocol: str | None = None,

92

headers: Sequence[tuple[bytes, bytes]] | None = None,

93

) -> None:

94

"""

95

Accept WebSocket connection.

96

97

Args:

98

subprotocol: Selected subprotocol from client's list

99

headers: Additional response headers

100

101

Raises:

102

RuntimeError: If connection already accepted/closed

103

"""

104

105

async def close(

106

self,

107

code: int = 1000,

108

reason: str | None = None

109

) -> None:

110

"""

111

Close WebSocket connection.

112

113

Args:

114

code: WebSocket close code (default: 1000 normal closure)

115

reason: Optional close reason string

116

"""

117

118

async def send_denial_response(self, response: Response) -> None:

119

"""

120

Send HTTP response instead of accepting WebSocket.

121

122

Used for authentication/authorization failures during handshake.

123

124

Args:

125

response: HTTP response to send

126

"""

127

128

# Message receiving

129

async def receive(self) -> dict[str, Any]:

130

"""

131

Receive raw WebSocket message.

132

133

Returns:

134

dict: ASGI WebSocket message

135

136

Raises:

137

WebSocketDisconnect: When connection closes

138

"""

139

140

async def receive_text(self) -> str:

141

"""

142

Receive text message.

143

144

Returns:

145

str: Text message content

146

147

Raises:

148

WebSocketDisconnect: When connection closes

149

RuntimeError: If message is not text

150

"""

151

152

async def receive_bytes(self) -> bytes:

153

"""

154

Receive binary message.

155

156

Returns:

157

bytes: Binary message content

158

159

Raises:

160

WebSocketDisconnect: When connection closes

161

RuntimeError: If message is not binary

162

"""

163

164

async def receive_json(self, mode: str = "text") -> Any:

165

"""

166

Receive JSON message.

167

168

Args:

169

mode: "text" or "binary" message mode

170

171

Returns:

172

Any: Parsed JSON data

173

174

Raises:

175

WebSocketDisconnect: When connection closes

176

ValueError: If message is not valid JSON

177

"""

178

179

# Message iteration

180

def iter_text(self) -> AsyncIterator[str]:

181

"""

182

Async iterator for text messages.

183

184

Yields:

185

str: Text messages until connection closes

186

"""

187

188

def iter_bytes(self) -> AsyncIterator[bytes]:

189

"""

190

Async iterator for binary messages.

191

192

Yields:

193

bytes: Binary messages until connection closes

194

"""

195

196

def iter_json(self, mode: str = "text") -> AsyncIterator[Any]:

197

"""

198

Async iterator for JSON messages.

199

200

Args:

201

mode: "text" or "binary" message mode

202

203

Yields:

204

Any: Parsed JSON data until connection closes

205

"""

206

207

# Message sending

208

async def send(self, message: dict[str, Any]) -> None:

209

"""

210

Send raw ASGI WebSocket message.

211

212

Args:

213

message: ASGI WebSocket message dict

214

"""

215

216

async def send_text(self, data: str) -> None:

217

"""

218

Send text message.

219

220

Args:

221

data: Text content to send

222

"""

223

224

async def send_bytes(self, data: bytes) -> None:

225

"""

226

Send binary message.

227

228

Args:

229

data: Binary content to send

230

"""

231

232

async def send_json(

233

self,

234

data: Any,

235

mode: str = "text"

236

) -> None:

237

"""

238

Send JSON message.

239

240

Args:

241

data: JSON-serializable data

242

mode: "text" or "binary" message mode

243

"""

244

245

def url_for(self, name: str, /, **path_params: Any) -> URL:

246

"""

247

Generate absolute URL for named route.

248

249

Args:

250

name: Route name

251

**path_params: Path parameter values

252

253

Returns:

254

URL: Absolute URL

255

"""

256

```

257

258

## WebSocket State and Exceptions

259

260

```python { .api }

261

from enum import Enum

262

263

class WebSocketState(Enum):

264

"""WebSocket connection states."""

265

266

CONNECTING = 0 # Initial state before accept/deny

267

CONNECTED = 1 # Connection accepted and active

268

DISCONNECTED = 2 # Connection closed

269

RESPONSE = 3 # HTTP response sent instead of WebSocket

270

271

class WebSocketDisconnect(Exception):

272

"""

273

Exception raised when WebSocket connection closes.

274

275

Contains close code and optional reason.

276

"""

277

278

def __init__(self, code: int = 1000, reason: str | None = None) -> None:

279

"""

280

Initialize disconnect exception.

281

282

Args:

283

code: WebSocket close code

284

reason: Optional close reason

285

"""

286

self.code = code

287

self.reason = reason

288

289

class WebSocketClose:

290

"""

291

ASGI application for closing WebSocket connections.

292

293

Can be used as a route endpoint to immediately close connections.

294

"""

295

296

def __init__(self, code: int = 1000, reason: str | None = None) -> None:

297

"""

298

Initialize WebSocket close handler.

299

300

Args:

301

code: Close code to send

302

reason: Close reason to send

303

"""

304

```

305

306

## Basic WebSocket Usage

307

308

### Simple Echo Server

309

310

```python { .api }

311

from starlette.applications import Starlette

312

from starlette.routing import WebSocketRoute

313

from starlette.websockets import WebSocket, WebSocketDisconnect

314

315

async def websocket_endpoint(websocket: WebSocket):

316

# Accept the connection

317

await websocket.accept()

318

319

try:

320

while True:

321

# Receive message from client

322

message = await websocket.receive_text()

323

324

# Echo message back

325

await websocket.send_text(f"Echo: {message}")

326

327

except WebSocketDisconnect:

328

print("Client disconnected")

329

330

app = Starlette(routes=[

331

WebSocketRoute("/ws", websocket_endpoint)

332

])

333

```

334

335

### JSON Message Handling

336

337

```python { .api }

338

async def json_websocket(websocket: WebSocket):

339

await websocket.accept()

340

341

try:

342

while True:

343

# Receive JSON data

344

data = await websocket.receive_json()

345

346

# Process message based on type

347

if data.get("type") == "ping":

348

await websocket.send_json({

349

"type": "pong",

350

"timestamp": time.time()

351

})

352

elif data.get("type") == "message":

353

await websocket.send_json({

354

"type": "response",

355

"original": data.get("content"),

356

"processed": data.get("content", "").upper()

357

})

358

else:

359

await websocket.send_json({

360

"type": "error",

361

"message": "Unknown message type"

362

})

363

364

except WebSocketDisconnect:

365

print("Client disconnected")

366

except ValueError as e:

367

# Invalid JSON

368

await websocket.send_json({

369

"type": "error",

370

"message": "Invalid JSON"

371

})

372

await websocket.close(code=1003) # Unsupported data

373

```

374

375

## WebSocket Routing

376

377

### Path Parameters

378

379

```python { .api }

380

from starlette.routing import WebSocketRoute

381

382

async def user_websocket(websocket: WebSocket):

383

# Extract path parameters

384

user_id = websocket.path_params["user_id"]

385

room_id = websocket.path_params.get("room_id")

386

387

await websocket.accept()

388

389

try:

390

# Send welcome message

391

await websocket.send_json({

392

"type": "welcome",

393

"user_id": int(user_id),

394

"room_id": room_id and int(room_id)

395

})

396

397

async for message in websocket.iter_json():

398

# Process user messages in room context

399

response = {

400

"type": "message",

401

"user_id": int(user_id),

402

"room_id": room_id and int(room_id),

403

"content": message.get("content")

404

}

405

await websocket.send_json(response)

406

407

except WebSocketDisconnect:

408

print(f"User {user_id} disconnected from room {room_id}")

409

410

routes = [

411

WebSocketRoute("/ws/user/{user_id:int}", user_websocket),

412

WebSocketRoute("/ws/room/{room_id:int}/user/{user_id:int}", user_websocket),

413

]

414

```

415

416

### Query Parameters and Headers

417

418

```python { .api }

419

async def websocket_with_auth(websocket: WebSocket):

420

# Check authentication in query params or headers

421

token = websocket.query_params.get("token")

422

auth_header = websocket.headers.get("authorization")

423

424

# Validate authentication

425

if not token and not auth_header:

426

await websocket.close(code=1008, reason="Authentication required")

427

return

428

429

# Extract user info from token

430

user = authenticate_token(token or auth_header.split(" ")[-1])

431

if not user:

432

await websocket.close(code=1008, reason="Invalid token")

433

return

434

435

# Store user in WebSocket state

436

websocket.state.user = user

437

438

await websocket.accept()

439

440

try:

441

await websocket.send_json({

442

"type": "authenticated",

443

"user": user.username

444

})

445

446

async for message in websocket.iter_json():

447

# Process authenticated user messages

448

await process_user_message(websocket.state.user, message)

449

450

except WebSocketDisconnect:

451

print(f"User {user.username} disconnected")

452

```

453

454

## Advanced WebSocket Patterns

455

456

### Connection Manager

457

458

```python { .api }

459

import json

460

from typing import Dict, Set

461

462

class ConnectionManager:

463

"""Manage multiple WebSocket connections."""

464

465

def __init__(self):

466

self.active_connections: Dict[str, Set[WebSocket]] = {}

467

self.user_connections: Dict[str, WebSocket] = {}

468

469

async def connect(self, websocket: WebSocket, room_id: str, user_id: str):

470

"""Add connection to room and user mapping."""

471

await websocket.accept()

472

473

# Add to room

474

if room_id not in self.active_connections:

475

self.active_connections[room_id] = set()

476

self.active_connections[room_id].add(websocket)

477

478

# Add to user mapping

479

self.user_connections[user_id] = websocket

480

481

# Notify room of new connection

482

await self.broadcast_to_room(room_id, {

483

"type": "user_joined",

484

"user_id": user_id,

485

"count": len(self.active_connections[room_id])

486

}, exclude=websocket)

487

488

def disconnect(self, websocket: WebSocket, room_id: str, user_id: str):

489

"""Remove connection from room and user mapping."""

490

if room_id in self.active_connections:

491

self.active_connections[room_id].discard(websocket)

492

if not self.active_connections[room_id]:

493

del self.active_connections[room_id]

494

495

if user_id in self.user_connections:

496

del self.user_connections[user_id]

497

498

async def send_personal_message(self, user_id: str, message: dict):

499

"""Send message to specific user."""

500

if user_id in self.user_connections:

501

websocket = self.user_connections[user_id]

502

await websocket.send_json(message)

503

504

async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None):

505

"""Broadcast message to all connections in room."""

506

if room_id in self.active_connections:

507

for connection in self.active_connections[room_id]:

508

if connection != exclude:

509

try:

510

await connection.send_json(message)

511

except:

512

# Connection is broken, remove it

513

self.active_connections[room_id].discard(connection)

514

515

# Global connection manager

516

manager = ConnectionManager()

517

518

async def chat_websocket(websocket: WebSocket):

519

room_id = websocket.path_params["room_id"]

520

user_id = websocket.query_params.get("user_id", "anonymous")

521

522

await manager.connect(websocket, room_id, user_id)

523

524

try:

525

async for data in websocket.iter_json():

526

message = {

527

"type": "message",

528

"user_id": user_id,

529

"room_id": room_id,

530

"content": data.get("content"),

531

"timestamp": time.time()

532

}

533

534

# Broadcast to room

535

await manager.broadcast_to_room(room_id, message)

536

537

except WebSocketDisconnect:

538

manager.disconnect(websocket, room_id, user_id)

539

540

# Notify room of disconnection

541

await manager.broadcast_to_room(room_id, {

542

"type": "user_left",

543

"user_id": user_id,

544

"count": len(manager.active_connections.get(room_id, set()))

545

})

546

```

547

548

### Subprotocol Handling

549

550

```python { .api }

551

async def subprotocol_websocket(websocket: WebSocket):

552

# Get requested subprotocols from client

553

subprotocols = websocket.headers.get("sec-websocket-protocol", "").split(",")

554

subprotocols = [p.strip() for p in subprotocols if p.strip()]

555

556

# Select supported subprotocol

557

supported = ["chat", "notification", "api.v1"]

558

selected = None

559

560

for protocol in subprotocols:

561

if protocol in supported:

562

selected = protocol

563

break

564

565

# Accept with selected subprotocol

566

await websocket.accept(subprotocol=selected)

567

568

try:

569

# Handle different protocols

570

if selected == "chat":

571

await handle_chat_protocol(websocket)

572

elif selected == "notification":

573

await handle_notification_protocol(websocket)

574

elif selected == "api.v1":

575

await handle_api_protocol(websocket)

576

else:

577

await handle_default_protocol(websocket)

578

579

except WebSocketDisconnect:

580

print(f"Client disconnected (protocol: {selected})")

581

582

async def handle_chat_protocol(websocket: WebSocket):

583

"""Handle chat-specific protocol."""

584

async for message in websocket.iter_text():

585

# Chat protocol: plain text messages

586

response = f"[CHAT] {message}"

587

await websocket.send_text(response)

588

589

async def handle_api_protocol(websocket: WebSocket):

590

"""Handle API-specific protocol."""

591

async for data in websocket.iter_json():

592

# API protocol: structured JSON commands

593

if data.get("command") == "list_users":

594

await websocket.send_json({

595

"type": "user_list",

596

"users": get_active_users()

597

})

598

elif data.get("command") == "send_message":

599

await websocket.send_json({

600

"type": "message_sent",

601

"id": data.get("message_id")

602

})

603

```

604

605

### Error Handling and Graceful Shutdown

606

607

```python { .api }

608

import asyncio

609

import signal

610

from typing import Set

611

612

class WebSocketServer:

613

def __init__(self):

614

self.connections: Set[WebSocket] = set()

615

self.shutdown_event = asyncio.Event()

616

617

async def websocket_endpoint(self, websocket: WebSocket):

618

await websocket.accept()

619

self.connections.add(websocket)

620

621

try:

622

# Send initial connection message

623

await websocket.send_json({

624

"type": "connected",

625

"server_time": time.time()

626

})

627

628

# Handle messages with timeout

629

while not self.shutdown_event.is_set():

630

try:

631

# Use timeout to allow checking shutdown event

632

message = await asyncio.wait_for(

633

websocket.receive_json(),

634

timeout=1.0

635

)

636

637

# Process message

638

await self.process_message(websocket, message)

639

640

except asyncio.TimeoutError:

641

# Timeout is normal, continue loop

642

continue

643

except ValueError:

644

# Invalid JSON

645

await websocket.send_json({

646

"type": "error",

647

"message": "Invalid JSON format"

648

})

649

except Exception as e:

650

# Unexpected error

651

await websocket.send_json({

652

"type": "error",

653

"message": "Server error occurred"

654

})

655

break

656

657

except WebSocketDisconnect:

658

pass

659

finally:

660

# Cleanup connection

661

self.connections.discard(websocket)

662

print(f"Connection closed. Active: {len(self.connections)}")

663

664

async def process_message(self, websocket: WebSocket, message: dict):

665

"""Process incoming message with error handling."""

666

try:

667

msg_type = message.get("type")

668

669

if msg_type == "ping":

670

await websocket.send_json({"type": "pong"})

671

elif msg_type == "echo":

672

await websocket.send_json({

673

"type": "echo_response",

674

"data": message.get("data")

675

})

676

else:

677

await websocket.send_json({

678

"type": "error",

679

"message": f"Unknown message type: {msg_type}"

680

})

681

682

except Exception as e:

683

print(f"Error processing message: {e}")

684

await websocket.send_json({

685

"type": "error",

686

"message": "Failed to process message"

687

})

688

689

async def broadcast_shutdown(self):

690

"""Notify all connections of shutdown."""

691

if self.connections:

692

message = {

693

"type": "server_shutdown",

694

"message": "Server is shutting down"

695

}

696

697

# Send to all connections

698

await asyncio.gather(

699

*[conn.send_json(message) for conn in self.connections],

700

return_exceptions=True

701

)

702

703

# Close all connections

704

await asyncio.gather(

705

*[conn.close(code=1001, reason="Server shutdown") for conn in self.connections],

706

return_exceptions=True

707

)

708

709

def setup_shutdown_handlers(self):

710

"""Setup graceful shutdown handlers."""

711

def signal_handler(sig, frame):

712

print(f"Received signal {sig}, starting graceful shutdown...")

713

self.shutdown_event.set()

714

715

signal.signal(signal.SIGTERM, signal_handler)

716

signal.signal(signal.SIGINT, signal_handler)

717

718

# Usage

719

server = WebSocketServer()

720

server.setup_shutdown_handlers()

721

722

async def websocket_endpoint(websocket: WebSocket):

723

await server.websocket_endpoint(websocket)

724

```

725

726

## Testing WebSocket Connections

727

728

```python { .api }

729

from starlette.testclient import TestClient

730

731

def test_websocket():

732

with TestClient(app) as client:

733

with client.websocket_connect("/ws") as websocket:

734

# Send message

735

websocket.send_text("Hello")

736

737

# Receive response

738

data = websocket.receive_text()

739

assert data == "Echo: Hello"

740

741

def test_websocket_json():

742

with TestClient(app) as client:

743

with client.websocket_connect("/ws/json") as websocket:

744

# Send JSON message

745

websocket.send_json({"type": "ping"})

746

747

# Receive JSON response

748

data = websocket.receive_json()

749

assert data["type"] == "pong"

750

751

def test_websocket_with_params():

752

with TestClient(app) as client:

753

with client.websocket_connect("/ws/user/123?token=abc") as websocket:

754

# Test authenticated connection

755

welcome = websocket.receive_json()

756

assert welcome["user_id"] == 123

757

758

def test_websocket_disconnect():

759

with TestClient(app) as client:

760

with client.websocket_connect("/ws") as websocket:

761

websocket.send_text("Hello")

762

websocket.close()

763

764

# Verify connection closed gracefully

765

assert websocket.client_state == WebSocketState.DISCONNECTED

766

```

767

768

## Real-time Applications

769

770

### Live Updates

771

772

```python { .api }

773

import asyncio

774

from datetime import datetime

775

776

async def live_updates_websocket(websocket: WebSocket):

777

await websocket.accept()

778

779

try:

780

# Send periodic updates

781

while True:

782

# Get latest data

783

data = {

784

"type": "update",

785

"timestamp": datetime.now().isoformat(),

786

"data": get_latest_data(),

787

"metrics": get_system_metrics()

788

}

789

790

await websocket.send_json(data)

791

792

# Wait before next update

793

await asyncio.sleep(5)

794

795

except WebSocketDisconnect:

796

print("Client disconnected from live updates")

797

798

def get_latest_data():

799

# Fetch real-time data

800

return {"value": random.randint(1, 100)}

801

802

def get_system_metrics():

803

# Get system metrics

804

return {

805

"cpu": psutil.cpu_percent(),

806

"memory": psutil.virtual_memory().percent

807

}

808

```

809

810

### Collaborative Editing

811

812

```python { .api }

813

class DocumentManager:

814

def __init__(self):

815

self.documents = {}

816

self.connections = {}

817

818

async def join_document(self, websocket: WebSocket, doc_id: str, user_id: str):

819

if doc_id not in self.documents:

820

self.documents[doc_id] = {"content": "", "version": 0}

821

822

if doc_id not in self.connections:

823

self.connections[doc_id] = {}

824

825

self.connections[doc_id][user_id] = websocket

826

827

# Send current document state

828

await websocket.send_json({

829

"type": "document_state",

830

"content": self.documents[doc_id]["content"],

831

"version": self.documents[doc_id]["version"]

832

})

833

834

# Notify other users

835

await self.broadcast_to_document(doc_id, {

836

"type": "user_joined",

837

"user_id": user_id

838

}, exclude=user_id)

839

840

async def handle_edit(self, doc_id: str, user_id: str, operation: dict):

841

# Apply operation to document

842

self.documents[doc_id]["content"] = apply_operation(

843

self.documents[doc_id]["content"],

844

operation

845

)

846

self.documents[doc_id]["version"] += 1

847

848

# Broadcast change to other users

849

await self.broadcast_to_document(doc_id, {

850

"type": "document_change",

851

"operation": operation,

852

"user_id": user_id,

853

"version": self.documents[doc_id]["version"]

854

}, exclude=user_id)

855

856

async def broadcast_to_document(self, doc_id: str, message: dict, exclude: str = None):

857

if doc_id in self.connections:

858

for user_id, websocket in self.connections[doc_id].items():

859

if user_id != exclude:

860

try:

861

await websocket.send_json(message)

862

except:

863

# Remove broken connection

864

del self.connections[doc_id][user_id]

865

```

866

867

Starlette's WebSocket support enables building real-time, interactive applications with robust connection management, error handling, and testing capabilities.