0
# Asyncio Server Operations
1
2
Complete asyncio-based WebSocket server functionality for creating WebSocket servers, managing connections, handling client requests, and broadcasting messages to multiple clients.
3
4
## Capabilities
5
6
### Server Creation Functions
7
8
Start WebSocket servers with customizable handlers, authentication, compression, and connection management.
9
10
```python { .api }
11
async def serve(
12
handler: Callable[[ServerConnection], Awaitable[None]],
13
host: str,
14
port: int,
15
*,
16
logger: LoggerLike = None,
17
compression: str = "deflate",
18
subprotocols: List[Subprotocol] = None,
19
extra_headers: HeadersLike = None,
20
process_request: Callable = None,
21
select_subprotocol: Callable = None,
22
ping_interval: float = 20,
23
ping_timeout: float = 20,
24
close_timeout: float = 10,
25
max_size: int = 2**20,
26
max_queue: int = 2**5,
27
read_limit: int = 2**16,
28
write_limit: int = 2**16,
29
extensions: List[ServerExtensionFactory] = None,
30
**kwargs
31
) -> Server:
32
"""
33
Start a WebSocket server.
34
35
Parameters:
36
- handler: Coroutine to handle each WebSocket connection
37
- host: Server host address
38
- port: Server port number
39
- logger: Logger instance for server logging
40
- compression: Compression mode ("deflate" or None)
41
- subprotocols: List of supported subprotocols
42
- extra_headers: Additional response headers
43
- process_request: Function to process HTTP request before upgrade
44
- select_subprotocol: Function to select subprotocol from client list
45
- ping_interval: Interval between ping frames (seconds)
46
- ping_timeout: Timeout for ping/pong exchange (seconds)
47
- close_timeout: Timeout for connection closure (seconds)
48
- max_size: Maximum message size (bytes)
49
- max_queue: Maximum number of queued messages
50
- read_limit: Buffer size for reading (bytes)
51
- write_limit: Buffer size for writing (bytes)
52
- extensions: List of WebSocket extensions
53
54
Returns:
55
Server: Async context manager for server lifecycle management
56
"""
57
58
async def unix_serve(
59
handler: Callable[[ServerConnection], Awaitable[None]],
60
path: str,
61
*,
62
logger: LoggerLike = None,
63
compression: str = "deflate",
64
subprotocols: List[Subprotocol] = None,
65
extra_headers: HeadersLike = None,
66
process_request: Callable = None,
67
select_subprotocol: Callable = None,
68
ping_interval: float = 20,
69
ping_timeout: float = 20,
70
close_timeout: float = 10,
71
max_size: int = 2**20,
72
max_queue: int = 2**5,
73
read_limit: int = 2**16,
74
write_limit: int = 2**16,
75
extensions: List[ServerExtensionFactory] = None,
76
**kwargs
77
) -> Server:
78
"""
79
Start a WebSocket server on Unix domain socket.
80
81
Parameters:
82
- handler: Coroutine to handle each WebSocket connection
83
- path: Unix domain socket path
84
- Other parameters same as serve()
85
86
Returns:
87
Server: WebSocket server bound to Unix socket
88
"""
89
```
90
91
### Server Connection Management
92
93
The ServerConnection class represents individual client connections on the server side.
94
95
```python { .api }
96
class ServerConnection:
97
"""WebSocket server connection representing a client."""
98
99
@property
100
def closed(self) -> bool:
101
"""Check if connection is closed."""
102
103
@property
104
def local_address(self) -> Tuple[str, int]:
105
"""Get server socket address."""
106
107
@property
108
def remote_address(self) -> Tuple[str, int]:
109
"""Get client socket address."""
110
111
@property
112
def subprotocol(self) -> Subprotocol | None:
113
"""Get negotiated subprotocol."""
114
115
@property
116
def request_headers(self) -> Headers:
117
"""Get HTTP request headers from handshake."""
118
119
@property
120
def response_headers(self) -> Headers:
121
"""Get HTTP response headers from handshake."""
122
123
async def send(self, message: Data) -> None:
124
"""
125
Send a message to the WebSocket client.
126
127
Parameters:
128
- message: Text (str) or binary (bytes) message to send
129
130
Raises:
131
- ConnectionClosed: If connection is closed
132
"""
133
134
async def recv(self) -> Data:
135
"""
136
Receive a message from the WebSocket client.
137
138
Returns:
139
str | bytes: Received message (text or binary)
140
141
Raises:
142
- ConnectionClosed: If connection is closed
143
"""
144
145
async def ping(self, data: bytes = b"") -> Awaitable[float]:
146
"""
147
Send a ping frame and wait for pong response.
148
149
Parameters:
150
- data: Optional payload for ping frame
151
152
Returns:
153
Awaitable[float]: Coroutine that resolves to round-trip time
154
155
Raises:
156
- ConnectionClosed: If connection is closed
157
"""
158
159
async def pong(self, data: bytes = b"") -> None:
160
"""
161
Send a pong frame.
162
163
Parameters:
164
- data: Payload for pong frame
165
166
Raises:
167
- ConnectionClosed: If connection is closed
168
"""
169
170
async def close(self, code: int = 1000, reason: str = "") -> None:
171
"""
172
Close the WebSocket connection.
173
174
Parameters:
175
- code: Close code (default 1000 for normal closure)
176
- reason: Human-readable close reason
177
178
Raises:
179
- ProtocolError: If code is invalid
180
"""
181
182
async def wait_closed(self) -> None:
183
"""Wait until the connection is closed."""
184
185
# Async iterator support for receiving messages
186
def __aiter__(self) -> AsyncIterator[Data]:
187
"""Return async iterator for receiving messages."""
188
return self
189
190
async def __anext__(self) -> Data:
191
"""Get next message from async iterator."""
192
```
193
194
### Server Management
195
196
The Server class manages the WebSocket server lifecycle and provides access to connected clients.
197
198
```python { .api }
199
class Server:
200
"""WebSocket server management."""
201
202
@property
203
def sockets(self) -> Set[socket.socket]:
204
"""Get server sockets."""
205
206
@property
207
def websockets(self) -> Set[ServerConnection]:
208
"""Get active WebSocket connections."""
209
210
def close(self) -> None:
211
"""
212
Stop accepting new connections and close existing ones.
213
214
This initiates server shutdown but doesn't wait for completion.
215
Use wait_closed() to wait for full shutdown.
216
"""
217
218
async def wait_closed(self) -> None:
219
"""Wait until server and all connections are closed."""
220
221
# Context manager support
222
async def __aenter__(self) -> Server:
223
"""Enter async context manager."""
224
return self
225
226
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
227
"""Exit async context manager and close server."""
228
```
229
230
### Message Broadcasting
231
232
Utility functions for sending messages to multiple WebSocket connections concurrently.
233
234
```python { .api }
235
def broadcast(
236
websockets: Iterable[ServerConnection],
237
message: Data,
238
raise_exceptions: bool = False
239
) -> None:
240
"""
241
Broadcast a message to multiple WebSocket connections.
242
243
Parameters:
244
- websockets: Iterable of ServerConnection objects
245
- message: Text (str) or binary (bytes) message to broadcast
246
- raise_exceptions: Whether to raise exceptions from failed sends
247
248
Note:
249
This function sends messages concurrently and handles connection failures gracefully.
250
Failed connections are silently ignored unless raise_exceptions=True.
251
"""
252
```
253
254
### Authentication Support
255
256
Built-in HTTP basic authentication decorator for WebSocket handlers.
257
258
```python { .api }
259
def basic_auth(
260
username: str,
261
password: str,
262
realm: str = "WebSocket"
263
) -> Callable:
264
"""
265
Create HTTP basic authentication decorator for WebSocket handlers.
266
267
Parameters:
268
- username: Expected username
269
- password: Expected password
270
- realm: Authentication realm name
271
272
Returns:
273
Callable: Decorator that adds basic authentication to handler
274
275
Usage:
276
@basic_auth("admin", "secret")
277
async def protected_handler(websocket):
278
# Handler code here
279
pass
280
"""
281
```
282
283
## Usage Examples
284
285
### Basic Echo Server
286
287
```python
288
import asyncio
289
import websockets
290
291
async def echo_handler(websocket):
292
"""Simple echo handler that returns received messages."""
293
async for message in websocket:
294
await websocket.send(f"Echo: {message}")
295
296
async def main():
297
# Start server on localhost:8765
298
async with websockets.serve(echo_handler, "localhost", 8765):
299
print("WebSocket server started on ws://localhost:8765")
300
await asyncio.Future() # Run forever
301
302
asyncio.run(main())
303
```
304
305
### Chat Server with Broadcasting
306
307
```python
308
import asyncio
309
import websockets
310
import json
311
from typing import Set
312
313
# Keep track of connected clients
314
clients: Set[websockets.ServerConnection] = set()
315
316
async def chat_handler(websocket):
317
"""Chat server handler with broadcasting."""
318
# Register client
319
clients.add(websocket)
320
print(f"Client connected: {websocket.remote_address}")
321
322
try:
323
async for message in websocket:
324
try:
325
# Parse JSON message
326
data = json.loads(message)
327
chat_message = {
328
"user": data.get("user", "Anonymous"),
329
"message": data.get("message", ""),
330
"timestamp": asyncio.get_event_loop().time()
331
}
332
333
# Broadcast to all connected clients
334
broadcast_message = json.dumps(chat_message)
335
websockets.broadcast(clients, broadcast_message)
336
337
except json.JSONDecodeError:
338
# Send error response
339
error = {"error": "Invalid JSON format"}
340
await websocket.send(json.dumps(error))
341
342
except websockets.ConnectionClosedError:
343
pass
344
finally:
345
# Unregister client
346
clients.discard(websocket)
347
print(f"Client disconnected: {websocket.remote_address}")
348
349
async def main():
350
async with websockets.serve(
351
chat_handler,
352
"localhost",
353
8765,
354
ping_interval=30,
355
ping_timeout=10,
356
max_size=1024*10 # 10KB max message
357
):
358
print("Chat server started on ws://localhost:8765")
359
await asyncio.Future()
360
361
asyncio.run(main())
362
```
363
364
### Authenticated Server
365
366
```python
367
import asyncio
368
import websockets
369
from websockets import basic_auth
370
371
@basic_auth("admin", "secret123")
372
async def protected_handler(websocket):
373
"""Handler that requires authentication."""
374
await websocket.send("Welcome! You are authenticated.")
375
376
async for message in websocket:
377
if message.lower() == "status":
378
await websocket.send("Server is running normally")
379
else:
380
await websocket.send(f"Received: {message}")
381
382
async def main():
383
async with websockets.serve(
384
protected_handler,
385
"localhost",
386
8765,
387
extra_headers={"Server": "MyWebSocketServer/1.0"}
388
):
389
print("Protected server started on ws://localhost:8765")
390
print("Use Authorization: Basic YWRtaW46c2VjcmV0MTIz")
391
await asyncio.Future()
392
393
asyncio.run(main())
394
```
395
396
### Server with Custom Request Processing
397
398
```python
399
import asyncio
400
import websockets
401
from websockets import Request, Response
402
403
async def process_request(connection, request: Request):
404
"""Custom request processing before WebSocket upgrade."""
405
# Check custom header
406
api_key = request.headers.get("X-API-Key")
407
if not api_key or api_key != "valid-api-key":
408
return Response(403, "Forbidden", b"Invalid API key")
409
410
# Allow upgrade to proceed
411
return None
412
413
async def api_handler(websocket):
414
"""API handler for authenticated clients."""
415
await websocket.send("API connection established")
416
417
async for message in websocket:
418
# Process API commands
419
if message.startswith("GET /"):
420
await websocket.send('{"status": "success", "data": "..."}')
421
else:
422
await websocket.send('{"error": "Unknown command"}')
423
424
async def main():
425
async with websockets.serve(
426
api_handler,
427
"localhost",
428
8765,
429
process_request=process_request,
430
compression="deflate"
431
):
432
print("API server started on ws://localhost:8765")
433
await asyncio.Future()
434
435
asyncio.run(main())
436
```
437
438
### Unix Domain Socket Server
439
440
```python
441
import asyncio
442
import websockets
443
import os
444
445
async def unix_handler(websocket):
446
"""Handler for Unix socket connections."""
447
await websocket.send("Connected via Unix socket")
448
449
async for message in websocket:
450
await websocket.send(f"Unix echo: {message}")
451
452
async def main():
453
socket_path = "/tmp/websocket.sock"
454
455
# Remove existing socket file
456
if os.path.exists(socket_path):
457
os.unlink(socket_path)
458
459
async with websockets.unix_serve(unix_handler, socket_path):
460
print(f"Unix WebSocket server started on {socket_path}")
461
await asyncio.Future()
462
463
asyncio.run(main())
464
```