Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
Listener-based event system for handling connection events, message delivery, error conditions, and protocol-specific events with built-in listeners for common use cases like statistics tracking, debugging, and synchronous operations.
Abstract base class defining the event handling interface for all connection events.
class ConnectionListener:
def on_connecting(self, host_and_port):
"""
Called when TCP connection is established.
Parameters:
- host_and_port: tuple, (hostname, port) of connected broker
"""
def on_connected(self, frame):
"""
Called when STOMP connection is established.
Parameters:
- frame: Frame, CONNECTED frame from broker
"""
def on_disconnecting(self):
"""
Called before DISCONNECT frame is sent.
"""
def on_disconnected(self):
"""
Called when connection is lost or closed.
"""
def on_heartbeat_timeout(self):
"""
Called when heartbeat timeout occurs.
"""
def on_before_message(self, frame):
"""
Called before message processing.
Parameters:
- frame: Frame, message frame
Returns:
tuple: (headers, body) for processing, or None to skip
"""
def on_message(self, frame):
"""
Called when message is received.
Parameters:
- frame: Frame, message frame with headers and body
"""
def on_receipt(self, frame):
"""
Called when receipt confirmation is received.
Parameters:
- frame: Frame, receipt frame
"""
def on_error(self, frame):
"""
Called when error frame is received.
Parameters:
- frame: Frame, error frame with error details
"""
def on_send(self, frame):
"""
Called when frame is sent to broker.
Parameters:
- frame: Frame, outgoing frame
"""
def on_heartbeat(self):
"""
Called when heartbeat is received.
"""
def on_receiver_loop_completed(self, frame):
"""
Called when receiver loop completes.
Parameters:
- frame: Frame, final frame when receiver loop completes
"""Tracks connection statistics and metrics for monitoring and debugging.
class StatsListener(ConnectionListener):
def __init__(self):
"""Initialize statistics tracking listener."""
self.errors = 0
self.connections = 0
self.disconnects = 0
self.messages = 0
self.messages_sent = 0
self.heartbeat_timeouts = 0
self.heartbeat_count = 0
def __str__(self) -> str:
"""
Get formatted statistics summary.
Returns:
str: formatted statistics
"""
def on_connecting(self, host_and_port):
"""Increment connection counter."""
def on_disconnected(self):
"""Increment disconnect counter."""
def on_message(self, frame):
"""Increment message received counter."""
def on_send(self, frame):
"""Increment message sent counter."""
def on_error(self, frame):
"""Increment error counter."""
def on_heartbeat_timeout(self):
"""Increment heartbeat timeout counter."""
def on_heartbeat(self):
"""Increment heartbeat received counter."""Synchronous listener that waits for specific events like receipts or disconnection.
class WaitingListener(ConnectionListener):
def __init__(self, receipt):
"""
Initialize waiting listener for specific receipt.
Parameters:
- receipt: str, receipt ID to wait for
"""
def wait_on_receipt(self, timeout=10):
"""
Wait for receipt confirmation.
Parameters:
- timeout: float, timeout in seconds
Returns:
bool: True if receipt received, False if timeout
"""
def wait_on_disconnected(self, timeout=10):
"""
Wait for disconnection event.
Parameters:
- timeout: float, timeout in seconds
Returns:
bool: True if disconnected, False if timeout
"""
def on_receipt(self, frame):
"""Handle receipt and signal waiting threads."""
def on_disconnected(self):
"""Handle disconnection and signal waiting threads."""Debug listener that prints all connection events to console or log.
class PrintingListener(ConnectionListener):
def __init__(self, print_to_log=False):
"""
Initialize printing listener.
Parameters:
- print_to_log: bool, print to log instead of stdout
"""
def on_connecting(self, host_and_port):
"""Print connecting event."""
def on_connected(self, frame):
"""Print connected event with frame details."""
def on_disconnecting(self):
"""Print disconnecting event."""
def on_disconnected(self):
"""Print disconnected event."""
def on_message(self, frame):
"""Print received message details."""
def on_error(self, frame):
"""Print error details."""
def on_send(self, frame):
"""Print sent frame details."""
def on_receipt(self, frame):
"""Print receipt confirmation."""
def on_heartbeat(self):
"""Print heartbeat event."""
def on_heartbeat_timeout(self):
"""Print heartbeat timeout."""Combined listener for testing that includes statistics, waiting, and printing functionality.
class TestListener(StatsListener, WaitingListener, PrintingListener):
def __init__(self, receipt=None, print_to_log=True):
"""
Initialize test listener with combined functionality.
Parameters:
- receipt: str, receipt ID to wait for
- print_to_log: bool, print events to log
"""
self.message_list = []
self.timestamp = None
def wait_for_message(self, timeout=10):
"""
Wait for next message.
Parameters:
- timeout: float, timeout in seconds
Returns:
Frame: received message frame or None if timeout
"""
def get_latest_message(self):
"""
Get most recently received message.
Returns:
Frame: latest message frame or None if no messages
"""
def wait_for_heartbeat(self, timeout=10):
"""
Wait for heartbeat.
Parameters:
- timeout: float, timeout in seconds
Returns:
bool: True if heartbeat received, False if timeout
"""
def on_message(self, frame):
"""Store message in list and update timestamp."""Internal listener for managing STOMP heartbeat functionality.
class HeartbeatListener(ConnectionListener):
def __init__(self, transport, heartbeats, heart_beat_receive_scale=1.5):
"""
Initialize heartbeat management.
Parameters:
- transport: Transport, connection transport
- heartbeats: tuple, (send_ms, receive_ms) heartbeat intervals
- heart_beat_receive_scale: float, receive timeout scale factor
"""
def on_connected(self, frame):
"""Start heartbeat timers after connection."""
def on_disconnected(self):
"""Stop heartbeat timers on disconnection."""
def on_heartbeat(self):
"""Reset receive heartbeat timer."""
def on_send(self, frame):
"""Update send heartbeat timer."""import stomp
class MessageHandler(stomp.ConnectionListener):
def __init__(self):
self.processed_count = 0
def on_message(self, frame):
print(f"Processing message: {frame.body}")
# Process message based on headers
if frame.headers.get('type') == 'order':
self.process_order(frame.body)
elif frame.headers.get('type') == 'notification':
self.send_notification(frame.body)
self.processed_count += 1
def on_error(self, frame):
print(f"Error occurred: {frame.body}")
# Handle error conditions
def process_order(self, order_data):
# Order processing logic
pass
def send_notification(self, notification_data):
# Notification logic
pass
# Use custom handler
conn = stomp.Connection([('localhost', 61613)])
handler = MessageHandler()
conn.set_listener('message_handler', handler)
conn.connect('user', 'pass', wait=True)
conn.subscribe('/queue/orders', id=1)import stomp
conn = stomp.Connection([('localhost', 61613)])
# Add statistics tracking
stats = stomp.StatsListener()
conn.set_listener('stats', stats)
# Add debug printing
debug = stomp.PrintingListener(print_to_log=True)
conn.set_listener('debug', debug)
# Add custom business logic
class BusinessLogic(stomp.ConnectionListener):
def on_message(self, frame):
# Business processing
pass
business = BusinessLogic()
conn.set_listener('business', business)
conn.connect('user', 'pass', wait=True)
# Check statistics
print(f"Messages received: {stats.messages}")
print(f"Errors: {stats.errors}")import stomp
import uuid
conn = stomp.Connection([('localhost', 61613)])
# Create waiting listener for receipt
receipt_id = str(uuid.uuid4())
waiter = stomp.WaitingListener(receipt_id)
conn.set_listener('waiter', waiter)
conn.connect('user', 'pass', wait=True)
# Send message with receipt and wait for confirmation
conn.send(
body='Important message',
destination='/queue/critical',
receipt=receipt_id
)
# Wait for receipt confirmation
if waiter.wait_on_receipt(timeout=30):
print("Message confirmed delivered")
else:
print("Delivery confirmation timeout")
conn.disconnect()import stomp
class ErrorHandler(stomp.ConnectionListener):
def on_error(self, frame):
error_msg = frame.body
error_code = frame.headers.get('code', 'unknown')
print(f"STOMP Error {error_code}: {error_msg}")
# Handle specific error types
if error_code == 'AUTHORIZATION_FAILED':
self.handle_auth_error()
elif error_code == 'DESTINATION_NOT_FOUND':
self.handle_destination_error()
else:
self.handle_generic_error(error_code, error_msg)
def on_heartbeat_timeout(self):
print("Heartbeat timeout - connection may be lost")
# Trigger reconnection logic
def handle_auth_error(self):
# Authentication error handling
pass
def handle_destination_error(self):
# Destination error handling
pass
def handle_generic_error(self, code, message):
# Generic error handling
pass
conn = stomp.Connection([('localhost', 61613)])
error_handler = ErrorHandler()
conn.set_listener('error_handler', error_handler)Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py