0
# WebSocket Support
1
2
WebSocket transport adapter enabling STOMP messaging over WebSocket connections for browser-based applications, web-friendly messaging patterns, and environments where traditional TCP sockets are not available.
3
4
## Capabilities
5
6
### WebSocket Connection
7
8
STOMP over WebSocket connection supporting all STOMP 1.2 features with WebSocket-specific transport handling.
9
10
```python { .api }
11
class WSConnection:
12
def __init__(self,
13
host_and_ports=None,
14
prefer_localhost=True,
15
try_loopback_connect=True,
16
reconnect_sleep_initial=0.1,
17
reconnect_sleep_increase=0.5,
18
reconnect_sleep_jitter=0.1,
19
reconnect_sleep_max=60.0,
20
reconnect_attempts_max=3,
21
timeout=None,
22
heartbeats=(0, 0),
23
keepalive=None,
24
vhost=None,
25
auto_decode=True,
26
encoding="utf-8",
27
auto_content_length=True,
28
heart_beat_receive_scale=1.5,
29
bind_host_port=None,
30
ws=None,
31
ws_path=None,
32
header=None,
33
binary_mode=False):
34
"""
35
Create WebSocket STOMP connection.
36
37
Parameters:
38
- host_and_ports: list of tuples, WebSocket host/port pairs [('localhost', 8080)]
39
- prefer_localhost: bool, prefer localhost connections
40
- try_loopback_connect: bool, try loopback if localhost fails
41
- reconnect_sleep_initial: float, initial reconnect delay
42
- reconnect_sleep_increase: float, delay increase factor
43
- reconnect_sleep_jitter: float, random delay variation
44
- reconnect_sleep_max: float, maximum reconnect delay
45
- reconnect_attempts_max: int, maximum reconnect attempts
46
- timeout: float, socket timeout in seconds
47
- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
48
- keepalive: bool, enable keepalive
49
- vhost: str, virtual host name
50
- auto_decode: bool, automatically decode message bodies
51
- encoding: str, text encoding for messages
52
- auto_content_length: bool, automatically set content-length header
53
- heart_beat_receive_scale: float, heartbeat timeout scale factor
54
- bind_host_port: tuple, local bind address
55
- ws: WebSocket connection object
56
- ws_path: str, WebSocket path
57
- header: dict, WebSocket headers
58
- binary_mode: bool, use binary WebSocket frames
59
"""
60
61
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
62
"""
63
Connect to STOMP broker via WebSocket.
64
65
Parameters:
66
- username: str, authentication username
67
- passcode: str, authentication password
68
- wait: bool, wait for connection confirmation
69
- headers: dict, additional connection headers
70
- **keyword_headers: additional headers as keyword arguments
71
"""
72
73
def disconnect(self, receipt=None, headers=None, **keyword_headers):
74
"""
75
Disconnect from STOMP broker.
76
77
Parameters:
78
- receipt: str, receipt ID for disconnect confirmation
79
- headers: dict, additional disconnect headers
80
- **keyword_headers: additional headers as keyword arguments
81
"""
82
83
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
84
"""
85
Send message via WebSocket transport.
86
87
Parameters:
88
- body: str, message body
89
- destination: str, destination queue/topic
90
- content_type: str, message content type
91
- headers: dict, message headers
92
- **keyword_headers: additional headers as keyword arguments
93
"""
94
95
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
96
"""
97
Subscribe to destination via WebSocket.
98
99
Parameters:
100
- destination: str, destination queue/topic
101
- id: str, subscription ID
102
- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
103
- headers: dict, subscription headers
104
- **keyword_headers: additional headers as keyword arguments
105
"""
106
107
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
108
"""
109
Unsubscribe from destination.
110
111
Parameters:
112
- destination: str, destination to unsubscribe from
113
- id: str, subscription ID to unsubscribe
114
- headers: dict, unsubscribe headers
115
- **keyword_headers: additional headers as keyword arguments
116
"""
117
118
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
119
"""
120
Acknowledge message.
121
122
Parameters:
123
- id: str, message ID to acknowledge
124
- subscription: str, subscription ID
125
- transaction: str, transaction ID
126
- headers: dict, ack headers
127
- **keyword_headers: additional headers as keyword arguments
128
"""
129
130
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
131
"""
132
Negative acknowledge message.
133
134
Parameters:
135
- id: str, message ID to nack
136
- subscription: str, subscription ID
137
- transaction: str, transaction ID
138
- headers: dict, nack headers
139
- **keyword_headers: additional headers as keyword arguments
140
"""
141
```
142
143
### WebSocket Transport
144
145
Low-level WebSocket transport implementation for STOMP protocol.
146
147
```python { .api }
148
class WSTransport:
149
def __init__(self,
150
url,
151
auto_decode=True,
152
encoding="utf-8",
153
is_eol_fc=None,
154
**kwargs):
155
"""
156
Initialize WebSocket transport.
157
158
Parameters:
159
- url: str, WebSocket URL
160
- auto_decode: bool, automatically decode message bodies
161
- encoding: str, text encoding for messages
162
- is_eol_fc: callable, end-of-line detection function
163
- **kwargs: additional WebSocket parameters
164
"""
165
166
def start(self):
167
"""Start WebSocket connection."""
168
169
def stop(self, timeout=None):
170
"""
171
Stop WebSocket connection.
172
173
Parameters:
174
- timeout: float, stop timeout in seconds
175
"""
176
177
def send(self, frame):
178
"""
179
Send STOMP frame via WebSocket.
180
181
Parameters:
182
- frame: Frame, STOMP frame to send
183
"""
184
185
def receive(self):
186
"""
187
Receive data from WebSocket.
188
189
Returns:
190
bytes: received data
191
"""
192
193
def is_connected(self) -> bool:
194
"""
195
Check if WebSocket is connected.
196
197
Returns:
198
bool: True if connected, False otherwise
199
"""
200
```
201
202
## Usage Examples
203
204
### Basic WebSocket Connection
205
206
```python
207
import stomp
208
209
# Create WebSocket connection
210
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
211
212
# Set up message handler
213
class WSMessageHandler(stomp.ConnectionListener):
214
def on_message(self, frame):
215
print(f"WebSocket message: {frame.body}")
216
217
def on_error(self, frame):
218
print(f"WebSocket error: {frame.body}")
219
220
handler = WSMessageHandler()
221
ws_conn.set_listener('handler', handler)
222
223
# Connect and use
224
ws_conn.connect('user', 'password', wait=True)
225
ws_conn.subscribe('/topic/updates', id=1)
226
ws_conn.send(body='Hello via WebSocket', destination='/topic/chat')
227
228
# Keep connection alive
229
import time
230
time.sleep(10)
231
232
ws_conn.disconnect()
233
```
234
235
### Secure WebSocket (WSS)
236
237
```python
238
import stomp
239
240
# Create secure WebSocket connection
241
ws_conn = stomp.WSConnection('wss://secure-broker.example.com/stomp')
242
243
# WebSocket-specific options
244
ws_conn = stomp.WSConnection(
245
'wss://broker.example.com/stomp',
246
heartbeats=(10000, 10000), # 10 second heartbeats
247
timeout=30, # 30 second timeout
248
reconnect_attempts_max=5 # Max 5 reconnection attempts
249
)
250
251
ws_conn.connect('username', 'password', wait=True)
252
```
253
254
### Browser-Compatible Usage Pattern
255
256
```python
257
import stomp
258
import json
259
260
class BrowserCompatibleHandler(stomp.ConnectionListener):
261
def __init__(self):
262
self.message_queue = []
263
264
def on_message(self, frame):
265
# Handle JSON messages typical in web applications
266
try:
267
data = json.loads(frame.body)
268
self.process_web_message(data, frame.headers)
269
except json.JSONDecodeError:
270
# Handle plain text messages
271
self.process_text_message(frame.body, frame.headers)
272
273
def process_web_message(self, data, headers):
274
# Process structured web messages
275
message_type = data.get('type', 'unknown')
276
277
if message_type == 'notification':
278
self.handle_notification(data)
279
elif message_type == 'update':
280
self.handle_update(data)
281
282
# Store for polling-based retrieval
283
self.message_queue.append({
284
'data': data,
285
'headers': headers,
286
'timestamp': time.time()
287
})
288
289
def process_text_message(self, body, headers):
290
# Handle plain text messages
291
self.message_queue.append({
292
'body': body,
293
'headers': headers,
294
'timestamp': time.time()
295
})
296
297
def get_messages(self):
298
"""Get queued messages for web application."""
299
messages = self.message_queue[:]
300
self.message_queue.clear()
301
return messages
302
303
def handle_notification(self, data):
304
# Handle notification-type messages
305
pass
306
307
def handle_update(self, data):
308
# Handle update-type messages
309
pass
310
311
# Setup WebSocket connection for web app
312
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
313
handler = BrowserCompatibleHandler()
314
ws_conn.set_listener('web_handler', handler)
315
316
# Connect and subscribe to web-friendly topics
317
ws_conn.connect('webapp_user', 'webapp_pass', wait=True)
318
ws_conn.subscribe('/topic/notifications', id=1)
319
ws_conn.subscribe('/topic/updates', id=2)
320
321
# Send JSON message
322
message_data = {
323
'type': 'user_action',
324
'action': 'login',
325
'user_id': '12345',
326
'timestamp': time.time()
327
}
328
329
ws_conn.send(
330
body=json.dumps(message_data),
331
destination='/topic/user_actions',
332
content_type='application/json'
333
)
334
335
# Periodic message retrieval pattern
336
def get_new_messages():
337
return handler.get_messages()
338
```
339
340
### WebSocket with Heartbeats
341
342
```python
343
import stomp
344
import threading
345
import time
346
347
class HeartbeatMonitor(stomp.ConnectionListener):
348
def __init__(self):
349
self.last_heartbeat = time.time()
350
self.connection_healthy = True
351
352
def on_heartbeat(self):
353
self.last_heartbeat = time.time()
354
self.connection_healthy = True
355
print("WebSocket heartbeat received")
356
357
def on_heartbeat_timeout(self):
358
self.connection_healthy = False
359
print("WebSocket heartbeat timeout - connection may be unhealthy")
360
361
def monitor_connection(self):
362
"""Monitor connection health based on heartbeats."""
363
while True:
364
time.sleep(5) # Check every 5 seconds
365
366
if time.time() - self.last_heartbeat > 30: # 30 second threshold
367
print("Connection appears stale")
368
self.connection_healthy = False
369
370
if not self.connection_healthy:
371
print("Connection health check failed")
372
# Trigger reconnection logic
373
break
374
375
# Create WebSocket connection with heartbeats
376
ws_conn = stomp.WSConnection(
377
'ws://localhost:61614/stomp',
378
heartbeats=(10000, 10000) # 10 second heartbeats
379
)
380
381
monitor = HeartbeatMonitor()
382
ws_conn.set_listener('monitor', monitor)
383
384
# Start monitoring in background
385
monitor_thread = threading.Thread(target=monitor.monitor_connection)
386
monitor_thread.daemon = True
387
monitor_thread.start()
388
389
ws_conn.connect('user', 'password', wait=True)
390
```
391
392
### WebSocket Error Handling
393
394
```python
395
import stomp
396
import time
397
398
class WebSocketErrorHandler(stomp.ConnectionListener):
399
def __init__(self, connection):
400
self.connection = connection
401
self.reconnect_attempts = 0
402
self.max_reconnect_attempts = 5
403
404
def on_error(self, frame):
405
error_msg = frame.body
406
print(f"WebSocket STOMP error: {error_msg}")
407
408
# Handle specific WebSocket errors
409
if 'connection refused' in error_msg.lower():
410
self.handle_connection_refused()
411
elif 'unauthorized' in error_msg.lower():
412
self.handle_unauthorized()
413
else:
414
self.handle_generic_error(error_msg)
415
416
def on_disconnected(self):
417
print("WebSocket disconnected")
418
419
if self.reconnect_attempts < self.max_reconnect_attempts:
420
self.attempt_reconnect()
421
else:
422
print("Max reconnection attempts reached")
423
424
def handle_connection_refused(self):
425
print("WebSocket connection refused - broker may be down")
426
time.sleep(5) # Wait before retry
427
428
def handle_unauthorized(self):
429
print("WebSocket authentication failed")
430
# Don't auto-reconnect on auth failures
431
self.max_reconnect_attempts = 0
432
433
def handle_generic_error(self, error_msg):
434
print(f"Generic WebSocket error: {error_msg}")
435
436
def attempt_reconnect(self):
437
self.reconnect_attempts += 1
438
delay = min(self.reconnect_attempts * 2, 30) # Exponential backoff, max 30s
439
440
print(f"Attempting reconnection {self.reconnect_attempts}/{self.max_reconnect_attempts} in {delay}s")
441
time.sleep(delay)
442
443
try:
444
self.connection.connect('user', 'password', wait=True)
445
self.reconnect_attempts = 0 # Reset on successful connection
446
print("WebSocket reconnection successful")
447
except Exception as e:
448
print(f"Reconnection failed: {e}")
449
450
# Setup WebSocket with error handling
451
ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')
452
error_handler = WebSocketErrorHandler(ws_conn)
453
ws_conn.set_listener('error_handler', error_handler)
454
455
ws_conn.connect('user', 'password', wait=True)
456
```