Python STOMP client library supporting versions 1.0, 1.1 and 1.2 of the protocol
Specialized transport adapters extending stomp.py beyond traditional TCP connections to support multicast, broadcast messaging, and alternative transport mechanisms for specific deployment scenarios.
STOMP messaging over UDP multicast transport enabling broker-less messaging patterns and distributed system coordination without central message broker infrastructure.
class MulticastConnection:
def __init__(self,
host_and_ports=None,
prefer_localhost=True,
try_loopback_connect=True,
reconnect_sleep_initial=0.1,
reconnect_sleep_increase=0.5,
reconnect_sleep_jitter=0.1,
reconnect_sleep_max=60.0,
reconnect_attempts_max=3,
timeout=None,
heartbeats=(0, 0),
keepalive=None,
vhost=None,
auto_decode=True,
encoding="utf-8",
auto_content_length=True,
heart_beat_receive_scale=1.5,
bind_host_port=None,
multicast_group="224.1.2.3",
multicast_port=61616):
"""
Create multicast STOMP connection.
Parameters:
- host_and_ports: list, multicast endpoints (optional for multicast)
- prefer_localhost: bool, prefer localhost connections
- try_loopback_connect: bool, try loopback if localhost fails
- reconnect_sleep_initial: float, initial reconnect delay
- reconnect_sleep_increase: float, delay increase factor
- reconnect_sleep_jitter: float, random delay variation
- reconnect_sleep_max: float, maximum reconnect delay
- reconnect_attempts_max: int, maximum reconnect attempts
- timeout: float, socket timeout in seconds
- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
- keepalive: bool, enable keepalive (N/A for multicast)
- vhost: str, virtual host name
- auto_decode: bool, automatically decode message bodies
- encoding: str, text encoding for messages
- auto_content_length: bool, automatically set content-length header
- heart_beat_receive_scale: float, heartbeat timeout scale factor
- bind_host_port: tuple, local bind address
- multicast_group: str, multicast group IP address
- multicast_port: int, multicast port number
"""
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
"""
Join multicast group for STOMP messaging.
Parameters:
- username: str, authentication username (optional for multicast)
- passcode: str, authentication password (optional for multicast)
- wait: bool, wait for connection confirmation
- headers: dict, additional connection headers
- **keyword_headers: additional headers as keyword arguments
Note: Authentication may not apply in multicast scenarios
"""
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""
Leave multicast group.
Parameters:
- receipt: str, receipt ID for disconnect confirmation
- headers: dict, additional disconnect headers
- **keyword_headers: additional headers as keyword arguments
"""
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
"""
Send multicast message to all group members.
Parameters:
- body: str, message body
- destination: str, logical destination (for routing/filtering)
- content_type: str, message content type
- headers: dict, message headers
- **keyword_headers: additional headers as keyword arguments
Message is broadcast to all multicast group members.
"""
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
"""
Subscribe to multicast destination pattern.
Parameters:
- destination: str, destination pattern for filtering
- id: str, subscription ID
- ack: str, acknowledgment mode (limited in multicast)
- headers: dict, subscription headers
- **keyword_headers: additional headers as keyword arguments
Note: Acknowledgments have limited meaning in multicast scenarios
"""Low-level multicast transport implementation handling UDP multicast socket operations.
class MulticastTransport:
def __init__(self,
multicast_group="224.1.2.3",
multicast_port=61616,
timeout=None,
bind_host_port=None):
"""
Initialize multicast transport.
Parameters:
- multicast_group: str, multicast IP address
- multicast_port: int, multicast port number
- timeout: float, socket timeout in seconds
- bind_host_port: tuple, local bind address
"""
def start(self):
"""Start multicast transport and join group."""
def stop(self):
"""Stop transport and leave multicast group."""
def send(self, encoded_frame):
"""
Send encoded STOMP frame via multicast.
Parameters:
- encoded_frame: bytes, encoded STOMP frame data
"""
def receive(self):
"""
Receive multicast STOMP frame.
Returns:
bytes: received frame data or None if timeout
"""
def is_connected(self):
"""
Check if multicast socket is active.
Returns:
bool: True if connected to multicast group
"""Enhanced transport configuration options for specialized deployment scenarios.
def override_threading(self, create_thread_fc):
"""
Override thread creation for custom threading libraries.
Parameters:
- create_thread_fc: callable, custom thread creation function
Enables integration with:
- gevent greenlet threads
- asyncio event loops
- Custom thread pools
- Testing frameworks with thread mocking
Example:
def custom_thread_creator(callback):
return gevent.spawn(callback)
conn.override_threading(custom_thread_creator)
"""
def wait_for_connection(self, timeout=None):
"""
Wait for connection establishment with timeout.
Parameters:
- timeout: float, maximum wait time in seconds
Returns:
bool: True if connected within timeout, False otherwise
Useful for synchronous connection patterns and testing.
"""
def set_keepalive_options(self, keepalive_options):
"""
Configure advanced TCP keepalive parameters.
Parameters:
- keepalive_options: tuple, platform-specific keepalive config
Linux format: ("linux", idle_sec, interval_sec, probe_count)
macOS format: ("mac", interval_sec)
Windows format: ("windows", idle_ms, interval_ms)
Examples:
# Linux: 2 hour idle, 75 sec intervals, 9 probes
conn.set_keepalive_options(("linux", 7200, 75, 9))
# macOS: 75 second intervals
conn.set_keepalive_options(("mac", 75))
"""import stomp
from stomp.adapter.multicast import MulticastConnection
# Create multicast connection
conn = MulticastConnection(
multicast_group="224.10.20.30",
multicast_port=61620
)
# Set up message handler
class MulticastListener(stomp.ConnectionListener):
def on_message(self, frame):
print(f"Multicast message: {frame.body}")
print(f"From destination: {frame.headers.get('destination')}")
conn.set_listener('multicast', MulticastListener())
# Join multicast group
conn.connect(wait=True)
# Send message to all group members
conn.send(
body='{"event": "system_alert", "level": "warning"}',
destination='/topic/system-alerts'
)
# Subscribe to specific message types
conn.subscribe('/topic/system-alerts', id='alerts')
# Keep listening for multicast messages
import time
time.sleep(60)
conn.disconnect()import stomp
import json
import time
from stomp.adapter.multicast import MulticastConnection
class ServiceDiscovery:
def __init__(self, service_name, service_port):
self.service_name = service_name
self.service_port = service_port
self.discovered_services = {}
# Setup multicast for service announcements
self.conn = MulticastConnection(
multicast_group="224.0.1.100",
multicast_port=61700
)
self.conn.set_listener('discovery', self)
self.conn.connect(wait=True)
self.conn.subscribe('/topic/service-discovery', id='discovery')
def announce_service(self):
"""Announce this service to the network"""
announcement = {
"service": self.service_name,
"port": self.service_port,
"timestamp": time.time(),
"type": "announcement"
}
self.conn.send(
body=json.dumps(announcement),
destination='/topic/service-discovery'
)
def on_message(self, frame):
"""Handle service discovery messages"""
try:
message = json.loads(frame.body)
if message.get('type') == 'announcement':
service_name = message['service']
self.discovered_services[service_name] = {
'port': message['port'],
'last_seen': message['timestamp']
}
print(f"Discovered service: {service_name}:{message['port']}")
except Exception as e:
print(f"Error processing discovery message: {e}")
def get_service_endpoint(self, service_name):
"""Get endpoint for discovered service"""
return self.discovered_services.get(service_name)
# Usage
discovery = ServiceDiscovery("user-service", 8080)
# Announce this service every 30 seconds
while True:
discovery.announce_service()
time.sleep(30)import stomp
import gevent
from gevent import monkey
monkey.patch_all()
# Custom thread creation for gevent
def gevent_thread_creator(callback):
return gevent.spawn(callback)
conn = stomp.Connection([('broker.com', 61613)])
# Use gevent threads instead of standard threads
conn.override_threading(gevent_thread_creator)
# Now all stomp.py background operations use gevent
conn.connect('user', 'pass', wait=True)
conn.subscribe('/queue/events', id='events')
# Gevent-compatible message handling
class GeventListener(stomp.ConnectionListener):
def on_message(self, frame):
# This runs in a gevent greenlet
gevent.sleep(0) # Yield to other greenlets
print(f"Processing: {frame.body}")
conn.set_listener('main', GeventListener())import stomp
conn = stomp.Connection([('broker.com', 61613)])
# Configure aggressive keepalive for unstable networks
# Linux: 30 sec idle, 10 sec intervals, 6 probes = 90 sec total
conn.set_keepalive_options(("linux", 30, 10, 6))
# Alternative for macOS
# conn.set_keepalive_options(("mac", 30))
conn.connect('user', 'pass', wait=True)
# Connection will detect network failures faster
conn.send(body='Test message', destination='/queue/test')import stomp
conn = stomp.Connection([('broker1.com', 61613), ('broker2.com', 61613)])
# Wait for connection with timeout
if conn.wait_for_connection(timeout=10.0):
print("Connected successfully")
# Perform operations
conn.send(body='Connected!', destination='/queue/status')
else:
print("Connection timeout - trying alternative approach")
# Fallback logicimport stomp
from unittest.mock import Mock
# Mock transport for testing
def mock_thread_creator(callback):
# Don't create real threads in tests
return Mock()
conn = stomp.Connection([('localhost', 61613)])
conn.override_threading(mock_thread_creator)
# Now stomp.py won't create background threads
# Useful for deterministic testing
conn.connect('test', 'test', wait=True)Install with Tessl CLI
npx tessl i tessl/pypi-stomp-py