CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stomp-py

Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol

Overview
Eval results
Files

connections.mddocs/

Connection Management

Comprehensive connection classes supporting all STOMP protocol versions (1.0, 1.1, 1.2) with automatic reconnection, SSL/TLS support, heartbeat handling, and connection pooling for robust message broker connectivity.

Capabilities

STOMP 1.1 Connection (Default)

The default connection class supporting STOMP 1.1 protocol with heartbeat negotiation and enhanced error handling.

class Connection:
    def __init__(self, 
                 host_and_ports=None,
                 prefer_localhost=True,
                 try_loopback_connect=True,
                 reconnect_sleep_initial=0.1,
                 reconnect_sleep_increase=0.5,
                 reconnect_sleep_jitter=0.1,
                 reconnect_sleep_max=60.0,
                 reconnect_attempts_max=3,
                 timeout=None,
                 heartbeats=(0, 0),
                 keepalive=None,
                 vhost=None,
                 auto_decode=True,
                 encoding="utf-8",
                 auto_content_length=True,
                 heart_beat_receive_scale=1.5,
                 bind_host_port=None):
        """
        Create STOMP 1.1 connection.
        
        Parameters:
        - host_and_ports: list of tuples, broker addresses [('localhost', 61613)]
        - prefer_localhost: bool, prefer localhost connections
        - try_loopback_connect: bool, try loopback if localhost fails
        - reconnect_sleep_initial: float, initial reconnect delay
        - reconnect_sleep_increase: float, delay increase factor
        - reconnect_sleep_jitter: float, random delay variation
        - reconnect_sleep_max: float, maximum reconnect delay
        - reconnect_attempts_max: int, maximum reconnect attempts
        - timeout: float, socket timeout in seconds
        - heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
        - keepalive: bool, enable TCP keepalive
        - vhost: str, virtual host name
        - auto_decode: bool, automatically decode message bodies
        - encoding: str, text encoding for messages
        - auto_content_length: bool, automatically set content-length header
        - heart_beat_receive_scale: float, heartbeat timeout scale factor
        - bind_host_port: tuple, local bind address
        """
    
    def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
        """
        Connect to STOMP broker.
        
        Parameters:
        - username: str, authentication username
        - passcode: str, authentication password
        - wait: bool, wait for connection confirmation
        - headers: dict, additional connection headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def disconnect(self, receipt=None, headers=None, **keyword_headers):
        """
        Disconnect from STOMP broker.
        
        Parameters:
        - receipt: str, receipt ID for disconnect confirmation
        - headers: dict, additional disconnect headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def is_connected(self) -> bool:
        """
        Check if connected to broker.
        
        Returns:
        bool: True if connected, False otherwise
        """
    
    def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
        """
        Send message to destination.
        
        Parameters:
        - body: str, message body
        - destination: str, destination queue/topic
        - content_type: str, message content type
        - headers: dict, message headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
        """
        Subscribe to destination.
        
        Parameters:
        - destination: str, destination queue/topic
        - id: str, subscription ID
        - ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
        - headers: dict, subscription headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
        """
        Unsubscribe from destination.
        
        Parameters:
        - destination: str, destination to unsubscribe from
        - id: str, subscription ID to unsubscribe
        - headers: dict, unsubscribe headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
        """
        Acknowledge message.
        
        Parameters:
        - id: str, message ID to acknowledge
        - subscription: str, subscription ID
        - transaction: str, transaction ID
        - headers: dict, ack headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
        """
        Negative acknowledge message (STOMP 1.1+).
        
        Parameters:
        - id: str, message ID to nack
        - subscription: str, subscription ID
        - transaction: str, transaction ID
        - headers: dict, nack headers
        - **keyword_headers: additional headers as keyword arguments
        """

STOMP 1.0 Connection

Legacy STOMP 1.0 protocol support for compatibility with older message brokers.

class Connection10:
    def __init__(self, 
                 host_and_ports=None,
                 prefer_localhost=True,
                 try_loopback_connect=True,
                 reconnect_sleep_initial=0.1,
                 reconnect_sleep_increase=0.5,
                 reconnect_sleep_jitter=0.1,
                 reconnect_sleep_max=60.0,
                 reconnect_attempts_max=3,
                 timeout=None,
                 keepalive=None,
                 auto_decode=True,
                 encoding="utf-8",
                 auto_content_length=True,
                 bind_host_port=None):
        """
        Create STOMP 1.0 connection.
        
        Parameters:
        - host_and_ports: list of tuples, broker addresses
        - prefer_localhost: bool, prefer localhost connections
        - try_loopback_connect: bool, try loopback if localhost fails
        - reconnect_sleep_initial: float, initial reconnect delay
        - reconnect_sleep_increase: float, delay increase factor
        - reconnect_sleep_jitter: float, random delay variation
        - reconnect_sleep_max: float, maximum reconnect delay
        - reconnect_attempts_max: int, maximum reconnect attempts
        - timeout: float, socket timeout in seconds
        - keepalive: bool, enable TCP keepalive
        - auto_decode: bool, automatically decode message bodies
        - encoding: str, text encoding for messages
        - auto_content_length: bool, automatically set content-length header
        - bind_host_port: tuple, local bind address
        """
    
    def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
        """Connect to STOMP 1.0 broker."""
    
    def disconnect(self, receipt=None, headers=None, **keyword_headers):
        """Disconnect from STOMP 1.0 broker."""
    
    def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
        """Send message via STOMP 1.0."""
    
    def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
        """Subscribe via STOMP 1.0."""
    
    def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
        """Unsubscribe via STOMP 1.0."""
    
    def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
        """Acknowledge message via STOMP 1.0."""

STOMP 1.2 Connection

Latest STOMP 1.2 protocol with enhanced header escaping and improved error handling.

class Connection12:
    def __init__(self, 
                 host_and_ports=None,
                 prefer_localhost=True,
                 try_loopback_connect=True,
                 reconnect_sleep_initial=0.1,
                 reconnect_sleep_increase=0.5,
                 reconnect_sleep_jitter=0.1,
                 reconnect_sleep_max=60.0,
                 reconnect_attempts_max=3,
                 timeout=None,
                 heartbeats=(0, 0),
                 keepalive=None,
                 vhost=None,
                 auto_decode=True,
                 encoding="utf-8",
                 auto_content_length=True,
                 heart_beat_receive_scale=1.5,
                 bind_host_port=None):
        """
        Create STOMP 1.2 connection.
        
        Parameters: Same as Connection (STOMP 1.1) plus STOMP 1.2 enhancements
        """
    
    def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
        """Connect to STOMP 1.2 broker with enhanced error handling."""
    
    def disconnect(self, receipt=None, headers=None, **keyword_headers):
        """Disconnect from STOMP 1.2 broker."""
    
    def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
        """STOMP 1.2 negative acknowledge with enhanced header escaping."""
    
    @staticmethod
    def is_eol(line):
        """
        Check if line is end-of-line marker.
        
        Parameters:
        - line: bytes, line to check
        
        Returns:
        bool: True if end-of-line
        """

Connection Management

Base connection functionality shared across all protocol versions.

def set_listener(self, name, listener):
    """
    Set named connection listener.
    
    Parameters:
    - name: str, listener name
    - listener: ConnectionListener, listener instance
    """

def remove_listener(self, name):
    """
    Remove named listener.
    
    Parameters:
    - name: str, listener name to remove
    """

def get_listener(self, name):
    """
    Get named listener.
    
    Parameters:
    - name: str, listener name
    
    Returns:
    ConnectionListener: listener instance or None
    """

def set_ssl(self, for_hosts=(), key_file=None, cert_file=None, ca_certs=None, 
            cert_validator=None, ssl_version=None, password=None, **kwargs):
    """
    Configure SSL/TLS connection.
    
    Parameters:
    - for_hosts: tuple, hosts requiring SSL
    - key_file: str, private key file path
    - cert_file: str, certificate file path  
    - ca_certs: str, CA certificates file path
    - cert_validator: callable, certificate validation function
    - ssl_version: int, SSL protocol version
    - password: str, private key password
    - **kwargs: additional SSL parameters
    """

def get_ssl(self, host_and_port=None):
    """
    Get SSL configuration for host.
    
    Parameters:
    - host_and_port: tuple, host and port
    
    Returns:
    dict: SSL configuration
    """

Transaction Support

Transaction management for atomic message operations.

def begin(self, transaction=None, headers=None, **keyword_headers):
    """
    Begin transaction.
    
    Parameters:
    - transaction: str, transaction ID
    - headers: dict, begin headers
    - **keyword_headers: additional headers as keyword arguments
    """

def commit(self, transaction=None, headers=None, **keyword_headers):
    """
    Commit transaction.
    
    Parameters:
    - transaction: str, transaction ID to commit
    - headers: dict, commit headers
    - **keyword_headers: additional headers as keyword arguments
    """

def abort(self, transaction=None, headers=None, **keyword_headers):
    """
    Abort transaction.
    
    Parameters:
    - transaction: str, transaction ID to abort
    - headers: dict, abort headers
    - **keyword_headers: additional headers as keyword arguments
    """

Usage Examples

Basic Connection

import stomp

# Create connection with reconnection settings
conn = stomp.Connection(
    [('localhost', 61613)],
    reconnect_sleep_initial=1.0,
    reconnect_sleep_max=30.0,
    reconnect_attempts_max=10
)

# Connect with authentication
conn.connect('username', 'password', wait=True)

# Use connection
conn.send(body='Hello World', destination='/queue/test')

# Disconnect
conn.disconnect()

SSL Connection

import stomp

conn = stomp.Connection([('broker.example.com', 61614)])

# Configure SSL
conn.set_ssl(
    for_hosts=[('broker.example.com', 61614)],
    key_file='/path/to/client.key',
    cert_file='/path/to/client.crt',
    ca_certs='/path/to/ca.crt'
)

conn.connect('username', 'password', wait=True)

Transaction Example

import stomp

conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'pass', wait=True)

# Begin transaction
conn.begin('tx-001')

# Send messages in transaction
conn.send(body='Message 1', destination='/queue/test', transaction='tx-001')
conn.send(body='Message 2', destination='/queue/test', transaction='tx-001')

# Commit transaction
conn.commit('tx-001')

conn.disconnect()

Install with Tessl CLI

npx tessl i tessl/pypi-stomp-py

docs

adapters.md

cli.md

connections.md

index.md

listeners.md

protocol.md

types.md

utilities.md

websocket.md

tile.json