Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
Comprehensive connection classes supporting all STOMP protocol versions (1.0, 1.1, 1.2) with automatic reconnection, SSL/TLS support, heartbeat handling, and connection pooling for robust message broker connectivity.
The default connection class supporting STOMP 1.1 protocol with heartbeat negotiation and enhanced error handling.
class Connection:
def __init__(self,
host_and_ports=None,
prefer_localhost=True,
try_loopback_connect=True,
reconnect_sleep_initial=0.1,
reconnect_sleep_increase=0.5,
reconnect_sleep_jitter=0.1,
reconnect_sleep_max=60.0,
reconnect_attempts_max=3,
timeout=None,
heartbeats=(0, 0),
keepalive=None,
vhost=None,
auto_decode=True,
encoding="utf-8",
auto_content_length=True,
heart_beat_receive_scale=1.5,
bind_host_port=None):
"""
Create STOMP 1.1 connection.
Parameters:
- host_and_ports: list of tuples, broker addresses [('localhost', 61613)]
- prefer_localhost: bool, prefer localhost connections
- try_loopback_connect: bool, try loopback if localhost fails
- reconnect_sleep_initial: float, initial reconnect delay
- reconnect_sleep_increase: float, delay increase factor
- reconnect_sleep_jitter: float, random delay variation
- reconnect_sleep_max: float, maximum reconnect delay
- reconnect_attempts_max: int, maximum reconnect attempts
- timeout: float, socket timeout in seconds
- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
- keepalive: bool, enable TCP keepalive
- vhost: str, virtual host name
- auto_decode: bool, automatically decode message bodies
- encoding: str, text encoding for messages
- auto_content_length: bool, automatically set content-length header
- heart_beat_receive_scale: float, heartbeat timeout scale factor
- bind_host_port: tuple, local bind address
"""
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
"""
Connect to STOMP broker.
Parameters:
- username: str, authentication username
- passcode: str, authentication password
- wait: bool, wait for connection confirmation
- headers: dict, additional connection headers
- **keyword_headers: additional headers as keyword arguments
"""
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""
Disconnect from STOMP broker.
Parameters:
- receipt: str, receipt ID for disconnect confirmation
- headers: dict, additional disconnect headers
- **keyword_headers: additional headers as keyword arguments
"""
def is_connected(self) -> bool:
"""
Check if connected to broker.
Returns:
bool: True if connected, False otherwise
"""
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
"""
Send message to destination.
Parameters:
- body: str, message body
- destination: str, destination queue/topic
- content_type: str, message content type
- headers: dict, message headers
- **keyword_headers: additional headers as keyword arguments
"""
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
"""
Subscribe to destination.
Parameters:
- destination: str, destination queue/topic
- id: str, subscription ID
- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
- headers: dict, subscription headers
- **keyword_headers: additional headers as keyword arguments
"""
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
"""
Unsubscribe from destination.
Parameters:
- destination: str, destination to unsubscribe from
- id: str, subscription ID to unsubscribe
- headers: dict, unsubscribe headers
- **keyword_headers: additional headers as keyword arguments
"""
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""
Acknowledge message.
Parameters:
- id: str, message ID to acknowledge
- subscription: str, subscription ID
- transaction: str, transaction ID
- headers: dict, ack headers
- **keyword_headers: additional headers as keyword arguments
"""
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""
Negative acknowledge message (STOMP 1.1+).
Parameters:
- id: str, message ID to nack
- subscription: str, subscription ID
- transaction: str, transaction ID
- headers: dict, nack headers
- **keyword_headers: additional headers as keyword arguments
"""Legacy STOMP 1.0 protocol support for compatibility with older message brokers.
class Connection10:
def __init__(self,
host_and_ports=None,
prefer_localhost=True,
try_loopback_connect=True,
reconnect_sleep_initial=0.1,
reconnect_sleep_increase=0.5,
reconnect_sleep_jitter=0.1,
reconnect_sleep_max=60.0,
reconnect_attempts_max=3,
timeout=None,
keepalive=None,
auto_decode=True,
encoding="utf-8",
auto_content_length=True,
bind_host_port=None):
"""
Create STOMP 1.0 connection.
Parameters:
- host_and_ports: list of tuples, broker addresses
- prefer_localhost: bool, prefer localhost connections
- try_loopback_connect: bool, try loopback if localhost fails
- reconnect_sleep_initial: float, initial reconnect delay
- reconnect_sleep_increase: float, delay increase factor
- reconnect_sleep_jitter: float, random delay variation
- reconnect_sleep_max: float, maximum reconnect delay
- reconnect_attempts_max: int, maximum reconnect attempts
- timeout: float, socket timeout in seconds
- keepalive: bool, enable TCP keepalive
- auto_decode: bool, automatically decode message bodies
- encoding: str, text encoding for messages
- auto_content_length: bool, automatically set content-length header
- bind_host_port: tuple, local bind address
"""
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
"""Connect to STOMP 1.0 broker."""
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""Disconnect from STOMP 1.0 broker."""
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
"""Send message via STOMP 1.0."""
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
"""Subscribe via STOMP 1.0."""
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
"""Unsubscribe via STOMP 1.0."""
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""Acknowledge message via STOMP 1.0."""Latest STOMP 1.2 protocol with enhanced header escaping and improved error handling.
class Connection12:
def __init__(self,
host_and_ports=None,
prefer_localhost=True,
try_loopback_connect=True,
reconnect_sleep_initial=0.1,
reconnect_sleep_increase=0.5,
reconnect_sleep_jitter=0.1,
reconnect_sleep_max=60.0,
reconnect_attempts_max=3,
timeout=None,
heartbeats=(0, 0),
keepalive=None,
vhost=None,
auto_decode=True,
encoding="utf-8",
auto_content_length=True,
heart_beat_receive_scale=1.5,
bind_host_port=None):
"""
Create STOMP 1.2 connection.
Parameters: Same as Connection (STOMP 1.1) plus STOMP 1.2 enhancements
"""
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
"""Connect to STOMP 1.2 broker with enhanced error handling."""
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""Disconnect from STOMP 1.2 broker."""
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""STOMP 1.2 negative acknowledge with enhanced header escaping."""
@staticmethod
def is_eol(line):
"""
Check if line is end-of-line marker.
Parameters:
- line: bytes, line to check
Returns:
bool: True if end-of-line
"""Base connection functionality shared across all protocol versions.
def set_listener(self, name, listener):
"""
Set named connection listener.
Parameters:
- name: str, listener name
- listener: ConnectionListener, listener instance
"""
def remove_listener(self, name):
"""
Remove named listener.
Parameters:
- name: str, listener name to remove
"""
def get_listener(self, name):
"""
Get named listener.
Parameters:
- name: str, listener name
Returns:
ConnectionListener: listener instance or None
"""
def set_ssl(self, for_hosts=(), key_file=None, cert_file=None, ca_certs=None,
cert_validator=None, ssl_version=None, password=None, **kwargs):
"""
Configure SSL/TLS connection.
Parameters:
- for_hosts: tuple, hosts requiring SSL
- key_file: str, private key file path
- cert_file: str, certificate file path
- ca_certs: str, CA certificates file path
- cert_validator: callable, certificate validation function
- ssl_version: int, SSL protocol version
- password: str, private key password
- **kwargs: additional SSL parameters
"""
def get_ssl(self, host_and_port=None):
"""
Get SSL configuration for host.
Parameters:
- host_and_port: tuple, host and port
Returns:
dict: SSL configuration
"""Transaction management for atomic message operations.
def begin(self, transaction=None, headers=None, **keyword_headers):
"""
Begin transaction.
Parameters:
- transaction: str, transaction ID
- headers: dict, begin headers
- **keyword_headers: additional headers as keyword arguments
"""
def commit(self, transaction=None, headers=None, **keyword_headers):
"""
Commit transaction.
Parameters:
- transaction: str, transaction ID to commit
- headers: dict, commit headers
- **keyword_headers: additional headers as keyword arguments
"""
def abort(self, transaction=None, headers=None, **keyword_headers):
"""
Abort transaction.
Parameters:
- transaction: str, transaction ID to abort
- headers: dict, abort headers
- **keyword_headers: additional headers as keyword arguments
"""import stomp
# Create connection with reconnection settings
conn = stomp.Connection(
[('localhost', 61613)],
reconnect_sleep_initial=1.0,
reconnect_sleep_max=30.0,
reconnect_attempts_max=10
)
# Connect with authentication
conn.connect('username', 'password', wait=True)
# Use connection
conn.send(body='Hello World', destination='/queue/test')
# Disconnect
conn.disconnect()import stomp
conn = stomp.Connection([('broker.example.com', 61614)])
# Configure SSL
conn.set_ssl(
for_hosts=[('broker.example.com', 61614)],
key_file='/path/to/client.key',
cert_file='/path/to/client.crt',
ca_certs='/path/to/ca.crt'
)
conn.connect('username', 'password', wait=True)import stomp
conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'pass', wait=True)
# Begin transaction
conn.begin('tx-001')
# Send messages in transaction
conn.send(body='Message 1', destination='/queue/test', transaction='tx-001')
conn.send(body='Message 2', destination='/queue/test', transaction='tx-001')
# Commit transaction
conn.commit('tx-001')
conn.disconnect()Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py