or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-routing.mdbackground-tasks.mdcore-application.mddependency-injection.mdexception-handling.mdfile-handling.mdindex.mdparameter-declaration.mdrequest-response.mdwebsocket-support.md

websocket-support.mddocs/

0

# WebSocket Support

1

2

FastAPI provides comprehensive WebSocket support for real-time bidirectional communication between clients and servers. WebSocket connections enable live updates, chat applications, streaming data, and interactive features with full integration into FastAPI's dependency injection and validation systems.

3

4

## Capabilities

5

6

### WebSocket Connection Object

7

8

Object representing an active WebSocket connection with methods for sending and receiving messages.

9

10

```python { .api }

11

class WebSocket:

12

"""

13

WebSocket connection object providing bidirectional communication.

14

15

Attributes (read-only):

16

- url: WebSocket URL with protocol, host, path, and query parameters

17

- headers: HTTP headers from the WebSocket handshake

18

- query_params: Query parameters from the WebSocket URL

19

- path_params: Path parameters extracted from URL pattern

20

- cookies: HTTP cookies from the handshake request

21

- client: Client connection information (host, port)

22

- state: Application state for storing data across the connection

23

"""

24

25

url: URL

26

headers: Headers

27

query_params: QueryParams

28

path_params: Dict[str, Any]

29

cookies: Dict[str, str]

30

client: Optional[Address]

31

state: State

32

33

async def accept(

34

self,

35

subprotocol: Optional[str] = None,

36

headers: Optional[Dict[str, str]] = None

37

) -> None:

38

"""

39

Accept the WebSocket connection.

40

41

Parameters:

42

- subprotocol: WebSocket subprotocol to use

43

- headers: Additional headers to send in handshake response

44

45

Note: Must be called before sending or receiving messages

46

"""

47

48

async def close(

49

self,

50

code: int = 1000,

51

reason: Optional[str] = None

52

) -> None:

53

"""

54

Close the WebSocket connection.

55

56

Parameters:

57

- code: WebSocket close code (1000 = normal closure)

58

- reason: Optional reason string for the close

59

"""

60

61

async def send_text(self, data: str) -> None:

62

"""

63

Send text message to client.

64

65

Parameters:

66

- data: Text string to send

67

"""

68

69

async def send_bytes(self, data: bytes) -> None:

70

"""

71

Send binary message to client.

72

73

Parameters:

74

- data: Binary data to send

75

"""

76

77

async def send_json(self, data: Any, mode: str = "text") -> None:

78

"""

79

Send JSON message to client.

80

81

Parameters:

82

- data: Python object to serialize as JSON

83

- mode: Send as "text" or "binary" message

84

"""

85

86

async def receive_text(self) -> str:

87

"""

88

Receive text message from client.

89

90

Returns:

91

Text string from client

92

93

Raises:

94

WebSocketDisconnect: If connection is closed

95

"""

96

97

async def receive_bytes(self) -> bytes:

98

"""

99

Receive binary message from client.

100

101

Returns:

102

Binary data from client

103

104

Raises:

105

WebSocketDisconnect: If connection is closed

106

"""

107

108

async def receive_json(self, mode: str = "text") -> Any:

109

"""

110

Receive and parse JSON message from client.

111

112

Parameters:

113

- mode: Expect "text" or "binary" message

114

115

Returns:

116

Parsed JSON data (dict, list, or primitive)

117

118

Raises:

119

WebSocketDisconnect: If connection is closed

120

JSONDecodeError: If message is not valid JSON

121

"""

122

123

async def iter_text(self) -> AsyncIterator[str]:

124

"""

125

Async iterator for receiving text messages.

126

127

Yields:

128

Text messages from client until connection closes

129

"""

130

131

async def iter_bytes(self) -> AsyncIterator[bytes]:

132

"""

133

Async iterator for receiving binary messages.

134

135

Yields:

136

Binary messages from client until connection closes

137

"""

138

139

async def iter_json(self) -> AsyncIterator[Any]:

140

"""

141

Async iterator for receiving JSON messages.

142

143

Yields:

144

Parsed JSON messages from client until connection closes

145

"""

146

```

147

148

### WebSocket Disconnect Exception

149

150

Exception raised when WebSocket connection is closed by client or due to network issues.

151

152

```python { .api }

153

class WebSocketDisconnect(Exception):

154

def __init__(self, code: int = 1000, reason: Optional[str] = None) -> None:

155

"""

156

WebSocket disconnection exception.

157

158

Parameters:

159

- code: WebSocket close code from client

160

- reason: Optional reason string from client

161

162

Standard close codes:

163

- 1000: Normal closure

164

- 1001: Going away (page refresh, navigation)

165

- 1002: Protocol error

166

- 1003: Unsupported data type

167

- 1006: Abnormal closure (no close frame)

168

- 1011: Server error

169

"""

170

self.code = code

171

self.reason = reason

172

```

173

174

### WebSocket State Enum

175

176

Enumeration of WebSocket connection states.

177

178

```python { .api }

179

class WebSocketState(Enum):

180

"""

181

WebSocket connection state enumeration.

182

183

Values:

184

- CONNECTING: Connection handshake in progress

185

- CONNECTED: Connection established and ready

186

- DISCONNECTED: Connection closed

187

"""

188

CONNECTING = 0

189

CONNECTED = 1

190

DISCONNECTED = 2

191

```

192

193

## WebSocket Routing

194

195

### WebSocket Route Decorator

196

197

Decorator for defining WebSocket endpoints with dependency injection support.

198

199

```python { .api }

200

@app.websocket("/ws/{path}")

201

async def websocket_endpoint(

202

websocket: WebSocket,

203

path_param: str,

204

query_param: str = Query(...),

205

dependency_result = Depends(some_dependency)

206

):

207

"""

208

WebSocket endpoint function.

209

210

Parameters:

211

- websocket: WebSocket connection object (automatically injected)

212

- path_param: Path parameters work the same as HTTP routes

213

- query_param: Query parameters with validation support

214

- dependency_result: Dependencies work the same as HTTP routes

215

216

Note: Must accept WebSocket as first parameter

217

"""

218

```

219

220

## Usage Examples

221

222

### Basic WebSocket Echo Server

223

224

```python

225

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

226

227

app = FastAPI()

228

229

@app.websocket("/ws")

230

async def websocket_endpoint(websocket: WebSocket):

231

await websocket.accept()

232

try:

233

while True:

234

data = await websocket.receive_text()

235

await websocket.send_text(f"Message text was: {data}")

236

except WebSocketDisconnect:

237

print("Client disconnected")

238

```

239

240

### WebSocket Chat Room

241

242

```python

243

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

244

from typing import List

245

import json

246

247

app = FastAPI()

248

249

class ConnectionManager:

250

def __init__(self):

251

self.active_connections: List[WebSocket] = []

252

253

async def connect(self, websocket: WebSocket):

254

await websocket.accept()

255

self.active_connections.append(websocket)

256

257

def disconnect(self, websocket: WebSocket):

258

self.active_connections.remove(websocket)

259

260

async def send_personal_message(self, message: str, websocket: WebSocket):

261

await websocket.send_text(message)

262

263

async def broadcast(self, message: str):

264

for connection in self.active_connections:

265

try:

266

await connection.send_text(message)

267

except:

268

# Remove broken connections

269

self.active_connections.remove(connection)

270

271

manager = ConnectionManager()

272

273

@app.websocket("/ws/{client_id}")

274

async def websocket_endpoint(websocket: WebSocket, client_id: str):

275

await manager.connect(websocket)

276

277

# Notify others about new connection

278

await manager.broadcast(f"Client #{client_id} joined the chat")

279

280

try:

281

while True:

282

data = await websocket.receive_text()

283

message = f"Client #{client_id}: {data}"

284

await manager.broadcast(message)

285

286

except WebSocketDisconnect:

287

manager.disconnect(websocket)

288

await manager.broadcast(f"Client #{client_id} left the chat")

289

```

290

291

### WebSocket with Authentication and Validation

292

293

```python

294

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException

295

from fastapi.security import HTTPBearer

296

import json

297

import jwt

298

299

app = FastAPI()

300

301

# Security scheme for WebSocket authentication

302

security = HTTPBearer()

303

304

def verify_websocket_token(token: str = Query(...)):

305

"""Verify WebSocket authentication token from query parameter."""

306

try:

307

payload = jwt.decode(token, "secret", algorithms=["HS256"])

308

return payload

309

except jwt.InvalidTokenError:

310

raise HTTPException(status_code=401, detail="Invalid token")

311

312

@app.websocket("/ws")

313

async def websocket_endpoint(

314

websocket: WebSocket,

315

user: dict = Depends(verify_websocket_token)

316

):

317

await websocket.accept()

318

319

# Send welcome message with user info

320

await websocket.send_json({

321

"type": "welcome",

322

"message": f"Welcome {user['username']}!",

323

"user_id": user["user_id"]

324

})

325

326

try:

327

while True:

328

# Receive JSON messages

329

data = await websocket.receive_json()

330

331

# Validate message structure

332

if "type" not in data:

333

await websocket.send_json({

334

"type": "error",

335

"message": "Message must include 'type' field"

336

})

337

continue

338

339

# Handle different message types

340

if data["type"] == "ping":

341

await websocket.send_json({"type": "pong"})

342

343

elif data["type"] == "message":

344

if "content" not in data:

345

await websocket.send_json({

346

"type": "error",

347

"message": "Message content is required"

348

})

349

continue

350

351

# Echo message with user info

352

await websocket.send_json({

353

"type": "message_received",

354

"content": data["content"],

355

"from_user": user["username"],

356

"timestamp": "2023-01-01T00:00:00Z"

357

})

358

359

else:

360

await websocket.send_json({

361

"type": "error",

362

"message": f"Unknown message type: {data['type']}"

363

})

364

365

except WebSocketDisconnect:

366

print(f"User {user['username']} disconnected")

367

368

except Exception as e:

369

print(f"Error in WebSocket connection: {e}")

370

await websocket.close(code=1011, reason="Internal server error")

371

```

372

373

### Real-time Data Streaming

374

375

```python

376

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

377

import asyncio

378

import json

379

import random

380

from datetime import datetime

381

382

app = FastAPI()

383

384

async def generate_stock_data():

385

"""Generate mock stock price data."""

386

stocks = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]

387

388

while True:

389

for stock in stocks:

390

price = round(random.uniform(100, 1000), 2)

391

change = round(random.uniform(-5, 5), 2)

392

yield {

393

"symbol": stock,

394

"price": price,

395

"change": change,

396

"timestamp": datetime.now().isoformat()

397

}

398

await asyncio.sleep(1)

399

400

@app.websocket("/ws/stocks")

401

async def stock_stream(websocket: WebSocket):

402

await websocket.accept()

403

404

try:

405

# Send initial message

406

await websocket.send_json({

407

"type": "connected",

408

"message": "Stock price stream connected"

409

})

410

411

# Stream stock data

412

async for stock_data in generate_stock_data():

413

await websocket.send_json({

414

"type": "stock_update",

415

"data": stock_data

416

})

417

418

# Check if client is still connected by trying to receive

419

# (with a very short timeout)

420

try:

421

await asyncio.wait_for(websocket.receive_text(), timeout=0.001)

422

except asyncio.TimeoutError:

423

# No message received, continue streaming

424

pass

425

except WebSocketDisconnect:

426

break

427

428

except WebSocketDisconnect:

429

print("Stock stream client disconnected")

430

```

431

432

### WebSocket with Room Management

433

434

```python

435

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

436

from typing import Dict, List, Set

437

import json

438

439

app = FastAPI()

440

441

class RoomManager:

442

def __init__(self):

443

# rooms[room_id] = set of websocket connections

444

self.rooms: Dict[str, Set[WebSocket]] = {}

445

# websocket_to_room[websocket] = room_id

446

self.websocket_to_room: Dict[WebSocket, str] = {}

447

448

async def join_room(self, websocket: WebSocket, room_id: str):

449

await websocket.accept()

450

451

if room_id not in self.rooms:

452

self.rooms[room_id] = set()

453

454

self.rooms[room_id].add(websocket)

455

self.websocket_to_room[websocket] = room_id

456

457

# Notify room about new member

458

await self.broadcast_to_room(

459

room_id,

460

{"type": "user_joined", "room_id": room_id, "member_count": len(self.rooms[room_id])},

461

exclude=websocket

462

)

463

464

def leave_room(self, websocket: WebSocket):

465

room_id = self.websocket_to_room.get(websocket)

466

if room_id and room_id in self.rooms:

467

self.rooms[room_id].discard(websocket)

468

469

# Remove room if empty

470

if not self.rooms[room_id]:

471

del self.rooms[room_id]

472

else:

473

# Notify remaining members

474

asyncio.create_task(

475

self.broadcast_to_room(

476

room_id,

477

{"type": "user_left", "room_id": room_id, "member_count": len(self.rooms[room_id])}

478

)

479

)

480

481

self.websocket_to_room.pop(websocket, None)

482

483

async def broadcast_to_room(self, room_id: str, message: dict, exclude: WebSocket = None):

484

if room_id not in self.rooms:

485

return

486

487

disconnected = set()

488

for websocket in self.rooms[room_id]:

489

if websocket == exclude:

490

continue

491

492

try:

493

await websocket.send_json(message)

494

except:

495

disconnected.add(websocket)

496

497

# Clean up disconnected websockets

498

for websocket in disconnected:

499

self.rooms[room_id].discard(websocket)

500

self.websocket_to_room.pop(websocket, None)

501

502

room_manager = RoomManager()

503

504

@app.websocket("/ws/room/{room_id}")

505

async def room_websocket(websocket: WebSocket, room_id: str):

506

await room_manager.join_room(websocket, room_id)

507

508

try:

509

# Send welcome message

510

await websocket.send_json({

511

"type": "welcome",

512

"room_id": room_id,

513

"message": f"Welcome to room {room_id}"

514

})

515

516

while True:

517

data = await websocket.receive_json()

518

519

# Handle different message types

520

if data.get("type") == "message":

521

# Broadcast message to all room members

522

await room_manager.broadcast_to_room(room_id, {

523

"type": "room_message",

524

"room_id": room_id,

525

"message": data.get("content", ""),

526

"timestamp": "2023-01-01T00:00:00Z"

527

})

528

529

elif data.get("type") == "ping":

530

await websocket.send_json({"type": "pong"})

531

532

except WebSocketDisconnect:

533

room_manager.leave_room(websocket)

534

print(f"Client left room {room_id}")

535

```

536

537

### WebSocket Error Handling and Reconnection

538

539

```python

540

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, WebSocketException

541

import asyncio

542

import json

543

import logging

544

545

app = FastAPI()

546

logger = logging.getLogger(__name__)

547

548

class WebSocketHandler:

549

def __init__(self, websocket: WebSocket):

550

self.websocket = websocket

551

self.is_connected = False

552

553

async def connect(self):

554

await self.websocket.accept()

555

self.is_connected = True

556

logger.info("WebSocket connected")

557

558

async def disconnect(self, code: int = 1000, reason: str = None):

559

if self.is_connected:

560

await self.websocket.close(code=code, reason=reason)

561

self.is_connected = False

562

logger.info(f"WebSocket disconnected: {code} - {reason}")

563

564

async def send_safe(self, message: dict):

565

"""Send message with error handling."""

566

if not self.is_connected:

567

return False

568

569

try:

570

await self.websocket.send_json(message)

571

return True

572

except Exception as e:

573

logger.error(f"Failed to send message: {e}")

574

self.is_connected = False

575

return False

576

577

async def receive_safe(self):

578

"""Receive message with error handling."""

579

try:

580

return await self.websocket.receive_json()

581

except WebSocketDisconnect as e:

582

logger.info(f"WebSocket disconnected: {e.code} - {e.reason}")

583

self.is_connected = False

584

raise

585

except Exception as e:

586

logger.error(f"Failed to receive message: {e}")

587

await self.disconnect(code=1002, reason="Protocol error")

588

raise WebSocketException(code=1002, reason="Protocol error")

589

590

@app.websocket("/ws/robust")

591

async def robust_websocket(websocket: WebSocket):

592

handler = WebSocketHandler(websocket)

593

594

try:

595

await handler.connect()

596

597

# Send connection info

598

await handler.send_safe({

599

"type": "connection_info",

600

"message": "Connected successfully",

601

"heartbeat_interval": 30

602

})

603

604

# Set up heartbeat task

605

async def heartbeat():

606

while handler.is_connected:

607

await asyncio.sleep(30)

608

if not await handler.send_safe({"type": "heartbeat"}):

609

break

610

611

heartbeat_task = asyncio.create_task(heartbeat())

612

613

# Main message loop

614

while handler.is_connected:

615

try:

616

data = await handler.receive_safe()

617

618

# Handle different message types

619

if data.get("type") == "heartbeat_response":

620

logger.debug("Received heartbeat response")

621

continue

622

623

elif data.get("type") == "echo":

624

await handler.send_safe({

625

"type": "echo_response",

626

"original_message": data.get("message", "")

627

})

628

629

elif data.get("type") == "error_test":

630

# Simulate different error conditions

631

error_type = data.get("error_type", "generic")

632

633

if error_type == "protocol_error":

634

raise WebSocketException(code=1002, reason="Test protocol error")

635

elif error_type == "invalid_data":

636

raise WebSocketException(code=1003, reason="Test invalid data")

637

else:

638

raise Exception("Test generic error")

639

640

else:

641

await handler.send_safe({

642

"type": "error",

643

"message": f"Unknown message type: {data.get('type')}"

644

})

645

646

except (WebSocketDisconnect, WebSocketException):

647

break

648

except Exception as e:

649

logger.error(f"Unexpected error in WebSocket handler: {e}")

650

await handler.send_safe({

651

"type": "error",

652

"message": "An unexpected error occurred"

653

})

654

655

# Clean up heartbeat task

656

heartbeat_task.cancel()

657

try:

658

await heartbeat_task

659

except asyncio.CancelledError:

660

pass

661

662

except Exception as e:

663

logger.error(f"Fatal error in WebSocket connection: {e}")

664

665

finally:

666

if handler.is_connected:

667

await handler.disconnect()

668

```

669

670

### WebSocket Testing Support

671

672

```python

673

from fastapi import FastAPI, WebSocket

674

from fastapi.testclient import TestClient

675

import pytest

676

677

app = FastAPI()

678

679

@app.websocket("/ws/test")

680

async def test_websocket(websocket: WebSocket):

681

await websocket.accept()

682

683

# Echo messages with uppercase transformation

684

try:

685

while True:

686

data = await websocket.receive_text()

687

await websocket.send_text(data.upper())

688

except WebSocketDisconnect:

689

pass

690

691

# Test client usage

692

def test_websocket_echo():

693

client = TestClient(app)

694

695

with client.websocket_connect("/ws/test") as websocket:

696

websocket.send_text("hello")

697

data = websocket.receive_text()

698

assert data == "HELLO"

699

700

websocket.send_text("world")

701

data = websocket.receive_text()

702

assert data == "WORLD"

703

704

@pytest.mark.asyncio

705

async def test_websocket_json():

706

client = TestClient(app)

707

708

with client.websocket_connect("/ws/test") as websocket:

709

test_data = {"message": "test"}

710

websocket.send_json(test_data)

711

712

# Since our endpoint expects text, this would need

713

# a JSON-aware endpoint in practice

714

response = websocket.receive_text()

715

assert response is not None

716

```