Tornado is a Python web framework and asynchronous networking library designed for applications requiring long-lived connections to many users.
Full WebSocket protocol implementation for real-time, bidirectional communication between client and server. Supports both server-side handlers and client connections.
Server-side WebSocket handler for processing incoming WebSocket connections, messages, and managing connection lifecycle.
class WebSocketHandler(tornado.web.RequestHandler):
"""WebSocket request handler for server-side connections."""
def open(self, *args, **kwargs):
"""
Called when WebSocket connection is opened.
Args:
*args: URL path arguments
**kwargs: URL keyword arguments
"""
def on_message(self, message):
"""
Called when message is received from client.
Args:
message: Message content (str or bytes)
"""
def on_close(self):
"""Called when WebSocket connection is closed."""
def write_message(self, message, binary: bool = False):
"""
Send message to client.
Args:
message: Message to send (str or bytes)
binary: Whether message is binary
Raises:
WebSocketClosedError: If connection is closed
"""
def close(self, code: int = None, reason: str = None):
"""
Close WebSocket connection.
Args:
code: Close status code
reason: Close reason string
"""
def check_origin(self, origin: str) -> bool:
"""
Check if request origin is allowed.
Args:
origin: Request origin header
Returns:
True if origin is allowed, False otherwise
"""
def get_compression_options(self):
"""Get WebSocket compression options."""
def set_nodelay(self, value: bool):
"""Enable/disable Nagle's algorithm."""
def ping(self, data: bytes = b"") -> Future:
"""
Send ping frame to client.
Args:
data: Ping payload data
Returns:
Future that resolves when pong is received
"""
def on_pong(self, data: bytes):
"""
Called when pong frame is received.
Args:
data: Pong payload data
"""Client-side WebSocket connection for connecting to WebSocket servers and managing client connections.
def websocket_connect(url: str, callback=None, connect_timeout: float = None, on_message_callback=None, compression_options=None, ping_interval: float = None, ping_timeout: float = None, max_message_size: int = None, subprotocols=None) -> Future:
"""
Connect to WebSocket server.
Args:
url: WebSocket URL (ws:// or wss://)
callback: Callback function (if not using async/await)
connect_timeout: Connection timeout in seconds
on_message_callback: Callback for incoming messages
compression_options: Compression configuration
ping_interval: Ping interval in seconds
ping_timeout: Ping timeout in seconds
max_message_size: Maximum message size in bytes
subprotocols: List of supported subprotocols
Returns:
Future resolving to WebSocketClientConnection
Usage:
conn = await websocket_connect("ws://example.com/websocket")
await conn.write_message("Hello")
msg = await conn.read_message()
conn.close()
"""
class WebSocketClientConnection:
"""Client-side WebSocket connection."""
def write_message(self, message, binary: bool = False) -> Future:
"""
Send message to server.
Args:
message: Message to send (str or bytes)
binary: Whether message is binary
Returns:
Future that resolves when message is sent
"""
def read_message(self, callback=None) -> Future:
"""
Read next message from server.
Args:
callback: Callback function (if not using async/await)
Returns:
Future resolving to message string/bytes or None if closed
"""
def close(self, code: int = None, reason: str = None):
"""
Close WebSocket connection.
Args:
code: Close status code
reason: Close reason string
"""
def ping(self, data: bytes = b"") -> Future:
"""
Send ping frame to server.
Args:
data: Ping payload data
Returns:
Future that resolves when pong is received
"""
@property
def protocol(self) -> WebSocketProtocol:
"""Get WebSocket protocol handler."""Low-level WebSocket protocol implementation handling frame parsing, masking, and protocol compliance.
class WebSocketProtocol:
"""Abstract WebSocket protocol handler."""
def __init__(self, handler, mask_outgoing: bool = False, compression_options=None):
"""
Initialize WebSocket protocol.
Args:
handler: WebSocket handler object
mask_outgoing: Whether to mask outgoing frames
compression_options: Compression configuration
"""
def accept_connection(self, handler):
"""Accept WebSocket connection."""
def write_message(self, message, binary: bool = False, locked: bool = True):
"""Write WebSocket message."""
def read_message(self, callback):
"""Read WebSocket message."""
def close(self, code: int = None, reason: str = None):
"""Close WebSocket connection."""
def ping(self, data: bytes) -> Future:
"""Send ping frame."""
def pong(self, data: bytes):
"""Send pong frame."""
class WebSocketProtocol13(WebSocketProtocol):
"""WebSocket protocol version 13 implementation."""
def __init__(self, handler, mask_outgoing: bool = False, compression_options=None):
"""Initialize WebSocket protocol v13."""
def compute_accept_value(self, key: str) -> str:
"""Compute WebSocket accept value from key."""
def challenge_response(self, challenge: str) -> str:
"""Generate challenge response for client."""Utility functions for WebSocket protocol handling, frame processing, and connection management.
def websocket_connect(url: str, **kwargs):
"""Create WebSocket client connection."""
def encode_username_password(username: str, password: str) -> str:
"""Encode credentials for WebSocket authentication."""
class WebSocketMask:
"""WebSocket frame masking utilities."""
@staticmethod
def mask(mask: bytes, data: bytes) -> bytes:
"""Apply WebSocket mask to data."""import tornado.ioloop
import tornado.web
import tornado.websocket
class EchoWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
def on_message(self, message):
print(f"Received: {message}")
self.write_message(f"Echo: {message}")
def on_close(self):
print("WebSocket closed")
def check_origin(self, origin):
# Allow connections from any origin
return True
app = tornado.web.Application([
(r"/websocket", EchoWebSocket),
])
if __name__ == "__main__":
app.listen(8888)
print("WebSocket server started on ws://localhost:8888/websocket")
tornado.ioloop.IOLoop.current().start()import asyncio
import tornado.websocket
async def websocket_client():
# Connect to WebSocket server
conn = await tornado.websocket.websocket_connect("ws://localhost:8888/websocket")
try:
# Send message
await conn.write_message("Hello WebSocket!")
# Read response
msg = await conn.read_message()
print(f"Received: {msg}")
finally:
# Close connection
conn.close()
if __name__ == "__main__":
asyncio.run(websocket_client())import tornado.ioloop
import tornado.web
import tornado.websocket
import json
class ChatWebSocket(tornado.websocket.WebSocketHandler):
clients = set()
def open(self):
print("Client connected")
ChatWebSocket.clients.add(self)
def on_message(self, message):
try:
data = json.loads(message)
# Broadcast message to all connected clients
for client in ChatWebSocket.clients:
if client != self: # Don't send back to sender
client.write_message(json.dumps({
'user': data.get('user', 'Anonymous'),
'message': data.get('message', ''),
'type': 'message'
}))
except json.JSONDecodeError:
self.write_message(json.dumps({
'error': 'Invalid JSON format',
'type': 'error'
}))
def on_close(self):
print("Client disconnected")
ChatWebSocket.clients.discard(self)
def check_origin(self, origin):
return True
app = tornado.web.Application([
(r"/chat", ChatWebSocket),
])# WebSocket message type
WebSocketMessage = Union[str, bytes, None]
# Close code type
WebSocketCloseCode = int
# Compression options type
CompressionOptions = Dict[str, Any]
# Subprotocols type
SubProtocols = List[str]
# WebSocket handler type
WebSocketHandlerType = Type[WebSocketHandler]# WebSocket close codes
WS_CLOSE_NORMAL = 1000
WS_CLOSE_GOING_AWAY = 1001
WS_CLOSE_PROTOCOL_ERROR = 1002
WS_CLOSE_UNSUPPORTED_DATA = 1003
WS_CLOSE_NO_STATUS = 1005
WS_CLOSE_ABNORMAL = 1006
WS_CLOSE_INVALID_DATA = 1007
WS_CLOSE_POLICY_VIOLATION = 1008
WS_CLOSE_MESSAGE_TOO_BIG = 1009
WS_CLOSE_MISSING_EXTENSION = 1010
WS_CLOSE_INTERNAL_ERROR = 1011
# WebSocket opcodes
WS_OPCODE_CONTINUATION = 0x0
WS_OPCODE_TEXT = 0x1
WS_OPCODE_BINARY = 0x2
WS_OPCODE_CLOSE = 0x8
WS_OPCODE_PING = 0x9
WS_OPCODE_PONG = 0xaclass WebSocketError(Exception):
"""Base exception for WebSocket errors."""
class WebSocketClosedError(WebSocketError):
"""Exception raised when connection is closed."""
def __init__(self, real_error=None):
"""
Initialize WebSocket closed error.
Args:
real_error: Underlying error that caused closure
"""Install with Tessl CLI
npx tessl i tessl/pypi-tornado