0
# Types and Exceptions
1
2
Core types, exception hierarchy, constants, and utility functions for comprehensive error handling, logging, and STOMP protocol compliance across all library components.
3
4
## Capabilities
5
6
### Exception Hierarchy
7
8
Comprehensive exception classes for different error conditions in STOMP operations.
9
10
```python { .api }
11
class StompException(Exception):
12
"""
13
Base exception for all stomp.py related errors.
14
15
All stomp.py exceptions inherit from this base class.
16
"""
17
18
class ConnectionClosedException(StompException):
19
"""
20
Raised when connection is closed by the server.
21
22
Typically occurs when:
23
- Server shuts down gracefully
24
- Connection idle timeout
25
- Server-side connection limits reached
26
"""
27
28
class NotConnectedException(StompException):
29
"""
30
Raised when operation attempted while not connected.
31
32
Occurs when trying to:
33
- Send messages without connection
34
- Subscribe without connection
35
- Perform operations on disconnected client
36
"""
37
38
class ConnectFailedException(StompException):
39
"""
40
Raised when connection attempts exceed maximum retries.
41
42
Indicates:
43
- Network connectivity issues
44
- Authentication failures
45
- Server unavailability after max retry attempts
46
"""
47
48
class InterruptedException(StompException):
49
"""
50
Raised when data read operation is interrupted.
51
52
Occurs during:
53
- Thread interruption during I/O
54
- Socket read interruption
55
- Graceful shutdown scenarios
56
"""
57
```
58
59
### Core Data Types
60
61
Essential data structures for STOMP frame handling and message processing.
62
63
```python { .api }
64
class Frame:
65
"""
66
STOMP frame representation containing command, headers, and body.
67
68
Attributes:
69
- cmd: str, STOMP command (CONNECT, SEND, MESSAGE, etc.)
70
- headers: dict, frame headers as key-value pairs
71
- body: str or bytes, frame body content
72
"""
73
def __init__(self, cmd=None, headers=None, body=None):
74
"""
75
Initialize STOMP frame.
76
77
Parameters:
78
- cmd: str, STOMP command
79
- headers: dict, frame headers
80
- body: str or bytes, frame body
81
"""
82
self.cmd = cmd
83
self.headers = headers or {}
84
self.body = body
85
86
def __str__(self) -> str:
87
"""
88
String representation of frame.
89
90
Returns:
91
str: formatted frame representation
92
"""
93
```
94
95
### Protocol Constants
96
97
STOMP protocol constants for commands, headers, and response codes.
98
99
```python { .api }
100
# STOMP Commands
101
CONNECT = "CONNECT"
102
STOMP = "STOMP" # STOMP 1.2 connection command
103
CONNECTED = "CONNECTED"
104
DISCONNECT = "DISCONNECT"
105
SEND = "SEND"
106
SUBSCRIBE = "SUBSCRIBE"
107
UNSUBSCRIBE = "UNSUBSCRIBE"
108
ACK = "ACK"
109
NACK = "NACK" # STOMP 1.1+
110
BEGIN = "BEGIN"
111
COMMIT = "COMMIT"
112
ABORT = "ABORT"
113
MESSAGE = "MESSAGE"
114
RECEIPT = "RECEIPT"
115
ERROR = "ERROR"
116
117
# Standard Headers
118
CONTENT_LENGTH = "content-length"
119
CONTENT_TYPE = "content-type"
120
DESTINATION = "destination"
121
MESSAGE_ID = "message-id"
122
SUBSCRIPTION = "subscription"
123
TRANSACTION = "transaction"
124
RECEIPT_ID = "receipt-id"
125
ACK_MODE = "ack"
126
HOST = "host"
127
VERSION = "version"
128
HEARTBEAT = "heart-beat"
129
SESSION = "session"
130
SERVER = "server"
131
132
# Acknowledgment Modes
133
ACK_AUTO = "auto"
134
ACK_CLIENT = "client"
135
ACK_CLIENT_INDIVIDUAL = "client-individual"
136
137
# Protocol Versions
138
PROTOCOL_10 = "1.0"
139
PROTOCOL_11 = "1.1"
140
PROTOCOL_12 = "1.2"
141
# STOMP Commands
142
CMD_CONNECT = "CONNECT"
143
CMD_STOMP = "STOMP"
144
CMD_DISCONNECT = "DISCONNECT"
145
CMD_SEND = "SEND"
146
CMD_SUBSCRIBE = "SUBSCRIBE"
147
CMD_UNSUBSCRIBE = "UNSUBSCRIBE"
148
CMD_ACK = "ACK"
149
CMD_NACK = "NACK"
150
CMD_BEGIN = "BEGIN"
151
CMD_COMMIT = "COMMIT"
152
CMD_ABORT = "ABORT"
153
154
# STOMP Headers
155
HDR_ACCEPT_VERSION = "accept-version"
156
HDR_HOST = "host"
157
HDR_LOGIN = "login"
158
HDR_PASSCODE = "passcode"
159
HDR_HEARTBEAT = "heart-beat"
160
HDR_DESTINATION = "destination"
161
HDR_CONTENT_TYPE = "content-type"
162
HDR_CONTENT_LENGTH = "content-length"
163
HDR_MESSAGE_ID = "message-id"
164
HDR_SUBSCRIPTION = "subscription"
165
HDR_ACK = "ack"
166
HDR_ID = "id"
167
HDR_TRANSACTION = "transaction"
168
HDR_RECEIPT = "receipt"
169
```
170
171
### Utility Functions
172
173
Core utility functions for frame processing, encoding, and protocol operations.
174
175
```python { .api }
176
def parse_frame(data):
177
"""
178
Parse byte data into STOMP frame.
179
180
Parameters:
181
- data: bytes, raw frame data
182
183
Returns:
184
Frame: parsed STOMP frame
185
"""
186
187
def convert_frame(frame):
188
"""
189
Convert Frame object to byte representation.
190
191
Parameters:
192
- frame: Frame, frame to convert
193
194
Returns:
195
bytes: frame as byte sequence
196
"""
197
198
def parse_headers(lines):
199
"""
200
Parse header lines into dictionary.
201
202
Parameters:
203
- lines: list, header lines
204
205
Returns:
206
dict: parsed headers
207
"""
208
209
def encode(char):
210
"""
211
Encode string to bytes using UTF-8.
212
213
Parameters:
214
- char: str, string to encode
215
216
Returns:
217
bytes: encoded bytes
218
"""
219
220
def decode(byte_data):
221
"""
222
Decode bytes to string using UTF-8.
223
224
Parameters:
225
- byte_data: bytes, data to decode
226
227
Returns:
228
str: decoded string
229
"""
230
231
def merge_headers(headers1, headers2, *additional_headers):
232
"""
233
Merge multiple header dictionaries.
234
235
Parameters:
236
- headers1: dict, first header set
237
- headers2: dict, second header set
238
- *additional_headers: additional header dictionaries
239
240
Returns:
241
dict: merged headers
242
"""
243
244
def clean_headers(headers):
245
"""
246
Clean headers for logging (remove sensitive data).
247
248
Parameters:
249
- headers: dict, headers to clean
250
251
Returns:
252
dict: headers with sensitive values removed
253
"""
254
255
def get_uuid():
256
"""
257
Generate unique identifier.
258
259
Returns:
260
str: UUID string
261
"""
262
263
def calculate_heartbeats(heartbeat_tuple, server_heartbeats):
264
"""
265
Calculate negotiated heartbeat intervals.
266
267
Parameters:
268
- heartbeat_tuple: tuple, client (send_ms, receive_ms)
269
- server_heartbeats: tuple, server (send_ms, receive_ms)
270
271
Returns:
272
tuple: negotiated (send_ms, receive_ms)
273
"""
274
275
def is_localhost(host):
276
"""
277
Check if host is localhost.
278
279
Parameters:
280
- host: str, hostname to check
281
282
Returns:
283
bool: True if localhost, False otherwise
284
"""
285
286
def length(obj):
287
"""
288
Null-safe length function.
289
290
Parameters:
291
- obj: any, object to measure
292
293
Returns:
294
int: length or 0 if None
295
"""
296
```
297
298
### Logging Functions
299
300
Configurable logging system for debugging and monitoring.
301
302
```python { .api }
303
def debug(msg, *args):
304
"""
305
Log debug message.
306
307
Parameters:
308
- msg: str, debug message
309
- *args: additional formatting arguments
310
"""
311
312
def info(msg, *args):
313
"""
314
Log info message.
315
316
Parameters:
317
- msg: str, info message
318
- *args: additional formatting arguments
319
"""
320
321
def warning(msg, *args):
322
"""
323
Log warning message.
324
325
Parameters:
326
- msg: str, warning message
327
- *args: additional formatting arguments
328
"""
329
330
def error(msg, *args):
331
"""
332
Log error message.
333
334
Parameters:
335
- msg: str, error message
336
- *args: additional formatting arguments
337
"""
338
339
def setLevel(level):
340
"""
341
Set logging level.
342
343
Parameters:
344
- level: int, logging level (DEBUG, INFO, WARNING, ERROR)
345
"""
346
347
def isEnabledFor(level):
348
"""
349
Check if logging level is enabled.
350
351
Parameters:
352
- level: int, logging level to check
353
354
Returns:
355
bool: True if level is enabled
356
"""
357
358
def log_to_stdout():
359
"""
360
Configure logging to output to stdout.
361
"""
362
363
# Logging levels
364
DEBUG = 10
365
INFO = 20
366
WARNING = 30
367
ERROR = 40
368
```
369
370
### Color Constants
371
372
ANSI color codes for terminal output formatting.
373
374
```python { .api }
375
GREEN = '\033[92m' # Green text
376
RED = '\033[91m' # Red text
377
BOLD = '\033[1m' # Bold text
378
NO_COLOUR = '\033[0m' # Reset formatting
379
```
380
381
### Utility Constants
382
383
Common constants and patterns used throughout the library.
384
385
```python { .api }
386
NULL = b'\x00' # Null byte terminator
387
LOCALHOST_NAMES = ['localhost', '127.0.0.1', '::1'] # Local host identifiers
388
389
# Regular expressions for frame parsing
390
HEADER_LINE_RE = re.compile(b'^([^:]+):(.*)$') # Header line pattern
391
PREAMBLE_END_RE = re.compile(b'\r?\n\r?\n') # Preamble end pattern
392
LINE_END_RE = re.compile(b'\r?\n') # Line end pattern
393
394
# Special frames
395
HEARTBEAT_FRAME = Frame(cmd=None, headers={}, body=None) # Heartbeat frame
396
```
397
398
## Usage Examples
399
400
### Exception Handling
401
402
```python
403
import stomp
404
from stomp.exception import *
405
406
try:
407
conn = stomp.Connection([('localhost', 61613)])
408
conn.connect('user', 'wrong_password', wait=True)
409
410
except ConnectFailedException as e:
411
print(f"Failed to connect after retries: {e}")
412
# Handle connection failure
413
414
except NotConnectedException as e:
415
print(f"Operation attempted while disconnected: {e}")
416
# Handle disconnected state
417
418
except ConnectionClosedException as e:
419
print(f"Connection closed by server: {e}")
420
# Handle server-side disconnection
421
422
except StompException as e:
423
print(f"General STOMP error: {e}")
424
# Handle any other STOMP-related error
425
426
except Exception as e:
427
print(f"Unexpected error: {e}")
428
# Handle non-STOMP errors
429
```
430
431
### Custom Frame Processing
432
433
```python
434
import stomp
435
from stomp.utils import Frame, parse_frame, convert_frame
436
437
# Create custom frame
438
custom_frame = Frame(
439
cmd='SEND',
440
headers={
441
'destination': '/queue/custom',
442
'content-type': 'application/json',
443
'custom-header': 'custom-value'
444
},
445
body='{"message": "custom data"}'
446
)
447
448
# Convert to bytes for transmission
449
frame_bytes = convert_frame(custom_frame)
450
print(f"Frame bytes: {frame_bytes}")
451
452
# Parse frame from bytes
453
parsed_frame = parse_frame(frame_bytes)
454
print(f"Parsed command: {parsed_frame.cmd}")
455
print(f"Parsed headers: {parsed_frame.headers}")
456
print(f"Parsed body: {parsed_frame.body}")
457
458
# Use with connection
459
conn = stomp.Connection([('localhost', 61613)])
460
conn.connect('user', 'password', wait=True)
461
conn.send_frame(custom_frame)
462
conn.disconnect()
463
```
464
465
### Logging Configuration
466
467
```python
468
import stomp
469
import stomp.logging as stomp_logging
470
471
# Configure logging level
472
stomp_logging.setLevel(stomp_logging.DEBUG)
473
474
# Enable stdout logging
475
stomp_logging.log_to_stdout()
476
477
# Check logging levels
478
if stomp_logging.isEnabledFor(stomp_logging.DEBUG):
479
stomp_logging.debug("Debug logging enabled")
480
481
# Use in application
482
class LoggingListener(stomp.ConnectionListener):
483
def on_connecting(self, host_and_port):
484
stomp_logging.info(f"Connecting to {host_and_port}")
485
486
def on_connected(self, frame):
487
stomp_logging.info("Successfully connected")
488
489
def on_message(self, frame):
490
stomp_logging.debug(f"Received message: {frame.body}")
491
492
def on_error(self, frame):
493
stomp_logging.error(f"Error occurred: {frame.body}")
494
495
def on_disconnected(self):
496
stomp_logging.warning("Connection lost")
497
498
conn = stomp.Connection([('localhost', 61613)])
499
logger = LoggingListener()
500
conn.set_listener('logger', logger)
501
conn.connect('user', 'password', wait=True)
502
```
503
504
### Utility Functions Usage
505
506
```python
507
import stomp
508
from stomp.utils import *
509
510
# Generate unique identifiers
511
message_id = get_uuid()
512
transaction_id = get_uuid()
513
514
# Header manipulation
515
headers1 = {'content-type': 'text/plain', 'priority': '5'}
516
headers2 = {'correlation-id': message_id, 'reply-to': '/queue/replies'}
517
merged = merge_headers(headers1, headers2)
518
519
# Clean sensitive headers for logging
520
sensitive_headers = {'login': 'user', 'passcode': 'secret', 'destination': '/queue/test'}
521
clean = clean_headers(sensitive_headers)
522
print(f"Safe to log: {clean}") # Passwords removed
523
524
# Host checking
525
if is_localhost('127.0.0.1'):
526
print("Using localhost connection")
527
528
# Length checking with null safety
529
safe_length = length(None) # Returns 0
530
string_length = length("hello") # Returns 5
531
532
# Heartbeat calculation
533
client_heartbeats = (10000, 10000) # 10 second intervals
534
server_heartbeats = (5000, 15000) # Server prefers different intervals
535
negotiated = calculate_heartbeats(client_heartbeats, server_heartbeats)
536
print(f"Negotiated heartbeats: {negotiated}")
537
```
538
539
### Error Recovery Patterns
540
541
```python
542
import stomp
543
from stomp.exception import *
544
import time
545
546
class RobustConnection:
547
def __init__(self, hosts, max_retries=3):
548
self.hosts = hosts
549
self.max_retries = max_retries
550
self.connection = None
551
self.connected = False
552
553
def connect_with_retry(self, username, password):
554
"""Connect with automatic retry logic."""
555
retry_count = 0
556
557
while retry_count < self.max_retries:
558
try:
559
self.connection = stomp.Connection(self.hosts)
560
self.connection.connect(username, password, wait=True)
561
self.connected = True
562
print("Connection established")
563
return True
564
565
except ConnectFailedException as e:
566
retry_count += 1
567
print(f"Connection attempt {retry_count} failed: {e}")
568
569
if retry_count < self.max_retries:
570
time.sleep(2 ** retry_count) # Exponential backoff
571
572
except NotConnectedException as e:
573
print(f"Connection not established: {e}")
574
break
575
576
except Exception as e:
577
print(f"Unexpected error: {e}")
578
break
579
580
print(f"Failed to connect after {self.max_retries} attempts")
581
return False
582
583
def send_with_retry(self, body, destination, max_retries=3):
584
"""Send message with retry on failure."""
585
for attempt in range(max_retries):
586
try:
587
if not self.connected:
588
raise NotConnectedException("Not connected to broker")
589
590
self.connection.send(body=body, destination=destination)
591
return True
592
593
except ConnectionClosedException:
594
print("Connection closed, attempting reconnect...")
595
self.connected = False
596
if self.connect_with_retry('user', 'password'):
597
continue # Retry send
598
else:
599
break
600
601
except NotConnectedException:
602
print("Not connected, attempting reconnect...")
603
if self.connect_with_retry('user', 'password'):
604
continue # Retry send
605
else:
606
break
607
608
except Exception as e:
609
print(f"Send attempt {attempt + 1} failed: {e}")
610
if attempt < max_retries - 1:
611
time.sleep(1)
612
613
return False
614
615
def disconnect(self):
616
"""Safe disconnect with error handling."""
617
try:
618
if self.connection and self.connected:
619
self.connection.disconnect()
620
print("Disconnected successfully")
621
except Exception as e:
622
print(f"Error during disconnect: {e}")
623
finally:
624
self.connected = False
625
626
# Usage
627
robust_conn = RobustConnection([('localhost', 61613), ('backup', 61613)])
628
629
if robust_conn.connect_with_retry('user', 'password'):
630
# Send messages with automatic retry
631
robust_conn.send_with_retry('Hello World', '/queue/test')
632
633
# Clean disconnect
634
robust_conn.disconnect()
635
```