Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
Core STOMP protocol operations including message sending, queue subscription, transaction management, and acknowledgment handling across all supported protocol versions (1.0, 1.1, 1.2).
Core messaging functionality for sending and receiving messages through STOMP destinations.
def send(self, destination, body='', content_type=None, headers=None, **keyword_headers):
"""
Send message to destination.
Parameters:
- destination: str, destination queue/topic (required)
- body: str or bytes, message body content
- content_type: str, MIME content type of message
- headers: dict, additional message headers
- **keyword_headers: additional headers as keyword arguments
Common headers:
- persistent: str, 'true' for persistent messages
- priority: str, message priority (0-9)
- expires: str, message expiration time
- correlation-id: str, correlation identifier
- reply-to: str, reply destination
- custom headers: any string key-value pairs
"""
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
"""
Subscribe to destination for message delivery.
Parameters:
- destination: str, destination queue/topic to subscribe to
- id: str, unique subscription identifier (auto-generated if None)
- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
- headers: dict, subscription headers
- **keyword_headers: additional headers as keyword arguments
Acknowledgment modes:
- 'auto': automatic acknowledgment (default)
- 'client': manual acknowledgment per subscription
- 'client-individual': manual acknowledgment per message
"""
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
"""
Unsubscribe from destination.
Parameters:
- destination: str, destination to unsubscribe from (if no id)
- id: str, subscription ID to unsubscribe (preferred method)
- headers: dict, unsubscribe headers
- **keyword_headers: additional headers as keyword arguments
Note: Either destination or id must be provided
"""Manual message acknowledgment for reliable message processing.
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""
Acknowledge message processing (STOMP 1.0+).
Parameters:
- id: str, message ID to acknowledge (required)
- subscription: str, subscription ID (STOMP 1.1+ only)
- transaction: str, transaction ID if within transaction
- headers: dict, acknowledgment headers
- **keyword_headers: additional headers as keyword arguments
Used with ack modes 'client' and 'client-individual'
"""
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
"""
Negative acknowledge message (STOMP 1.1+ only).
Parameters:
- id: str, message ID to nack (required)
- subscription: str, subscription ID (STOMP 1.1+ only)
- transaction: str, transaction ID if within transaction
- headers: dict, nack headers
- **keyword_headers: additional headers as keyword arguments
Signals message processing failure, may trigger redelivery
"""Atomic transaction support for grouping multiple operations.
def begin(self, transaction=None, headers=None, **keyword_headers):
"""
Begin transaction.
Parameters:
- transaction: str, transaction identifier (auto-generated if None)
- headers: dict, begin headers
- **keyword_headers: additional headers as keyword arguments
Returns transaction ID for use in subsequent operations
"""
def commit(self, transaction=None, headers=None, **keyword_headers):
"""
Commit transaction.
Parameters:
- transaction: str, transaction ID to commit (required)
- headers: dict, commit headers
- **keyword_headers: additional headers as keyword arguments
All operations within transaction are atomically applied
"""
def abort(self, transaction=None, headers=None, **keyword_headers):
"""
Abort transaction.
Parameters:
- transaction: str, transaction ID to abort (required)
- headers: dict, abort headers
- **keyword_headers: additional headers as keyword arguments
All operations within transaction are rolled back
"""Receipt confirmations for reliable operation acknowledgment.
def set_receipt(self, receipt_id, value):
"""
Set receipt handler for confirmation tracking.
Parameters:
- receipt_id: str, receipt identifier
- value: any, value associated with receipt
"""Low-level STOMP frame operations for advanced usage.
def send_frame(self, frame):
"""
Send raw STOMP frame.
Parameters:
- frame: Frame, raw STOMP frame to send
"""
class Frame:
"""
STOMP frame representation.
Attributes:
- cmd: str, STOMP command (CONNECT, SEND, etc.)
- headers: dict, frame headers
- body: str or bytes, frame body content
"""
def __init__(self, cmd=None, headers=None, body=None):
self.cmd = cmd
self.headers = headers or {}
self.body = bodyimport stomp
conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'password', wait=True)
# Send simple message
conn.send(
body='Hello World',
destination='/queue/test'
)
# Send message with headers
conn.send(
body='Priority message',
destination='/queue/important',
headers={
'priority': '9',
'persistent': 'true',
'correlation-id': 'msg-001'
}
)
# Send JSON message
import json
data = {'user': 'john', 'action': 'login', 'timestamp': 1234567890}
conn.send(
body=json.dumps(data),
destination='/topic/events',
content_type='application/json'
)
conn.disconnect()import stomp
import time
class MessageProcessor(stomp.ConnectionListener):
def on_message(self, frame):
print(f"Received from {frame.headers.get('destination', 'unknown')}: {frame.body}")
conn = stomp.Connection([('localhost', 61613)])
processor = MessageProcessor()
conn.set_listener('processor', processor)
conn.connect('user', 'password', wait=True)
# Subscribe to queue with auto-acknowledgment
conn.subscribe('/queue/orders', id='orders-sub', ack='auto')
# Subscribe to topic with manual acknowledgment
conn.subscribe('/topic/notifications', id='notify-sub', ack='client')
# Subscribe with custom headers
conn.subscribe(
destination='/queue/priority',
id='priority-sub',
ack='client-individual',
headers={'selector': "priority > 5"}
)
# Keep connection alive
time.sleep(30)
# Unsubscribe
conn.unsubscribe(id='orders-sub')
conn.unsubscribe(id='notify-sub')
conn.unsubscribe(id='priority-sub')
conn.disconnect()import stomp
import time
class ManualAckProcessor(stomp.ConnectionListener):
def __init__(self, connection):
self.connection = connection
def on_message(self, frame):
message_id = frame.headers.get('message-id')
subscription_id = frame.headers.get('subscription')
try:
# Process message
self.process_message(frame.body)
# Acknowledge successful processing
self.connection.ack(message_id, subscription_id)
print(f"Acknowledged message {message_id}")
except Exception as e:
print(f"Processing failed: {e}")
# Negative acknowledge (STOMP 1.1+)
if hasattr(self.connection, 'nack'):
self.connection.nack(message_id, subscription_id)
print(f"Nacked message {message_id}")
def process_message(self, body):
# Simulate message processing
if 'error' in body.lower():
raise ValueError("Simulated processing error")
print(f"Successfully processed: {body}")
conn = stomp.Connection11([('localhost', 61613)]) # STOMP 1.1 for NACK support
processor = ManualAckProcessor(conn)
conn.set_listener('processor', processor)
conn.connect('user', 'password', wait=True)
# Subscribe with client-individual acknowledgment
conn.subscribe('/queue/work', id='work-sub', ack='client-individual')
time.sleep(60) # Process messages for 1 minute
conn.disconnect()import stomp
import uuid
conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'password', wait=True)
# Begin transaction
tx_id = str(uuid.uuid4())
conn.begin(tx_id)
try:
# Send multiple messages in transaction
conn.send(
body='Order created',
destination='/queue/orders',
transaction=tx_id
)
conn.send(
body='Inventory updated',
destination='/queue/inventory',
transaction=tx_id
)
conn.send(
body='Email notification',
destination='/queue/notifications',
transaction=tx_id
)
# Simulate business logic
if all_operations_successful():
conn.commit(tx_id)
print("Transaction committed successfully")
else:
conn.abort(tx_id)
print("Transaction aborted")
except Exception as e:
# Abort transaction on error
conn.abort(tx_id)
print(f"Transaction aborted due to error: {e}")
conn.disconnect()
def all_operations_successful():
# Simulate business validation
return Trueimport stomp
import uuid
import time
class ReceiptHandler(stomp.ConnectionListener):
def __init__(self):
self.pending_receipts = {}
def on_receipt(self, frame):
receipt_id = frame.headers.get('receipt-id')
if receipt_id in self.pending_receipts:
print(f"Receipt confirmed: {receipt_id}")
self.pending_receipts[receipt_id] = True
def wait_for_receipt(self, receipt_id, timeout=10):
"""Wait for specific receipt confirmation."""
end_time = time.time() + timeout
while time.time() < end_time:
if self.pending_receipts.get(receipt_id):
return True
time.sleep(0.1)
return False
conn = stomp.Connection([('localhost', 61613)])
receipt_handler = ReceiptHandler()
conn.set_listener('receipt_handler', receipt_handler)
conn.connect('user', 'password', wait=True)
# Send message with receipt
receipt_id = str(uuid.uuid4())
receipt_handler.pending_receipts[receipt_id] = False
conn.send(
body='Important message',
destination='/queue/critical',
receipt=receipt_id
)
# Wait for delivery confirmation
if receipt_handler.wait_for_receipt(receipt_id, timeout=30):
print("Message delivery confirmed")
else:
print("Message delivery confirmation timeout")
conn.disconnect()import stomp
# STOMP 1.0 - Basic functionality
conn10 = stomp.Connection10([('localhost', 61613)])
conn10.connect('user', 'password', wait=True)
conn10.send(body='STOMP 1.0 message', destination='/queue/test')
# No NACK support in 1.0
conn10.disconnect()
# STOMP 1.1 - Heartbeats and NACK
conn11 = stomp.Connection11(
[('localhost', 61613)],
heartbeats=(10000, 10000) # 10 second heartbeats
)
conn11.connect('user', 'password', wait=True)
# NACK support in 1.1+
class NackCapableProcessor(stomp.ConnectionListener):
def __init__(self, connection):
self.connection = connection
def on_message(self, frame):
message_id = frame.headers.get('message-id')
subscription_id = frame.headers.get('subscription')
try:
# Process message
pass
except Exception:
# NACK available in STOMP 1.1+
self.connection.nack(message_id, subscription_id)
processor = NackCapableProcessor(conn11)
conn11.set_listener('processor', processor)
conn11.subscribe('/queue/test', id='test-sub', ack='client-individual')
time.sleep(10)
conn11.disconnect()
# STOMP 1.2 - Enhanced header escaping
conn12 = stomp.Connection12([('localhost', 61613)])
conn12.connect('user', 'password', wait=True)
# STOMP 1.2 handles special characters in headers properly
conn12.send(
body='Message with special header',
destination='/queue/test',
headers={
'custom-header': 'value with\nspecial\tcharacters', # Automatically escaped
'correlation-id': 'msg:with:colons'
}
)
conn12.disconnect()Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py