Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
Core utility functions, constants, logging configuration, and helper methods that support stomp.py's internal operations and provide convenient functionality for advanced use cases.
Low-level frame manipulation and conversion functions for custom protocol handling.
def convert_frame(frame):
"""
Convert Frame object to transmission format.
Parameters:
- frame: Frame, STOMP frame object to convert
Returns:
bytes: encoded frame ready for network transmission
Handles:
- Header encoding and escaping
- Body encoding with proper content-length
- Protocol version specific formatting
"""
def parse_headers(lines, offset=0):
"""
Parse STOMP headers from frame lines.
Parameters:
- lines: list, frame lines to parse
- offset: int, starting line offset for headers
Returns:
dict: parsed headers as key-value pairs
Handles:
- Header unescaping per protocol version
- Duplicate header handling
- Invalid header format detection
"""
def pack(pieces):
"""
Join byte sequences efficiently.
Parameters:
- pieces: iterable, sequence of bytes objects
Returns:
bytes: concatenated byte sequence
Optimized for frame assembly and network I/O.
"""
def join(chars):
"""
Join character sequences efficiently.
Parameters:
- chars: iterable, sequence of character strings
Returns:
str: concatenated string
Optimized for header and command processing.
"""Thread management and callback execution helpers for customizing stomp.py's concurrency model.
def default_create_thread(callback):
"""
Default thread creation function for receiver loops.
Parameters:
- callback: callable, function to execute in new thread
Returns:
Thread: started daemon thread
Creates daemon threads for background operations:
- Message receiver loops
- Heartbeat timers
- Connection monitoring
"""
def is_eol_default(line):
"""
Default end-of-line detection for frame parsing.
Parameters:
- line: bytes, line to check for end-of-line marker
Returns:
bool: True if line represents end-of-line
Protocol-agnostic EOL detection for frame boundaries.
"""
def calculate_heartbeats(send_heartbeat, client_heartbeat):
"""
Calculate negotiated heartbeat intervals between client and server.
Parameters:
- send_heartbeat: int, client's desired send interval (ms)
- client_heartbeat: int, client's desired receive interval (ms)
Returns:
tuple: (negotiated_send_ms, negotiated_receive_ms)
Implements STOMP heartbeat negotiation algorithm:
- 0 means no heartbeat desired
- Non-zero values are negotiated to maximum of client/server desires
"""
def get_errno(exception):
"""
Extract errno from socket or OS exception.
Parameters:
- exception: Exception, OS or socket exception
Returns:
int: errno value or None if not available
Cross-platform errno extraction for error handling.
"""STOMP protocol constants for commands, headers, and response codes.
# STOMP Commands
CONNECT = "CONNECT"
STOMP = "STOMP" # STOMP 1.2 connection command
CONNECTED = "CONNECTED"
DISCONNECT = "DISCONNECT"
SEND = "SEND"
SUBSCRIBE = "SUBSCRIBE"
UNSUBSCRIBE = "UNSUBSCRIBE"
ACK = "ACK"
NACK = "NACK" # STOMP 1.1+
BEGIN = "BEGIN"
COMMIT = "COMMIT"
ABORT = "ABORT"
MESSAGE = "MESSAGE"
RECEIPT = "RECEIPT"
ERROR = "ERROR"
# Standard Headers
CONTENT_LENGTH = "content-length"
CONTENT_TYPE = "content-type"
DESTINATION = "destination"
MESSAGE_ID = "message-id"
SUBSCRIPTION = "subscription"
TRANSACTION = "transaction"
RECEIPT_ID = "receipt-id"
ACK_MODE = "ack"
HOST = "host"
VERSION = "version"
HEARTBEAT = "heart-beat"
SESSION = "session"
SERVER = "server"
# Acknowledgment Modes
ACK_AUTO = "auto"
ACK_CLIENT = "client"
ACK_CLIENT_INDIVIDUAL = "client-individual"
# Protocol Versions
PROTOCOL_10 = "1.0"
PROTOCOL_11 = "1.1"
PROTOCOL_12 = "1.2"Configurable logging setup for debugging and monitoring stomp.py operations.
def log_to_stdout(verbose_logging=True):
"""
Configure logging output to stdout with optional verbosity.
Parameters:
- verbose_logging: bool, enable detailed debug logging
When verbose_logging=True:
- Shows all frame transmissions
- Logs connection state changes
- Reports heartbeat activity
- Displays protocol negotiation details
When verbose_logging=False:
- Shows only errors and warnings
- Logs connection events
- Reports critical failures
"""
def log_to_file(filename, verbose_logging=True):
"""
Configure logging output to file.
Parameters:
- filename: str, path to log file
- verbose_logging: bool, enable detailed debug logging
Creates rotating log file with:
- Timestamped entries
- Thread identification
- Structured frame logging
- Error stack traces
"""
def get_logger(name):
"""
Get named logger for stomp.py components.
Parameters:
- name: str, logger name (e.g., 'stomp.connection', 'stomp.transport')
Returns:
Logger: configured logger instance
Available loggers:
- stomp.connection: connection lifecycle events
- stomp.transport: low-level transport operations
- stomp.protocol: STOMP protocol processing
- stomp.heartbeat: heartbeat management
- stomp.listener: listener callback execution
"""Terminal color constants for CLI output formatting.
# ANSI Color Codes
GREEN = "\33[32m"
RED = "\33[31m"
YELLOW = "\33[33m"
BLUE = "\33[34m"
MAGENTA = "\33[35m"
CYAN = "\33[36m"
WHITE = "\33[37m"
# Text Formatting
BOLD = "\33[1m"
UNDERLINE = "\33[4m"
ITALIC = "\33[3m"
# Reset
NO_COLOUR = "\33[0m"
RESET = NO_COLOUR
def colorize(text, color):
"""
Apply color formatting to text.
Parameters:
- text: str, text to colorize
- color: str, color constant (GREEN, RED, etc.)
Returns:
str: colorized text with reset code
Automatically handles color reset to prevent bleeding.
"""Helper functions and classes for testing STOMP applications.
class TestListener(StatsListener, WaitingListener, PrintingListener):
"""
Combined listener for testing with message queuing and statistics.
Inherits from:
- StatsListener: connection statistics tracking
- WaitingListener: synchronous wait operations
- PrintingListener: debug output functionality
"""
def __init__(self, receipt=None, print_to_log=True):
"""
Initialize test listener with combined functionality.
Parameters:
- receipt: str, receipt ID to wait for (optional)
- print_to_log: bool, print events to log instead of stdout
"""
self.message_list = []
self.timestamp = None
def wait_for_message(self, timeout=10):
"""
Wait for next message with timeout.
Parameters:
- timeout: float, maximum wait time in seconds
Returns:
Frame: received message frame or None if timeout
Blocks until message arrives or timeout expires.
"""
def get_latest_message(self):
"""
Get most recently received message.
Returns:
Frame: latest message frame or None if no messages
Non-blocking access to latest message for assertions.
"""
def wait_for_heartbeat(self, timeout=10):
"""
Wait for heartbeat with timeout.
Parameters:
- timeout: float, maximum wait time in seconds
Returns:
bool: True if heartbeat received, False if timeout
Useful for testing heartbeat negotiation and timing.
"""
def clear_messages(self):
"""
Clear stored message list.
Resets message_list to empty for fresh test runs.
"""
def create_test_connection(host='localhost', port=61613, **kwargs):
"""
Create connection configured for testing.
Parameters:
- host: str, test broker hostname
- port: int, test broker port
- **kwargs: additional connection parameters
Returns:
Connection: configured test connection
Pre-configured with:
- Short timeouts for fast test execution
- Reduced retry attempts
- Test-friendly heartbeat settings
"""import stomp
from stomp.utils import convert_frame, parse_headers
# Custom frame manipulation
class CustomProtocolHandler(stomp.ConnectionListener):
def on_send(self, frame):
# Intercept outgoing frames for custom processing
raw_frame = convert_frame(frame)
print(f"Sending raw frame: {raw_frame}")
# Could modify frame here before transmission
return frame
def on_message(self, frame):
# Parse custom headers
custom_headers = parse_headers([
"custom-property:value1",
"app-specific:value2"
])
print(f"Custom headers: {custom_headers}")import stomp
import stomp.logging as logging
# Enable detailed logging
logging.log_to_stdout(verbose_logging=True)
# Create connection with full logging
conn = stomp.Connection([('broker.com', 61613)])
# Get specific logger for custom output
transport_logger = logging.get_logger('stomp.transport')
transport_logger.info("Custom transport message")
conn.connect('user', 'pass', wait=True)
# Will log all frame exchanges, heartbeats, etc.import stomp
from stomp.utils import TestListener, create_test_connection
import unittest
class STOMPIntegrationTest(unittest.TestCase):
def setUp(self):
# Create test connection with fast timeouts
self.conn = create_test_connection(
timeout=5.0,
reconnect_attempts_max=1
)
# Setup test listener
self.test_listener = TestListener(print_to_log=True)
self.conn.set_listener('test', self.test_listener)
self.conn.connect('testuser', 'testpass', wait=True)
def test_message_exchange(self):
# Subscribe and wait for subscription
self.conn.subscribe('/queue/test', id='test-sub')
# Send test message
self.conn.send(
body='test message',
destination='/queue/test'
)
# Wait for message with timeout
message = self.test_listener.wait_for_message(timeout=5.0)
self.assertIsNotNone(message)
self.assertEqual(message.body, 'test message')
# Check statistics
stats = str(self.test_listener)
self.assertIn('messages: 1', stats.lower())
def tearDown(self):
self.conn.disconnect()import stomp
from stomp.utils import default_create_thread
import threading
import queue
# Custom thread pool for stomp.py operations
class ThreadPoolCreator:
def __init__(self, pool_size=5):
self.pool = []
self.task_queue = queue.Queue()
# Create worker threads
for _ in range(pool_size):
worker = threading.Thread(target=self._worker, daemon=True)
worker.start()
self.pool.append(worker)
def _worker(self):
while True:
task = self.task_queue.get()
if task is None:
break
try:
task()
except Exception as e:
print(f"Task error: {e}")
finally:
self.task_queue.task_done()
def create_thread(self, callback):
# Instead of creating new thread, queue the task
self.task_queue.put(callback)
return threading.current_thread() # Return dummy thread
# Use custom thread creator
thread_creator = ThreadPoolCreator(pool_size=3)
conn = stomp.Connection([('broker.com', 61613)])
conn.override_threading(thread_creator.create_thread)
# Now all stomp.py background tasks use the thread pool
conn.connect('user', 'pass', wait=True)from stomp.utils import calculate_heartbeats
# Client wants to send heartbeats every 10 seconds
# Client wants to receive heartbeats every 15 seconds
client_send = 10000 # ms
client_receive = 15000 # ms
# Server advertises it can send every 5 seconds
# Server wants to receive every 8 seconds
server_send = 5000 # ms
server_receive = 8000 # ms
# Calculate negotiated heartbeats
negotiated = calculate_heartbeats(
max(client_send, server_receive), # Send interval
max(client_receive, server_send) # Receive interval
)
print(f"Negotiated heartbeats: {negotiated}")
# Result: (10000, 15000) - most restrictive winsimport stomp
from stomp.utils import PROTOCOL_10, PROTOCOL_11, PROTOCOL_12
def create_version_specific_connection(version):
"""Create connection for specific STOMP version"""
if version == PROTOCOL_10:
return stomp.Connection10([('broker.com', 61613)])
elif version == PROTOCOL_11:
return stomp.Connection11([('broker.com', 61613)])
elif version == PROTOCOL_12:
return stomp.Connection12([('broker.com', 61613)])
else:
raise ValueError(f"Unsupported version: {version}")
# Use version-specific features
conn_12 = create_version_specific_connection(PROTOCOL_12)
conn_12.connect('user', 'pass', wait=True)
# STOMP 1.2 supports enhanced header escaping
conn_12.send(
body='Message with\nspecial\rcharacters',
destination='/queue/test',
headers={'custom-header': 'value\nwith\nlines'}
)Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py