WebSocket client for Python with low level API options
Direct WebSocket protocol control with frame-by-frame communication, custom connection options, and advanced features like manual ping/pong handling and continuation frames. The WebSocket class provides fine-grained control over the WebSocket protocol for advanced use cases.
Low-level WebSocket interface offering direct protocol control and comprehensive configuration options.
class WebSocket:
def __init__(
self,
get_mask_key=None,
sockopt=None,
sslopt=None,
fire_cont_frame: bool = False,
enable_multithread: bool = True,
skip_utf8_validation: bool = False,
**_,
):
"""
Initialize WebSocket object.
Parameters:
- get_mask_key: Function to generate mask keys for frame masking
- sockopt: Socket options list for socket.setsockopt calls
- sslopt: SSL options dictionary for SSL socket configuration
- fire_cont_frame: Whether to fire events for continuation frames
- enable_multithread: Enable thread-safe operations with locks
- skip_utf8_validation: Skip UTF-8 validation for performance
"""Establish and manage WebSocket connections with comprehensive configuration options.
def connect(self, url, **options):
"""
Connect to WebSocket URL with extensive configuration options.
Parameters:
- url: WebSocket URL (ws:// or wss://)
- header: Custom headers (list or dict)
- cookie: Cookie string for authentication
- origin: Custom origin URL
- connection: Custom connection header (default: "Upgrade")
- suppress_origin: Don't send Origin header
- host: Custom host header
- timeout: Socket timeout in seconds
- http_proxy_host: HTTP proxy hostname
- http_proxy_port: HTTP proxy port
- http_no_proxy: Hosts that bypass proxy
- http_proxy_auth: Proxy auth tuple (username, password)
- http_proxy_timeout: Proxy connection timeout
- redirect_limit: Maximum redirects to follow
- subprotocols: List of supported subprotocols
- socket: Pre-initialized socket object
"""
def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3):
"""
Close WebSocket connection gracefully.
Parameters:
- status: Close status code (see STATUS_* constants)
- reason: UTF-8 encoded close reason
- timeout: Timeout to wait for close frame response
Raises:
ValueError: If status code is invalid range
"""
def abort(self):
"""
Immediately abort connection without close handshake.
Wakes up threads waiting in recv operations.
"""
def shutdown(self):
"""
Close socket immediately without WebSocket close handshake.
"""Send various types of messages and frames with precise control over WebSocket opcodes.
def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
"""
Send data with specified opcode.
Parameters:
- payload: Data to send (string for text, bytes for binary)
- opcode: WebSocket opcode (OPCODE_TEXT, OPCODE_BINARY, etc.)
Returns:
int: Number of bytes sent
"""
def send_text(self, text_data: str) -> int:
"""
Send UTF-8 text message.
Parameters:
- text_data: Text string to send
Returns:
int: Number of bytes sent
"""
def send_bytes(self, data: Union[bytes, bytearray]) -> int:
"""
Send binary message.
Parameters:
- data: Binary data to send
Returns:
int: Number of bytes sent
"""
def send_binary(self, payload: bytes) -> int:
"""
Send binary message (alias for send with OPCODE_BINARY).
Parameters:
- payload: Binary data to send
Returns:
int: Number of bytes sent
"""
def send_frame(self, frame) -> int:
"""
Send raw ABNF frame.
Parameters:
- frame: ABNF frame object
Returns:
int: Number of bytes sent
"""Receive messages and frames with different levels of detail and control.
def recv(self) -> Union[str, bytes]:
"""
Receive message data.
Returns:
Union[str, bytes]: Text messages as str, binary messages as bytes
"""
def recv_data(self, control_frame: bool = False) -> tuple:
"""
Receive data with opcode information.
Parameters:
- control_frame: Whether to return control frame data
Returns:
tuple: (opcode, data) - opcode and message data
"""
def recv_data_frame(self, control_frame: bool = False) -> tuple:
"""
Receive complete frame with opcode.
Parameters:
- control_frame: Whether to return control frame data
Returns:
tuple: (opcode, frame) - opcode and ABNF frame object
"""
def recv_frame(self):
"""
Receive raw ABNF frame from server.
Returns:
ABNF: Raw frame object with all protocol details
"""Send and manage WebSocket control frames for connection health monitoring.
def ping(self, payload: Union[str, bytes] = ""):
"""
Send ping frame to server.
Parameters:
- payload: Optional payload data for ping
Note: Payload is automatically UTF-8 encoded if string
"""
def pong(self, payload: Union[str, bytes] = ""):
"""
Send pong frame to server.
Parameters:
- payload: Optional payload data for pong
Note: Payload is automatically UTF-8 encoded if string
"""
def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""):
"""
Send close frame without waiting for response.
Parameters:
- status: Close status code
- reason: UTF-8 encoded close reason
Raises:
ValueError: If status code is invalid range
"""Access connection properties and configure WebSocket behavior.
def set_mask_key(self, func):
"""
Set custom mask key generator function.
Parameters:
- func: Callable that takes length integer and returns bytes
"""
def gettimeout(self) -> Union[float, int, None]:
"""
Get current socket timeout.
Returns:
Union[float, int, None]: Timeout in seconds or None for blocking
"""
def settimeout(self, timeout: Union[float, int, None]):
"""
Set socket timeout.
Parameters:
- timeout: Timeout in seconds or None for blocking
"""
def getsubprotocol(self):
"""
Get negotiated subprotocol.
Returns:
str: Negotiated subprotocol name or None
"""
def getstatus(self):
"""
Get handshake HTTP status code.
Returns:
int: HTTP status code from handshake response
"""
def getheaders(self):
"""
Get handshake response headers.
Returns:
dict: HTTP headers from handshake response
"""
def is_ssl(self):
"""
Check if connection uses SSL/TLS.
Returns:
bool: True if SSL connection, False otherwise
"""
def fileno(self):
"""
Get socket file descriptor.
Returns:
int: Socket file descriptor for select/poll operations
"""
# Properties for convenient access
timeout = property(gettimeout, settimeout)
subprotocol = property(getsubprotocol)
status = property(getstatus)
headers = property(getheaders)Utility function to create and connect WebSocket instances with extensive configuration.
def create_connection(url: str, timeout=None, class_=WebSocket, **options) -> WebSocket:
"""
Create and connect WebSocket instance.
Parameters:
- url: WebSocket URL
- timeout: Connection timeout in seconds
- class_: WebSocket class to instantiate (for customization)
- **options: All WebSocket.connect() options plus:
- sockopt: Socket options list
- sslopt: SSL options dict
- fire_cont_frame: Enable continuation frame events
- enable_multithread: Enable thread-safe operations
- skip_utf8_validation: Skip UTF-8 validation
Returns:
WebSocket: Connected WebSocket instance
Raises:
Various WebSocket exceptions on connection failure
"""from websocket import create_connection
# Connect and exchange single message
ws = create_connection("ws://echo.websocket.events/")
print("Connected:", ws.getstatus())
print("Server says:", ws.recv())
ws.send("Hello, World!")
response = ws.recv()
print("Echo:", response)
ws.close()from websocket import WebSocket
import struct
ws = WebSocket()
ws.connect("ws://example.com/binary-api")
# Send binary data
data = struct.pack("!I", 12345)
ws.send_bytes(data)
# Receive binary response
response = ws.recv()
if isinstance(response, bytes):
value = struct.unpack("!I", response)[0]
print(f"Received number: {value}")
ws.close()from websocket import create_connection
# Connect with custom headers
headers = [
"Authorization: Bearer your-jwt-token",
"X-Client-Version: 1.0",
"User-Agent: MyApp/1.0"
]
ws = create_connection(
"wss://api.example.com/websocket",
header=headers,
origin="https://myapp.com"
)
print("Connected with headers")
print("Response headers:", ws.headers)
ws.close()from websocket import create_connection
import ssl
# Custom SSL configuration
sslopt = {
"cert_reqs": ssl.CERT_REQUIRED,
"ca_certs": "/etc/ssl/certs/ca-certificates.crt",
"certfile": "/path/to/client.crt",
"keyfile": "/path/to/client.key",
"ssl_version": ssl.PROTOCOL_TLS,
"ciphers": "HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA"
}
ws = create_connection("wss://secure.example.com/websocket", sslopt=sslopt)
print("Secure connection established")
print("Using SSL:", ws.is_ssl())
ws.close()from websocket import create_connection
# HTTP proxy
ws = create_connection(
"ws://echo.websocket.events/",
http_proxy_host="proxy.company.com",
http_proxy_port=8080,
http_proxy_auth=("username", "password")
)
# SOCKS proxy
ws = create_connection(
"ws://echo.websocket.events/",
http_proxy_host="socks-proxy.company.com",
http_proxy_port=1080,
proxy_type="socks5"
)from websocket import WebSocket, ABNF
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
# Send custom frame
frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
ws.send_frame(frame)
# Receive and examine frame
opcode, received_frame = ws.recv_data_frame()
print(f"Received opcode: {opcode}")
print(f"Frame data: {received_frame.data}")
print(f"Frame info: fin={received_frame.fin}, opcode={received_frame.opcode}")
ws.close()from websocket import WebSocket, ABNF
import time
ws = WebSocket()
ws.connect("ws://echo.websocket.events/")
# Send ping
ws.ping("ping-payload")
# Manually handle frames
while True:
opcode, frame = ws.recv_data_frame(control_frame=True)
if opcode == ABNF.OPCODE_PONG:
print(f"Received pong: {frame.data}")
break
elif opcode == ABNF.OPCODE_PING:
print("Received ping, sending pong")
ws.pong(frame.data)
elif opcode == ABNF.OPCODE_CLOSE:
print("Server closed connection")
break
ws.close()from websocket import create_connection
ws = create_connection("ws://echo.websocket.events/")
# Send some test messages
for i in range(3):
ws.send(f"Message {i}")
# Use iterator interface to receive
try:
for message in ws:
print(f"Received: {message}")
if "Message 2" in message:
break
except KeyboardInterrupt:
pass
ws.close()Install with Tessl CLI
npx tessl i tessl/pypi-websocket-client