or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-routing.mdconfiguration.mddto.mdexceptions.mdhttp-handlers.mdindex.mdmiddleware.mdopenapi.mdplugins.mdrequest-response.mdsecurity.mdtesting.mdwebsocket.md

websocket.mddocs/

0

# WebSocket Support

1

2

WebSocket connection handling with decorators for WebSocket routes, listeners, and streaming. Litestar provides comprehensive WebSocket support for real-time bidirectional communication between client and server.

3

4

## Capabilities

5

6

### WebSocket Route Decorators

7

8

Decorators for creating WebSocket route handlers with different patterns of communication.

9

10

```python { .api }

11

def websocket(

12

path: str | Sequence[str] | None = None,

13

*,

14

dependencies: Dependencies | None = None,

15

exception_handlers: ExceptionHandlersMap | None = None,

16

guards: Sequence[Guard] | None = None,

17

middleware: Sequence[Middleware] | None = None,

18

name: str | None = None,

19

opt: dict[str, Any] | None = None,

20

signature_namespace: dict[str, Any] | None = None,

21

websocket_class: type[WebSocket] | None = None,

22

connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,

23

) -> Callable[[AnyCallable], WebsocketRouteHandler]:

24

"""

25

Create a WebSocket route handler.

26

27

Parameters:

28

- path: WebSocket route path(s)

29

- dependencies: Route-specific dependency providers

30

- exception_handlers: Route-specific exception handlers

31

- guards: Authorization guards

32

- middleware: Route-specific middleware

33

- name: Route name for URL generation

34

- opt: Arbitrary options dictionary

35

- signature_namespace: Additional namespace for signature inspection

36

- websocket_class: Custom WebSocket class

37

- connection_lifespan: Lifespan managers for the connection

38

39

Returns:

40

Decorator function that creates a WebsocketRouteHandler

41

"""

42

43

def websocket_listener(

44

path: str | Sequence[str] | None = None,

45

**kwargs: Any

46

) -> Callable[[AnyCallable], WebsocketListenerRouteHandler]:

47

"""

48

Create a WebSocket listener route handler.

49

50

Listener handlers automatically accept connections and continuously

51

listen for messages, calling the handler function for each message.

52

"""

53

54

def websocket_stream(

55

path: str | Sequence[str] | None = None,

56

**kwargs: Any

57

) -> Callable[[AnyCallable], WebsocketRouteHandler]:

58

"""

59

Create a WebSocket streaming route handler.

60

61

Stream handlers send data to the client using an async generator function.

62

"""

63

```

64

65

### WebSocket Connection Object

66

67

The WebSocket connection object provides methods for bidirectional communication.

68

69

```python { .api }

70

class WebSocket(ASGIConnection):

71

def __init__(self, scope: Scope, receive: Receive, send: Send):

72

"""

73

Initialize a WebSocket connection.

74

75

Parameters:

76

- scope: ASGI scope dictionary

77

- receive: ASGI receive callable

78

- send: ASGI send callable

79

"""

80

81

# Connection management

82

async def accept(

83

self,

84

subprotocols: str | Sequence[str] | None = None,

85

headers: Sequence[tuple[str, str]] | None = None,

86

) -> None:

87

"""

88

Accept the WebSocket connection.

89

90

Parameters:

91

- subprotocols: Supported subprotocols

92

- headers: Additional headers to send

93

"""

94

95

async def close(self, code: int = 1000, reason: str | None = None) -> None:

96

"""

97

Close the WebSocket connection.

98

99

Parameters:

100

- code: Close code (default 1000 for normal closure)

101

- reason: Reason for closing

102

"""

103

104

# Sending data

105

async def send_text(self, data: str) -> None:

106

"""Send text data to the client."""

107

108

async def send_bytes(self, data: bytes) -> None:

109

"""Send binary data to the client."""

110

111

async def send_json(self, data: Any, mode: Literal["text", "binary"] = "text") -> None:

112

"""

113

Send JSON data to the client.

114

115

Parameters:

116

- data: Data to serialize as JSON

117

- mode: Send as text or binary message

118

"""

119

120

# Receiving data

121

async def receive_text(self) -> str:

122

"""Receive text data from the client."""

123

124

async def receive_bytes(self) -> bytes:

125

"""Receive binary data from the client."""

126

127

async def receive_json(self, mode: Literal["text", "binary"] = "text") -> Any:

128

"""

129

Receive JSON data from the client.

130

131

Parameters:

132

- mode: Expect text or binary JSON message

133

134

Returns:

135

Deserialized JSON data

136

"""

137

138

async def receive(self) -> Message:

139

"""Receive raw ASGI message from the client."""

140

141

async def send(self, message: Message) -> None:

142

"""Send raw ASGI message to the client."""

143

144

# Connection state

145

@property

146

def connection_state(self) -> WebSocketState:

147

"""Get current connection state."""

148

149

@property

150

def client_state(self) -> WebSocketState:

151

"""Get client connection state."""

152

153

# Iterator support

154

def iter_text(self) -> AsyncIterator[str]:

155

"""Iterate over incoming text messages."""

156

157

def iter_bytes(self) -> AsyncIterator[bytes]:

158

"""Iterate over incoming binary messages."""

159

160

def iter_json(self, mode: Literal["text", "binary"] = "text") -> AsyncIterator[Any]:

161

"""Iterate over incoming JSON messages."""

162

```

163

164

### WebSocket Route Handlers

165

166

Route handler classes for different WebSocket patterns.

167

168

```python { .api }

169

class WebsocketRouteHandler(BaseRouteHandler):

170

def __init__(

171

self,

172

fn: AnyCallable,

173

*,

174

path: str | Sequence[str] | None = None,

175

connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,

176

**kwargs: Any,

177

):

178

"""Create a WebSocket route handler."""

179

180

class WebsocketListenerRouteHandler(WebsocketRouteHandler):

181

"""Route handler that automatically listens for messages."""

182

183

class WebsocketListener:

184

def __init__(

185

self,

186

path: str,

187

*,

188

connection_lifespan: Sequence[Callable[..., AsyncContextManager[None]]] | None = None,

189

**kwargs: Any,

190

):

191

"""Create a WebSocket listener."""

192

```

193

194

### WebSocket Utilities

195

196

Utility functions for WebSocket streaming and message handling.

197

198

```python { .api }

199

async def send_websocket_stream(

200

websocket: WebSocket,

201

stream: AsyncIterator[str | bytes | dict],

202

*,

203

mode: Literal["text", "binary"] = "text",

204

) -> None:

205

"""

206

Send a stream of data over WebSocket.

207

208

Parameters:

209

- websocket: WebSocket connection

210

- stream: Async iterator yielding data to send

211

- mode: Send as text or binary messages

212

"""

213

```

214

215

## Usage Examples

216

217

### Basic WebSocket Handler

218

219

```python

220

from litestar import websocket, WebSocket

221

import asyncio

222

223

@websocket("/ws")

224

async def websocket_handler(websocket: WebSocket) -> None:

225

await websocket.accept()

226

227

try:

228

while True:

229

message = await websocket.receive_text()

230

# Echo the message back

231

await websocket.send_text(f"Echo: {message}")

232

except Exception:

233

await websocket.close()

234

```

235

236

### WebSocket Listener

237

238

```python

239

from litestar import websocket_listener, WebSocket

240

241

@websocket_listener("/chat")

242

async def chat_handler(websocket: WebSocket, data: str) -> None:

243

"""Handler is called for each message received."""

244

# Process the message

245

response = f"Received: {data}"

246

await websocket.send_text(response)

247

```

248

249

### WebSocket Streaming

250

251

```python

252

from litestar import websocket_stream, WebSocket

253

import asyncio

254

import json

255

256

@websocket_stream("/stream")

257

async def stream_handler(websocket: WebSocket) -> AsyncIterator[str]:

258

"""Stream data to the client using an async generator."""

259

counter = 0

260

while True:

261

data = {"counter": counter, "timestamp": time.time()}

262

yield json.dumps(data)

263

counter += 1

264

await asyncio.sleep(1)

265

```

266

267

### JSON Message Handling

268

269

```python

270

from litestar import websocket, WebSocket

271

from typing import Dict, Any

272

273

@websocket("/api/ws")

274

async def api_websocket(websocket: WebSocket) -> None:

275

await websocket.accept()

276

277

try:

278

while True:

279

# Receive JSON message

280

message: Dict[str, Any] = await websocket.receive_json()

281

282

# Process based on message type

283

if message.get("type") == "ping":

284

await websocket.send_json({"type": "pong"})

285

elif message.get("type") == "echo":

286

await websocket.send_json({

287

"type": "echo_response",

288

"data": message.get("data")

289

})

290

else:

291

await websocket.send_json({

292

"type": "error",

293

"message": "Unknown message type"

294

})

295

296

except Exception as e:

297

await websocket.close(code=1011, reason=str(e))

298

```

299

300

### Connection Management with Authentication

301

302

```python

303

from litestar import websocket, WebSocket, Dependency

304

from litestar.exceptions import WebSocketException

305

306

async def authenticate_websocket(websocket: WebSocket) -> dict:

307

"""Authenticate WebSocket connection."""

308

token = websocket.query_params.get("token")

309

if not token:

310

raise WebSocketException("Authentication required", code=4001)

311

312

# Validate token (simplified)

313

if token != "valid_token":

314

raise WebSocketException("Invalid token", code=4003)

315

316

return {"user_id": 123, "username": "alice"}

317

318

@websocket("/secure-ws", dependencies={"user": Dependency(authenticate_websocket)})

319

async def secure_websocket(websocket: WebSocket, user: dict) -> None:

320

await websocket.accept()

321

322

# Send welcome message

323

await websocket.send_json({

324

"type": "welcome",

325

"user": user["username"]

326

})

327

328

try:

329

while True:

330

message = await websocket.receive_json()

331

# Process authenticated user messages

332

await websocket.send_json({

333

"type": "response",

334

"user_id": user["user_id"],

335

"echo": message

336

})

337

except Exception:

338

await websocket.close()

339

```

340

341

### Broadcasting to Multiple Connections

342

343

```python

344

from litestar import Litestar, websocket, WebSocket

345

import asyncio

346

from typing import Set

347

348

# Store active connections

349

active_connections: Set[WebSocket] = set()

350

351

async def add_connection(websocket: WebSocket) -> None:

352

active_connections.add(websocket)

353

354

async def remove_connection(websocket: WebSocket) -> None:

355

active_connections.discard(websocket)

356

357

async def broadcast_message(message: str) -> None:

358

"""Broadcast message to all active connections."""

359

if active_connections:

360

await asyncio.gather(

361

*[ws.send_text(message) for ws in active_connections.copy()],

362

return_exceptions=True

363

)

364

365

@websocket("/broadcast")

366

async def broadcast_websocket(websocket: WebSocket) -> None:

367

await websocket.accept()

368

await add_connection(websocket)

369

370

try:

371

while True:

372

message = await websocket.receive_text()

373

# Broadcast to all connections

374

await broadcast_message(f"User says: {message}")

375

except Exception:

376

pass

377

finally:

378

await remove_connection(websocket)

379

await websocket.close()

380

```

381

382

### WebSocket with Background Tasks

383

384

```python

385

from litestar import websocket, WebSocket

386

from litestar.concurrency import sync_to_thread

387

import asyncio

388

import queue

389

import threading

390

391

# Message queue for background processing

392

message_queue = queue.Queue()

393

394

def background_processor():

395

"""Background thread that processes messages."""

396

while True:

397

try:

398

message = message_queue.get(timeout=1)

399

# Process message (simulate work)

400

processed = f"Processed: {message}"

401

# In real app, you'd send this back to specific connections

402

print(processed)

403

except queue.Empty:

404

continue

405

406

# Start background thread

407

threading.Thread(target=background_processor, daemon=True).start()

408

409

@websocket("/background")

410

async def background_websocket(websocket: WebSocket) -> None:

411

await websocket.accept()

412

413

try:

414

while True:

415

message = await websocket.receive_text()

416

417

# Add to background processing queue

418

await sync_to_thread(message_queue.put, message)

419

420

# Acknowledge receipt

421

await websocket.send_text("Message queued for processing")

422

423

except Exception:

424

await websocket.close()

425

```

426

427

### WebSocket with Path Parameters

428

429

```python

430

from litestar import websocket, WebSocket

431

432

@websocket("/rooms/{room_id:int}/ws")

433

async def room_websocket(websocket: WebSocket, room_id: int) -> None:

434

await websocket.accept()

435

436

# Send room info

437

await websocket.send_json({

438

"type": "room_joined",

439

"room_id": room_id,

440

"message": f"Welcome to room {room_id}"

441

})

442

443

try:

444

while True:

445

message = await websocket.receive_json()

446

# Handle room-specific messages

447

await websocket.send_json({

448

"type": "room_message",

449

"room_id": room_id,

450

"data": message

451

})

452

except Exception:

453

await websocket.close()

454

```

455

456

## Types

457

458

```python { .api }

459

# WebSocket states

460

class WebSocketState(Enum):

461

CONNECTING = "CONNECTING"

462

CONNECTED = "CONNECTED"

463

DISCONNECTED = "DISCONNECTED"

464

465

# WebSocket message types

466

WebSocketMessage = dict[str, Any]

467

468

# Close codes (RFC 6455)

469

WS_1000_NORMAL_CLOSURE = 1000

470

WS_1001_GOING_AWAY = 1001

471

WS_1002_PROTOCOL_ERROR = 1002

472

WS_1003_UNSUPPORTED_DATA = 1003

473

WS_1007_INVALID_FRAME_PAYLOAD_DATA = 1007

474

WS_1008_POLICY_VIOLATION = 1008

475

WS_1009_MESSAGE_TOO_BIG = 1009

476

WS_1010_MANDATORY_EXTENSION = 1010

477

WS_1011_INTERNAL_ERROR = 1011

478

WS_1012_SERVICE_RESTART = 1012

479

WS_1013_TRY_AGAIN_LATER = 1013

480

481

# Custom close codes for application use (4000-4999)

482

WS_4001_UNAUTHORIZED = 4001

483

WS_4003_FORBIDDEN = 4003

484

WS_4004_NOT_FOUND = 4004

485

486

# Iterator types

487

AsyncTextIterator = AsyncIterator[str]

488

AsyncBytesIterator = AsyncIterator[bytes]

489

AsyncJSONIterator = AsyncIterator[Any]

490

```