CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stomp-py

Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol

Overview
Eval results
Files

utilities.mddocs/

Utilities and Helpers

Core utility functions, constants, logging configuration, and helper methods that support stomp.py's internal operations and provide convenient functionality for advanced use cases.

Capabilities

Frame Processing Utilities

Low-level frame manipulation and conversion functions for custom protocol handling.

def convert_frame(frame):
    """
    Convert Frame object to transmission format.
    
    Parameters:
    - frame: Frame, STOMP frame object to convert
    
    Returns:
    bytes: encoded frame ready for network transmission
    
    Handles:
    - Header encoding and escaping
    - Body encoding with proper content-length
    - Protocol version specific formatting
    """

def parse_headers(lines, offset=0):
    """
    Parse STOMP headers from frame lines.
    
    Parameters:
    - lines: list, frame lines to parse
    - offset: int, starting line offset for headers
    
    Returns:
    dict: parsed headers as key-value pairs
    
    Handles:
    - Header unescaping per protocol version
    - Duplicate header handling
    - Invalid header format detection
    """

def pack(pieces):
    """
    Join byte sequences efficiently.
    
    Parameters:
    - pieces: iterable, sequence of bytes objects
    
    Returns:
    bytes: concatenated byte sequence
    
    Optimized for frame assembly and network I/O.
    """

def join(chars):
    """
    Join character sequences efficiently.
    
    Parameters:
    - chars: iterable, sequence of character strings
    
    Returns:
    str: concatenated string
    
    Optimized for header and command processing.
    """

Threading and Execution Utilities

Thread management and callback execution helpers for customizing stomp.py's concurrency model.

def default_create_thread(callback):
    """
    Default thread creation function for receiver loops.
    
    Parameters:
    - callback: callable, function to execute in new thread
    
    Returns:
    Thread: started daemon thread
    
    Creates daemon threads for background operations:
    - Message receiver loops
    - Heartbeat timers
    - Connection monitoring
    """

def is_eol_default(line):
    """
    Default end-of-line detection for frame parsing.
    
    Parameters:
    - line: bytes, line to check for end-of-line marker
    
    Returns:
    bool: True if line represents end-of-line
    
    Protocol-agnostic EOL detection for frame boundaries.
    """

def calculate_heartbeats(send_heartbeat, client_heartbeat):
    """
    Calculate negotiated heartbeat intervals between client and server.
    
    Parameters:
    - send_heartbeat: int, client's desired send interval (ms)
    - client_heartbeat: int, client's desired receive interval (ms)
    
    Returns:
    tuple: (negotiated_send_ms, negotiated_receive_ms)
    
    Implements STOMP heartbeat negotiation algorithm:
    - 0 means no heartbeat desired
    - Non-zero values are negotiated to maximum of client/server desires
    """

def get_errno(exception):
    """
    Extract errno from socket or OS exception.
    
    Parameters:
    - exception: Exception, OS or socket exception
    
    Returns:
    int: errno value or None if not available
    
    Cross-platform errno extraction for error handling.
    """

Protocol Constants

STOMP protocol constants for commands, headers, and response codes.

# STOMP Commands
CONNECT = "CONNECT"
STOMP = "STOMP"  # STOMP 1.2 connection command
CONNECTED = "CONNECTED"
DISCONNECT = "DISCONNECT"
SEND = "SEND"
SUBSCRIBE = "SUBSCRIBE"
UNSUBSCRIBE = "UNSUBSCRIBE"
ACK = "ACK"
NACK = "NACK"  # STOMP 1.1+
BEGIN = "BEGIN"
COMMIT = "COMMIT"
ABORT = "ABORT"
MESSAGE = "MESSAGE"
RECEIPT = "RECEIPT"
ERROR = "ERROR"

# Standard Headers
CONTENT_LENGTH = "content-length"
CONTENT_TYPE = "content-type"
DESTINATION = "destination"
MESSAGE_ID = "message-id"
SUBSCRIPTION = "subscription"
TRANSACTION = "transaction"
RECEIPT_ID = "receipt-id"
ACK_MODE = "ack"
HOST = "host"
VERSION = "version"
HEARTBEAT = "heart-beat"
SESSION = "session"
SERVER = "server"

# Acknowledgment Modes
ACK_AUTO = "auto"
ACK_CLIENT = "client"
ACK_CLIENT_INDIVIDUAL = "client-individual"

# Protocol Versions
PROTOCOL_10 = "1.0"
PROTOCOL_11 = "1.1"
PROTOCOL_12 = "1.2"

Logging System

Configurable logging setup for debugging and monitoring stomp.py operations.

def log_to_stdout(verbose_logging=True):
    """
    Configure logging output to stdout with optional verbosity.
    
    Parameters:
    - verbose_logging: bool, enable detailed debug logging
    
    When verbose_logging=True:
    - Shows all frame transmissions
    - Logs connection state changes
    - Reports heartbeat activity
    - Displays protocol negotiation details
    
    When verbose_logging=False:
    - Shows only errors and warnings
    - Logs connection events
    - Reports critical failures
    """

def log_to_file(filename, verbose_logging=True):
    """
    Configure logging output to file.
    
    Parameters:
    - filename: str, path to log file
    - verbose_logging: bool, enable detailed debug logging
    
    Creates rotating log file with:
    - Timestamped entries
    - Thread identification
    - Structured frame logging
    - Error stack traces
    """

def get_logger(name):
    """
    Get named logger for stomp.py components.
    
    Parameters:
    - name: str, logger name (e.g., 'stomp.connection', 'stomp.transport')
    
    Returns:
    Logger: configured logger instance
    
    Available loggers:
    - stomp.connection: connection lifecycle events
    - stomp.transport: low-level transport operations
    - stomp.protocol: STOMP protocol processing
    - stomp.heartbeat: heartbeat management
    - stomp.listener: listener callback execution
    """

Color Constants

Terminal color constants for CLI output formatting.

# ANSI Color Codes
GREEN = "\33[32m"
RED = "\33[31m"
YELLOW = "\33[33m"
BLUE = "\33[34m"
MAGENTA = "\33[35m"
CYAN = "\33[36m"
WHITE = "\33[37m"

# Text Formatting
BOLD = "\33[1m"
UNDERLINE = "\33[4m"
ITALIC = "\33[3m"

# Reset
NO_COLOUR = "\33[0m"
RESET = NO_COLOUR

def colorize(text, color):
    """
    Apply color formatting to text.
    
    Parameters:
    - text: str, text to colorize
    - color: str, color constant (GREEN, RED, etc.)
    
    Returns:
    str: colorized text with reset code
    
    Automatically handles color reset to prevent bleeding.
    """

Testing Utilities

Helper functions and classes for testing STOMP applications.

class TestListener(StatsListener, WaitingListener, PrintingListener):
    """
    Combined listener for testing with message queuing and statistics.
    
    Inherits from:
    - StatsListener: connection statistics tracking
    - WaitingListener: synchronous wait operations  
    - PrintingListener: debug output functionality
    """
    
    def __init__(self, receipt=None, print_to_log=True):
        """
        Initialize test listener with combined functionality.
        
        Parameters:
        - receipt: str, receipt ID to wait for (optional)
        - print_to_log: bool, print events to log instead of stdout
        """
        self.message_list = []
        self.timestamp = None
    
    def wait_for_message(self, timeout=10):
        """
        Wait for next message with timeout.
        
        Parameters:
        - timeout: float, maximum wait time in seconds
        
        Returns:
        Frame: received message frame or None if timeout
        
        Blocks until message arrives or timeout expires.
        """
    
    def get_latest_message(self):
        """
        Get most recently received message.
        
        Returns:
        Frame: latest message frame or None if no messages
        
        Non-blocking access to latest message for assertions.
        """
    
    def wait_for_heartbeat(self, timeout=10):
        """
        Wait for heartbeat with timeout.
        
        Parameters:
        - timeout: float, maximum wait time in seconds
        
        Returns:
        bool: True if heartbeat received, False if timeout
        
        Useful for testing heartbeat negotiation and timing.
        """
    
    def clear_messages(self):
        """
        Clear stored message list.
        
        Resets message_list to empty for fresh test runs.
        """

def create_test_connection(host='localhost', port=61613, **kwargs):
    """
    Create connection configured for testing.
    
    Parameters:
    - host: str, test broker hostname
    - port: int, test broker port
    - **kwargs: additional connection parameters
    
    Returns:
    Connection: configured test connection
    
    Pre-configured with:
    - Short timeouts for fast test execution
    - Reduced retry attempts
    - Test-friendly heartbeat settings
    """

Usage Examples

Custom Frame Processing

import stomp
from stomp.utils import convert_frame, parse_headers

# Custom frame manipulation
class CustomProtocolHandler(stomp.ConnectionListener):
    def on_send(self, frame):
        # Intercept outgoing frames for custom processing
        raw_frame = convert_frame(frame)
        print(f"Sending raw frame: {raw_frame}")
        
        # Could modify frame here before transmission
        return frame
    
    def on_message(self, frame):
        # Parse custom headers
        custom_headers = parse_headers([
            "custom-property:value1",
            "app-specific:value2"
        ])
        
        print(f"Custom headers: {custom_headers}")

Advanced Logging Configuration

import stomp
import stomp.logging as logging

# Enable detailed logging
logging.log_to_stdout(verbose_logging=True)

# Create connection with full logging
conn = stomp.Connection([('broker.com', 61613)])

# Get specific logger for custom output
transport_logger = logging.get_logger('stomp.transport')
transport_logger.info("Custom transport message")

conn.connect('user', 'pass', wait=True)
# Will log all frame exchanges, heartbeats, etc.

Testing Framework Integration

import stomp
from stomp.utils import TestListener, create_test_connection
import unittest

class STOMPIntegrationTest(unittest.TestCase):
    def setUp(self):
        # Create test connection with fast timeouts
        self.conn = create_test_connection(
            timeout=5.0,
            reconnect_attempts_max=1
        )
        
        # Setup test listener
        self.test_listener = TestListener(print_to_log=True)
        self.conn.set_listener('test', self.test_listener)
        
        self.conn.connect('testuser', 'testpass', wait=True)
    
    def test_message_exchange(self):
        # Subscribe and wait for subscription
        self.conn.subscribe('/queue/test', id='test-sub')
        
        # Send test message
        self.conn.send(
            body='test message',
            destination='/queue/test'
        )
        
        # Wait for message with timeout
        message = self.test_listener.wait_for_message(timeout=5.0)
        self.assertIsNotNone(message)
        self.assertEqual(message.body, 'test message')
        
        # Check statistics
        stats = str(self.test_listener)
        self.assertIn('messages: 1', stats.lower())
    
    def tearDown(self):
        self.conn.disconnect()

Custom Threading Integration

import stomp
from stomp.utils import default_create_thread
import threading
import queue

# Custom thread pool for stomp.py operations
class ThreadPoolCreator:
    def __init__(self, pool_size=5):
        self.pool = []
        self.task_queue = queue.Queue()
        
        # Create worker threads
        for _ in range(pool_size):
            worker = threading.Thread(target=self._worker, daemon=True)
            worker.start()
            self.pool.append(worker)
    
    def _worker(self):
        while True:
            task = self.task_queue.get()
            if task is None:
                break
            try:
                task()
            except Exception as e:
                print(f"Task error: {e}")
            finally:
                self.task_queue.task_done()
    
    def create_thread(self, callback):
        # Instead of creating new thread, queue the task
        self.task_queue.put(callback)
        return threading.current_thread()  # Return dummy thread

# Use custom thread creator
thread_creator = ThreadPoolCreator(pool_size=3)
conn = stomp.Connection([('broker.com', 61613)])
conn.override_threading(thread_creator.create_thread)

# Now all stomp.py background tasks use the thread pool
conn.connect('user', 'pass', wait=True)

Heartbeat Calculation

from stomp.utils import calculate_heartbeats

# Client wants to send heartbeats every 10 seconds
# Client wants to receive heartbeats every 15 seconds
client_send = 10000  # ms
client_receive = 15000  # ms

# Server advertises it can send every 5 seconds  
# Server wants to receive every 8 seconds
server_send = 5000  # ms
server_receive = 8000  # ms

# Calculate negotiated heartbeats
negotiated = calculate_heartbeats(
    max(client_send, server_receive),  # Send interval
    max(client_receive, server_send)   # Receive interval
)

print(f"Negotiated heartbeats: {negotiated}")
# Result: (10000, 15000) - most restrictive wins

Protocol Version Handling

import stomp
from stomp.utils import PROTOCOL_10, PROTOCOL_11, PROTOCOL_12

def create_version_specific_connection(version):
    """Create connection for specific STOMP version"""
    
    if version == PROTOCOL_10:
        return stomp.Connection10([('broker.com', 61613)])
    elif version == PROTOCOL_11:
        return stomp.Connection11([('broker.com', 61613)])
    elif version == PROTOCOL_12:
        return stomp.Connection12([('broker.com', 61613)])
    else:
        raise ValueError(f"Unsupported version: {version}")

# Use version-specific features
conn_12 = create_version_specific_connection(PROTOCOL_12)
conn_12.connect('user', 'pass', wait=True)

# STOMP 1.2 supports enhanced header escaping
conn_12.send(
    body='Message with\nspecial\rcharacters',
    destination='/queue/test',
    headers={'custom-header': 'value\nwith\nlines'}
)

Install with Tessl CLI

npx tessl i tessl/pypi-stomp-py

docs

adapters.md

cli.md

connections.md

index.md

listeners.md

protocol.md

types.md

utilities.md

websocket.md

tile.json