CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stomp-py

Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol

Overview
Eval results
Files

websocket.mddocs/

WebSocket Support

WebSocket transport adapter enabling STOMP messaging over WebSocket connections for browser-based applications, web-friendly messaging patterns, and environments where traditional TCP sockets are not available.

Capabilities

WebSocket Connection

STOMP over WebSocket connection supporting all STOMP 1.2 features with WebSocket-specific transport handling.

class WSConnection:
    def __init__(self, 
                 host_and_ports=None,
                 prefer_localhost=True,
                 try_loopback_connect=True,
                 reconnect_sleep_initial=0.1,
                 reconnect_sleep_increase=0.5,
                 reconnect_sleep_jitter=0.1,
                 reconnect_sleep_max=60.0,
                 reconnect_attempts_max=3,
                 timeout=None,
                 heartbeats=(0, 0),
                 keepalive=None,
                 vhost=None,
                 auto_decode=True,
                 encoding="utf-8",
                 auto_content_length=True,
                 heart_beat_receive_scale=1.5,
                 bind_host_port=None,
                 ws=None,
                 ws_path=None,
                 header=None,
                 binary_mode=False):
        """
        Create WebSocket STOMP connection.
        
        Parameters:
        - host_and_ports: list of tuples, WebSocket host/port pairs [('localhost', 8080)]
        - prefer_localhost: bool, prefer localhost connections
        - try_loopback_connect: bool, try loopback if localhost fails
        - reconnect_sleep_initial: float, initial reconnect delay
        - reconnect_sleep_increase: float, delay increase factor
        - reconnect_sleep_jitter: float, random delay variation
        - reconnect_sleep_max: float, maximum reconnect delay
        - reconnect_attempts_max: int, maximum reconnect attempts
        - timeout: float, socket timeout in seconds
        - heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
        - keepalive: bool, enable keepalive
        - vhost: str, virtual host name
        - auto_decode: bool, automatically decode message bodies
        - encoding: str, text encoding for messages
        - auto_content_length: bool, automatically set content-length header
        - heart_beat_receive_scale: float, heartbeat timeout scale factor
        - bind_host_port: tuple, local bind address
        - ws: WebSocket connection object
        - ws_path: str, WebSocket path
        - header: dict, WebSocket headers
        - binary_mode: bool, use binary WebSocket frames
        """
    
    def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
        """
        Connect to STOMP broker via WebSocket.
        
        Parameters:
        - username: str, authentication username
        - passcode: str, authentication password
        - wait: bool, wait for connection confirmation
        - headers: dict, additional connection headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def disconnect(self, receipt=None, headers=None, **keyword_headers):
        """
        Disconnect from STOMP broker.
        
        Parameters:
        - receipt: str, receipt ID for disconnect confirmation
        - headers: dict, additional disconnect headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
        """
        Send message via WebSocket transport.
        
        Parameters:
        - body: str, message body
        - destination: str, destination queue/topic
        - content_type: str, message content type
        - headers: dict, message headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
        """
        Subscribe to destination via WebSocket.
        
        Parameters:
        - destination: str, destination queue/topic
        - id: str, subscription ID
        - ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
        - headers: dict, subscription headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
        """
        Unsubscribe from destination.
        
        Parameters:
        - destination: str, destination to unsubscribe from
        - id: str, subscription ID to unsubscribe
        - headers: dict, unsubscribe headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
        """
        Acknowledge message.
        
        Parameters:
        - id: str, message ID to acknowledge
        - subscription: str, subscription ID
        - transaction: str, transaction ID
        - headers: dict, ack headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
        """
        Negative acknowledge message.
        
        Parameters:
        - id: str, message ID to nack
        - subscription: str, subscription ID
        - transaction: str, transaction ID
        - headers: dict, nack headers
        - **keyword_headers: additional headers as keyword arguments
        """

WebSocket Transport

Low-level WebSocket transport implementation for STOMP protocol.

class WSTransport:
    def __init__(self, 
                 url,
                 auto_decode=True, 
                 encoding="utf-8",
                 is_eol_fc=None,
                 **kwargs):
        """
        Initialize WebSocket transport.
        
        Parameters:
        - url: str, WebSocket URL
        - auto_decode: bool, automatically decode message bodies
        - encoding: str, text encoding for messages
        - is_eol_fc: callable, end-of-line detection function
        - **kwargs: additional WebSocket parameters
        """
    
    def start(self):
        """Start WebSocket connection."""
    
    def stop(self, timeout=None):
        """
        Stop WebSocket connection.
        
        Parameters:
        - timeout: float, stop timeout in seconds
        """
    
    def send(self, frame):
        """
        Send STOMP frame via WebSocket.
        
        Parameters:
        - frame: Frame, STOMP frame to send
        """
    
    def receive(self):
        """
        Receive data from WebSocket.
        
        Returns:
        bytes: received data
        """
    
    def is_connected(self) -> bool:
        """
        Check if WebSocket is connected.
        
        Returns:
        bool: True if connected, False otherwise
        """

Usage Examples

Basic WebSocket Connection

import stomp

# Create WebSocket connection
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')

# Set up message handler
class WSMessageHandler(stomp.ConnectionListener):
    def on_message(self, frame):
        print(f"WebSocket message: {frame.body}")
    
    def on_error(self, frame):
        print(f"WebSocket error: {frame.body}")

handler = WSMessageHandler()
ws_conn.set_listener('handler', handler)

# Connect and use
ws_conn.connect('user', 'password', wait=True)
ws_conn.subscribe('/topic/updates', id=1)
ws_conn.send(body='Hello via WebSocket', destination='/topic/chat')

# Keep connection alive
import time
time.sleep(10)

ws_conn.disconnect()

Secure WebSocket (WSS)

import stomp

# Create secure WebSocket connection
ws_conn = stomp.WSConnection('wss://secure-broker.example.com/stomp')

# WebSocket-specific options
ws_conn = stomp.WSConnection(
    'wss://broker.example.com/stomp',
    heartbeats=(10000, 10000),  # 10 second heartbeats
    timeout=30,                 # 30 second timeout
    reconnect_attempts_max=5    # Max 5 reconnection attempts
)

ws_conn.connect('username', 'password', wait=True)

Browser-Compatible Usage Pattern

import stomp
import json

class BrowserCompatibleHandler(stomp.ConnectionListener):
    def __init__(self):
        self.message_queue = []
    
    def on_message(self, frame):
        # Handle JSON messages typical in web applications
        try:
            data = json.loads(frame.body)
            self.process_web_message(data, frame.headers)
        except json.JSONDecodeError:
            # Handle plain text messages
            self.process_text_message(frame.body, frame.headers)
    
    def process_web_message(self, data, headers):
        # Process structured web messages
        message_type = data.get('type', 'unknown')
        
        if message_type == 'notification':
            self.handle_notification(data)
        elif message_type == 'update':
            self.handle_update(data)
        
        # Store for polling-based retrieval
        self.message_queue.append({
            'data': data,
            'headers': headers,
            'timestamp': time.time()
        })
    
    def process_text_message(self, body, headers):
        # Handle plain text messages
        self.message_queue.append({
            'body': body,
            'headers': headers,
            'timestamp': time.time()
        })
    
    def get_messages(self):
        """Get queued messages for web application."""
        messages = self.message_queue[:]
        self.message_queue.clear()
        return messages
    
    def handle_notification(self, data):
        # Handle notification-type messages
        pass
    
    def handle_update(self, data):
        # Handle update-type messages
        pass

# Setup WebSocket connection for web app
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
handler = BrowserCompatibleHandler()
ws_conn.set_listener('web_handler', handler)

# Connect and subscribe to web-friendly topics
ws_conn.connect('webapp_user', 'webapp_pass', wait=True)
ws_conn.subscribe('/topic/notifications', id=1)
ws_conn.subscribe('/topic/updates', id=2)

# Send JSON message
message_data = {
    'type': 'user_action',
    'action': 'login',
    'user_id': '12345',
    'timestamp': time.time()
}

ws_conn.send(
    body=json.dumps(message_data),
    destination='/topic/user_actions',
    content_type='application/json'
)

# Periodic message retrieval pattern
def get_new_messages():
    return handler.get_messages()

WebSocket with Heartbeats

import stomp
import threading
import time

class HeartbeatMonitor(stomp.ConnectionListener):
    def __init__(self):
        self.last_heartbeat = time.time()
        self.connection_healthy = True
    
    def on_heartbeat(self):
        self.last_heartbeat = time.time()
        self.connection_healthy = True
        print("WebSocket heartbeat received")
    
    def on_heartbeat_timeout(self):
        self.connection_healthy = False
        print("WebSocket heartbeat timeout - connection may be unhealthy")
    
    def monitor_connection(self):
        """Monitor connection health based on heartbeats."""
        while True:
            time.sleep(5)  # Check every 5 seconds
            
            if time.time() - self.last_heartbeat > 30:  # 30 second threshold
                print("Connection appears stale")
                self.connection_healthy = False
            
            if not self.connection_healthy:
                print("Connection health check failed")
                # Trigger reconnection logic
                break

# Create WebSocket connection with heartbeats
ws_conn = stomp.WSConnection(
    'ws://localhost:61614/stomp',
    heartbeats=(10000, 10000)  # 10 second heartbeats
)

monitor = HeartbeatMonitor()
ws_conn.set_listener('monitor', monitor)

# Start monitoring in background
monitor_thread = threading.Thread(target=monitor.monitor_connection)
monitor_thread.daemon = True
monitor_thread.start()

ws_conn.connect('user', 'password', wait=True)

WebSocket Error Handling

import stomp
import time

class WebSocketErrorHandler(stomp.ConnectionListener):
    def __init__(self, connection):
        self.connection = connection
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
    
    def on_error(self, frame):
        error_msg = frame.body
        print(f"WebSocket STOMP error: {error_msg}")
        
        # Handle specific WebSocket errors
        if 'connection refused' in error_msg.lower():
            self.handle_connection_refused()
        elif 'unauthorized' in error_msg.lower():
            self.handle_unauthorized()
        else:
            self.handle_generic_error(error_msg)
    
    def on_disconnected(self):
        print("WebSocket disconnected")
        
        if self.reconnect_attempts < self.max_reconnect_attempts:
            self.attempt_reconnect()
        else:
            print("Max reconnection attempts reached")
    
    def handle_connection_refused(self):
        print("WebSocket connection refused - broker may be down")
        time.sleep(5)  # Wait before retry
    
    def handle_unauthorized(self):
        print("WebSocket authentication failed")
        # Don't auto-reconnect on auth failures
        self.max_reconnect_attempts = 0
    
    def handle_generic_error(self, error_msg):
        print(f"Generic WebSocket error: {error_msg}")
    
    def attempt_reconnect(self):
        self.reconnect_attempts += 1
        delay = min(self.reconnect_attempts * 2, 30)  # Exponential backoff, max 30s
        
        print(f"Attempting reconnection {self.reconnect_attempts}/{self.max_reconnect_attempts} in {delay}s")
        time.sleep(delay)
        
        try:
            self.connection.connect('user', 'password', wait=True)
            self.reconnect_attempts = 0  # Reset on successful connection
            print("WebSocket reconnection successful")
        except Exception as e:
            print(f"Reconnection failed: {e}")

# Setup WebSocket with error handling
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
error_handler = WebSocketErrorHandler(ws_conn)
ws_conn.set_listener('error_handler', error_handler)

ws_conn.connect('user', 'password', wait=True)

Install with Tessl CLI

npx tessl i tessl/pypi-stomp-py

docs

adapters.md

cli.md

connections.md

index.md

listeners.md

protocol.md

types.md

utilities.md

websocket.md

tile.json