0
# Transport Adapters
1
2
Specialized transport adapters extending stomp.py beyond traditional TCP connections to support multicast, broadcast messaging, and alternative transport mechanisms for specific deployment scenarios.
3
4
## Capabilities
5
6
### Multicast Adapter
7
8
STOMP messaging over UDP multicast transport enabling broker-less messaging patterns and distributed system coordination without central message broker infrastructure.
9
10
```python { .api }
11
class MulticastConnection:
12
def __init__(self,
13
host_and_ports=None,
14
prefer_localhost=True,
15
try_loopback_connect=True,
16
reconnect_sleep_initial=0.1,
17
reconnect_sleep_increase=0.5,
18
reconnect_sleep_jitter=0.1,
19
reconnect_sleep_max=60.0,
20
reconnect_attempts_max=3,
21
timeout=None,
22
heartbeats=(0, 0),
23
keepalive=None,
24
vhost=None,
25
auto_decode=True,
26
encoding="utf-8",
27
auto_content_length=True,
28
heart_beat_receive_scale=1.5,
29
bind_host_port=None,
30
multicast_group="224.1.2.3",
31
multicast_port=61616):
32
"""
33
Create multicast STOMP connection.
34
35
Parameters:
36
- host_and_ports: list, multicast endpoints (optional for multicast)
37
- prefer_localhost: bool, prefer localhost connections
38
- try_loopback_connect: bool, try loopback if localhost fails
39
- reconnect_sleep_initial: float, initial reconnect delay
40
- reconnect_sleep_increase: float, delay increase factor
41
- reconnect_sleep_jitter: float, random delay variation
42
- reconnect_sleep_max: float, maximum reconnect delay
43
- reconnect_attempts_max: int, maximum reconnect attempts
44
- timeout: float, socket timeout in seconds
45
- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
46
- keepalive: bool, enable keepalive (N/A for multicast)
47
- vhost: str, virtual host name
48
- auto_decode: bool, automatically decode message bodies
49
- encoding: str, text encoding for messages
50
- auto_content_length: bool, automatically set content-length header
51
- heart_beat_receive_scale: float, heartbeat timeout scale factor
52
- bind_host_port: tuple, local bind address
53
- multicast_group: str, multicast group IP address
54
- multicast_port: int, multicast port number
55
"""
56
57
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
58
"""
59
Join multicast group for STOMP messaging.
60
61
Parameters:
62
- username: str, authentication username (optional for multicast)
63
- passcode: str, authentication password (optional for multicast)
64
- wait: bool, wait for connection confirmation
65
- headers: dict, additional connection headers
66
- **keyword_headers: additional headers as keyword arguments
67
68
Note: Authentication may not apply in multicast scenarios
69
"""
70
71
def disconnect(self, receipt=None, headers=None, **keyword_headers):
72
"""
73
Leave multicast group.
74
75
Parameters:
76
- receipt: str, receipt ID for disconnect confirmation
77
- headers: dict, additional disconnect headers
78
- **keyword_headers: additional headers as keyword arguments
79
"""
80
81
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
82
"""
83
Send multicast message to all group members.
84
85
Parameters:
86
- body: str, message body
87
- destination: str, logical destination (for routing/filtering)
88
- content_type: str, message content type
89
- headers: dict, message headers
90
- **keyword_headers: additional headers as keyword arguments
91
92
Message is broadcast to all multicast group members.
93
"""
94
95
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
96
"""
97
Subscribe to multicast destination pattern.
98
99
Parameters:
100
- destination: str, destination pattern for filtering
101
- id: str, subscription ID
102
- ack: str, acknowledgment mode (limited in multicast)
103
- headers: dict, subscription headers
104
- **keyword_headers: additional headers as keyword arguments
105
106
Note: Acknowledgments have limited meaning in multicast scenarios
107
"""
108
```
109
110
### Multicast Transport
111
112
Low-level multicast transport implementation handling UDP multicast socket operations.
113
114
```python { .api }
115
class MulticastTransport:
116
def __init__(self,
117
multicast_group="224.1.2.3",
118
multicast_port=61616,
119
timeout=None,
120
bind_host_port=None):
121
"""
122
Initialize multicast transport.
123
124
Parameters:
125
- multicast_group: str, multicast IP address
126
- multicast_port: int, multicast port number
127
- timeout: float, socket timeout in seconds
128
- bind_host_port: tuple, local bind address
129
"""
130
131
def start(self):
132
"""Start multicast transport and join group."""
133
134
def stop(self):
135
"""Stop transport and leave multicast group."""
136
137
def send(self, encoded_frame):
138
"""
139
Send encoded STOMP frame via multicast.
140
141
Parameters:
142
- encoded_frame: bytes, encoded STOMP frame data
143
"""
144
145
def receive(self):
146
"""
147
Receive multicast STOMP frame.
148
149
Returns:
150
bytes: received frame data or None if timeout
151
"""
152
153
def is_connected(self):
154
"""
155
Check if multicast socket is active.
156
157
Returns:
158
bool: True if connected to multicast group
159
"""
160
```
161
162
### Advanced Transport Configuration
163
164
Enhanced transport configuration options for specialized deployment scenarios.
165
166
```python { .api }
167
def override_threading(self, create_thread_fc):
168
"""
169
Override thread creation for custom threading libraries.
170
171
Parameters:
172
- create_thread_fc: callable, custom thread creation function
173
174
Enables integration with:
175
- gevent greenlet threads
176
- asyncio event loops
177
- Custom thread pools
178
- Testing frameworks with thread mocking
179
180
Example:
181
def custom_thread_creator(callback):
182
return gevent.spawn(callback)
183
184
conn.override_threading(custom_thread_creator)
185
"""
186
187
def wait_for_connection(self, timeout=None):
188
"""
189
Wait for connection establishment with timeout.
190
191
Parameters:
192
- timeout: float, maximum wait time in seconds
193
194
Returns:
195
bool: True if connected within timeout, False otherwise
196
197
Useful for synchronous connection patterns and testing.
198
"""
199
200
def set_keepalive_options(self, keepalive_options):
201
"""
202
Configure advanced TCP keepalive parameters.
203
204
Parameters:
205
- keepalive_options: tuple, platform-specific keepalive config
206
207
Linux format: ("linux", idle_sec, interval_sec, probe_count)
208
macOS format: ("mac", interval_sec)
209
Windows format: ("windows", idle_ms, interval_ms)
210
211
Examples:
212
# Linux: 2 hour idle, 75 sec intervals, 9 probes
213
conn.set_keepalive_options(("linux", 7200, 75, 9))
214
215
# macOS: 75 second intervals
216
conn.set_keepalive_options(("mac", 75))
217
"""
218
```
219
220
## Usage Examples
221
222
### Basic Multicast Messaging
223
224
```python
225
import stomp
226
from stomp.adapter.multicast import MulticastConnection
227
228
# Create multicast connection
229
conn = MulticastConnection(
230
multicast_group="224.10.20.30",
231
multicast_port=61620
232
)
233
234
# Set up message handler
235
class MulticastListener(stomp.ConnectionListener):
236
def on_message(self, frame):
237
print(f"Multicast message: {frame.body}")
238
print(f"From destination: {frame.headers.get('destination')}")
239
240
conn.set_listener('multicast', MulticastListener())
241
242
# Join multicast group
243
conn.connect(wait=True)
244
245
# Send message to all group members
246
conn.send(
247
body='{"event": "system_alert", "level": "warning"}',
248
destination='/topic/system-alerts'
249
)
250
251
# Subscribe to specific message types
252
conn.subscribe('/topic/system-alerts', id='alerts')
253
254
# Keep listening for multicast messages
255
import time
256
time.sleep(60)
257
258
conn.disconnect()
259
```
260
261
### Broker-less Service Discovery
262
263
```python
264
import stomp
265
import json
266
import time
267
from stomp.adapter.multicast import MulticastConnection
268
269
class ServiceDiscovery:
270
def __init__(self, service_name, service_port):
271
self.service_name = service_name
272
self.service_port = service_port
273
self.discovered_services = {}
274
275
# Setup multicast for service announcements
276
self.conn = MulticastConnection(
277
multicast_group="224.0.1.100",
278
multicast_port=61700
279
)
280
self.conn.set_listener('discovery', self)
281
self.conn.connect(wait=True)
282
self.conn.subscribe('/topic/service-discovery', id='discovery')
283
284
def announce_service(self):
285
"""Announce this service to the network"""
286
announcement = {
287
"service": self.service_name,
288
"port": self.service_port,
289
"timestamp": time.time(),
290
"type": "announcement"
291
}
292
293
self.conn.send(
294
body=json.dumps(announcement),
295
destination='/topic/service-discovery'
296
)
297
298
def on_message(self, frame):
299
"""Handle service discovery messages"""
300
try:
301
message = json.loads(frame.body)
302
if message.get('type') == 'announcement':
303
service_name = message['service']
304
self.discovered_services[service_name] = {
305
'port': message['port'],
306
'last_seen': message['timestamp']
307
}
308
print(f"Discovered service: {service_name}:{message['port']}")
309
except Exception as e:
310
print(f"Error processing discovery message: {e}")
311
312
def get_service_endpoint(self, service_name):
313
"""Get endpoint for discovered service"""
314
return self.discovered_services.get(service_name)
315
316
# Usage
317
discovery = ServiceDiscovery("user-service", 8080)
318
319
# Announce this service every 30 seconds
320
while True:
321
discovery.announce_service()
322
time.sleep(30)
323
```
324
325
### Custom Threading Integration
326
327
```python
328
import stomp
329
import gevent
330
from gevent import monkey
331
monkey.patch_all()
332
333
# Custom thread creation for gevent
334
def gevent_thread_creator(callback):
335
return gevent.spawn(callback)
336
337
conn = stomp.Connection([('broker.com', 61613)])
338
339
# Use gevent threads instead of standard threads
340
conn.override_threading(gevent_thread_creator)
341
342
# Now all stomp.py background operations use gevent
343
conn.connect('user', 'pass', wait=True)
344
conn.subscribe('/queue/events', id='events')
345
346
# Gevent-compatible message handling
347
class GeventListener(stomp.ConnectionListener):
348
def on_message(self, frame):
349
# This runs in a gevent greenlet
350
gevent.sleep(0) # Yield to other greenlets
351
print(f"Processing: {frame.body}")
352
353
conn.set_listener('main', GeventListener())
354
```
355
356
### Advanced Keepalive Configuration
357
358
```python
359
import stomp
360
361
conn = stomp.Connection([('broker.com', 61613)])
362
363
# Configure aggressive keepalive for unstable networks
364
# Linux: 30 sec idle, 10 sec intervals, 6 probes = 90 sec total
365
conn.set_keepalive_options(("linux", 30, 10, 6))
366
367
# Alternative for macOS
368
# conn.set_keepalive_options(("mac", 30))
369
370
conn.connect('user', 'pass', wait=True)
371
372
# Connection will detect network failures faster
373
conn.send(body='Test message', destination='/queue/test')
374
```
375
376
### Synchronous Connection Patterns
377
378
```python
379
import stomp
380
381
conn = stomp.Connection([('broker1.com', 61613), ('broker2.com', 61613)])
382
383
# Wait for connection with timeout
384
if conn.wait_for_connection(timeout=10.0):
385
print("Connected successfully")
386
387
# Perform operations
388
conn.send(body='Connected!', destination='/queue/status')
389
390
else:
391
print("Connection timeout - trying alternative approach")
392
# Fallback logic
393
```
394
395
### Testing with Custom Transport
396
397
```python
398
import stomp
399
from unittest.mock import Mock
400
401
# Mock transport for testing
402
def mock_thread_creator(callback):
403
# Don't create real threads in tests
404
return Mock()
405
406
conn = stomp.Connection([('localhost', 61613)])
407
conn.override_threading(mock_thread_creator)
408
409
# Now stomp.py won't create background threads
410
# Useful for deterministic testing
411
conn.connect('test', 'test', wait=True)
412
```