0
# Synchronous Server Operations
1
2
Complete threading-based synchronous WebSocket server functionality for creating WebSocket servers using blocking operations, suitable for traditional Python applications that don't use asyncio.
3
4
## Core Imports
5
6
```python
7
from websockets.sync.server import serve, unix_serve, ServerConnection, Server, basic_auth
8
```
9
10
## Capabilities
11
12
### Server Creation Functions
13
14
Start synchronous WebSocket servers with customizable handlers, authentication, and connection management using threading.
15
16
```python { .api }
17
def serve(
18
handler: Callable[[ServerConnection], None],
19
host: str | None = None,
20
port: int | None = None,
21
*,
22
# TCP/TLS
23
sock: socket.socket | None = None,
24
ssl: ssl.SSLContext | None = None,
25
# WebSocket
26
origins: Sequence[Origin | re.Pattern[str] | None] | None = None,
27
extensions: Sequence[ServerExtensionFactory] | None = None,
28
subprotocols: Sequence[Subprotocol] | None = None,
29
select_subprotocol: Callable[[ServerConnection, Sequence[Subprotocol]], Subprotocol | None] | None = None,
30
compression: str | None = "deflate",
31
# HTTP
32
process_request: Callable[[ServerConnection, Request], Response | None] | None = None,
33
process_response: Callable[[ServerConnection, Request, Response], Response | None] | None = None,
34
server_header: str | None = SERVER,
35
# Timeouts
36
open_timeout: float | None = 10,
37
ping_interval: float | None = 20,
38
ping_timeout: float | None = 20,
39
close_timeout: float | None = 10,
40
# Limits
41
max_size: int | None = 2**20,
42
max_queue: int | None | tuple[int | None, int | None] = 16,
43
# Logging
44
logger: LoggerLike | None = None,
45
# Escape hatch for advanced customization
46
create_connection: type[ServerConnection] | None = None,
47
**kwargs: Any
48
) -> Server:
49
"""
50
Start a synchronous WebSocket server.
51
52
Parameters:
53
- handler: Function to handle each WebSocket connection (runs in separate thread)
54
- host: Server host address (None for all interfaces)
55
- port: Server port number (None for automatic assignment)
56
- sock: Preexisting TCP socket to use for server
57
- ssl: SSL context for TLS configuration
58
- origins: List of allowed origins (None allows all)
59
- extensions: List of supported extensions in negotiation order
60
- subprotocols: List of supported subprotocols in preference order
61
- select_subprotocol: Function to select subprotocol from client list
62
- compression: Compression mode ("deflate" or None)
63
- process_request: Function to process HTTP request before WebSocket upgrade
64
- process_response: Function to process HTTP response after WebSocket upgrade
65
- server_header: Server header value (None to omit)
66
- open_timeout: Timeout for connection establishment (seconds)
67
- ping_interval: Interval between ping frames (seconds)
68
- ping_timeout: Timeout for ping/pong exchange (seconds)
69
- close_timeout: Timeout for connection closure (seconds)
70
- max_size: Maximum message size (bytes)
71
- max_queue: Maximum number of queued messages (int or tuple)
72
- logger: Logger instance for server logging
73
- create_connection: Custom connection class factory
74
75
Returns:
76
Server: Context manager for server lifecycle management
77
78
Raises:
79
- OSError: If server cannot bind to address/port
80
"""
81
82
def unix_serve(
83
handler: Callable[[ServerConnection], None],
84
path: str,
85
*,
86
logger: LoggerLike = None,
87
compression: str = "deflate",
88
subprotocols: List[Subprotocol] = None,
89
extra_headers: HeadersLike = None,
90
process_request: Callable = None,
91
select_subprotocol: Callable = None,
92
ping_interval: float = 20,
93
ping_timeout: float = 20,
94
close_timeout: float = 10,
95
max_size: int = 2**20,
96
max_queue: int = 2**5,
97
read_limit: int = 2**16,
98
write_limit: int = 2**16,
99
extensions: List[ServerExtensionFactory] = None,
100
**kwargs
101
) -> Server:
102
"""
103
Start a synchronous WebSocket server on Unix domain socket.
104
105
Parameters:
106
- handler: Function to handle each WebSocket connection
107
- path: Unix domain socket path
108
- Other parameters same as serve()
109
110
Returns:
111
Server: WebSocket server bound to Unix socket
112
113
Raises:
114
- OSError: If server cannot bind to Unix socket
115
"""
116
```
117
118
### Server Connection Management
119
120
The synchronous ServerConnection class represents individual client connections with blocking operations.
121
122
```python { .api }
123
class ServerConnection:
124
"""Synchronous WebSocket server connection representing a client."""
125
126
@property
127
def closed(self) -> bool:
128
"""Check if connection is closed."""
129
130
@property
131
def local_address(self) -> Tuple[str, int]:
132
"""Get server socket address."""
133
134
@property
135
def remote_address(self) -> Tuple[str, int]:
136
"""Get client socket address."""
137
138
@property
139
def subprotocol(self) -> Subprotocol | None:
140
"""Get negotiated subprotocol."""
141
142
@property
143
def request_headers(self) -> Headers:
144
"""Get HTTP request headers from handshake."""
145
146
@property
147
def response_headers(self) -> Headers:
148
"""Get HTTP response headers from handshake."""
149
150
def send(self, message: Data, timeout: float = None) -> None:
151
"""
152
Send a message to the WebSocket client.
153
154
Parameters:
155
- message: Text (str) or binary (bytes) message to send
156
- timeout: Optional timeout for send operation (seconds)
157
158
Raises:
159
- ConnectionClosed: If connection is closed
160
- TimeoutError: If timeout is exceeded
161
"""
162
163
def recv(self, timeout: float = None) -> Data:
164
"""
165
Receive a message from the WebSocket client.
166
167
Parameters:
168
- timeout: Optional timeout for receive operation (seconds)
169
170
Returns:
171
str | bytes: Received message (text or binary)
172
173
Raises:
174
- ConnectionClosed: If connection is closed
175
- TimeoutError: If timeout is exceeded
176
"""
177
178
def ping(self, data: bytes = b"", timeout: float = None) -> float:
179
"""
180
Send a ping frame and wait for pong response.
181
182
Parameters:
183
- data: Optional payload for ping frame
184
- timeout: Optional timeout for ping/pong exchange (seconds)
185
186
Returns:
187
float: Round-trip time in seconds
188
189
Raises:
190
- ConnectionClosed: If connection is closed
191
- TimeoutError: If timeout is exceeded
192
"""
193
194
def pong(self, data: bytes = b"") -> None:
195
"""
196
Send a pong frame.
197
198
Parameters:
199
- data: Payload for pong frame
200
201
Raises:
202
- ConnectionClosed: If connection is closed
203
"""
204
205
def close(self, code: int = 1000, reason: str = "") -> None:
206
"""
207
Close the WebSocket connection.
208
209
Parameters:
210
- code: Close code (default 1000 for normal closure)
211
- reason: Human-readable close reason
212
213
Raises:
214
- ProtocolError: If code is invalid
215
"""
216
217
# Iterator support for receiving messages
218
def __iter__(self) -> Iterator[Data]:
219
"""Return iterator for receiving messages."""
220
return self
221
222
def __next__(self) -> Data:
223
"""Get next message from iterator."""
224
try:
225
return self.recv()
226
except ConnectionClosed:
227
raise StopIteration
228
```
229
230
### Server Management
231
232
The synchronous Server class manages the WebSocket server lifecycle with blocking operations.
233
234
```python { .api }
235
class Server:
236
"""Synchronous WebSocket server management."""
237
238
@property
239
def socket(self) -> socket.socket:
240
"""Get server socket."""
241
242
def close(self) -> None:
243
"""
244
Stop accepting new connections and close existing ones.
245
246
This initiates server shutdown and blocks until all connections are closed.
247
"""
248
249
def serve_forever(self) -> None:
250
"""
251
Run the server until interrupted.
252
253
This method blocks and handles incoming connections until
254
KeyboardInterrupt or server.close() is called.
255
"""
256
257
# Context manager support
258
def __enter__(self) -> Server:
259
"""Enter context manager."""
260
return self
261
262
def __exit__(self, exc_type, exc_value, traceback) -> None:
263
"""Exit context manager and close server."""
264
self.close()
265
```
266
267
### Authentication Support
268
269
Built-in HTTP basic authentication decorator for synchronous WebSocket handlers.
270
271
```python { .api }
272
def basic_auth(
273
username: str,
274
password: str,
275
realm: str = "WebSocket"
276
) -> Callable:
277
"""
278
Create HTTP basic authentication decorator for synchronous WebSocket handlers.
279
280
Parameters:
281
- username: Expected username
282
- password: Expected password
283
- realm: Authentication realm name
284
285
Returns:
286
Callable: Decorator that adds basic authentication to handler
287
288
Usage:
289
@basic_auth("admin", "secret")
290
def protected_handler(websocket):
291
# Handler code here
292
pass
293
"""
294
```
295
296
## Usage Examples
297
298
### Basic Echo Server
299
300
```python
301
from websockets.sync import serve
302
import logging
303
304
def echo_handler(websocket):
305
"""Simple echo handler that returns received messages."""
306
print(f"Client connected: {websocket.remote_address}")
307
308
try:
309
for message in websocket:
310
print(f"Received: {message}")
311
websocket.send(f"Echo: {message}")
312
except Exception as e:
313
print(f"Handler error: {e}")
314
finally:
315
print(f"Client disconnected: {websocket.remote_address}")
316
317
def main():
318
# Start server
319
with serve(echo_handler, "localhost", 8765) as server:
320
print("WebSocket server started on ws://localhost:8765")
321
server.serve_forever()
322
323
if __name__ == "__main__":
324
main()
325
```
326
327
### Multi-threaded Chat Server
328
329
```python
330
from websockets.sync import serve
331
import threading
332
import json
333
import time
334
from typing import Set
335
336
# Thread-safe set for connected clients
337
clients: Set = set()
338
clients_lock = threading.Lock()
339
340
def broadcast_message(message: str, sender=None):
341
"""Broadcast message to all connected clients except sender."""
342
with clients_lock:
343
for client in clients.copy(): # Copy to avoid modification during iteration
344
if client != sender:
345
try:
346
client.send(message)
347
except Exception as e:
348
print(f"Failed to send to client: {e}")
349
clients.discard(client)
350
351
def chat_handler(websocket):
352
"""Chat server handler with broadcasting."""
353
# Register client
354
with clients_lock:
355
clients.add(websocket)
356
357
print(f"Client connected: {websocket.remote_address} (Total: {len(clients)})")
358
359
try:
360
# Send welcome message
361
websocket.send(json.dumps({
362
"type": "system",
363
"message": "Welcome to the chat server!"
364
}))
365
366
# Handle messages
367
for message in websocket:
368
try:
369
data = json.loads(message)
370
chat_message = {
371
"type": "chat",
372
"user": data.get("user", "Anonymous"),
373
"message": data.get("message", ""),
374
"timestamp": time.time()
375
}
376
377
# Broadcast to all clients
378
broadcast_message(json.dumps(chat_message), websocket)
379
380
except json.JSONDecodeError:
381
error = {"type": "error", "message": "Invalid JSON format"}
382
websocket.send(json.dumps(error))
383
384
except Exception as e:
385
print(f"Chat handler error: {e}")
386
finally:
387
# Unregister client
388
with clients_lock:
389
clients.discard(websocket)
390
print(f"Client disconnected: {websocket.remote_address} (Total: {len(clients)})")
391
392
def main():
393
with serve(
394
chat_handler,
395
"localhost",
396
8765,
397
ping_interval=30,
398
ping_timeout=10
399
) as server:
400
print("Chat server started on ws://localhost:8765")
401
try:
402
server.serve_forever()
403
except KeyboardInterrupt:
404
print("\nShutting down server...")
405
406
if __name__ == "__main__":
407
main()
408
```
409
410
### Authenticated Server with Request Processing
411
412
```python
413
from websockets.sync import serve, basic_auth
414
from websockets import Request, Response
415
import base64
416
417
def process_request(connection, request: Request):
418
"""Custom request processing with API key validation."""
419
# Check for API key in headers
420
api_key = request.headers.get("X-API-Key")
421
if not api_key or api_key != "secret-api-key":
422
return Response(401, "Unauthorized", b"Missing or invalid API key")
423
424
# Check for valid client certificate (example)
425
user_agent = request.headers.get("User-Agent", "")
426
if "TrustedClient" not in user_agent:
427
return Response(403, "Forbidden", b"Untrusted client")
428
429
# Allow WebSocket upgrade
430
return None
431
432
@basic_auth("admin", "password123")
433
def protected_handler(websocket):
434
"""Handler that requires both API key and basic auth."""
435
websocket.send("Authentication successful!")
436
437
try:
438
for message in websocket:
439
if message.startswith("/"):
440
# Handle commands
441
command = message[1:].strip().lower()
442
443
if command == "status":
444
websocket.send("Server is running normally")
445
elif command == "clients":
446
websocket.send(f"Connected clients: 1") # Simplified
447
elif command == "time":
448
import datetime
449
websocket.send(f"Server time: {datetime.datetime.now()}")
450
else:
451
websocket.send(f"Unknown command: {command}")
452
else:
453
# Echo regular messages
454
websocket.send(f"Received: {message}")
455
456
except Exception as e:
457
print(f"Protected handler error: {e}")
458
459
def main():
460
with serve(
461
protected_handler,
462
"localhost",
463
8765,
464
process_request=process_request,
465
extra_headers={"Server": "SecureWebSocketServer/1.0"}
466
) as server:
467
print("Secure server started on ws://localhost:8765")
468
print("Requires: X-API-Key: secret-api-key")
469
print("And Basic Auth: admin/password123")
470
server.serve_forever()
471
472
if __name__ == "__main__":
473
main()
474
```
475
476
### File Transfer Server
477
478
```python
479
from websockets.sync import serve
480
import os
481
import json
482
import base64
483
484
def file_server_handler(websocket):
485
"""File transfer server handler."""
486
websocket.send(json.dumps({
487
"type": "welcome",
488
"message": "File server ready. Send 'list' or 'get <filename>'"
489
}))
490
491
try:
492
for message in websocket:
493
try:
494
data = json.loads(message)
495
command = data.get("command", "").lower()
496
497
if command == "list":
498
# List files in current directory
499
files = [f for f in os.listdir(".") if os.path.isfile(f)]
500
response = {
501
"type": "file_list",
502
"files": files
503
}
504
websocket.send(json.dumps(response))
505
506
elif command == "get":
507
filename = data.get("filename", "")
508
if not filename:
509
websocket.send(json.dumps({
510
"type": "error",
511
"message": "Filename required"
512
}))
513
continue
514
515
try:
516
with open(filename, "rb") as f:
517
file_data = f.read()
518
encoded_data = base64.b64encode(file_data).decode()
519
520
response = {
521
"type": "file_data",
522
"filename": filename,
523
"size": len(file_data),
524
"data": encoded_data
525
}
526
websocket.send(json.dumps(response))
527
528
except FileNotFoundError:
529
websocket.send(json.dumps({
530
"type": "error",
531
"message": f"File not found: {filename}"
532
}))
533
534
else:
535
websocket.send(json.dumps({
536
"type": "error",
537
"message": f"Unknown command: {command}"
538
}))
539
540
except json.JSONDecodeError:
541
websocket.send(json.dumps({
542
"type": "error",
543
"message": "Invalid JSON format"
544
}))
545
546
except Exception as e:
547
print(f"File server error: {e}")
548
549
def main():
550
with serve(
551
file_server_handler,
552
"localhost",
553
8765,
554
max_size=10*1024*1024 # 10MB max message size for file transfers
555
) as server:
556
print("File server started on ws://localhost:8765")
557
server.serve_forever()
558
559
if __name__ == "__main__":
560
main()
561
```
562
563
### Unix Domain Socket Server
564
565
```python
566
from websockets.sync import unix_serve
567
import os
568
import tempfile
569
570
def unix_handler(websocket):
571
"""Handler for Unix socket connections."""
572
print(f"Client connected via Unix socket")
573
574
try:
575
websocket.send("Connected to Unix WebSocket server")
576
577
for message in websocket:
578
print(f"Received: {message}")
579
websocket.send(f"Unix echo: {message}")
580
581
except Exception as e:
582
print(f"Unix handler error: {e}")
583
finally:
584
print("Unix client disconnected")
585
586
def main():
587
# Use temporary directory for socket
588
socket_path = os.path.join(tempfile.gettempdir(), "websocket.sock")
589
590
# Remove existing socket file
591
if os.path.exists(socket_path):
592
os.unlink(socket_path)
593
594
try:
595
with unix_serve(unix_handler, socket_path) as server:
596
print(f"Unix WebSocket server started on {socket_path}")
597
server.serve_forever()
598
finally:
599
# Clean up socket file
600
if os.path.exists(socket_path):
601
os.unlink(socket_path)
602
603
if __name__ == "__main__":
604
main()
605
```
606
607
### Server with Graceful Shutdown
608
609
```python
610
from websockets.sync import serve
611
import signal
612
import sys
613
import threading
614
import time
615
616
# Global server reference for signal handler
617
server_instance = None
618
619
def signal_handler(signum, frame):
620
"""Handle shutdown signals gracefully."""
621
print(f"\nReceived signal {signum}, shutting down gracefully...")
622
if server_instance:
623
server_instance.close()
624
sys.exit(0)
625
626
def heartbeat_handler(websocket):
627
"""Handler that sends periodic heartbeats."""
628
print(f"Client connected: {websocket.remote_address}")
629
630
try:
631
# Send welcome message
632
websocket.send("Connected to heartbeat server")
633
634
# Start heartbeat thread
635
stop_heartbeat = threading.Event()
636
637
def send_heartbeat():
638
while not stop_heartbeat.is_set():
639
try:
640
websocket.send(f"Heartbeat: {time.time()}")
641
time.sleep(5)
642
except:
643
break
644
645
heartbeat_thread = threading.Thread(target=send_heartbeat)
646
heartbeat_thread.daemon = True
647
heartbeat_thread.start()
648
649
# Handle incoming messages
650
for message in websocket:
651
print(f"Received: {message}")
652
if message.lower() == "stop heartbeat":
653
stop_heartbeat.set()
654
websocket.send("Heartbeat stopped")
655
elif message.lower() == "start heartbeat":
656
if stop_heartbeat.is_set():
657
stop_heartbeat.clear()
658
heartbeat_thread = threading.Thread(target=send_heartbeat)
659
heartbeat_thread.daemon = True
660
heartbeat_thread.start()
661
websocket.send("Heartbeat started")
662
else:
663
websocket.send(f"Echo: {message}")
664
665
stop_heartbeat.set()
666
667
except Exception as e:
668
print(f"Heartbeat handler error: {e}")
669
finally:
670
print(f"Client disconnected: {websocket.remote_address}")
671
672
def main():
673
global server_instance
674
675
# Set up signal handlers
676
signal.signal(signal.SIGINT, signal_handler)
677
signal.signal(signal.SIGTERM, signal_handler)
678
679
try:
680
with serve(heartbeat_handler, "localhost", 8765) as server:
681
server_instance = server
682
print("Heartbeat server started on ws://localhost:8765")
683
print("Press Ctrl+C to shutdown gracefully")
684
server.serve_forever()
685
except KeyboardInterrupt:
686
print("\nServer interrupted")
687
finally:
688
print("Server shutdown complete")
689
690
if __name__ == "__main__":
691
main()
692
```