CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stomp-py

Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol

Overview
Eval results
Files

protocol.mddocs/

Protocol Operations

Core STOMP protocol operations including message sending, queue subscription, transaction management, and acknowledgment handling across all supported protocol versions (1.0, 1.1, 1.2).

Capabilities

Message Operations

Core messaging functionality for sending and receiving messages through STOMP destinations.

def send(self, destination, body='', content_type=None, headers=None, **keyword_headers):
    """
    Send message to destination.
    
    Parameters:
    - destination: str, destination queue/topic (required)
    - body: str or bytes, message body content  
    - content_type: str, MIME content type of message
    - headers: dict, additional message headers
    - **keyword_headers: additional headers as keyword arguments
    
    Common headers:
    - persistent: str, 'true' for persistent messages
    - priority: str, message priority (0-9)
    - expires: str, message expiration time
    - correlation-id: str, correlation identifier
    - reply-to: str, reply destination
    - custom headers: any string key-value pairs
    """

def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
    """
    Subscribe to destination for message delivery.
    
    Parameters:
    - destination: str, destination queue/topic to subscribe to
    - id: str, unique subscription identifier (auto-generated if None)
    - ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
    - headers: dict, subscription headers
    - **keyword_headers: additional headers as keyword arguments
    
    Acknowledgment modes:
    - 'auto': automatic acknowledgment (default)
    - 'client': manual acknowledgment per subscription
    - 'client-individual': manual acknowledgment per message
    """

def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
    """
    Unsubscribe from destination.
    
    Parameters:
    - destination: str, destination to unsubscribe from (if no id)
    - id: str, subscription ID to unsubscribe (preferred method)
    - headers: dict, unsubscribe headers
    - **keyword_headers: additional headers as keyword arguments
    
    Note: Either destination or id must be provided
    """

Message Acknowledgment

Manual message acknowledgment for reliable message processing.

def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
    """
    Acknowledge message processing (STOMP 1.0+).
    
    Parameters:
    - id: str, message ID to acknowledge (required)
    - subscription: str, subscription ID (STOMP 1.1+ only)
    - transaction: str, transaction ID if within transaction
    - headers: dict, acknowledgment headers
    - **keyword_headers: additional headers as keyword arguments
    
    Used with ack modes 'client' and 'client-individual'
    """

def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
    """
    Negative acknowledge message (STOMP 1.1+ only).
    
    Parameters:
    - id: str, message ID to nack (required)
    - subscription: str, subscription ID (STOMP 1.1+ only)
    - transaction: str, transaction ID if within transaction
    - headers: dict, nack headers
    - **keyword_headers: additional headers as keyword arguments
    
    Signals message processing failure, may trigger redelivery
    """

Transaction Management

Atomic transaction support for grouping multiple operations.

def begin(self, transaction=None, headers=None, **keyword_headers):
    """
    Begin transaction.
    
    Parameters:
    - transaction: str, transaction identifier (auto-generated if None)
    - headers: dict, begin headers
    - **keyword_headers: additional headers as keyword arguments
    
    Returns transaction ID for use in subsequent operations
    """

def commit(self, transaction=None, headers=None, **keyword_headers):
    """
    Commit transaction.
    
    Parameters:
    - transaction: str, transaction ID to commit (required)
    - headers: dict, commit headers
    - **keyword_headers: additional headers as keyword arguments
    
    All operations within transaction are atomically applied
    """

def abort(self, transaction=None, headers=None, **keyword_headers):
    """
    Abort transaction.
    
    Parameters:
    - transaction: str, transaction ID to abort (required)
    - headers: dict, abort headers
    - **keyword_headers: additional headers as keyword arguments
    
    All operations within transaction are rolled back
    """

Receipt Management

Receipt confirmations for reliable operation acknowledgment.

def set_receipt(self, receipt_id, value):
    """
    Set receipt handler for confirmation tracking.
    
    Parameters:
    - receipt_id: str, receipt identifier
    - value: any, value associated with receipt
    """

Frame Processing

Low-level STOMP frame operations for advanced usage.

def send_frame(self, frame):
    """
    Send raw STOMP frame.
    
    Parameters:
    - frame: Frame, raw STOMP frame to send
    """

class Frame:
    """
    STOMP frame representation.
    
    Attributes:
    - cmd: str, STOMP command (CONNECT, SEND, etc.)
    - headers: dict, frame headers
    - body: str or bytes, frame body content
    """
    def __init__(self, cmd=None, headers=None, body=None):
        self.cmd = cmd
        self.headers = headers or {}
        self.body = body

Usage Examples

Basic Messaging

import stomp

conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'password', wait=True)

# Send simple message
conn.send(
    body='Hello World',
    destination='/queue/test'
)

# Send message with headers
conn.send(
    body='Priority message',
    destination='/queue/important',
    headers={
        'priority': '9',
        'persistent': 'true',
        'correlation-id': 'msg-001'
    }
)

# Send JSON message
import json
data = {'user': 'john', 'action': 'login', 'timestamp': 1234567890}
conn.send(
    body=json.dumps(data),
    destination='/topic/events',
    content_type='application/json'
)

conn.disconnect()

Subscription Patterns

import stomp
import time

class MessageProcessor(stomp.ConnectionListener):
    def on_message(self, frame):
        print(f"Received from {frame.headers.get('destination', 'unknown')}: {frame.body}")

conn = stomp.Connection([('localhost', 61613)])
processor = MessageProcessor()
conn.set_listener('processor', processor)
conn.connect('user', 'password', wait=True)

# Subscribe to queue with auto-acknowledgment
conn.subscribe('/queue/orders', id='orders-sub', ack='auto')

# Subscribe to topic with manual acknowledgment
conn.subscribe('/topic/notifications', id='notify-sub', ack='client')

# Subscribe with custom headers
conn.subscribe(
    destination='/queue/priority',
    id='priority-sub',
    ack='client-individual',
    headers={'selector': "priority > 5"}
)

# Keep connection alive
time.sleep(30)

# Unsubscribe
conn.unsubscribe(id='orders-sub')
conn.unsubscribe(id='notify-sub')
conn.unsubscribe(id='priority-sub')

conn.disconnect()

Manual Acknowledgment

import stomp
import time

class ManualAckProcessor(stomp.ConnectionListener):
    def __init__(self, connection):
        self.connection = connection
    
    def on_message(self, frame):
        message_id = frame.headers.get('message-id')
        subscription_id = frame.headers.get('subscription')
        
        try:
            # Process message
            self.process_message(frame.body)
            
            # Acknowledge successful processing
            self.connection.ack(message_id, subscription_id)
            print(f"Acknowledged message {message_id}")
            
        except Exception as e:
            print(f"Processing failed: {e}")
            
            # Negative acknowledge (STOMP 1.1+)
            if hasattr(self.connection, 'nack'):
                self.connection.nack(message_id, subscription_id)
                print(f"Nacked message {message_id}")
    
    def process_message(self, body):
        # Simulate message processing
        if 'error' in body.lower():
            raise ValueError("Simulated processing error")
        
        print(f"Successfully processed: {body}")

conn = stomp.Connection11([('localhost', 61613)])  # STOMP 1.1 for NACK support
processor = ManualAckProcessor(conn)
conn.set_listener('processor', processor)
conn.connect('user', 'password', wait=True)

# Subscribe with client-individual acknowledgment
conn.subscribe('/queue/work', id='work-sub', ack='client-individual')

time.sleep(60)  # Process messages for 1 minute
conn.disconnect()

Transaction Example

import stomp
import uuid

conn = stomp.Connection([('localhost', 61613)])
conn.connect('user', 'password', wait=True)

# Begin transaction
tx_id = str(uuid.uuid4())
conn.begin(tx_id)

try:
    # Send multiple messages in transaction
    conn.send(
        body='Order created',
        destination='/queue/orders',
        transaction=tx_id
    )
    
    conn.send(
        body='Inventory updated',
        destination='/queue/inventory',
        transaction=tx_id
    )
    
    conn.send(
        body='Email notification',
        destination='/queue/notifications',
        transaction=tx_id
    )
    
    # Simulate business logic
    if all_operations_successful():
        conn.commit(tx_id)
        print("Transaction committed successfully")
    else:
        conn.abort(tx_id)
        print("Transaction aborted")
        
except Exception as e:
    # Abort transaction on error
    conn.abort(tx_id)
    print(f"Transaction aborted due to error: {e}")

conn.disconnect()

def all_operations_successful():
    # Simulate business validation
    return True

Receipt Confirmation

import stomp
import uuid
import time

class ReceiptHandler(stomp.ConnectionListener):
    def __init__(self):
        self.pending_receipts = {}
    
    def on_receipt(self, frame):
        receipt_id = frame.headers.get('receipt-id')
        if receipt_id in self.pending_receipts:
            print(f"Receipt confirmed: {receipt_id}")
            self.pending_receipts[receipt_id] = True
    
    def wait_for_receipt(self, receipt_id, timeout=10):
        """Wait for specific receipt confirmation."""
        end_time = time.time() + timeout
        
        while time.time() < end_time:
            if self.pending_receipts.get(receipt_id):
                return True
            time.sleep(0.1)
        
        return False

conn = stomp.Connection([('localhost', 61613)])
receipt_handler = ReceiptHandler()
conn.set_listener('receipt_handler', receipt_handler)
conn.connect('user', 'password', wait=True)

# Send message with receipt
receipt_id = str(uuid.uuid4())
receipt_handler.pending_receipts[receipt_id] = False

conn.send(
    body='Important message',
    destination='/queue/critical',
    receipt=receipt_id
)

# Wait for delivery confirmation
if receipt_handler.wait_for_receipt(receipt_id, timeout=30):
    print("Message delivery confirmed")
else:
    print("Message delivery confirmation timeout")

conn.disconnect()

Protocol Version Specific Features

import stomp

# STOMP 1.0 - Basic functionality
conn10 = stomp.Connection10([('localhost', 61613)])
conn10.connect('user', 'password', wait=True)
conn10.send(body='STOMP 1.0 message', destination='/queue/test')
# No NACK support in 1.0
conn10.disconnect()

# STOMP 1.1 - Heartbeats and NACK
conn11 = stomp.Connection11(
    [('localhost', 61613)],
    heartbeats=(10000, 10000)  # 10 second heartbeats
)
conn11.connect('user', 'password', wait=True)

# NACK support in 1.1+
class NackCapableProcessor(stomp.ConnectionListener):
    def __init__(self, connection):
        self.connection = connection
    
    def on_message(self, frame):
        message_id = frame.headers.get('message-id')
        subscription_id = frame.headers.get('subscription')
        
        try:
            # Process message
            pass
        except Exception:
            # NACK available in STOMP 1.1+
            self.connection.nack(message_id, subscription_id)

processor = NackCapableProcessor(conn11)
conn11.set_listener('processor', processor)
conn11.subscribe('/queue/test', id='test-sub', ack='client-individual')

time.sleep(10)
conn11.disconnect()

# STOMP 1.2 - Enhanced header escaping
conn12 = stomp.Connection12([('localhost', 61613)])
conn12.connect('user', 'password', wait=True)

# STOMP 1.2 handles special characters in headers properly
conn12.send(
    body='Message with special header',
    destination='/queue/test',
    headers={
        'custom-header': 'value with\nspecial\tcharacters',  # Automatically escaped
        'correlation-id': 'msg:with:colons'
    }
)

conn12.disconnect()

Install with Tessl CLI

npx tessl i tessl/pypi-stomp-py

docs

adapters.md

cli.md

connections.md

index.md

listeners.md

protocol.md

types.md

utilities.md

websocket.md

tile.json