CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stomp-py

Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol

Overview
Eval results
Files

listeners.mddocs/

Event Handling

Listener-based event system for handling connection events, message delivery, error conditions, and protocol-specific events with built-in listeners for common use cases like statistics tracking, debugging, and synchronous operations.

Capabilities

Base Connection Listener

Abstract base class defining the event handling interface for all connection events.

class ConnectionListener:
    def on_connecting(self, host_and_port):
        """
        Called when TCP connection is established.
        
        Parameters:
        - host_and_port: tuple, (hostname, port) of connected broker
        """
    
    def on_connected(self, frame):
        """
        Called when STOMP connection is established.
        
        Parameters:
        - frame: Frame, CONNECTED frame from broker
        """
    
    def on_disconnecting(self):
        """
        Called before DISCONNECT frame is sent.
        """
    
    def on_disconnected(self):
        """
        Called when connection is lost or closed.
        """
    
    def on_heartbeat_timeout(self):
        """
        Called when heartbeat timeout occurs.
        """
    
    def on_before_message(self, frame):
        """
        Called before message processing.
        
        Parameters:
        - frame: Frame, message frame
        
        Returns:
        tuple: (headers, body) for processing, or None to skip
        """
    
    def on_message(self, frame):
        """
        Called when message is received.
        
        Parameters:
        - frame: Frame, message frame with headers and body
        """
    
    def on_receipt(self, frame):
        """
        Called when receipt confirmation is received.
        
        Parameters:
        - frame: Frame, receipt frame
        """
    
    def on_error(self, frame):
        """
        Called when error frame is received.
        
        Parameters:
        - frame: Frame, error frame with error details
        """
    
    def on_send(self, frame):
        """
        Called when frame is sent to broker.
        
        Parameters:
        - frame: Frame, outgoing frame
        """
    
    def on_heartbeat(self):
        """
        Called when heartbeat is received.
        """
    
    def on_receiver_loop_completed(self, frame):
        """
        Called when receiver loop completes.
        
        Parameters:
        - frame: Frame, final frame when receiver loop completes
        """

Statistics Listener

Tracks connection statistics and metrics for monitoring and debugging.

class StatsListener(ConnectionListener):
    def __init__(self):
        """Initialize statistics tracking listener."""
        self.errors = 0
        self.connections = 0
        self.disconnects = 0
        self.messages = 0
        self.messages_sent = 0
        self.heartbeat_timeouts = 0
        self.heartbeat_count = 0
    
    def __str__(self) -> str:
        """
        Get formatted statistics summary.
        
        Returns:
        str: formatted statistics
        """
    
    def on_connecting(self, host_and_port):
        """Increment connection counter."""
    
    def on_disconnected(self):
        """Increment disconnect counter."""
    
    def on_message(self, frame):
        """Increment message received counter."""
    
    def on_send(self, frame):
        """Increment message sent counter."""
    
    def on_error(self, frame):
        """Increment error counter."""
    
    def on_heartbeat_timeout(self):
        """Increment heartbeat timeout counter."""
    
    def on_heartbeat(self):
        """Increment heartbeat received counter."""

Waiting Listener

Synchronous listener that waits for specific events like receipts or disconnection.

class WaitingListener(ConnectionListener):
    def __init__(self, receipt):
        """
        Initialize waiting listener for specific receipt.
        
        Parameters:
        - receipt: str, receipt ID to wait for
        """
    
    def wait_on_receipt(self, timeout=10):
        """
        Wait for receipt confirmation.
        
        Parameters:
        - timeout: float, timeout in seconds
        
        Returns:
        bool: True if receipt received, False if timeout
        """
    
    def wait_on_disconnected(self, timeout=10):
        """
        Wait for disconnection event.
        
        Parameters:
        - timeout: float, timeout in seconds
        
        Returns:
        bool: True if disconnected, False if timeout
        """
    
    def on_receipt(self, frame):
        """Handle receipt and signal waiting threads."""
    
    def on_disconnected(self):
        """Handle disconnection and signal waiting threads."""

Printing Listener

Debug listener that prints all connection events to console or log.

class PrintingListener(ConnectionListener):
    def __init__(self, print_to_log=False):
        """
        Initialize printing listener.
        
        Parameters:
        - print_to_log: bool, print to log instead of stdout
        """
    
    def on_connecting(self, host_and_port):
        """Print connecting event."""
    
    def on_connected(self, frame):
        """Print connected event with frame details."""
    
    def on_disconnecting(self):
        """Print disconnecting event."""
    
    def on_disconnected(self):
        """Print disconnected event."""
    
    def on_message(self, frame):
        """Print received message details."""
    
    def on_error(self, frame):
        """Print error details."""
    
    def on_send(self, frame):
        """Print sent frame details."""
    
    def on_receipt(self, frame):
        """Print receipt confirmation."""
    
    def on_heartbeat(self):
        """Print heartbeat event."""
    
    def on_heartbeat_timeout(self):
        """Print heartbeat timeout."""

Test Listener

Combined listener for testing that includes statistics, waiting, and printing functionality.

class TestListener(StatsListener, WaitingListener, PrintingListener):
    def __init__(self, receipt=None, print_to_log=True):
        """
        Initialize test listener with combined functionality.
        
        Parameters:
        - receipt: str, receipt ID to wait for
        - print_to_log: bool, print events to log
        """
        self.message_list = []
        self.timestamp = None
    
    def wait_for_message(self, timeout=10):
        """
        Wait for next message.
        
        Parameters:
        - timeout: float, timeout in seconds
        
        Returns:
        Frame: received message frame or None if timeout
        """
    
    def get_latest_message(self):
        """
        Get most recently received message.
        
        Returns:
        Frame: latest message frame or None if no messages
        """
    
    def wait_for_heartbeat(self, timeout=10):
        """
        Wait for heartbeat.
        
        Parameters:
        - timeout: float, timeout in seconds
        
        Returns:
        bool: True if heartbeat received, False if timeout
        """
    
    def on_message(self, frame):
        """Store message in list and update timestamp."""

Heartbeat Listener

Internal listener for managing STOMP heartbeat functionality.

class HeartbeatListener(ConnectionListener):
    def __init__(self, transport, heartbeats, heart_beat_receive_scale=1.5):
        """
        Initialize heartbeat management.
        
        Parameters:
        - transport: Transport, connection transport
        - heartbeats: tuple, (send_ms, receive_ms) heartbeat intervals
        - heart_beat_receive_scale: float, receive timeout scale factor
        """
    
    def on_connected(self, frame):
        """Start heartbeat timers after connection."""
    
    def on_disconnected(self):
        """Stop heartbeat timers on disconnection."""
    
    def on_heartbeat(self):
        """Reset receive heartbeat timer."""
    
    def on_send(self, frame):
        """Update send heartbeat timer."""

Usage Examples

Custom Message Handler

import stomp

class MessageHandler(stomp.ConnectionListener):
    def __init__(self):
        self.processed_count = 0
    
    def on_message(self, frame):
        print(f"Processing message: {frame.body}")
        
        # Process message based on headers
        if frame.headers.get('type') == 'order':
            self.process_order(frame.body)
        elif frame.headers.get('type') == 'notification':
            self.send_notification(frame.body)
        
        self.processed_count += 1
    
    def on_error(self, frame):
        print(f"Error occurred: {frame.body}")
        # Handle error conditions
    
    def process_order(self, order_data):
        # Order processing logic
        pass
    
    def send_notification(self, notification_data):
        # Notification logic
        pass

# Use custom handler
conn = stomp.Connection([('localhost', 61613)])
handler = MessageHandler()
conn.set_listener('message_handler', handler)
conn.connect('user', 'pass', wait=True)
conn.subscribe('/queue/orders', id=1)

Multiple Listeners

import stomp

conn = stomp.Connection([('localhost', 61613)])

# Add statistics tracking
stats = stomp.StatsListener()
conn.set_listener('stats', stats)

# Add debug printing
debug = stomp.PrintingListener(print_to_log=True)
conn.set_listener('debug', debug)

# Add custom business logic
class BusinessLogic(stomp.ConnectionListener):
    def on_message(self, frame):
        # Business processing
        pass

business = BusinessLogic()
conn.set_listener('business', business)

conn.connect('user', 'pass', wait=True)

# Check statistics
print(f"Messages received: {stats.messages}")
print(f"Errors: {stats.errors}")

Synchronous Operations

import stomp
import uuid

conn = stomp.Connection([('localhost', 61613)])

# Create waiting listener for receipt
receipt_id = str(uuid.uuid4())
waiter = stomp.WaitingListener(receipt_id)
conn.set_listener('waiter', waiter)

conn.connect('user', 'pass', wait=True)

# Send message with receipt and wait for confirmation
conn.send(
    body='Important message',
    destination='/queue/critical',
    receipt=receipt_id
)

# Wait for receipt confirmation
if waiter.wait_on_receipt(timeout=30):
    print("Message confirmed delivered")
else:
    print("Delivery confirmation timeout")

conn.disconnect()

Error Handling

import stomp

class ErrorHandler(stomp.ConnectionListener):
    def on_error(self, frame):
        error_msg = frame.body
        error_code = frame.headers.get('code', 'unknown')
        
        print(f"STOMP Error {error_code}: {error_msg}")
        
        # Handle specific error types
        if error_code == 'AUTHORIZATION_FAILED':
            self.handle_auth_error()
        elif error_code == 'DESTINATION_NOT_FOUND':
            self.handle_destination_error()
        else:
            self.handle_generic_error(error_code, error_msg)
    
    def on_heartbeat_timeout(self):
        print("Heartbeat timeout - connection may be lost")
        # Trigger reconnection logic
    
    def handle_auth_error(self):
        # Authentication error handling
        pass
    
    def handle_destination_error(self):
        # Destination error handling
        pass
    
    def handle_generic_error(self, code, message):
        # Generic error handling
        pass

conn = stomp.Connection([('localhost', 61613)])
error_handler = ErrorHandler()
conn.set_listener('error_handler', error_handler)

Install with Tessl CLI

npx tessl i tessl/pypi-stomp-py

docs

adapters.md

cli.md

connections.md

index.md

listeners.md

protocol.md

types.md

utilities.md

websocket.md

tile.json