0
# Utilities and Helpers
1
2
Core utility functions, constants, logging configuration, and helper methods that support stomp.py's internal operations and provide convenient functionality for advanced use cases.
3
4
## Capabilities
5
6
### Frame Processing Utilities
7
8
Low-level frame manipulation and conversion functions for custom protocol handling.
9
10
```python { .api }
11
def convert_frame(frame):
12
"""
13
Convert Frame object to transmission format.
14
15
Parameters:
16
- frame: Frame, STOMP frame object to convert
17
18
Returns:
19
bytes: encoded frame ready for network transmission
20
21
Handles:
22
- Header encoding and escaping
23
- Body encoding with proper content-length
24
- Protocol version specific formatting
25
"""
26
27
def parse_headers(lines, offset=0):
28
"""
29
Parse STOMP headers from frame lines.
30
31
Parameters:
32
- lines: list, frame lines to parse
33
- offset: int, starting line offset for headers
34
35
Returns:
36
dict: parsed headers as key-value pairs
37
38
Handles:
39
- Header unescaping per protocol version
40
- Duplicate header handling
41
- Invalid header format detection
42
"""
43
44
def pack(pieces):
45
"""
46
Join byte sequences efficiently.
47
48
Parameters:
49
- pieces: iterable, sequence of bytes objects
50
51
Returns:
52
bytes: concatenated byte sequence
53
54
Optimized for frame assembly and network I/O.
55
"""
56
57
def join(chars):
58
"""
59
Join character sequences efficiently.
60
61
Parameters:
62
- chars: iterable, sequence of character strings
63
64
Returns:
65
str: concatenated string
66
67
Optimized for header and command processing.
68
"""
69
```
70
71
### Threading and Execution Utilities
72
73
Thread management and callback execution helpers for customizing stomp.py's concurrency model.
74
75
```python { .api }
76
def default_create_thread(callback):
77
"""
78
Default thread creation function for receiver loops.
79
80
Parameters:
81
- callback: callable, function to execute in new thread
82
83
Returns:
84
Thread: started daemon thread
85
86
Creates daemon threads for background operations:
87
- Message receiver loops
88
- Heartbeat timers
89
- Connection monitoring
90
"""
91
92
def is_eol_default(line):
93
"""
94
Default end-of-line detection for frame parsing.
95
96
Parameters:
97
- line: bytes, line to check for end-of-line marker
98
99
Returns:
100
bool: True if line represents end-of-line
101
102
Protocol-agnostic EOL detection for frame boundaries.
103
"""
104
105
def calculate_heartbeats(send_heartbeat, client_heartbeat):
106
"""
107
Calculate negotiated heartbeat intervals between client and server.
108
109
Parameters:
110
- send_heartbeat: int, client's desired send interval (ms)
111
- client_heartbeat: int, client's desired receive interval (ms)
112
113
Returns:
114
tuple: (negotiated_send_ms, negotiated_receive_ms)
115
116
Implements STOMP heartbeat negotiation algorithm:
117
- 0 means no heartbeat desired
118
- Non-zero values are negotiated to maximum of client/server desires
119
"""
120
121
def get_errno(exception):
122
"""
123
Extract errno from socket or OS exception.
124
125
Parameters:
126
- exception: Exception, OS or socket exception
127
128
Returns:
129
int: errno value or None if not available
130
131
Cross-platform errno extraction for error handling.
132
"""
133
```
134
135
### Protocol Constants
136
137
STOMP protocol constants for commands, headers, and response codes.
138
139
```python { .api }
140
# STOMP Commands
141
CONNECT = "CONNECT"
142
STOMP = "STOMP" # STOMP 1.2 connection command
143
CONNECTED = "CONNECTED"
144
DISCONNECT = "DISCONNECT"
145
SEND = "SEND"
146
SUBSCRIBE = "SUBSCRIBE"
147
UNSUBSCRIBE = "UNSUBSCRIBE"
148
ACK = "ACK"
149
NACK = "NACK" # STOMP 1.1+
150
BEGIN = "BEGIN"
151
COMMIT = "COMMIT"
152
ABORT = "ABORT"
153
MESSAGE = "MESSAGE"
154
RECEIPT = "RECEIPT"
155
ERROR = "ERROR"
156
157
# Standard Headers
158
CONTENT_LENGTH = "content-length"
159
CONTENT_TYPE = "content-type"
160
DESTINATION = "destination"
161
MESSAGE_ID = "message-id"
162
SUBSCRIPTION = "subscription"
163
TRANSACTION = "transaction"
164
RECEIPT_ID = "receipt-id"
165
ACK_MODE = "ack"
166
HOST = "host"
167
VERSION = "version"
168
HEARTBEAT = "heart-beat"
169
SESSION = "session"
170
SERVER = "server"
171
172
# Acknowledgment Modes
173
ACK_AUTO = "auto"
174
ACK_CLIENT = "client"
175
ACK_CLIENT_INDIVIDUAL = "client-individual"
176
177
# Protocol Versions
178
PROTOCOL_10 = "1.0"
179
PROTOCOL_11 = "1.1"
180
PROTOCOL_12 = "1.2"
181
```
182
183
### Logging System
184
185
Configurable logging setup for debugging and monitoring stomp.py operations.
186
187
```python { .api }
188
def log_to_stdout(verbose_logging=True):
189
"""
190
Configure logging output to stdout with optional verbosity.
191
192
Parameters:
193
- verbose_logging: bool, enable detailed debug logging
194
195
When verbose_logging=True:
196
- Shows all frame transmissions
197
- Logs connection state changes
198
- Reports heartbeat activity
199
- Displays protocol negotiation details
200
201
When verbose_logging=False:
202
- Shows only errors and warnings
203
- Logs connection events
204
- Reports critical failures
205
"""
206
207
def log_to_file(filename, verbose_logging=True):
208
"""
209
Configure logging output to file.
210
211
Parameters:
212
- filename: str, path to log file
213
- verbose_logging: bool, enable detailed debug logging
214
215
Creates rotating log file with:
216
- Timestamped entries
217
- Thread identification
218
- Structured frame logging
219
- Error stack traces
220
"""
221
222
def get_logger(name):
223
"""
224
Get named logger for stomp.py components.
225
226
Parameters:
227
- name: str, logger name (e.g., 'stomp.connection', 'stomp.transport')
228
229
Returns:
230
Logger: configured logger instance
231
232
Available loggers:
233
- stomp.connection: connection lifecycle events
234
- stomp.transport: low-level transport operations
235
- stomp.protocol: STOMP protocol processing
236
- stomp.heartbeat: heartbeat management
237
- stomp.listener: listener callback execution
238
"""
239
```
240
241
### Color Constants
242
243
Terminal color constants for CLI output formatting.
244
245
```python { .api }
246
# ANSI Color Codes
247
GREEN = "\33[32m"
248
RED = "\33[31m"
249
YELLOW = "\33[33m"
250
BLUE = "\33[34m"
251
MAGENTA = "\33[35m"
252
CYAN = "\33[36m"
253
WHITE = "\33[37m"
254
255
# Text Formatting
256
BOLD = "\33[1m"
257
UNDERLINE = "\33[4m"
258
ITALIC = "\33[3m"
259
260
# Reset
261
NO_COLOUR = "\33[0m"
262
RESET = NO_COLOUR
263
264
def colorize(text, color):
265
"""
266
Apply color formatting to text.
267
268
Parameters:
269
- text: str, text to colorize
270
- color: str, color constant (GREEN, RED, etc.)
271
272
Returns:
273
str: colorized text with reset code
274
275
Automatically handles color reset to prevent bleeding.
276
"""
277
```
278
279
### Testing Utilities
280
281
Helper functions and classes for testing STOMP applications.
282
283
```python { .api }
284
class TestListener(StatsListener, WaitingListener, PrintingListener):
285
"""
286
Combined listener for testing with message queuing and statistics.
287
288
Inherits from:
289
- StatsListener: connection statistics tracking
290
- WaitingListener: synchronous wait operations
291
- PrintingListener: debug output functionality
292
"""
293
294
def __init__(self, receipt=None, print_to_log=True):
295
"""
296
Initialize test listener with combined functionality.
297
298
Parameters:
299
- receipt: str, receipt ID to wait for (optional)
300
- print_to_log: bool, print events to log instead of stdout
301
"""
302
self.message_list = []
303
self.timestamp = None
304
305
def wait_for_message(self, timeout=10):
306
"""
307
Wait for next message with timeout.
308
309
Parameters:
310
- timeout: float, maximum wait time in seconds
311
312
Returns:
313
Frame: received message frame or None if timeout
314
315
Blocks until message arrives or timeout expires.
316
"""
317
318
def get_latest_message(self):
319
"""
320
Get most recently received message.
321
322
Returns:
323
Frame: latest message frame or None if no messages
324
325
Non-blocking access to latest message for assertions.
326
"""
327
328
def wait_for_heartbeat(self, timeout=10):
329
"""
330
Wait for heartbeat with timeout.
331
332
Parameters:
333
- timeout: float, maximum wait time in seconds
334
335
Returns:
336
bool: True if heartbeat received, False if timeout
337
338
Useful for testing heartbeat negotiation and timing.
339
"""
340
341
def clear_messages(self):
342
"""
343
Clear stored message list.
344
345
Resets message_list to empty for fresh test runs.
346
"""
347
348
def create_test_connection(host='localhost', port=61613, **kwargs):
349
"""
350
Create connection configured for testing.
351
352
Parameters:
353
- host: str, test broker hostname
354
- port: int, test broker port
355
- **kwargs: additional connection parameters
356
357
Returns:
358
Connection: configured test connection
359
360
Pre-configured with:
361
- Short timeouts for fast test execution
362
- Reduced retry attempts
363
- Test-friendly heartbeat settings
364
"""
365
```
366
367
## Usage Examples
368
369
### Custom Frame Processing
370
371
```python
372
import stomp
373
from stomp.utils import convert_frame, parse_headers
374
375
# Custom frame manipulation
376
class CustomProtocolHandler(stomp.ConnectionListener):
377
def on_send(self, frame):
378
# Intercept outgoing frames for custom processing
379
raw_frame = convert_frame(frame)
380
print(f"Sending raw frame: {raw_frame}")
381
382
# Could modify frame here before transmission
383
return frame
384
385
def on_message(self, frame):
386
# Parse custom headers
387
custom_headers = parse_headers([
388
"custom-property:value1",
389
"app-specific:value2"
390
])
391
392
print(f"Custom headers: {custom_headers}")
393
```
394
395
### Advanced Logging Configuration
396
397
```python
398
import stomp
399
import stomp.logging as logging
400
401
# Enable detailed logging
402
logging.log_to_stdout(verbose_logging=True)
403
404
# Create connection with full logging
405
conn = stomp.Connection([('broker.com', 61613)])
406
407
# Get specific logger for custom output
408
transport_logger = logging.get_logger('stomp.transport')
409
transport_logger.info("Custom transport message")
410
411
conn.connect('user', 'pass', wait=True)
412
# Will log all frame exchanges, heartbeats, etc.
413
```
414
415
### Testing Framework Integration
416
417
```python
418
import stomp
419
from stomp.utils import TestListener, create_test_connection
420
import unittest
421
422
class STOMPIntegrationTest(unittest.TestCase):
423
def setUp(self):
424
# Create test connection with fast timeouts
425
self.conn = create_test_connection(
426
timeout=5.0,
427
reconnect_attempts_max=1
428
)
429
430
# Setup test listener
431
self.test_listener = TestListener(print_to_log=True)
432
self.conn.set_listener('test', self.test_listener)
433
434
self.conn.connect('testuser', 'testpass', wait=True)
435
436
def test_message_exchange(self):
437
# Subscribe and wait for subscription
438
self.conn.subscribe('/queue/test', id='test-sub')
439
440
# Send test message
441
self.conn.send(
442
body='test message',
443
destination='/queue/test'
444
)
445
446
# Wait for message with timeout
447
message = self.test_listener.wait_for_message(timeout=5.0)
448
self.assertIsNotNone(message)
449
self.assertEqual(message.body, 'test message')
450
451
# Check statistics
452
stats = str(self.test_listener)
453
self.assertIn('messages: 1', stats.lower())
454
455
def tearDown(self):
456
self.conn.disconnect()
457
```
458
459
### Custom Threading Integration
460
461
```python
462
import stomp
463
from stomp.utils import default_create_thread
464
import threading
465
import queue
466
467
# Custom thread pool for stomp.py operations
468
class ThreadPoolCreator:
469
def __init__(self, pool_size=5):
470
self.pool = []
471
self.task_queue = queue.Queue()
472
473
# Create worker threads
474
for _ in range(pool_size):
475
worker = threading.Thread(target=self._worker, daemon=True)
476
worker.start()
477
self.pool.append(worker)
478
479
def _worker(self):
480
while True:
481
task = self.task_queue.get()
482
if task is None:
483
break
484
try:
485
task()
486
except Exception as e:
487
print(f"Task error: {e}")
488
finally:
489
self.task_queue.task_done()
490
491
def create_thread(self, callback):
492
# Instead of creating new thread, queue the task
493
self.task_queue.put(callback)
494
return threading.current_thread() # Return dummy thread
495
496
# Use custom thread creator
497
thread_creator = ThreadPoolCreator(pool_size=3)
498
conn = stomp.Connection([('broker.com', 61613)])
499
conn.override_threading(thread_creator.create_thread)
500
501
# Now all stomp.py background tasks use the thread pool
502
conn.connect('user', 'pass', wait=True)
503
```
504
505
### Heartbeat Calculation
506
507
```python
508
from stomp.utils import calculate_heartbeats
509
510
# Client wants to send heartbeats every 10 seconds
511
# Client wants to receive heartbeats every 15 seconds
512
client_send = 10000 # ms
513
client_receive = 15000 # ms
514
515
# Server advertises it can send every 5 seconds
516
# Server wants to receive every 8 seconds
517
server_send = 5000 # ms
518
server_receive = 8000 # ms
519
520
# Calculate negotiated heartbeats
521
negotiated = calculate_heartbeats(
522
max(client_send, server_receive), # Send interval
523
max(client_receive, server_send) # Receive interval
524
)
525
526
print(f"Negotiated heartbeats: {negotiated}")
527
# Result: (10000, 15000) - most restrictive wins
528
```
529
530
### Protocol Version Handling
531
532
```python
533
import stomp
534
from stomp.utils import PROTOCOL_10, PROTOCOL_11, PROTOCOL_12
535
536
def create_version_specific_connection(version):
537
"""Create connection for specific STOMP version"""
538
539
if version == PROTOCOL_10:
540
return stomp.Connection10([('broker.com', 61613)])
541
elif version == PROTOCOL_11:
542
return stomp.Connection11([('broker.com', 61613)])
543
elif version == PROTOCOL_12:
544
return stomp.Connection12([('broker.com', 61613)])
545
else:
546
raise ValueError(f"Unsupported version: {version}")
547
548
# Use version-specific features
549
conn_12 = create_version_specific_connection(PROTOCOL_12)
550
conn_12.connect('user', 'pass', wait=True)
551
552
# STOMP 1.2 supports enhanced header escaping
553
conn_12.send(
554
body='Message with\nspecial\rcharacters',
555
destination='/queue/test',
556
headers={'custom-header': 'value\nwith\nlines'}
557
)
558
```