Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
Core types, exception hierarchy, constants, and utility functions for comprehensive error handling, logging, and STOMP protocol compliance across all library components.
Comprehensive exception classes for different error conditions in STOMP operations.
class StompException(Exception):
"""
Base exception for all stomp.py related errors.
All stomp.py exceptions inherit from this base class.
"""
class ConnectionClosedException(StompException):
"""
Raised when connection is closed by the server.
Typically occurs when:
- Server shuts down gracefully
- Connection idle timeout
- Server-side connection limits reached
"""
class NotConnectedException(StompException):
"""
Raised when operation attempted while not connected.
Occurs when trying to:
- Send messages without connection
- Subscribe without connection
- Perform operations on disconnected client
"""
class ConnectFailedException(StompException):
"""
Raised when connection attempts exceed maximum retries.
Indicates:
- Network connectivity issues
- Authentication failures
- Server unavailability after max retry attempts
"""
class InterruptedException(StompException):
"""
Raised when data read operation is interrupted.
Occurs during:
- Thread interruption during I/O
- Socket read interruption
- Graceful shutdown scenarios
"""Essential data structures for STOMP frame handling and message processing.
class Frame:
"""
STOMP frame representation containing command, headers, and body.
Attributes:
- cmd: str, STOMP command (CONNECT, SEND, MESSAGE, etc.)
- headers: dict, frame headers as key-value pairs
- body: str or bytes, frame body content
"""
def __init__(self, cmd=None, headers=None, body=None):
"""
Initialize STOMP frame.
Parameters:
- cmd: str, STOMP command
- headers: dict, frame headers
- body: str or bytes, frame body
"""
self.cmd = cmd
self.headers = headers or {}
self.body = body
def __str__(self) -> str:
"""
String representation of frame.
Returns:
str: formatted frame representation
"""STOMP protocol constants for commands, headers, and response codes.
# STOMP Commands
CONNECT = "CONNECT"
STOMP = "STOMP" # STOMP 1.2 connection command
CONNECTED = "CONNECTED"
DISCONNECT = "DISCONNECT"
SEND = "SEND"
SUBSCRIBE = "SUBSCRIBE"
UNSUBSCRIBE = "UNSUBSCRIBE"
ACK = "ACK"
NACK = "NACK" # STOMP 1.1+
BEGIN = "BEGIN"
COMMIT = "COMMIT"
ABORT = "ABORT"
MESSAGE = "MESSAGE"
RECEIPT = "RECEIPT"
ERROR = "ERROR"
# Standard Headers
CONTENT_LENGTH = "content-length"
CONTENT_TYPE = "content-type"
DESTINATION = "destination"
MESSAGE_ID = "message-id"
SUBSCRIPTION = "subscription"
TRANSACTION = "transaction"
RECEIPT_ID = "receipt-id"
ACK_MODE = "ack"
HOST = "host"
VERSION = "version"
HEARTBEAT = "heart-beat"
SESSION = "session"
SERVER = "server"
# Acknowledgment Modes
ACK_AUTO = "auto"
ACK_CLIENT = "client"
ACK_CLIENT_INDIVIDUAL = "client-individual"
# Protocol Versions
PROTOCOL_10 = "1.0"
PROTOCOL_11 = "1.1"
PROTOCOL_12 = "1.2"
# STOMP Commands
CMD_CONNECT = "CONNECT"
CMD_STOMP = "STOMP"
CMD_DISCONNECT = "DISCONNECT"
CMD_SEND = "SEND"
CMD_SUBSCRIBE = "SUBSCRIBE"
CMD_UNSUBSCRIBE = "UNSUBSCRIBE"
CMD_ACK = "ACK"
CMD_NACK = "NACK"
CMD_BEGIN = "BEGIN"
CMD_COMMIT = "COMMIT"
CMD_ABORT = "ABORT"
# STOMP Headers
HDR_ACCEPT_VERSION = "accept-version"
HDR_HOST = "host"
HDR_LOGIN = "login"
HDR_PASSCODE = "passcode"
HDR_HEARTBEAT = "heart-beat"
HDR_DESTINATION = "destination"
HDR_CONTENT_TYPE = "content-type"
HDR_CONTENT_LENGTH = "content-length"
HDR_MESSAGE_ID = "message-id"
HDR_SUBSCRIPTION = "subscription"
HDR_ACK = "ack"
HDR_ID = "id"
HDR_TRANSACTION = "transaction"
HDR_RECEIPT = "receipt"Core utility functions for frame processing, encoding, and protocol operations.
def parse_frame(data):
"""
Parse byte data into STOMP frame.
Parameters:
- data: bytes, raw frame data
Returns:
Frame: parsed STOMP frame
"""
def convert_frame(frame):
"""
Convert Frame object to byte representation.
Parameters:
- frame: Frame, frame to convert
Returns:
bytes: frame as byte sequence
"""
def parse_headers(lines):
"""
Parse header lines into dictionary.
Parameters:
- lines: list, header lines
Returns:
dict: parsed headers
"""
def encode(char):
"""
Encode string to bytes using UTF-8.
Parameters:
- char: str, string to encode
Returns:
bytes: encoded bytes
"""
def decode(byte_data):
"""
Decode bytes to string using UTF-8.
Parameters:
- byte_data: bytes, data to decode
Returns:
str: decoded string
"""
def merge_headers(headers1, headers2, *additional_headers):
"""
Merge multiple header dictionaries.
Parameters:
- headers1: dict, first header set
- headers2: dict, second header set
- *additional_headers: additional header dictionaries
Returns:
dict: merged headers
"""
def clean_headers(headers):
"""
Clean headers for logging (remove sensitive data).
Parameters:
- headers: dict, headers to clean
Returns:
dict: headers with sensitive values removed
"""
def get_uuid():
"""
Generate unique identifier.
Returns:
str: UUID string
"""
def calculate_heartbeats(heartbeat_tuple, server_heartbeats):
"""
Calculate negotiated heartbeat intervals.
Parameters:
- heartbeat_tuple: tuple, client (send_ms, receive_ms)
- server_heartbeats: tuple, server (send_ms, receive_ms)
Returns:
tuple: negotiated (send_ms, receive_ms)
"""
def is_localhost(host):
"""
Check if host is localhost.
Parameters:
- host: str, hostname to check
Returns:
bool: True if localhost, False otherwise
"""
def length(obj):
"""
Null-safe length function.
Parameters:
- obj: any, object to measure
Returns:
int: length or 0 if None
"""Configurable logging system for debugging and monitoring.
def debug(msg, *args):
"""
Log debug message.
Parameters:
- msg: str, debug message
- *args: additional formatting arguments
"""
def info(msg, *args):
"""
Log info message.
Parameters:
- msg: str, info message
- *args: additional formatting arguments
"""
def warning(msg, *args):
"""
Log warning message.
Parameters:
- msg: str, warning message
- *args: additional formatting arguments
"""
def error(msg, *args):
"""
Log error message.
Parameters:
- msg: str, error message
- *args: additional formatting arguments
"""
def setLevel(level):
"""
Set logging level.
Parameters:
- level: int, logging level (DEBUG, INFO, WARNING, ERROR)
"""
def isEnabledFor(level):
"""
Check if logging level is enabled.
Parameters:
- level: int, logging level to check
Returns:
bool: True if level is enabled
"""
def log_to_stdout():
"""
Configure logging to output to stdout.
"""
# Logging levels
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40ANSI color codes for terminal output formatting.
GREEN = '\033[92m' # Green text
RED = '\033[91m' # Red text
BOLD = '\033[1m' # Bold text
NO_COLOUR = '\033[0m' # Reset formattingCommon constants and patterns used throughout the library.
NULL = b'\x00' # Null byte terminator
LOCALHOST_NAMES = ['localhost', '127.0.0.1', '::1'] # Local host identifiers
# Regular expressions for frame parsing
HEADER_LINE_RE = re.compile(b'^([^:]+):(.*)$') # Header line pattern
PREAMBLE_END_RE = re.compile(b'\r?\n\r?\n') # Preamble end pattern
LINE_END_RE = re.compile(b'\r?\n') # Line end pattern
# Special frames
HEARTBEAT_FRAME = Frame(cmd=None, headers={}, body=None) # Heartbeat frameimport stomp
from stomp.exception import *
try:
conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'wrong_password', wait=True)
except ConnectFailedException as e:
print(f"Failed to connect after retries: {e}")
# Handle connection failure
except NotConnectedException as e:
print(f"Operation attempted while disconnected: {e}")
# Handle disconnected state
except ConnectionClosedException as e:
print(f"Connection closed by server: {e}")
# Handle server-side disconnection
except StompException as e:
print(f"General STOMP error: {e}")
# Handle any other STOMP-related error
except Exception as e:
print(f"Unexpected error: {e}")
# Handle non-STOMP errorsimport stomp
from stomp.utils import Frame, parse_frame, convert_frame
# Create custom frame
custom_frame = Frame(
cmd='SEND',
headers={
'destination': '/queue/custom',
'content-type': 'application/json',
'custom-header': 'custom-value'
},
body='{"message": "custom data"}'
)
# Convert to bytes for transmission
frame_bytes = convert_frame(custom_frame)
print(f"Frame bytes: {frame_bytes}")
# Parse frame from bytes
parsed_frame = parse_frame(frame_bytes)
print(f"Parsed command: {parsed_frame.cmd}")
print(f"Parsed headers: {parsed_frame.headers}")
print(f"Parsed body: {parsed_frame.body}")
# Use with connection
conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'password', wait=True)
conn.send_frame(custom_frame)
conn.disconnect()import stomp
import stomp.logging as stomp_logging
# Configure logging level
stomp_logging.setLevel(stomp_logging.DEBUG)
# Enable stdout logging
stomp_logging.log_to_stdout()
# Check logging levels
if stomp_logging.isEnabledFor(stomp_logging.DEBUG):
stomp_logging.debug("Debug logging enabled")
# Use in application
class LoggingListener(stomp.ConnectionListener):
def on_connecting(self, host_and_port):
stomp_logging.info(f"Connecting to {host_and_port}")
def on_connected(self, frame):
stomp_logging.info("Successfully connected")
def on_message(self, frame):
stomp_logging.debug(f"Received message: {frame.body}")
def on_error(self, frame):
stomp_logging.error(f"Error occurred: {frame.body}")
def on_disconnected(self):
stomp_logging.warning("Connection lost")
conn = stomp.Connection([('localhost', 61613)])
logger = LoggingListener()
conn.set_listener('logger', logger)
conn.connect('user', 'password', wait=True)import stomp
from stomp.utils import *
# Generate unique identifiers
message_id = get_uuid()
transaction_id = get_uuid()
# Header manipulation
headers1 = {'content-type': 'text/plain', 'priority': '5'}
headers2 = {'correlation-id': message_id, 'reply-to': '/queue/replies'}
merged = merge_headers(headers1, headers2)
# Clean sensitive headers for logging
sensitive_headers = {'login': 'user', 'passcode': 'secret', 'destination': '/queue/test'}
clean = clean_headers(sensitive_headers)
print(f"Safe to log: {clean}") # Passwords removed
# Host checking
if is_localhost('127.0.0.1'):
print("Using localhost connection")
# Length checking with null safety
safe_length = length(None) # Returns 0
string_length = length("hello") # Returns 5
# Heartbeat calculation
client_heartbeats = (10000, 10000) # 10 second intervals
server_heartbeats = (5000, 15000) # Server prefers different intervals
negotiated = calculate_heartbeats(client_heartbeats, server_heartbeats)
print(f"Negotiated heartbeats: {negotiated}")import stomp
from stomp.exception import *
import time
class RobustConnection:
def __init__(self, hosts, max_retries=3):
self.hosts = hosts
self.max_retries = max_retries
self.connection = None
self.connected = False
def connect_with_retry(self, username, password):
"""Connect with automatic retry logic."""
retry_count = 0
while retry_count < self.max_retries:
try:
self.connection = stomp.Connection(self.hosts)
self.connection.connect(username, password, wait=True)
self.connected = True
print("Connection established")
return True
except ConnectFailedException as e:
retry_count += 1
print(f"Connection attempt {retry_count} failed: {e}")
if retry_count < self.max_retries:
time.sleep(2 ** retry_count) # Exponential backoff
except NotConnectedException as e:
print(f"Connection not established: {e}")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
print(f"Failed to connect after {self.max_retries} attempts")
return False
def send_with_retry(self, body, destination, max_retries=3):
"""Send message with retry on failure."""
for attempt in range(max_retries):
try:
if not self.connected:
raise NotConnectedException("Not connected to broker")
self.connection.send(body=body, destination=destination)
return True
except ConnectionClosedException:
print("Connection closed, attempting reconnect...")
self.connected = False
if self.connect_with_retry('user', 'password'):
continue # Retry send
else:
break
except NotConnectedException:
print("Not connected, attempting reconnect...")
if self.connect_with_retry('user', 'password'):
continue # Retry send
else:
break
except Exception as e:
print(f"Send attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(1)
return False
def disconnect(self):
"""Safe disconnect with error handling."""
try:
if self.connection and self.connected:
self.connection.disconnect()
print("Disconnected successfully")
except Exception as e:
print(f"Error during disconnect: {e}")
finally:
self.connected = False
# Usage
robust_conn = RobustConnection([('localhost', 61613), ('backup', 61613)])
if robust_conn.connect_with_retry('user', 'password'):
# Send messages with automatic retry
robust_conn.send_with_retry('Hello World', '/queue/test')
# Clean disconnect
robust_conn.disconnect()Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py