Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
WebSocket transport adapter enabling STOMP messaging over WebSocket connections for browser-based applications, web-friendly messaging patterns, and environments where traditional TCP sockets are not available.
STOMP over WebSocket connection supporting all STOMP 1.2 features with WebSocket-specific transport handling.
class WSConnection:
def __init__(self,
host_and_ports=None,
prefer_localhost=True,
try_loopback_connect=True,
reconnect_sleep_initial=0.1,
reconnect_sleep_increase=0.5,
reconnect_sleep_jitter=0.1,
reconnect_sleep_max=60.0,
reconnect_attempts_max=3,
timeout=None,
heartbeats=(0, 0),
keepalive=None,
vhost=None,
auto_decode=True,
encoding="utf-8",
auto_content_length=True,
heart_beat_receive_scale=1.5,
bind_host_port=None,
ws=None,
ws_path=None,
header=None,
binary_mode=False):
"""
Create WebSocket STOMP connection.
Parameters:
- host_and_ports: list of tuples, WebSocket host/port pairs [('localhost', 8080)]
- prefer_localhost: bool, prefer localhost connections
- try_loopback_connect: bool, try loopback if localhost fails
- reconnect_sleep_initial: float, initial reconnect delay
- reconnect_sleep_increase: float, delay increase factor
- reconnect_sleep_jitter: float, random delay variation
- reconnect_sleep_max: float, maximum reconnect delay
- reconnect_attempts_max: int, maximum reconnect attempts
- timeout: float, socket timeout in seconds
- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
- keepalive: bool, enable keepalive
- vhost: str, virtual host name
- auto_decode: bool, automatically decode message bodies
- encoding: str, text encoding for messages
- auto_content_length: bool, automatically set content-length header
- heart_beat_receive_scale: float, heartbeat timeout scale factor
- bind_host_port: tuple, local bind address
- ws: WebSocket connection object
- ws_path: str, WebSocket path
- header: dict, WebSocket headers
- binary_mode: bool, use binary WebSocket frames
"""
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
"""
Connect to STOMP broker via WebSocket.
Parameters:
- username: str, authentication username
- passcode: str, authentication password
- wait: bool, wait for connection confirmation
- headers: dict, additional connection headers
- **keyword_headers: additional headers as keyword arguments
"""
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""
Disconnect from STOMP broker.
Parameters:
- receipt: str, receipt ID for disconnect confirmation
- headers: dict, additional disconnect headers
- **keyword_headers: additional headers as keyword arguments
"""
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
"""
Send message via WebSocket transport.
Parameters:
- body: str, message body
- destination: str, destination queue/topic
- content_type: str, message content type
- headers: dict, message headers
- **keyword_headers: additional headers as keyword arguments
"""
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
"""
Subscribe to destination via WebSocket.
Parameters:
- destination: str, destination queue/topic
- id: str, subscription ID
- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
- headers: dict, subscription headers
- **keyword_headers: additional headers as keyword arguments
"""
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
"""
Unsubscribe from destination.
Parameters:
- destination: str, destination to unsubscribe from
- id: str, subscription ID to unsubscribe
- headers: dict, unsubscribe headers
- **keyword_headers: additional headers as keyword arguments
"""
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""
Acknowledge message.
Parameters:
- id: str, message ID to acknowledge
- subscription: str, subscription ID
- transaction: str, transaction ID
- headers: dict, ack headers
- **keyword_headers: additional headers as keyword arguments
"""
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""
Negative acknowledge message.
Parameters:
- id: str, message ID to nack
- subscription: str, subscription ID
- transaction: str, transaction ID
- headers: dict, nack headers
- **keyword_headers: additional headers as keyword arguments
"""Low-level WebSocket transport implementation for STOMP protocol.
class WSTransport:
def __init__(self,
url,
auto_decode=True,
encoding="utf-8",
is_eol_fc=None,
**kwargs):
"""
Initialize WebSocket transport.
Parameters:
- url: str, WebSocket URL
- auto_decode: bool, automatically decode message bodies
- encoding: str, text encoding for messages
- is_eol_fc: callable, end-of-line detection function
- **kwargs: additional WebSocket parameters
"""
def start(self):
"""Start WebSocket connection."""
def stop(self, timeout=None):
"""
Stop WebSocket connection.
Parameters:
- timeout: float, stop timeout in seconds
"""
def send(self, frame):
"""
Send STOMP frame via WebSocket.
Parameters:
- frame: Frame, STOMP frame to send
"""
def receive(self):
"""
Receive data from WebSocket.
Returns:
bytes: received data
"""
def is_connected(self) -> bool:
"""
Check if WebSocket is connected.
Returns:
bool: True if connected, False otherwise
"""import stomp
# Create WebSocket connection
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
# Set up message handler
class WSMessageHandler(stomp.ConnectionListener):
def on_message(self, frame):
print(f"WebSocket message: {frame.body}")
def on_error(self, frame):
print(f"WebSocket error: {frame.body}")
handler = WSMessageHandler()
ws_conn.set_listener('handler', handler)
# Connect and use
ws_conn.connect('user', 'password', wait=True)
ws_conn.subscribe('/topic/updates', id=1)
ws_conn.send(body='Hello via WebSocket', destination='/topic/chat')
# Keep connection alive
import time
time.sleep(10)
ws_conn.disconnect()import stomp
# Create secure WebSocket connection
ws_conn = stomp.WSConnection('wss://secure-broker.example.com/stomp')
# WebSocket-specific options
ws_conn = stomp.WSConnection(
'wss://broker.example.com/stomp',
heartbeats=(10000, 10000), # 10 second heartbeats
timeout=30, # 30 second timeout
reconnect_attempts_max=5 # Max 5 reconnection attempts
)
ws_conn.connect('username', 'password', wait=True)import stomp
import json
class BrowserCompatibleHandler(stomp.ConnectionListener):
def __init__(self):
self.message_queue = []
def on_message(self, frame):
# Handle JSON messages typical in web applications
try:
data = json.loads(frame.body)
self.process_web_message(data, frame.headers)
except json.JSONDecodeError:
# Handle plain text messages
self.process_text_message(frame.body, frame.headers)
def process_web_message(self, data, headers):
# Process structured web messages
message_type = data.get('type', 'unknown')
if message_type == 'notification':
self.handle_notification(data)
elif message_type == 'update':
self.handle_update(data)
# Store for polling-based retrieval
self.message_queue.append({
'data': data,
'headers': headers,
'timestamp': time.time()
})
def process_text_message(self, body, headers):
# Handle plain text messages
self.message_queue.append({
'body': body,
'headers': headers,
'timestamp': time.time()
})
def get_messages(self):
"""Get queued messages for web application."""
messages = self.message_queue[:]
self.message_queue.clear()
return messages
def handle_notification(self, data):
# Handle notification-type messages
pass
def handle_update(self, data):
# Handle update-type messages
pass
# Setup WebSocket connection for web app
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
handler = BrowserCompatibleHandler()
ws_conn.set_listener('web_handler', handler)
# Connect and subscribe to web-friendly topics
ws_conn.connect('webapp_user', 'webapp_pass', wait=True)
ws_conn.subscribe('/topic/notifications', id=1)
ws_conn.subscribe('/topic/updates', id=2)
# Send JSON message
message_data = {
'type': 'user_action',
'action': 'login',
'user_id': '12345',
'timestamp': time.time()
}
ws_conn.send(
body=json.dumps(message_data),
destination='/topic/user_actions',
content_type='application/json'
)
# Periodic message retrieval pattern
def get_new_messages():
return handler.get_messages()import stomp
import threading
import time
class HeartbeatMonitor(stomp.ConnectionListener):
def __init__(self):
self.last_heartbeat = time.time()
self.connection_healthy = True
def on_heartbeat(self):
self.last_heartbeat = time.time()
self.connection_healthy = True
print("WebSocket heartbeat received")
def on_heartbeat_timeout(self):
self.connection_healthy = False
print("WebSocket heartbeat timeout - connection may be unhealthy")
def monitor_connection(self):
"""Monitor connection health based on heartbeats."""
while True:
time.sleep(5) # Check every 5 seconds
if time.time() - self.last_heartbeat > 30: # 30 second threshold
print("Connection appears stale")
self.connection_healthy = False
if not self.connection_healthy:
print("Connection health check failed")
# Trigger reconnection logic
break
# Create WebSocket connection with heartbeats
ws_conn = stomp.WSConnection(
'ws://localhost:61614/stomp',
heartbeats=(10000, 10000) # 10 second heartbeats
)
monitor = HeartbeatMonitor()
ws_conn.set_listener('monitor', monitor)
# Start monitoring in background
monitor_thread = threading.Thread(target=monitor.monitor_connection)
monitor_thread.daemon = True
monitor_thread.start()
ws_conn.connect('user', 'password', wait=True)import stomp
import time
class WebSocketErrorHandler(stomp.ConnectionListener):
def __init__(self, connection):
self.connection = connection
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
def on_error(self, frame):
error_msg = frame.body
print(f"WebSocket STOMP error: {error_msg}")
# Handle specific WebSocket errors
if 'connection refused' in error_msg.lower():
self.handle_connection_refused()
elif 'unauthorized' in error_msg.lower():
self.handle_unauthorized()
else:
self.handle_generic_error(error_msg)
def on_disconnected(self):
print("WebSocket disconnected")
if self.reconnect_attempts < self.max_reconnect_attempts:
self.attempt_reconnect()
else:
print("Max reconnection attempts reached")
def handle_connection_refused(self):
print("WebSocket connection refused - broker may be down")
time.sleep(5) # Wait before retry
def handle_unauthorized(self):
print("WebSocket authentication failed")
# Don't auto-reconnect on auth failures
self.max_reconnect_attempts = 0
def handle_generic_error(self, error_msg):
print(f"Generic WebSocket error: {error_msg}")
def attempt_reconnect(self):
self.reconnect_attempts += 1
delay = min(self.reconnect_attempts * 2, 30) # Exponential backoff, max 30s
print(f"Attempting reconnection {self.reconnect_attempts}/{self.max_reconnect_attempts} in {delay}s")
time.sleep(delay)
try:
self.connection.connect('user', 'password', wait=True)
self.reconnect_attempts = 0 # Reset on successful connection
print("WebSocket reconnection successful")
except Exception as e:
print(f"Reconnection failed: {e}")
# Setup WebSocket with error handling
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
error_handler = WebSocketErrorHandler(ws_conn)
ws_conn.set_listener('error_handler', error_handler)
ws_conn.connect('user', 'password', wait=True)Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py