Simple WebSocket server and client for Python
—
Asynchronous WebSocket implementation providing asyncio-based WebSocket server and client functionality. These classes handle WebSocket connections using async/await patterns with full asyncio integration for modern Python applications.
Async WebSocket server supporting multiple async frameworks including aiohttp, ASGI, and custom socket integration. Provides seamless integration with asyncio-based web applications.
class AioServer:
def __init__(self, request, subprotocols=None, receive_bytes=4096,
ping_interval=None, max_message_size=None):
"""
Initialize async WebSocket server.
Parameters:
- request: Request object containing connection details
- subprotocols: List of supported subprotocols or None
- receive_bytes: Buffer size for receiving data (default: 4096)
- ping_interval: Ping interval in seconds or None to disable
- max_message_size: Maximum message size in bytes or None for no limit
"""@classmethod
async def accept(cls, aiohttp=None, asgi=None, sock=None, headers=None,
subprotocols=None, receive_bytes=4096, ping_interval=None,
max_message_size=None):
"""
Accept async WebSocket connection from client.
Parameters:
- aiohttp: aiohttp request object (mutually exclusive with other options)
- asgi: ASGI (scope, receive, send) tuple (mutually exclusive)
- sock: Connected socket with headers dict (mutually exclusive)
- headers: Request headers dict when using sock parameter
- subprotocols: List of supported subprotocols or None
- receive_bytes: Buffer size for receiving data (default: 4096)
- ping_interval: Send ping packets at this interval in seconds or None
- max_message_size: Maximum allowed message size in bytes or None
Returns:
AioServer instance connected to the client
"""async def send(self, data):
"""
Send data over the WebSocket connection.
Parameters:
- data: Data to send (str for text message, bytes for binary message)
"""
async def receive(self, timeout=None):
"""
Receive data over the WebSocket connection.
Parameters:
- timeout: Timeout in seconds (None for indefinite, 0 for non-blocking)
Returns:
Received data as str or bytes depending on message type
"""
async def close(self, reason=None, message=None):
"""
Close the WebSocket connection.
Parameters:
- reason: Numeric status code for closure (default: 1000 normal closure)
- message: Text message to send with closure
"""
def choose_subprotocol(self, request):
"""
Choose subprotocol for the WebSocket connection.
Parameters:
- request: Request object containing client's requested subprotocols
Returns:
Selected subprotocol name or None if no subprotocol chosen
"""Async WebSocket client that connects to WebSocket servers using asyncio. Supports secure connections, custom headers, subprotocol negotiation, and full async context management.
class AioClient:
def __init__(self, url, subprotocols=None, headers=None, receive_bytes=4096,
ping_interval=None, max_message_size=None, ssl_context=None):
"""
Initialize async WebSocket client.
Parameters:
- url: WebSocket URL to connect to (ws:// or wss://)
- subprotocols: Subprotocol name or list of names to request
- headers: Additional HTTP headers as dict or list of tuples
- receive_bytes: Buffer size for receiving data (default: 4096)
- ping_interval: Ping interval in seconds or None to disable
- max_message_size: Maximum message size in bytes or None for no limit
- ssl_context: SSL context for secure connections or None for default
"""@classmethod
async def connect(cls, url, subprotocols=None, headers=None, receive_bytes=4096,
ping_interval=None, max_message_size=None, ssl_context=None):
"""
Create and connect async WebSocket client.
Parameters:
- url: WebSocket URL (ws:// or wss://)
- subprotocols: Subprotocol name or list of preference-ordered names
- headers: Additional HTTP headers as dict or list of tuples
- receive_bytes: Buffer size for receiving data (default: 4096)
- ping_interval: Send ping packets at this interval in seconds or None
- max_message_size: Maximum allowed message size in bytes or None
- ssl_context: Custom SSL context or None for default secure context
Returns:
Connected AioClient instance
"""async def send(self, data):
"""
Send data over the WebSocket connection.
Parameters:
- data: Data to send (str for text message, bytes for binary message)
"""
async def receive(self, timeout=None):
"""
Receive data over the WebSocket connection.
Parameters:
- timeout: Timeout in seconds (None for indefinite, 0 for non-blocking)
Returns:
Received data as str or bytes depending on message type
"""
async def close(self, reason=None, message=None):
"""
Close the WebSocket connection and underlying socket.
Parameters:
- reason: Numeric status code for closure (default: 1000 normal closure)
- message: Text message to send with closure
"""import simple_websocket
from aiohttp import web, WSMsgType
async def websocket_handler(request):
# Accept WebSocket connection from aiohttp
ws = await simple_websocket.AioServer.accept(aiohttp=request)
try:
async for message in ws:
# Process incoming message
if message.type == WSMsgType.TEXT:
data = message.data
# Echo message back
await ws.send(f"Echo: {data}")
elif message.type == WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except simple_websocket.ConnectionClosed:
print("Client disconnected")
finally:
await ws.close()
app = web.Application()
app.router.add_get('/ws', websocket_handler)import asyncio
import simple_websocket
async def client_example():
try:
# Connect to WebSocket server
ws = await simple_websocket.AioClient.connect("ws://localhost:8000/ws")
# Send message
await ws.send("Hello, Async Server!")
# Receive response with timeout
response = await ws.receive(timeout=10)
print(f"Server response: {response}")
# Send binary data
await ws.send(b"Binary data")
binary_response = await ws.receive()
print(f"Binary response: {binary_response}")
except simple_websocket.ConnectionError as e:
print(f"Failed to connect: {e}")
except simple_websocket.ConnectionClosed:
print("Server closed connection")
except asyncio.TimeoutError:
print("Timeout waiting for response")
finally:
await ws.close()
# Run the client
asyncio.run(client_example())import asyncio
import socket
import simple_websocket
async def custom_socket_server():
# Create and bind socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('localhost', 8000))
server_sock.listen(1)
while True:
client_sock, addr = server_sock.accept()
# Prepare headers dict (simplified example)
headers = {
'Host': 'localhost:8000',
'Connection': 'Upgrade',
'Upgrade': 'websocket',
'Sec-WebSocket-Key': 'example-key',
'Sec-WebSocket-Version': '13'
}
# Accept WebSocket with custom socket
ws = await simple_websocket.AioServer.accept(
sock=client_sock,
headers=headers
)
try:
# Handle connection
while True:
message = await ws.receive(timeout=30)
if message is None:
break
await ws.send(f"Received: {message}")
except simple_websocket.ConnectionClosed:
print(f"Client {addr} disconnected")
finally:
await ws.close()
# Run the server
asyncio.run(custom_socket_server())import asyncio
import simple_websocket
class AsyncWebSocketClient:
def __init__(self, url, **kwargs):
self.url = url
self.kwargs = kwargs
self.ws = None
async def __aenter__(self):
self.ws = await simple_websocket.AioClient.connect(self.url, **self.kwargs)
return self.ws
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.ws:
await self.ws.close()
# Usage with async context manager
async def context_manager_example():
async with AsyncWebSocketClient("ws://localhost:8000/ws") as ws:
await ws.send("Hello with context manager!")
response = await ws.receive()
print(f"Response: {response}")
# WebSocket automatically closed when exiting context
asyncio.run(context_manager_example())class AioBase:
subprotocol: str | None # Negotiated subprotocol name
connected: bool # Connection status
receive_bytes: int # Buffer size for receiving data
ping_interval: float | None # Ping interval in seconds
max_message_size: int | None # Maximum message size limit
rsock: asyncio.StreamReader | None # Read stream
wsock: asyncio.StreamWriter | None # Write stream
event: asyncio.Event # Async event for synchronization
task: asyncio.Task | None # Background task handleInstall with Tessl CLI
npx tessl i tessl/pypi-simple-websocket