An implementation of the WebSocket Protocol (RFC 6455 & 7692)
High-level asyncio-based WebSocket client functionality for connecting to WebSocket servers, managing connection lifecycle, and handling message communication using Python's async/await syntax.
Create WebSocket client connections with support for various protocols, authentication, compression, and connection options.
async def connect(
uri: str,
*,
additional_headers: HeadersLike = None,
user_agent_header: str = None,
compression: str = "deflate",
logger: LoggerLike = None,
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
open_timeout: float = 10,
close_timeout: float = 10,
ping_interval: float = 20,
ping_timeout: float = 20,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ClientExtensionFactory] = None,
**kwargs
) -> ClientConnection:
"""
Connect to a WebSocket server.
Parameters:
- uri: WebSocket URI (ws:// or wss://)
- additional_headers: Extra HTTP headers for handshake
- user_agent_header: Custom User-Agent header
- compression: Compression mode ("deflate" or None)
- logger: Logger instance for connection logging
- subprotocols: List of supported subprotocols
- extra_headers: Additional headers (deprecated, use additional_headers)
- open_timeout: Timeout for connection establishment (seconds)
- close_timeout: Timeout for connection closure (seconds)
- ping_interval: Interval between ping frames (seconds)
- ping_timeout: Timeout for ping/pong exchange (seconds)
- max_size: Maximum message size (bytes)
- max_queue: Maximum number of queued messages
- read_limit: Buffer size for reading (bytes)
- write_limit: Buffer size for writing (bytes)
- extensions: List of WebSocket extensions
Returns:
ClientConnection: Async context manager and iterator for WebSocket communication
"""
async def unix_connect(
path: str,
*,
additional_headers: HeadersLike = None,
user_agent_header: str = None,
compression: str = "deflate",
logger: LoggerLike = None,
subprotocols: List[Subprotocol] = None,
extra_headers: HeadersLike = None,
open_timeout: float = 10,
close_timeout: float = 10,
ping_interval: float = 20,
ping_timeout: float = 20,
max_size: int = 2**20,
max_queue: int = 2**5,
read_limit: int = 2**16,
write_limit: int = 2**16,
extensions: List[ClientExtensionFactory] = None,
**kwargs
) -> ClientConnection:
"""
Connect to a WebSocket server over Unix domain socket.
Parameters:
- path: Unix domain socket path
- Other parameters same as connect()
Returns:
ClientConnection: WebSocket connection over Unix socket
"""The ClientConnection class provides the main interface for WebSocket client operations with support for async context management and message iteration.
class ClientConnection:
"""WebSocket client connection with asyncio support."""
@property
def closed(self) -> bool:
"""Check if connection is closed."""
@property
def local_address(self) -> Tuple[str, int]:
"""Get local socket address."""
@property
def remote_address(self) -> Tuple[str, int]:
"""Get remote socket address."""
@property
def subprotocol(self) -> Subprotocol | None:
"""Get negotiated subprotocol."""
async def send(self, message: Data) -> None:
"""
Send a message to the WebSocket server.
Parameters:
- message: Text (str) or binary (bytes) message to send
Raises:
- ConnectionClosed: If connection is closed
"""
async def recv(self) -> Data:
"""
Receive a message from the WebSocket server.
Returns:
str | bytes: Received message (text or binary)
Raises:
- ConnectionClosed: If connection is closed
"""
async def ping(self, data: bytes = b"") -> Awaitable[float]:
"""
Send a ping frame and wait for pong response.
Parameters:
- data: Optional payload for ping frame
Returns:
Awaitable[float]: Coroutine that resolves to round-trip time
Raises:
- ConnectionClosed: If connection is closed
"""
async def pong(self, data: bytes = b"") -> None:
"""
Send a pong frame.
Parameters:
- data: Payload for pong frame
Raises:
- ConnectionClosed: If connection is closed
"""
async def close(self, code: int = 1000, reason: str = "") -> None:
"""
Close the WebSocket connection.
Parameters:
- code: Close code (default 1000 for normal closure)
- reason: Human-readable close reason
Raises:
- ProtocolError: If code is invalid
"""
async def wait_closed(self) -> None:
"""Wait until the connection is closed."""
# Context manager support
async def __aenter__(self) -> ClientConnection:
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
"""Exit async context manager and close connection."""
# Async iterator support
def __aiter__(self) -> AsyncIterator[Data]:
"""Return async iterator for receiving messages."""
return self
async def __anext__(self) -> Data:
"""Get next message from async iterator."""Utility function for classifying connection errors and determining appropriate handling strategies.
def process_exception(exception: Exception) -> Exception:
"""
Classify exceptions for connection error handling.
Parameters:
- exception: Exception that occurred during connection
Returns:
Exception: Classified exception (may be the same or different type)
"""import asyncio
import websockets
async def basic_client():
# Simple connection with default settings
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("Hello, Server!")
response = await websocket.recv()
print(f"Server response: {response}")
asyncio.run(basic_client())import asyncio
import websockets
import logging
async def configured_client():
# Client with custom headers, compression, and logging
headers = {"Authorization": "Bearer token123"}
logger = logging.getLogger("websocket_client")
async with websockets.connect(
"wss://api.example.com/ws",
additional_headers=headers,
compression="deflate",
logger=logger,
ping_interval=30,
ping_timeout=10,
max_size=1024*1024 # 1MB max message size
) as websocket:
# Send JSON data
import json
data = {"type": "subscribe", "channel": "updates"}
await websocket.send(json.dumps(data))
# Listen for messages
async for message in websocket:
data = json.loads(message)
print(f"Received: {data}")
# Break on specific message
if data.get("type") == "close":
break
asyncio.run(configured_client())import asyncio
import websockets
from websockets import ConnectionClosedError, InvalidURI
async def robust_client():
try:
async with websockets.connect(
"ws://localhost:8765",
open_timeout=5,
close_timeout=5
) as websocket:
# Send ping to test connectivity
pong_waiter = await websocket.ping()
latency = await pong_waiter
print(f"Connection latency: {latency:.3f}s")
# Send messages with error handling
for i in range(10):
try:
await websocket.send(f"Message {i}")
response = await websocket.recv()
print(f"Echo {i}: {response}")
except ConnectionClosedError:
print("Connection closed by server")
break
except InvalidURI as e:
print(f"Invalid WebSocket URI: {e}")
except ConnectionClosedError as e:
print(f"Connection failed: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(robust_client())import asyncio
import websockets
async def unix_client():
# Connect via Unix domain socket
async with websockets.unix_connect("/tmp/websocket.sock") as websocket:
await websocket.send("Hello via Unix socket!")
response = await websocket.recv()
print(f"Response: {response}")
asyncio.run(unix_client())Install with Tessl CLI
npx tessl i tessl/pypi-websockets