0
# WebSocket Support
1
2
Native async WebSocket connection handling with JSON message support, connection lifecycle management, and comprehensive WebSocket protocol features.
3
4
## Capabilities
5
6
### WebSocket Connection Management
7
8
Represents WebSocket connections with async support for bidirectional communication and connection lifecycle management.
9
10
```python { .api }
11
class Websocket:
12
# Connection attributes
13
path: str # WebSocket URL path
14
query_string: bytes # Raw query string
15
args: ImmutableMultiDict # Parsed query string arguments
16
headers: Headers # Connection headers
17
requested_subprotocols: list[str] # Client requested subprotocols
18
19
async def accept(
20
self,
21
headers: dict | Headers | None = None,
22
subprotocol: str | None = None
23
):
24
"""
25
Accept WebSocket connection.
26
27
Args:
28
headers: Additional headers to send with acceptance
29
subprotocol: Selected subprotocol from client's requested list
30
"""
31
32
async def receive(self) -> str | bytes:
33
"""
34
Receive data from client.
35
36
Returns:
37
Message data as string or bytes
38
39
Raises:
40
ConnectionClosed: If connection is closed
41
"""
42
43
async def send(self, data: str | bytes):
44
"""
45
Send data to client.
46
47
Args:
48
data: Message data to send (string or bytes)
49
50
Raises:
51
ConnectionClosed: If connection is closed
52
"""
53
54
async def receive_json(self):
55
"""
56
Receive JSON data from client.
57
58
Returns:
59
Parsed JSON data
60
61
Raises:
62
ConnectionClosed: If connection is closed
63
BadRequest: If data is not valid JSON
64
"""
65
66
async def send_json(self, *args, **kwargs):
67
"""
68
Send JSON data to client.
69
70
Args:
71
*args: Data to serialize as JSON (single positional arg or multiple args)
72
**kwargs: Additional arguments for json.dumps() or dict data if no args
73
74
Raises:
75
ConnectionClosed: If connection is closed
76
"""
77
78
async def close(self, code: int, reason: str = ""):
79
"""
80
Close WebSocket connection.
81
82
Args:
83
code: WebSocket close code (required, e.g. 1000 = normal closure)
84
reason: Close reason message
85
"""
86
```
87
88
### Usage Examples
89
90
#### Basic WebSocket Handler
91
92
```python
93
from quart import Quart, websocket
94
95
app = Quart(__name__)
96
97
@app.websocket('/ws')
98
async def ws():
99
# Accept the connection
100
await websocket.accept()
101
102
try:
103
while True:
104
# Receive message from client
105
message = await websocket.receive()
106
107
# Echo message back to client
108
await websocket.send(f"Echo: {message}")
109
110
except ConnectionClosed:
111
print("WebSocket connection closed")
112
```
113
114
#### JSON Message Handling
115
116
```python
117
from quart import Quart, websocket
118
119
app = Quart(__name__)
120
121
@app.websocket('/api/ws')
122
async def api_websocket():
123
await websocket.accept()
124
125
try:
126
while True:
127
# Receive JSON data
128
data = await websocket.receive_json()
129
130
# Process the message
131
response = {
132
'type': 'response',
133
'original': data,
134
'timestamp': time.time()
135
}
136
137
# Send JSON response
138
await websocket.send_json(response)
139
140
except ConnectionClosed:
141
print("Client disconnected")
142
except BadRequest as e:
143
# Handle invalid JSON
144
await websocket.send_json({
145
'type': 'error',
146
'message': 'Invalid JSON format'
147
})
148
await websocket.close(code=1003, reason="Invalid data format")
149
```
150
151
#### WebSocket with Authentication
152
153
```python
154
from quart import Quart, websocket, abort
155
156
app = Quart(__name__)
157
158
@app.websocket('/secure/ws')
159
async def secure_websocket():
160
# Check authentication before accepting
161
auth_token = websocket.args.get('token')
162
if not auth_token or not validate_token(auth_token):
163
await websocket.close(code=1008, reason="Authentication required")
164
return
165
166
# Accept authenticated connection
167
await websocket.accept()
168
169
try:
170
while True:
171
message = await websocket.receive_json()
172
173
# Handle authenticated message
174
response = await process_authenticated_message(message, auth_token)
175
await websocket.send_json(response)
176
177
except ConnectionClosed:
178
print(f"Authenticated client disconnected: {auth_token}")
179
180
def validate_token(token):
181
# Implement token validation logic
182
return token == "valid_token"
183
184
async def process_authenticated_message(message, token):
185
# Process message with authentication context
186
return {
187
'status': 'processed',
188
'data': message,
189
'user': get_user_from_token(token)
190
}
191
```
192
193
#### WebSocket Room/Broadcasting System
194
195
```python
196
from quart import Quart, websocket
197
import asyncio
198
199
app = Quart(__name__)
200
201
# Store active connections
202
connections = set()
203
204
@app.websocket('/chat')
205
async def chat_websocket():
206
await websocket.accept()
207
connections.add(websocket)
208
209
try:
210
# Send welcome message
211
await websocket.send_json({
212
'type': 'system',
213
'message': f'Connected! {len(connections)} users online.'
214
})
215
216
# Broadcast new user joined
217
await broadcast_message({
218
'type': 'user_joined',
219
'count': len(connections)
220
}, exclude=websocket)
221
222
while True:
223
# Receive message from this client
224
data = await websocket.receive_json()
225
226
# Broadcast to all other clients
227
broadcast_data = {
228
'type': 'message',
229
'user': data.get('user', 'Anonymous'),
230
'message': data.get('message', ''),
231
'timestamp': time.time()
232
}
233
234
await broadcast_message(broadcast_data, exclude=websocket)
235
236
except ConnectionClosed:
237
pass
238
finally:
239
# Remove connection and notify others
240
connections.discard(websocket)
241
await broadcast_message({
242
'type': 'user_left',
243
'count': len(connections)
244
})
245
246
async def broadcast_message(data, exclude=None):
247
"""Broadcast message to all connected clients."""
248
if not connections:
249
return
250
251
# Create list of send tasks
252
tasks = []
253
for conn in connections.copy(): # Copy to avoid modification during iteration
254
if conn != exclude:
255
tasks.append(send_safe(conn, data))
256
257
# Send to all connections concurrently
258
if tasks:
259
await asyncio.gather(*tasks, return_exceptions=True)
260
261
async def send_safe(ws, data):
262
"""Send data to websocket, handling connection errors."""
263
try:
264
await ws.send_json(data)
265
except ConnectionClosed:
266
# Remove broken connection
267
connections.discard(ws)
268
```
269
270
#### WebSocket with Heartbeat/Ping-Pong
271
272
```python
273
from quart import Quart, websocket
274
import asyncio
275
276
app = Quart(__name__)
277
278
@app.websocket('/ws/heartbeat')
279
async def heartbeat_websocket():
280
await websocket.accept()
281
282
# Start heartbeat task
283
heartbeat_task = asyncio.create_task(heartbeat_handler())
284
285
try:
286
while True:
287
# Receive messages with timeout
288
try:
289
message = await asyncio.wait_for(
290
websocket.receive(),
291
timeout=30.0 # 30 second timeout
292
)
293
294
# Handle regular messages
295
if message == 'ping':
296
await websocket.send('pong')
297
else:
298
# Process other messages
299
await websocket.send(f"Received: {message}")
300
301
except asyncio.TimeoutError:
302
# Send ping to check if client is still alive
303
await websocket.send('ping')
304
305
except ConnectionClosed:
306
print("Client disconnected")
307
finally:
308
# Cancel heartbeat task
309
heartbeat_task.cancel()
310
try:
311
await heartbeat_task
312
except asyncio.CancelledError:
313
pass
314
315
async def heartbeat_handler():
316
"""Send periodic heartbeat messages."""
317
try:
318
while True:
319
await asyncio.sleep(20) # Send heartbeat every 20 seconds
320
await websocket.send_json({
321
'type': 'heartbeat',
322
'timestamp': time.time()
323
})
324
except (ConnectionClosed, asyncio.CancelledError):
325
pass
326
```
327
328
#### WebSocket Error Handling
329
330
```python
331
from quart import Quart, websocket
332
from quart.exceptions import BadRequest, ConnectionClosed
333
334
app = Quart(__name__)
335
336
@app.websocket('/ws/robust')
337
async def robust_websocket():
338
try:
339
await websocket.accept()
340
341
while True:
342
try:
343
# Receive and validate message
344
data = await websocket.receive_json()
345
346
# Validate message structure
347
if not isinstance(data, dict) or 'type' not in data:
348
await websocket.send_json({
349
'type': 'error',
350
'code': 'INVALID_FORMAT',
351
'message': 'Message must be JSON object with "type" field'
352
})
353
continue
354
355
# Process different message types
356
if data['type'] == 'echo':
357
await websocket.send_json({
358
'type': 'echo_response',
359
'original': data.get('message', '')
360
})
361
elif data['type'] == 'time':
362
await websocket.send_json({
363
'type': 'time_response',
364
'timestamp': time.time()
365
})
366
else:
367
await websocket.send_json({
368
'type': 'error',
369
'code': 'UNKNOWN_TYPE',
370
'message': f'Unknown message type: {data["type"]}'
371
})
372
373
except BadRequest:
374
# Invalid JSON received
375
await websocket.send_json({
376
'type': 'error',
377
'code': 'INVALID_JSON',
378
'message': 'Invalid JSON format'
379
})
380
381
except Exception as e:
382
# Handle unexpected errors
383
await websocket.send_json({
384
'type': 'error',
385
'code': 'INTERNAL_ERROR',
386
'message': 'An internal error occurred'
387
})
388
print(f"WebSocket error: {e}")
389
390
except ConnectionClosed:
391
print("WebSocket connection closed normally")
392
except Exception as e:
393
print(f"WebSocket handler error: {e}")
394
try:
395
await websocket.close(code=1011, reason="Internal server error")
396
except:
397
pass # Connection might already be closed
398
```