0
# WebSocket Support
1
2
Comprehensive WebSocket implementation for both client and server applications, enabling real-time bidirectional communication. Supports message handling, connection management, and the complete WebSocket protocol with proper connection lifecycle management.
3
4
## Capabilities
5
6
### Client WebSocket Connections
7
8
WebSocket client implementation for connecting to WebSocket servers and handling real-time communication.
9
10
```python { .api }
11
class ClientWebSocketResponse:
12
@property
13
def closed(self):
14
"""Check if WebSocket connection is closed."""
15
16
@property
17
def close_code(self):
18
"""WebSocket close code."""
19
20
@property
21
def protocol(self):
22
"""WebSocket protocol."""
23
24
async def send_str(self, data, compress=None):
25
"""
26
Send text message.
27
28
Parameters:
29
- data (str): Text message to send
30
- compress (int): Compression level
31
"""
32
33
async def send_bytes(self, data, compress=None):
34
"""
35
Send binary message.
36
37
Parameters:
38
- data (bytes): Binary message to send
39
- compress (int): Compression level
40
"""
41
42
async def send_json(self, data, *, dumps=None, compress=None):
43
"""
44
Send JSON message.
45
46
Parameters:
47
- data: Object to serialize as JSON
48
- dumps: JSON serialization function
49
- compress (int): Compression level
50
"""
51
52
async def ping(self, message=b''):
53
"""
54
Send ping frame.
55
56
Parameters:
57
- message (bytes): Ping payload
58
"""
59
60
async def pong(self, message=b''):
61
"""
62
Send pong frame.
63
64
Parameters:
65
- message (bytes): Pong payload
66
"""
67
68
async def close(self, code=None, message=b''):
69
"""
70
Close WebSocket connection.
71
72
Parameters:
73
- code (int): Close code
74
- message (bytes): Close message
75
"""
76
77
async def receive(self):
78
"""
79
Receive next message.
80
81
Returns:
82
WSMessage: WebSocket message
83
"""
84
85
async def receive_str(self):
86
"""
87
Receive text message.
88
89
Returns:
90
str: Text message content
91
"""
92
93
async def receive_bytes(self):
94
"""
95
Receive binary message.
96
97
Returns:
98
bytes: Binary message content
99
"""
100
101
async def receive_json(self, *, loads=None):
102
"""
103
Receive JSON message.
104
105
Parameters:
106
- loads: JSON deserialization function
107
108
Returns:
109
Object parsed from JSON
110
"""
111
112
def __aiter__(self):
113
"""Async iterator over messages."""
114
115
async def __anext__(self):
116
"""Get next message in async iteration."""
117
```
118
119
### Server WebSocket Responses
120
121
WebSocket server implementation for handling WebSocket connections in web applications.
122
123
```python { .api }
124
class WebSocketResponse:
125
def __init__(
126
self,
127
*,
128
timeout=10.0,
129
receive_timeout=None,
130
autoclose=True,
131
autoping=True,
132
heartbeat=None,
133
protocols=None,
134
compress=True,
135
max_msg_size=4*1024*1024,
136
**kwargs
137
):
138
"""
139
Create WebSocket response.
140
141
Parameters:
142
- timeout (float): WebSocket timeout
143
- receive_timeout (float): Message receive timeout
144
- autoclose (bool): Auto-close on connection errors
145
- autoping (bool): Auto-send ping frames
146
- heartbeat (float): Heartbeat interval
147
- protocols: Supported WebSocket protocols
148
- compress (bool): Enable compression
149
- max_msg_size (int): Maximum message size
150
"""
151
152
async def prepare(self, request):
153
"""
154
Prepare WebSocket response.
155
156
Parameters:
157
- request: HTTP request for WebSocket upgrade
158
159
Returns:
160
WebSocketReady: Preparation result
161
"""
162
163
@property
164
def closed(self):
165
"""Check if WebSocket is closed."""
166
167
@property
168
def close_code(self):
169
"""WebSocket close code."""
170
171
@property
172
def protocol(self):
173
"""Selected WebSocket protocol."""
174
175
async def send_str(self, data, compress=None):
176
"""
177
Send text message.
178
179
Parameters:
180
- data (str): Text message
181
- compress (int): Compression level
182
"""
183
184
async def send_bytes(self, data, compress=None):
185
"""
186
Send binary message.
187
188
Parameters:
189
- data (bytes): Binary message
190
- compress (int): Compression level
191
"""
192
193
async def send_json(self, data, *, dumps=None, compress=None):
194
"""
195
Send JSON message.
196
197
Parameters:
198
- data: Object to serialize as JSON
199
- dumps: JSON serialization function
200
- compress (int): Compression level
201
"""
202
203
async def ping(self, message=b''):
204
"""Send ping frame."""
205
206
async def pong(self, message=b''):
207
"""Send pong frame."""
208
209
async def close(self, code=None, message=b''):
210
"""Close WebSocket connection."""
211
212
async def receive(self):
213
"""Receive next message."""
214
215
async def receive_str(self):
216
"""Receive text message."""
217
218
async def receive_bytes(self):
219
"""Receive binary message."""
220
221
async def receive_json(self, *, loads=None):
222
"""Receive JSON message."""
223
224
def __aiter__(self):
225
"""Async iterator over messages."""
226
227
class WebSocketReady:
228
"""WebSocket preparation result enumeration."""
229
230
OK = 'OK'
231
ERROR = 'ERROR'
232
```
233
234
### WebSocket Protocol Types
235
236
Core WebSocket protocol types and message handling.
237
238
```python { .api }
239
class WSMessage:
240
def __init__(self, type, data, extra):
241
"""
242
WebSocket message.
243
244
Parameters:
245
- type: Message type
246
- data: Message data
247
- extra: Extra message information
248
"""
249
250
@property
251
def type(self):
252
"""Message type."""
253
254
@property
255
def data(self):
256
"""Message data."""
257
258
@property
259
def extra(self):
260
"""Extra information."""
261
262
def json(self, *, loads=None):
263
"""Parse message data as JSON."""
264
265
class WSMsgType:
266
"""WebSocket message types."""
267
268
CONTINUATION = 0x0
269
TEXT = 0x1
270
BINARY = 0x2
271
CLOSE = 0x8
272
PING = 0x9
273
PONG = 0xa
274
ERROR = 0x100
275
276
class WSCloseCode:
277
"""WebSocket close codes."""
278
279
OK = 1000
280
GOING_AWAY = 1001
281
PROTOCOL_ERROR = 1002
282
UNSUPPORTED_DATA = 1003
283
ABNORMAL_CLOSURE = 1006
284
INVALID_TEXT = 1007
285
POLICY_VIOLATION = 1008
286
MESSAGE_TOO_BIG = 1009
287
MANDATORY_EXTENSION = 1010
288
INTERNAL_ERROR = 1011
289
SERVICE_RESTART = 1012
290
TRY_AGAIN_LATER = 1013
291
292
class WebSocketError(Exception):
293
"""WebSocket protocol error."""
294
```
295
296
## Usage Examples
297
298
### WebSocket Client
299
300
```python
301
import aiohttp
302
import asyncio
303
304
async def websocket_client():
305
session = aiohttp.ClientSession()
306
307
try:
308
async with session.ws_connect('ws://localhost:8080/ws') as ws:
309
# Send messages
310
await ws.send_str('Hello, Server!')
311
await ws.send_json({'type': 'greeting', 'message': 'Hello'})
312
313
# Receive messages
314
async for msg in ws:
315
if msg.type == aiohttp.WSMsgType.TEXT:
316
print(f"Received text: {msg.data}")
317
elif msg.type == aiohttp.WSMsgType.BINARY:
318
print(f"Received binary: {msg.data}")
319
elif msg.type == aiohttp.WSMsgType.ERROR:
320
print(f"WebSocket error: {ws.exception()}")
321
break
322
elif msg.type == aiohttp.WSMsgType.CLOSE:
323
print("WebSocket closed")
324
break
325
326
finally:
327
await session.close()
328
329
asyncio.run(websocket_client())
330
```
331
332
### WebSocket Server
333
334
```python
335
from aiohttp import web, WSMsgType
336
import weakref
337
338
# Store active WebSocket connections
339
websockets = weakref.WeakSet()
340
341
async def websocket_handler(request):
342
ws = web.WebSocketResponse()
343
await ws.prepare(request)
344
345
# Add to active connections
346
websockets.add(ws)
347
348
try:
349
async for msg in ws:
350
if msg.type == WSMsgType.TEXT:
351
data = msg.json()
352
353
if data['type'] == 'echo':
354
# Echo message back
355
await ws.send_json({
356
'type': 'echo_response',
357
'message': data['message']
358
})
359
360
elif data['type'] == 'broadcast':
361
# Broadcast to all connections
362
message = data['message']
363
for websocket in websockets:
364
if not websocket.closed:
365
await websocket.send_json({
366
'type': 'broadcast',
367
'message': message
368
})
369
370
elif msg.type == WSMsgType.ERROR:
371
print(f"WebSocket error: {ws.exception()}")
372
break
373
374
except Exception as e:
375
print(f"WebSocket handler error: {e}")
376
377
finally:
378
websockets.discard(ws)
379
380
return ws
381
382
# Create application with WebSocket route
383
app = web.Application()
384
app.router.add_get('/ws', websocket_handler)
385
386
if __name__ == '__main__':
387
web.run_app(app, host='localhost', port=8080)
388
```
389
390
### Real-time Chat Server
391
392
```python
393
from aiohttp import web, WSMsgType
394
import json
395
import weakref
396
import logging
397
398
logging.basicConfig(level=logging.INFO)
399
logger = logging.getLogger(__name__)
400
401
class ChatRoom:
402
def __init__(self):
403
self.clients = weakref.WeakSet()
404
self.users = {} # websocket -> username mapping
405
406
def add_client(self, ws, username):
407
self.clients.add(ws)
408
self.users[ws] = username
409
logger.info(f"User {username} joined the chat")
410
411
def remove_client(self, ws):
412
if ws in self.users:
413
username = self.users.pop(ws)
414
logger.info(f"User {username} left the chat")
415
self.clients.discard(ws)
416
417
async def broadcast(self, message, exclude=None):
418
"""Broadcast message to all connected clients."""
419
disconnected = []
420
421
for client in self.clients:
422
if client != exclude and not client.closed:
423
try:
424
await client.send_json(message)
425
except Exception as e:
426
logger.error(f"Error sending to client: {e}")
427
disconnected.append(client)
428
429
# Clean up disconnected clients
430
for client in disconnected:
431
self.remove_client(client)
432
433
# Global chat room
434
chat_room = ChatRoom()
435
436
async def websocket_handler(request):
437
ws = web.WebSocketResponse(heartbeat=30)
438
await ws.prepare(request)
439
440
username = None
441
442
try:
443
async for msg in ws:
444
if msg.type == WSMsgType.TEXT:
445
try:
446
data = json.loads(msg.data)
447
msg_type = data.get('type')
448
449
if msg_type == 'join':
450
username = data.get('username', 'Anonymous')
451
chat_room.add_client(ws, username)
452
453
# Send welcome message
454
await ws.send_json({
455
'type': 'system',
456
'message': f'Welcome {username}!'
457
})
458
459
# Notify others
460
await chat_room.broadcast({
461
'type': 'user_joined',
462
'username': username,
463
'message': f'{username} joined the chat'
464
}, exclude=ws)
465
466
elif msg_type == 'message':
467
if username:
468
message = {
469
'type': 'message',
470
'username': username,
471
'message': data.get('message', '')
472
}
473
await chat_room.broadcast(message)
474
475
except json.JSONDecodeError:
476
await ws.send_json({
477
'type': 'error',
478
'message': 'Invalid JSON message'
479
})
480
481
elif msg.type == WSMsgType.ERROR:
482
logger.error(f"WebSocket error: {ws.exception()}")
483
break
484
485
except Exception as e:
486
logger.error(f"WebSocket handler error: {e}")
487
488
finally:
489
if username:
490
await chat_room.broadcast({
491
'type': 'user_left',
492
'username': username,
493
'message': f'{username} left the chat'
494
}, exclude=ws)
495
496
chat_room.remove_client(ws)
497
498
return ws
499
500
# Serve static files (chat UI)
501
async def index_handler(request):
502
return web.Response(text="""
503
<!DOCTYPE html>
504
<html>
505
<head>
506
<title>WebSocket Chat</title>
507
</head>
508
<body>
509
<div id="messages"></div>
510
<input type="text" id="messageInput" placeholder="Type a message...">
511
<button onclick="sendMessage()">Send</button>
512
513
<script>
514
const ws = new WebSocket('ws://localhost:8080/ws');
515
const messages = document.getElementById('messages');
516
517
ws.onopen = function() {
518
ws.send(JSON.stringify({
519
type: 'join',
520
username: prompt('Enter your username:') || 'Anonymous'
521
}));
522
};
523
524
ws.onmessage = function(event) {
525
const data = JSON.parse(event.data);
526
const div = document.createElement('div');
527
528
if (data.type === 'message') {
529
div.textContent = `${data.username}: ${data.message}`;
530
} else {
531
div.textContent = data.message;
532
div.style.fontStyle = 'italic';
533
}
534
535
messages.appendChild(div);
536
messages.scrollTop = messages.scrollHeight;
537
};
538
539
function sendMessage() {
540
const input = document.getElementById('messageInput');
541
if (input.value.trim()) {
542
ws.send(JSON.stringify({
543
type: 'message',
544
message: input.value
545
}));
546
input.value = '';
547
}
548
}
549
550
document.getElementById('messageInput').addEventListener('keypress', function(e) {
551
if (e.key === 'Enter') {
552
sendMessage();
553
}
554
});
555
</script>
556
</body>
557
</html>
558
""", content_type='text/html')
559
560
# Create application
561
app = web.Application()
562
app.router.add_get('/', index_handler)
563
app.router.add_get('/ws', websocket_handler)
564
565
if __name__ == '__main__':
566
web.run_app(app, host='localhost', port=8080)
567
```
568
569
### WebSocket with Authentication
570
571
```python
572
from aiohttp import web, WSMsgType
573
import jwt
574
import datetime
575
576
SECRET_KEY = 'your-secret-key'
577
578
def create_token(username):
579
"""Create JWT token for user."""
580
payload = {
581
'username': username,
582
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
583
}
584
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
585
586
def verify_token(token):
587
"""Verify JWT token."""
588
try:
589
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
590
return payload.get('username')
591
except jwt.ExpiredSignatureError:
592
return None
593
except jwt.InvalidTokenError:
594
return None
595
596
async def websocket_handler(request):
597
ws = web.WebSocketResponse()
598
await ws.prepare(request)
599
600
authenticated = False
601
username = None
602
603
try:
604
async for msg in ws:
605
if msg.type == WSMsgType.TEXT:
606
data = msg.json()
607
608
if not authenticated:
609
if data.get('type') == 'auth':
610
token = data.get('token')
611
username = verify_token(token)
612
613
if username:
614
authenticated = True
615
await ws.send_json({
616
'type': 'auth_success',
617
'message': f'Welcome {username}!'
618
})
619
else:
620
await ws.send_json({
621
'type': 'auth_error',
622
'message': 'Invalid or expired token'
623
})
624
break
625
else:
626
await ws.send_json({
627
'type': 'error',
628
'message': 'Authentication required'
629
})
630
631
else:
632
# Handle authenticated messages
633
if data.get('type') == 'ping':
634
await ws.send_json({'type': 'pong'})
635
636
elif data.get('type') == 'message':
637
# Process authenticated message
638
await ws.send_json({
639
'type': 'echo',
640
'username': username,
641
'message': data.get('message')
642
})
643
644
elif msg.type == WSMsgType.ERROR:
645
print(f"WebSocket error: {ws.exception()}")
646
break
647
648
except Exception as e:
649
print(f"WebSocket error: {e}")
650
651
return ws
652
653
app = web.Application()
654
app.router.add_get('/ws', websocket_handler)
655
656
if __name__ == '__main__':
657
web.run_app(app, host='localhost', port=8080)
658
```