An implementation of the WebSocket Protocol (RFC 6455 & 7692)
Threading-based synchronous WebSocket client functionality for connecting to WebSocket servers using blocking operations, suitable for traditional Python applications that don't use asyncio.
from websockets.sync.client import connect, unix_connect, ClientConnectionCreate synchronous WebSocket client connections with blocking operations for environments where asyncio is not suitable or preferred.
def connect(
uri: str,
*,
# TCP/TLS
sock: socket.socket | None = None,
ssl: ssl.SSLContext | None = None,
server_hostname: str | None = None,
# WebSocket
origin: Origin | None = None,
extensions: Sequence[ClientExtensionFactory] | None = None,
subprotocols: Sequence[Subprotocol] | None = None,
compression: str | None = "deflate",
# HTTP
additional_headers: HeadersLike | None = None,
user_agent_header: str | None = USER_AGENT,
proxy: str | Literal[True] | None = True,
proxy_ssl: ssl.SSLContext | None = None,
proxy_server_hostname: str | None = None,
# Timeouts
open_timeout: float | None = 10,
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | None | tuple[int | None, int | None] = 16,
# Logging
logger: LoggerLike | None = None,
# Escape hatch for advanced customization
create_connection: type[ClientConnection] | None = None,
**kwargs: Any
) -> ClientConnection:
"""
Connect to a WebSocket server synchronously.
Parameters:
- uri: WebSocket URI (ws:// or wss://)
- sock: Preexisting TCP socket (overrides host/port from URI)
- ssl: SSL context for TLS configuration
- server_hostname: Host name for TLS handshake (overrides URI hostname)
- origin: Value of Origin header for servers that require it
- extensions: List of supported extensions in negotiation order
- subprotocols: List of supported subprotocols in preference order
- compression: Compression mode ("deflate" or None)
- additional_headers: Extra HTTP headers for handshake
- user_agent_header: Custom User-Agent header
- proxy: Proxy configuration (True for auto-detect, str for URL, None to disable)
- proxy_ssl: SSL context for proxy connection
- proxy_server_hostname: Server hostname for proxy TLS handshake
- open_timeout: Timeout for connection establishment (seconds)
- ping_interval: Interval between ping frames (seconds)
- ping_timeout: Timeout for ping/pong exchange (seconds)
- close_timeout: Timeout for connection closure (seconds)
- max_size: Maximum message size (bytes)
- max_queue: Maximum number of queued messages (int or tuple)
- logger: Logger instance for connection logging
- create_connection: Custom connection class factory
Returns:
ClientConnection: Synchronous WebSocket connection
Raises:
- InvalidURI: If URI format is invalid
- InvalidHandshake: If handshake fails
- ConnectionClosedError: If connection fails
"""
def unix_connect(
path: str | None = None,
uri: str | None = None,
**kwargs: Any
) -> ClientConnection:
"""
Connect to a WebSocket server over Unix domain socket synchronously.
Parameters:
- path: Unix socket path for connection (mutually exclusive with uri)
- uri: WebSocket URI with unix:// scheme (mutually exclusive with path)
- **kwargs: Same keyword arguments as connect()
Returns:
ClientConnection: Synchronous WebSocket connection
Note:
Only available on Unix-like systems (Linux, macOS).
Accepts the same keyword arguments as connect().
"""The synchronous ClientConnection class provides blocking operations for WebSocket communication.
class ClientConnection:
"""Synchronous WebSocket client connection."""
@property
def closed(self) -> bool:
"""Check if connection is closed."""
@property
def local_address(self) -> Tuple[str, int]:
"""Get local socket address."""
@property
def remote_address(self) -> Tuple[str, int]:
"""Get remote socket address."""
@property
def subprotocol(self) -> Subprotocol | None:
"""Get negotiated subprotocol."""
def send(self, message: Data, timeout: float = None) -> None:
"""
Send a message to the WebSocket server.
Parameters:
- message: Text (str) or binary (bytes) message to send
- timeout: Optional timeout for send operation (seconds)
Raises:
- ConnectionClosed: If connection is closed
- TimeoutError: If timeout is exceeded
"""
def recv(self, timeout: float = None) -> Data:
"""
Receive a message from the WebSocket server.
Parameters:
- timeout: Optional timeout for receive operation (seconds)
Returns:
str | bytes: Received message (text or binary)
Raises:
- ConnectionClosed: If connection is closed
- TimeoutError: If timeout is exceeded
"""
def ping(self, data: bytes = b"", timeout: float = None) -> float:
"""
Send a ping frame and wait for pong response.
Parameters:
- data: Optional payload for ping frame
- timeout: Optional timeout for ping/pong exchange (seconds)
Returns:
float: Round-trip time in seconds
Raises:
- ConnectionClosed: If connection is closed
- TimeoutError: If timeout is exceeded
"""
def pong(self, data: bytes = b"") -> None:
"""
Send a pong frame.
Parameters:
- data: Payload for pong frame
Raises:
- ConnectionClosed: If connection is closed
"""
def close(self, code: int = 1000, reason: str = "") -> None:
"""
Close the WebSocket connection.
Parameters:
- code: Close code (default 1000 for normal closure)
- reason: Human-readable close reason
Raises:
- ProtocolError: If code is invalid
"""
# Context manager support
def __enter__(self) -> ClientConnection:
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Exit context manager and close connection."""
self.close()
# Iterator support for receiving messages
def __iter__(self) -> Iterator[Data]:
"""Return iterator for receiving messages."""
return self
def __next__(self) -> Data:
"""Get next message from iterator."""
try:
return self.recv()
except ConnectionClosed:
raise StopIterationfrom websockets.sync import connect
def basic_client():
"""Simple synchronous WebSocket client."""
with connect("ws://localhost:8765") as websocket:
# Send a message
websocket.send("Hello, Server!")
# Receive response
response = websocket.recv()
print(f"Server response: {response}")
# Run the client
basic_client()from websockets.sync import connect
from websockets import ConnectionClosed, TimeoutError
def timeout_client():
"""Client with timeout handling."""
try:
with connect(
"ws://localhost:8765",
open_timeout=5,
close_timeout=3
) as websocket:
# Send with timeout
websocket.send("Hello with timeout!", timeout=2.0)
# Receive with timeout
try:
response = websocket.recv(timeout=5.0)
print(f"Response: {response}")
except TimeoutError:
print("No response received within timeout")
# Test connection with ping
try:
latency = websocket.ping(timeout=3.0)
print(f"Connection latency: {latency:.3f}s")
except TimeoutError:
print("Ping timeout - connection may be stale")
except ConnectionClosed as e:
print(f"Connection closed: {e}")
except Exception as e:
print(f"Connection error: {e}")
timeout_client()from websockets.sync import connect
from websockets import ConnectionClosed
def iterator_client():
"""Client using message iteration."""
try:
with connect("ws://localhost:8765") as websocket:
# Send initial message
websocket.send("Start streaming")
# Iterate over received messages
for message in websocket:
print(f"Received: {message}")
# Send acknowledgment
websocket.send("ACK")
# Break on specific message
if message.lower() == "stop":
break
except ConnectionClosed:
print("Connection closed by server")
iterator_client()import threading
import time
from websockets.sync import connect
from websockets import ConnectionClosed
def worker_thread(thread_id: int, uri: str):
"""Worker thread with independent WebSocket connection."""
try:
with connect(uri) as websocket:
print(f"Thread {thread_id}: Connected")
# Send periodic messages
for i in range(5):
message = f"Thread {thread_id}, Message {i}"
websocket.send(message)
try:
response = websocket.recv(timeout=2.0)
print(f"Thread {thread_id}: {response}")
except Exception as e:
print(f"Thread {thread_id}: Error - {e}")
time.sleep(1)
except ConnectionClosed as e:
print(f"Thread {thread_id}: Connection closed - {e}")
def multi_threaded_client():
"""Example using multiple threads with sync connections."""
threads = []
uri = "ws://localhost:8765"
# Create multiple worker threads
for i in range(3):
thread = threading.Thread(
target=worker_thread,
args=(i, uri)
)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
print("All threads completed")
multi_threaded_client()from websockets.sync import connect
import base64
def authenticated_client():
"""Client with custom headers and authentication."""
# Prepare authentication header
credentials = base64.b64encode(b"username:password").decode()
headers = {
"Authorization": f"Basic {credentials}",
"X-Client-Version": "1.0",
"X-Client-Type": "Python-Sync"
}
try:
with connect(
"ws://localhost:8765",
additional_headers=headers,
subprotocols=["chat", "notifications"],
compression="deflate"
) as websocket:
print(f"Connected with subprotocol: {websocket.subprotocol}")
# Send authentication verification
websocket.send("VERIFY_AUTH")
auth_response = websocket.recv()
if auth_response == "AUTH_OK":
print("Authentication successful")
# Normal operation
websocket.send("GET_STATUS")
status = websocket.recv()
print(f"Status: {status}")
else:
print("Authentication failed")
except Exception as e:
print(f"Authentication error: {e}")
authenticated_client()from websockets.sync import unix_connect
def unix_client():
"""Client connecting via Unix domain socket."""
try:
with unix_connect("/tmp/websocket.sock") as websocket:
print("Connected via Unix socket")
# Send message
websocket.send("Hello via Unix socket!")
# Receive response
response = websocket.recv()
print(f"Response: {response}")
except FileNotFoundError:
print("Unix socket not found - is the server running?")
except Exception as e:
print(f"Unix socket error: {e}")
unix_client()import time
from websockets.sync import connect
from websockets import ConnectionClosed, InvalidURI
def robust_client_with_reconnect():
"""Client with automatic reconnection logic."""
uri = "ws://localhost:8765"
max_retries = 5
retry_delay = 2
for attempt in range(max_retries):
try:
print(f"Connection attempt {attempt + 1}")
with connect(uri, open_timeout=5) as websocket:
print("Connected successfully")
# Normal operation
try:
while True:
websocket.send("ping")
response = websocket.recv(timeout=10)
if response == "pong":
print("Connection healthy")
time.sleep(5)
else:
print(f"Unexpected response: {response}")
except ConnectionClosed:
print("Connection lost")
break
except KeyboardInterrupt:
print("Interrupted by user")
return
except (ConnectionClosed, InvalidURI) as e:
print(f"Connection failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print("Max retries exceeded")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
robust_client_with_reconnect()Install with Tessl CLI
npx tessl i tessl/pypi-websockets