CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stomp-py

Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol

Overview
Eval results
Files

adapters.mddocs/

Transport Adapters

Specialized transport adapters extending stomp.py beyond traditional TCP connections to support multicast, broadcast messaging, and alternative transport mechanisms for specific deployment scenarios.

Capabilities

Multicast Adapter

STOMP messaging over UDP multicast transport enabling broker-less messaging patterns and distributed system coordination without central message broker infrastructure.

class MulticastConnection:
    def __init__(self, 
                 host_and_ports=None,
                 prefer_localhost=True,
                 try_loopback_connect=True,
                 reconnect_sleep_initial=0.1,
                 reconnect_sleep_increase=0.5,
                 reconnect_sleep_jitter=0.1,
                 reconnect_sleep_max=60.0,
                 reconnect_attempts_max=3,
                 timeout=None,
                 heartbeats=(0, 0),
                 keepalive=None,
                 vhost=None,
                 auto_decode=True,
                 encoding="utf-8",
                 auto_content_length=True,
                 heart_beat_receive_scale=1.5,
                 bind_host_port=None,
                 multicast_group="224.1.2.3",
                 multicast_port=61616):
        """
        Create multicast STOMP connection.
        
        Parameters:
        - host_and_ports: list, multicast endpoints (optional for multicast)
        - prefer_localhost: bool, prefer localhost connections  
        - try_loopback_connect: bool, try loopback if localhost fails
        - reconnect_sleep_initial: float, initial reconnect delay
        - reconnect_sleep_increase: float, delay increase factor
        - reconnect_sleep_jitter: float, random delay variation
        - reconnect_sleep_max: float, maximum reconnect delay
        - reconnect_attempts_max: int, maximum reconnect attempts
        - timeout: float, socket timeout in seconds
        - heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
        - keepalive: bool, enable keepalive (N/A for multicast)
        - vhost: str, virtual host name
        - auto_decode: bool, automatically decode message bodies
        - encoding: str, text encoding for messages
        - auto_content_length: bool, automatically set content-length header
        - heart_beat_receive_scale: float, heartbeat timeout scale factor
        - bind_host_port: tuple, local bind address
        - multicast_group: str, multicast group IP address
        - multicast_port: int, multicast port number
        """
    
    def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
        """
        Join multicast group for STOMP messaging.
        
        Parameters:
        - username: str, authentication username (optional for multicast)
        - passcode: str, authentication password (optional for multicast)
        - wait: bool, wait for connection confirmation
        - headers: dict, additional connection headers
        - **keyword_headers: additional headers as keyword arguments
        
        Note: Authentication may not apply in multicast scenarios
        """
    
    def disconnect(self, receipt=None, headers=None, **keyword_headers):
        """
        Leave multicast group.
        
        Parameters:
        - receipt: str, receipt ID for disconnect confirmation
        - headers: dict, additional disconnect headers
        - **keyword_headers: additional headers as keyword arguments
        """
    
    def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
        """
        Send multicast message to all group members.
        
        Parameters:
        - body: str, message body
        - destination: str, logical destination (for routing/filtering)
        - content_type: str, message content type
        - headers: dict, message headers
        - **keyword_headers: additional headers as keyword arguments
        
        Message is broadcast to all multicast group members.
        """
    
    def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
        """
        Subscribe to multicast destination pattern.
        
        Parameters:
        - destination: str, destination pattern for filtering
        - id: str, subscription ID
        - ack: str, acknowledgment mode (limited in multicast)
        - headers: dict, subscription headers
        - **keyword_headers: additional headers as keyword arguments
        
        Note: Acknowledgments have limited meaning in multicast scenarios
        """

Multicast Transport

Low-level multicast transport implementation handling UDP multicast socket operations.

class MulticastTransport:
    def __init__(self, 
                 multicast_group="224.1.2.3", 
                 multicast_port=61616,
                 timeout=None,
                 bind_host_port=None):
        """
        Initialize multicast transport.
        
        Parameters:
        - multicast_group: str, multicast IP address
        - multicast_port: int, multicast port number
        - timeout: float, socket timeout in seconds
        - bind_host_port: tuple, local bind address
        """
    
    def start(self):
        """Start multicast transport and join group."""
    
    def stop(self):
        """Stop transport and leave multicast group."""
    
    def send(self, encoded_frame):
        """
        Send encoded STOMP frame via multicast.
        
        Parameters:
        - encoded_frame: bytes, encoded STOMP frame data
        """
    
    def receive(self):
        """
        Receive multicast STOMP frame.
        
        Returns:
        bytes: received frame data or None if timeout
        """
    
    def is_connected(self):
        """
        Check if multicast socket is active.
        
        Returns:
        bool: True if connected to multicast group
        """

Advanced Transport Configuration

Enhanced transport configuration options for specialized deployment scenarios.

def override_threading(self, create_thread_fc):
    """
    Override thread creation for custom threading libraries.
    
    Parameters:
    - create_thread_fc: callable, custom thread creation function
    
    Enables integration with:
    - gevent greenlet threads
    - asyncio event loops  
    - Custom thread pools
    - Testing frameworks with thread mocking
    
    Example:
    def custom_thread_creator(callback):
        return gevent.spawn(callback)
    
    conn.override_threading(custom_thread_creator)
    """

def wait_for_connection(self, timeout=None):
    """
    Wait for connection establishment with timeout.
    
    Parameters:
    - timeout: float, maximum wait time in seconds
    
    Returns:
    bool: True if connected within timeout, False otherwise
    
    Useful for synchronous connection patterns and testing.
    """

def set_keepalive_options(self, keepalive_options):
    """
    Configure advanced TCP keepalive parameters.
    
    Parameters:
    - keepalive_options: tuple, platform-specific keepalive config
    
    Linux format: ("linux", idle_sec, interval_sec, probe_count)
    macOS format: ("mac", interval_sec)
    Windows format: ("windows", idle_ms, interval_ms)
    
    Examples:
    # Linux: 2 hour idle, 75 sec intervals, 9 probes
    conn.set_keepalive_options(("linux", 7200, 75, 9))
    
    # macOS: 75 second intervals
    conn.set_keepalive_options(("mac", 75))
    """

Usage Examples

Basic Multicast Messaging

import stomp
from stomp.adapter.multicast import MulticastConnection

# Create multicast connection
conn = MulticastConnection(
    multicast_group="224.10.20.30",
    multicast_port=61620
)

# Set up message handler
class MulticastListener(stomp.ConnectionListener):
    def on_message(self, frame):
        print(f"Multicast message: {frame.body}")
        print(f"From destination: {frame.headers.get('destination')}")

conn.set_listener('multicast', MulticastListener())

# Join multicast group
conn.connect(wait=True)

# Send message to all group members
conn.send(
    body='{"event": "system_alert", "level": "warning"}',  
    destination='/topic/system-alerts'
)

# Subscribe to specific message types
conn.subscribe('/topic/system-alerts', id='alerts')

# Keep listening for multicast messages
import time
time.sleep(60)

conn.disconnect()

Broker-less Service Discovery

import stomp
import json
import time
from stomp.adapter.multicast import MulticastConnection

class ServiceDiscovery:
    def __init__(self, service_name, service_port):
        self.service_name = service_name
        self.service_port = service_port
        self.discovered_services = {}
        
        # Setup multicast for service announcements
        self.conn = MulticastConnection(
            multicast_group="224.0.1.100",
            multicast_port=61700
        )
        self.conn.set_listener('discovery', self)
        self.conn.connect(wait=True)
        self.conn.subscribe('/topic/service-discovery', id='discovery')
    
    def announce_service(self):
        """Announce this service to the network"""
        announcement = {
            "service": self.service_name,
            "port": self.service_port,
            "timestamp": time.time(),
            "type": "announcement"
        }
        
        self.conn.send(
            body=json.dumps(announcement),
            destination='/topic/service-discovery'
        )
    
    def on_message(self, frame):
        """Handle service discovery messages"""
        try:
            message = json.loads(frame.body)
            if message.get('type') == 'announcement':
                service_name = message['service']
                self.discovered_services[service_name] = {
                    'port': message['port'],
                    'last_seen': message['timestamp']
                }
                print(f"Discovered service: {service_name}:{message['port']}")
        except Exception as e:
            print(f"Error processing discovery message: {e}")
    
    def get_service_endpoint(self, service_name):
        """Get endpoint for discovered service"""
        return self.discovered_services.get(service_name)

# Usage
discovery = ServiceDiscovery("user-service", 8080)

# Announce this service every 30 seconds
while True:
    discovery.announce_service()
    time.sleep(30)

Custom Threading Integration

import stomp
import gevent
from gevent import monkey
monkey.patch_all()

# Custom thread creation for gevent
def gevent_thread_creator(callback):
    return gevent.spawn(callback)

conn = stomp.Connection([('broker.com', 61613)])

# Use gevent threads instead of standard threads
conn.override_threading(gevent_thread_creator)

# Now all stomp.py background operations use gevent
conn.connect('user', 'pass', wait=True)
conn.subscribe('/queue/events', id='events')

# Gevent-compatible message handling
class GeventListener(stomp.ConnectionListener):
    def on_message(self, frame):
        # This runs in a gevent greenlet
        gevent.sleep(0)  # Yield to other greenlets
        print(f"Processing: {frame.body}")

conn.set_listener('main', GeventListener())

Advanced Keepalive Configuration

import stomp

conn = stomp.Connection([('broker.com', 61613)])

# Configure aggressive keepalive for unstable networks
# Linux: 30 sec idle, 10 sec intervals, 6 probes = 90 sec total
conn.set_keepalive_options(("linux", 30, 10, 6))

# Alternative for macOS
# conn.set_keepalive_options(("mac", 30))

conn.connect('user', 'pass', wait=True)

# Connection will detect network failures faster
conn.send(body='Test message', destination='/queue/test')

Synchronous Connection Patterns

import stomp

conn = stomp.Connection([('broker1.com', 61613), ('broker2.com', 61613)])

# Wait for connection with timeout
if conn.wait_for_connection(timeout=10.0):
    print("Connected successfully")
    
    # Perform operations
    conn.send(body='Connected!', destination='/queue/status')
    
else:
    print("Connection timeout - trying alternative approach")
    # Fallback logic

Testing with Custom Transport

import stomp
from unittest.mock import Mock

# Mock transport for testing
def mock_thread_creator(callback):
    # Don't create real threads in tests
    return Mock()

conn = stomp.Connection([('localhost', 61613)])
conn.override_threading(mock_thread_creator)

# Now stomp.py won't create background threads
# Useful for deterministic testing
conn.connect('test', 'test', wait=True)

Install with Tessl CLI

npx tessl i tessl/pypi-stomp-py

docs

adapters.md

cli.md

connections.md

index.md

listeners.md

protocol.md

types.md

utilities.md

websocket.md

tile.json