0
# Event Handling
1
2
Listener-based event system for handling connection events, message delivery, error conditions, and protocol-specific events with built-in listeners for common use cases like statistics tracking, debugging, and synchronous operations.
3
4
## Capabilities
5
6
### Base Connection Listener
7
8
Abstract base class defining the event handling interface for all connection events.
9
10
```python { .api }
11
class ConnectionListener:
12
def on_connecting(self, host_and_port):
13
"""
14
Called when TCP connection is established.
15
16
Parameters:
17
- host_and_port: tuple, (hostname, port) of connected broker
18
"""
19
20
def on_connected(self, frame):
21
"""
22
Called when STOMP connection is established.
23
24
Parameters:
25
- frame: Frame, CONNECTED frame from broker
26
"""
27
28
def on_disconnecting(self):
29
"""
30
Called before DISCONNECT frame is sent.
31
"""
32
33
def on_disconnected(self):
34
"""
35
Called when connection is lost or closed.
36
"""
37
38
def on_heartbeat_timeout(self):
39
"""
40
Called when heartbeat timeout occurs.
41
"""
42
43
def on_before_message(self, frame):
44
"""
45
Called before message processing.
46
47
Parameters:
48
- frame: Frame, message frame
49
50
Returns:
51
tuple: (headers, body) for processing, or None to skip
52
"""
53
54
def on_message(self, frame):
55
"""
56
Called when message is received.
57
58
Parameters:
59
- frame: Frame, message frame with headers and body
60
"""
61
62
def on_receipt(self, frame):
63
"""
64
Called when receipt confirmation is received.
65
66
Parameters:
67
- frame: Frame, receipt frame
68
"""
69
70
def on_error(self, frame):
71
"""
72
Called when error frame is received.
73
74
Parameters:
75
- frame: Frame, error frame with error details
76
"""
77
78
def on_send(self, frame):
79
"""
80
Called when frame is sent to broker.
81
82
Parameters:
83
- frame: Frame, outgoing frame
84
"""
85
86
def on_heartbeat(self):
87
"""
88
Called when heartbeat is received.
89
"""
90
91
def on_receiver_loop_completed(self, frame):
92
"""
93
Called when receiver loop completes.
94
95
Parameters:
96
- frame: Frame, final frame when receiver loop completes
97
"""
98
```
99
100
### Statistics Listener
101
102
Tracks connection statistics and metrics for monitoring and debugging.
103
104
```python { .api }
105
class StatsListener(ConnectionListener):
106
def __init__(self):
107
"""Initialize statistics tracking listener."""
108
self.errors = 0
109
self.connections = 0
110
self.disconnects = 0
111
self.messages = 0
112
self.messages_sent = 0
113
self.heartbeat_timeouts = 0
114
self.heartbeat_count = 0
115
116
def __str__(self) -> str:
117
"""
118
Get formatted statistics summary.
119
120
Returns:
121
str: formatted statistics
122
"""
123
124
def on_connecting(self, host_and_port):
125
"""Increment connection counter."""
126
127
def on_disconnected(self):
128
"""Increment disconnect counter."""
129
130
def on_message(self, frame):
131
"""Increment message received counter."""
132
133
def on_send(self, frame):
134
"""Increment message sent counter."""
135
136
def on_error(self, frame):
137
"""Increment error counter."""
138
139
def on_heartbeat_timeout(self):
140
"""Increment heartbeat timeout counter."""
141
142
def on_heartbeat(self):
143
"""Increment heartbeat received counter."""
144
```
145
146
### Waiting Listener
147
148
Synchronous listener that waits for specific events like receipts or disconnection.
149
150
```python { .api }
151
class WaitingListener(ConnectionListener):
152
def __init__(self, receipt):
153
"""
154
Initialize waiting listener for specific receipt.
155
156
Parameters:
157
- receipt: str, receipt ID to wait for
158
"""
159
160
def wait_on_receipt(self, timeout=10):
161
"""
162
Wait for receipt confirmation.
163
164
Parameters:
165
- timeout: float, timeout in seconds
166
167
Returns:
168
bool: True if receipt received, False if timeout
169
"""
170
171
def wait_on_disconnected(self, timeout=10):
172
"""
173
Wait for disconnection event.
174
175
Parameters:
176
- timeout: float, timeout in seconds
177
178
Returns:
179
bool: True if disconnected, False if timeout
180
"""
181
182
def on_receipt(self, frame):
183
"""Handle receipt and signal waiting threads."""
184
185
def on_disconnected(self):
186
"""Handle disconnection and signal waiting threads."""
187
```
188
189
### Printing Listener
190
191
Debug listener that prints all connection events to console or log.
192
193
```python { .api }
194
class PrintingListener(ConnectionListener):
195
def __init__(self, print_to_log=False):
196
"""
197
Initialize printing listener.
198
199
Parameters:
200
- print_to_log: bool, print to log instead of stdout
201
"""
202
203
def on_connecting(self, host_and_port):
204
"""Print connecting event."""
205
206
def on_connected(self, frame):
207
"""Print connected event with frame details."""
208
209
def on_disconnecting(self):
210
"""Print disconnecting event."""
211
212
def on_disconnected(self):
213
"""Print disconnected event."""
214
215
def on_message(self, frame):
216
"""Print received message details."""
217
218
def on_error(self, frame):
219
"""Print error details."""
220
221
def on_send(self, frame):
222
"""Print sent frame details."""
223
224
def on_receipt(self, frame):
225
"""Print receipt confirmation."""
226
227
def on_heartbeat(self):
228
"""Print heartbeat event."""
229
230
def on_heartbeat_timeout(self):
231
"""Print heartbeat timeout."""
232
```
233
234
### Test Listener
235
236
Combined listener for testing that includes statistics, waiting, and printing functionality.
237
238
```python { .api }
239
class TestListener(StatsListener, WaitingListener, PrintingListener):
240
def __init__(self, receipt=None, print_to_log=True):
241
"""
242
Initialize test listener with combined functionality.
243
244
Parameters:
245
- receipt: str, receipt ID to wait for
246
- print_to_log: bool, print events to log
247
"""
248
self.message_list = []
249
self.timestamp = None
250
251
def wait_for_message(self, timeout=10):
252
"""
253
Wait for next message.
254
255
Parameters:
256
- timeout: float, timeout in seconds
257
258
Returns:
259
Frame: received message frame or None if timeout
260
"""
261
262
def get_latest_message(self):
263
"""
264
Get most recently received message.
265
266
Returns:
267
Frame: latest message frame or None if no messages
268
"""
269
270
def wait_for_heartbeat(self, timeout=10):
271
"""
272
Wait for heartbeat.
273
274
Parameters:
275
- timeout: float, timeout in seconds
276
277
Returns:
278
bool: True if heartbeat received, False if timeout
279
"""
280
281
def on_message(self, frame):
282
"""Store message in list and update timestamp."""
283
```
284
285
### Heartbeat Listener
286
287
Internal listener for managing STOMP heartbeat functionality.
288
289
```python { .api }
290
class HeartbeatListener(ConnectionListener):
291
def __init__(self, transport, heartbeats, heart_beat_receive_scale=1.5):
292
"""
293
Initialize heartbeat management.
294
295
Parameters:
296
- transport: Transport, connection transport
297
- heartbeats: tuple, (send_ms, receive_ms) heartbeat intervals
298
- heart_beat_receive_scale: float, receive timeout scale factor
299
"""
300
301
def on_connected(self, frame):
302
"""Start heartbeat timers after connection."""
303
304
def on_disconnected(self):
305
"""Stop heartbeat timers on disconnection."""
306
307
def on_heartbeat(self):
308
"""Reset receive heartbeat timer."""
309
310
def on_send(self, frame):
311
"""Update send heartbeat timer."""
312
```
313
314
## Usage Examples
315
316
### Custom Message Handler
317
318
```python
319
import stomp
320
321
class MessageHandler(stomp.ConnectionListener):
322
def __init__(self):
323
self.processed_count = 0
324
325
def on_message(self, frame):
326
print(f"Processing message: {frame.body}")
327
328
# Process message based on headers
329
if frame.headers.get('type') == 'order':
330
self.process_order(frame.body)
331
elif frame.headers.get('type') == 'notification':
332
self.send_notification(frame.body)
333
334
self.processed_count += 1
335
336
def on_error(self, frame):
337
print(f"Error occurred: {frame.body}")
338
# Handle error conditions
339
340
def process_order(self, order_data):
341
# Order processing logic
342
pass
343
344
def send_notification(self, notification_data):
345
# Notification logic
346
pass
347
348
# Use custom handler
349
conn = stomp.Connection([('localhost', 61613)])
350
handler = MessageHandler()
351
conn.set_listener('message_handler', handler)
352
conn.connect('user', 'pass', wait=True)
353
conn.subscribe('/queue/orders', id=1)
354
```
355
356
### Multiple Listeners
357
358
```python
359
import stomp
360
361
conn = stomp.Connection([('localhost', 61613)])
362
363
# Add statistics tracking
364
stats = stomp.StatsListener()
365
conn.set_listener('stats', stats)
366
367
# Add debug printing
368
debug = stomp.PrintingListener(print_to_log=True)
369
conn.set_listener('debug', debug)
370
371
# Add custom business logic
372
class BusinessLogic(stomp.ConnectionListener):
373
def on_message(self, frame):
374
# Business processing
375
pass
376
377
business = BusinessLogic()
378
conn.set_listener('business', business)
379
380
conn.connect('user', 'pass', wait=True)
381
382
# Check statistics
383
print(f"Messages received: {stats.messages}")
384
print(f"Errors: {stats.errors}")
385
```
386
387
### Synchronous Operations
388
389
```python
390
import stomp
391
import uuid
392
393
conn = stomp.Connection([('localhost', 61613)])
394
395
# Create waiting listener for receipt
396
receipt_id = str(uuid.uuid4())
397
waiter = stomp.WaitingListener(receipt_id)
398
conn.set_listener('waiter', waiter)
399
400
conn.connect('user', 'pass', wait=True)
401
402
# Send message with receipt and wait for confirmation
403
conn.send(
404
body='Important message',
405
destination='/queue/critical',
406
receipt=receipt_id
407
)
408
409
# Wait for receipt confirmation
410
if waiter.wait_on_receipt(timeout=30):
411
print("Message confirmed delivered")
412
else:
413
print("Delivery confirmation timeout")
414
415
conn.disconnect()
416
```
417
418
### Error Handling
419
420
```python
421
import stomp
422
423
class ErrorHandler(stomp.ConnectionListener):
424
def on_error(self, frame):
425
error_msg = frame.body
426
error_code = frame.headers.get('code', 'unknown')
427
428
print(f"STOMP Error {error_code}: {error_msg}")
429
430
# Handle specific error types
431
if error_code == 'AUTHORIZATION_FAILED':
432
self.handle_auth_error()
433
elif error_code == 'DESTINATION_NOT_FOUND':
434
self.handle_destination_error()
435
else:
436
self.handle_generic_error(error_code, error_msg)
437
438
def on_heartbeat_timeout(self):
439
print("Heartbeat timeout - connection may be lost")
440
# Trigger reconnection logic
441
442
def handle_auth_error(self):
443
# Authentication error handling
444
pass
445
446
def handle_destination_error(self):
447
# Destination error handling
448
pass
449
450
def handle_generic_error(self, code, message):
451
# Generic error handling
452
pass
453
454
conn = stomp.Connection([('localhost', 61613)])
455
error_handler = ErrorHandler()
456
conn.set_listener('error_handler', error_handler)
457
```