WebSocket client & server library, WAMP real-time framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete RFC 6455 WebSocket implementation providing both client and server functionality with extensive protocol options, message compression, frame-level API access, and streaming capabilities.
Server-side WebSocket protocol implementation handling connection handshakes, message processing, and connection lifecycle management.
class WebSocketServerProtocol:
def onConnect(self, request: ConnectionRequest) -> None:
"""
Called when WebSocket connection request is received.
Parameters:
- request: ConnectionRequest with peer, headers, host, path, params, version, origin, protocols, extensions
Returns:
None, ConnectionAccept, or ConnectionDeny
"""
def onOpen(self) -> None:
"""Called when WebSocket connection is established."""
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
"""
Send WebSocket message.
Parameters:
- payload: Message data
- isBinary: True for binary message, False for text
"""
def onMessage(self, payload: bytes, isBinary: bool) -> None:
"""
Called when WebSocket message is received.
Parameters:
- payload: Message data
- isBinary: True for binary message, False for text
"""
def sendClose(self, code: int = None, reason: str = None) -> None:
"""
Initiate WebSocket close handshake.
Parameters:
- code: Close status code
- reason: Close reason string
"""
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
"""
Called when WebSocket connection is closed.
Parameters:
- wasClean: True if closed cleanly
- code: Close status code
- reason: Close reason string
"""
def sendPing(self, payload: bytes = None) -> None:
"""Send WebSocket ping frame."""
def onPing(self, payload: bytes) -> None:
"""Called when ping frame received."""
def sendPong(self, payload: bytes = None) -> None:
"""Send WebSocket pong frame."""
def onPong(self, payload: bytes) -> None:
"""Called when pong frame received."""Client-side WebSocket protocol implementation for connecting to WebSocket servers.
class WebSocketClientProtocol:
def onConnect(self, response: ConnectionResponse) -> None:
"""
Called when WebSocket handshake response received.
Parameters:
- response: ConnectionResponse with peer, headers, version, protocol, extensions
"""
def onOpen(self) -> None:
"""Called when WebSocket connection is established."""
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
"""
Send WebSocket message.
Parameters:
- payload: Message data
- isBinary: True for binary message, False for text
"""
def onMessage(self, payload: bytes, isBinary: bool) -> None:
"""
Called when WebSocket message is received.
Parameters:
- payload: Message data
- isBinary: True for binary message, False for text
"""
def sendClose(self, code: int = None, reason: str = None) -> None:
"""Initiate WebSocket close handshake."""
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
"""Called when WebSocket connection is closed."""
def sendPing(self, payload: bytes = None) -> None:
"""Send WebSocket ping frame."""
def onPing(self, payload: bytes) -> None:
"""Called when ping frame received."""
def sendPong(self, payload: bytes = None) -> None:
"""Send WebSocket pong frame."""
def onPong(self, payload: bytes) -> None:
"""Called when pong frame received."""Factory classes for creating WebSocket protocol instances with configuration and protocol options.
class WebSocketServerFactory:
def __init__(
self,
url: str = None,
protocols: list = None,
server: str = None,
headers: dict = None,
externalPort: int = None
):
"""
WebSocket server factory.
Parameters:
- url: WebSocket server URL
- protocols: List of supported subprotocols
- server: Server identifier string
- headers: Additional HTTP headers
- externalPort: External port number
"""
def setProtocolOptions(
self,
allowHixie76: bool = None,
webStatus: bool = None,
utf8validateIncoming: bool = None,
maskServerFrames: bool = None,
requireMaskedClientFrames: bool = None,
applyMask: bool = None,
maxFramePayloadSize: int = None,
maxMessagePayloadSize: int = None,
autoFragmentSize: int = None,
failByDrop: bool = None,
echoCloseCodeReason: bool = None,
openHandshakeTimeout: float = None,
closeHandshakeTimeout: float = None,
tcpNoDelay: bool = None,
perMessageCompressionAccept: callable = None,
perMessageCompressionOffers: list = None,
perMessageCompressionRaw: bool = None,
compression: str = None
) -> None:
"""Set WebSocket protocol options."""
class WebSocketClientFactory:
def __init__(
self,
url: str = None,
origin: str = None,
protocols: list = None,
useragent: str = None,
headers: dict = None,
proxy: dict = None
):
"""
WebSocket client factory.
Parameters:
- url: WebSocket server URL to connect to
- origin: Origin header value
- protocols: List of requested subprotocols
- useragent: User-Agent header value
- headers: Additional HTTP headers
- proxy: Proxy configuration
"""
def setProtocolOptions(
self,
version: int = None,
utf8validateIncoming: bool = None,
acceptMaskedServerFrames: bool = None,
maskClientFrames: bool = None,
applyMask: bool = None,
maxFramePayloadSize: int = None,
maxMessagePayloadSize: int = None,
autoFragmentSize: int = None,
failByDrop: bool = None,
echoCloseCodeReason: bool = None,
serverConnectionDropTimeout: float = None,
openHandshakeTimeout: float = None,
closeHandshakeTimeout: float = None,
tcpNoDelay: bool = None,
perMessageCompressionOffers: list = None,
perMessageCompressionAccept: callable = None,
compression: str = None
) -> None:
"""Set WebSocket protocol options."""Frame-level API for applications requiring fine-grained control over WebSocket message framing.
class IWebSocketChannelFrameApi:
def onMessageBegin(self, isBinary: bool) -> None:
"""Called when WebSocket message begins."""
def onMessageFrame(self, payload: bytes) -> None:
"""Called when complete message frame received."""
def onMessageEnd(self) -> None:
"""Called when WebSocket message ends."""
def beginMessage(self, isBinary: bool = False, doNotCompress: bool = False) -> None:
"""Begin sending fragmented message."""
def sendMessageFrame(self, payload: bytes, sync: bool = False) -> None:
"""Send message frame."""
def endMessage(self) -> None:
"""End fragmented message."""Streaming API for processing WebSocket messages as data streams rather than complete messages.
class IWebSocketChannelStreamingApi:
def onMessageFrameBegin(self, length: int) -> None:
"""Called when message frame begins."""
def onMessageFrameData(self, payload: bytes) -> None:
"""Called when frame data received."""
def onMessageFrameEnd(self) -> None:
"""Called when message frame ends."""
def beginMessageFrame(self, length: int) -> None:
"""Begin sending message frame."""
def sendMessageFrameData(self, payload: bytes, sync: bool = False) -> None:
"""Send frame data."""class ConnectionRequest:
peer: str # Remote peer address
headers: dict # HTTP headers
host: str # Host header value
path: str # Request path
params: dict # Query parameters
version: int # WebSocket version
origin: str # Origin header
protocols: list # Requested subprotocols
extensions: list # Requested extensions
class ConnectionResponse:
peer: str # Remote peer address
headers: dict # HTTP response headers
version: int # WebSocket version
protocol: str # Selected subprotocol
extensions: list # Negotiated extensions
class ConnectionAccept:
def __init__(self, subprotocol: str = None, headers: list = None): ...
class ConnectionDeny(Exception):
def __init__(self, code: int, reason: str = None): ...class Message:
payload: bytes # Message payload
is_binary: bool # Binary message flag
class IncomingMessage(Message):
payload: bytes # Incoming message data
is_binary: bool # True for binary, False for text
class OutgoingMessage(Message):
payload: bytes # Outgoing message data
is_binary: bool # Message type flag
skip_compress: bool # Skip compression flag
class Ping:
def __init__(self, payload: bytes = None): ...import asyncio
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
class EchoServerProtocol(WebSocketServerProtocol):
def onMessage(self, payload, isBinary):
# Echo back received message
self.sendMessage(payload, isBinary)
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = EchoServerProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)
loop.run_forever()import asyncio
from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory
class MyClientProtocol(WebSocketClientProtocol):
def onOpen(self):
self.sendMessage(b"Hello Server!", isBinary=False)
def onMessage(self, payload, isBinary):
print(f"Received: {payload.decode('utf8')}")
factory = WebSocketClientFactory("ws://localhost:9000", protocols=["chat"])
factory.protocol = MyClientProtocol
loop = asyncio.get_event_loop()
coro = loop.create_connection(factory, '127.0.0.1', 9000)
loop.run_until_complete(coro)
loop.run_forever()class SelectiveServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
# Accept only connections from localhost
if request.peer != '127.0.0.1':
raise ConnectionDeny(403, "Forbidden")
# Select subprotocol
if 'chat' in request.protocols:
return ConnectionAccept(subprotocol='chat')
return ConnectionAccept()Install with Tessl CLI
npx tessl i tessl/pypi-autobahn