0
# Protocol Operations
1
2
Core STOMP protocol operations including message sending, queue subscription, transaction management, and acknowledgment handling across all supported protocol versions (1.0, 1.1, 1.2).
3
4
## Capabilities
5
6
### Message Operations
7
8
Core messaging functionality for sending and receiving messages through STOMP destinations.
9
10
```python { .api }
11
def send(self, destination, body='', content_type=None, headers=None, **keyword_headers):
12
"""
13
Send message to destination.
14
15
Parameters:
16
- destination: str, destination queue/topic (required)
17
- body: str or bytes, message body content
18
- content_type: str, MIME content type of message
19
- headers: dict, additional message headers
20
- **keyword_headers: additional headers as keyword arguments
21
22
Common headers:
23
- persistent: str, 'true' for persistent messages
24
- priority: str, message priority (0-9)
25
- expires: str, message expiration time
26
- correlation-id: str, correlation identifier
27
- reply-to: str, reply destination
28
- custom headers: any string key-value pairs
29
"""
30
31
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
32
"""
33
Subscribe to destination for message delivery.
34
35
Parameters:
36
- destination: str, destination queue/topic to subscribe to
37
- id: str, unique subscription identifier (auto-generated if None)
38
- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
39
- headers: dict, subscription headers
40
- **keyword_headers: additional headers as keyword arguments
41
42
Acknowledgment modes:
43
- 'auto': automatic acknowledgment (default)
44
- 'client': manual acknowledgment per subscription
45
- 'client-individual': manual acknowledgment per message
46
"""
47
48
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
49
"""
50
Unsubscribe from destination.
51
52
Parameters:
53
- destination: str, destination to unsubscribe from (if no id)
54
- id: str, subscription ID to unsubscribe (preferred method)
55
- headers: dict, unsubscribe headers
56
- **keyword_headers: additional headers as keyword arguments
57
58
Note: Either destination or id must be provided
59
"""
60
```
61
62
### Message Acknowledgment
63
64
Manual message acknowledgment for reliable message processing.
65
66
```python { .api }
67
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
68
"""
69
Acknowledge message processing (STOMP 1.0+).
70
71
Parameters:
72
- id: str, message ID to acknowledge (required)
73
- subscription: str, subscription ID (STOMP 1.1+ only)
74
- transaction: str, transaction ID if within transaction
75
- headers: dict, acknowledgment headers
76
- **keyword_headers: additional headers as keyword arguments
77
78
Used with ack modes 'client' and 'client-individual'
79
"""
80
81
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
82
"""
83
Negative acknowledge message (STOMP 1.1+ only).
84
85
Parameters:
86
- id: str, message ID to nack (required)
87
- subscription: str, subscription ID (STOMP 1.1+ only)
88
- transaction: str, transaction ID if within transaction
89
- headers: dict, nack headers
90
- **keyword_headers: additional headers as keyword arguments
91
92
Signals message processing failure, may trigger redelivery
93
"""
94
```
95
96
### Transaction Management
97
98
Atomic transaction support for grouping multiple operations.
99
100
```python { .api }
101
def begin(self, transaction=None, headers=None, **keyword_headers):
102
"""
103
Begin transaction.
104
105
Parameters:
106
- transaction: str, transaction identifier (auto-generated if None)
107
- headers: dict, begin headers
108
- **keyword_headers: additional headers as keyword arguments
109
110
Returns transaction ID for use in subsequent operations
111
"""
112
113
def commit(self, transaction=None, headers=None, **keyword_headers):
114
"""
115
Commit transaction.
116
117
Parameters:
118
- transaction: str, transaction ID to commit (required)
119
- headers: dict, commit headers
120
- **keyword_headers: additional headers as keyword arguments
121
122
All operations within transaction are atomically applied
123
"""
124
125
def abort(self, transaction=None, headers=None, **keyword_headers):
126
"""
127
Abort transaction.
128
129
Parameters:
130
- transaction: str, transaction ID to abort (required)
131
- headers: dict, abort headers
132
- **keyword_headers: additional headers as keyword arguments
133
134
All operations within transaction are rolled back
135
"""
136
```
137
138
### Receipt Management
139
140
Receipt confirmations for reliable operation acknowledgment.
141
142
```python { .api }
143
def set_receipt(self, receipt_id, value):
144
"""
145
Set receipt handler for confirmation tracking.
146
147
Parameters:
148
- receipt_id: str, receipt identifier
149
- value: any, value associated with receipt
150
"""
151
```
152
153
### Frame Processing
154
155
Low-level STOMP frame operations for advanced usage.
156
157
```python { .api }
158
def send_frame(self, frame):
159
"""
160
Send raw STOMP frame.
161
162
Parameters:
163
- frame: Frame, raw STOMP frame to send
164
"""
165
166
class Frame:
167
"""
168
STOMP frame representation.
169
170
Attributes:
171
- cmd: str, STOMP command (CONNECT, SEND, etc.)
172
- headers: dict, frame headers
173
- body: str or bytes, frame body content
174
"""
175
def __init__(self, cmd=None, headers=None, body=None):
176
self.cmd = cmd
177
self.headers = headers or {}
178
self.body = body
179
```
180
181
## Usage Examples
182
183
### Basic Messaging
184
185
```python
186
import stomp
187
188
conn = stomp.Connection([('localhost', 61613)])
189
conn.connect('user', 'password', wait=True)
190
191
# Send simple message
192
conn.send(
193
body='Hello World',
194
destination='/queue/test'
195
)
196
197
# Send message with headers
198
conn.send(
199
body='Priority message',
200
destination='/queue/important',
201
headers={
202
'priority': '9',
203
'persistent': 'true',
204
'correlation-id': 'msg-001'
205
}
206
)
207
208
# Send JSON message
209
import json
210
data = {'user': 'john', 'action': 'login', 'timestamp': 1234567890}
211
conn.send(
212
body=json.dumps(data),
213
destination='/topic/events',
214
content_type='application/json'
215
)
216
217
conn.disconnect()
218
```
219
220
### Subscription Patterns
221
222
```python
223
import stomp
224
import time
225
226
class MessageProcessor(stomp.ConnectionListener):
227
def on_message(self, frame):
228
print(f"Received from {frame.headers.get('destination', 'unknown')}: {frame.body}")
229
230
conn = stomp.Connection([('localhost', 61613)])
231
processor = MessageProcessor()
232
conn.set_listener('processor', processor)
233
conn.connect('user', 'password', wait=True)
234
235
# Subscribe to queue with auto-acknowledgment
236
conn.subscribe('/queue/orders', id='orders-sub', ack='auto')
237
238
# Subscribe to topic with manual acknowledgment
239
conn.subscribe('/topic/notifications', id='notify-sub', ack='client')
240
241
# Subscribe with custom headers
242
conn.subscribe(
243
destination='/queue/priority',
244
id='priority-sub',
245
ack='client-individual',
246
headers={'selector': "priority > 5"}
247
)
248
249
# Keep connection alive
250
time.sleep(30)
251
252
# Unsubscribe
253
conn.unsubscribe(id='orders-sub')
254
conn.unsubscribe(id='notify-sub')
255
conn.unsubscribe(id='priority-sub')
256
257
conn.disconnect()
258
```
259
260
### Manual Acknowledgment
261
262
```python
263
import stomp
264
import time
265
266
class ManualAckProcessor(stomp.ConnectionListener):
267
def __init__(self, connection):
268
self.connection = connection
269
270
def on_message(self, frame):
271
message_id = frame.headers.get('message-id')
272
subscription_id = frame.headers.get('subscription')
273
274
try:
275
# Process message
276
self.process_message(frame.body)
277
278
# Acknowledge successful processing
279
self.connection.ack(message_id, subscription_id)
280
print(f"Acknowledged message {message_id}")
281
282
except Exception as e:
283
print(f"Processing failed: {e}")
284
285
# Negative acknowledge (STOMP 1.1+)
286
if hasattr(self.connection, 'nack'):
287
self.connection.nack(message_id, subscription_id)
288
print(f"Nacked message {message_id}")
289
290
def process_message(self, body):
291
# Simulate message processing
292
if 'error' in body.lower():
293
raise ValueError("Simulated processing error")
294
295
print(f"Successfully processed: {body}")
296
297
conn = stomp.Connection11([('localhost', 61613)]) # STOMP 1.1 for NACK support
298
processor = ManualAckProcessor(conn)
299
conn.set_listener('processor', processor)
300
conn.connect('user', 'password', wait=True)
301
302
# Subscribe with client-individual acknowledgment
303
conn.subscribe('/queue/work', id='work-sub', ack='client-individual')
304
305
time.sleep(60) # Process messages for 1 minute
306
conn.disconnect()
307
```
308
309
### Transaction Example
310
311
```python
312
import stomp
313
import uuid
314
315
conn = stomp.Connection([('localhost', 61613)])
316
conn.connect('user', 'password', wait=True)
317
318
# Begin transaction
319
tx_id = str(uuid.uuid4())
320
conn.begin(tx_id)
321
322
try:
323
# Send multiple messages in transaction
324
conn.send(
325
body='Order created',
326
destination='/queue/orders',
327
transaction=tx_id
328
)
329
330
conn.send(
331
body='Inventory updated',
332
destination='/queue/inventory',
333
transaction=tx_id
334
)
335
336
conn.send(
337
body='Email notification',
338
destination='/queue/notifications',
339
transaction=tx_id
340
)
341
342
# Simulate business logic
343
if all_operations_successful():
344
conn.commit(tx_id)
345
print("Transaction committed successfully")
346
else:
347
conn.abort(tx_id)
348
print("Transaction aborted")
349
350
except Exception as e:
351
# Abort transaction on error
352
conn.abort(tx_id)
353
print(f"Transaction aborted due to error: {e}")
354
355
conn.disconnect()
356
357
def all_operations_successful():
358
# Simulate business validation
359
return True
360
```
361
362
### Receipt Confirmation
363
364
```python
365
import stomp
366
import uuid
367
import time
368
369
class ReceiptHandler(stomp.ConnectionListener):
370
def __init__(self):
371
self.pending_receipts = {}
372
373
def on_receipt(self, frame):
374
receipt_id = frame.headers.get('receipt-id')
375
if receipt_id in self.pending_receipts:
376
print(f"Receipt confirmed: {receipt_id}")
377
self.pending_receipts[receipt_id] = True
378
379
def wait_for_receipt(self, receipt_id, timeout=10):
380
"""Wait for specific receipt confirmation."""
381
end_time = time.time() + timeout
382
383
while time.time() < end_time:
384
if self.pending_receipts.get(receipt_id):
385
return True
386
time.sleep(0.1)
387
388
return False
389
390
conn = stomp.Connection([('localhost', 61613)])
391
receipt_handler = ReceiptHandler()
392
conn.set_listener('receipt_handler', receipt_handler)
393
conn.connect('user', 'password', wait=True)
394
395
# Send message with receipt
396
receipt_id = str(uuid.uuid4())
397
receipt_handler.pending_receipts[receipt_id] = False
398
399
conn.send(
400
body='Important message',
401
destination='/queue/critical',
402
receipt=receipt_id
403
)
404
405
# Wait for delivery confirmation
406
if receipt_handler.wait_for_receipt(receipt_id, timeout=30):
407
print("Message delivery confirmed")
408
else:
409
print("Message delivery confirmation timeout")
410
411
conn.disconnect()
412
```
413
414
### Protocol Version Specific Features
415
416
```python
417
import stomp
418
419
# STOMP 1.0 - Basic functionality
420
conn10 = stomp.Connection10([('localhost', 61613)])
421
conn10.connect('user', 'password', wait=True)
422
conn10.send(body='STOMP 1.0 message', destination='/queue/test')
423
# No NACK support in 1.0
424
conn10.disconnect()
425
426
# STOMP 1.1 - Heartbeats and NACK
427
conn11 = stomp.Connection11(
428
[('localhost', 61613)],
429
heartbeats=(10000, 10000) # 10 second heartbeats
430
)
431
conn11.connect('user', 'password', wait=True)
432
433
# NACK support in 1.1+
434
class NackCapableProcessor(stomp.ConnectionListener):
435
def __init__(self, connection):
436
self.connection = connection
437
438
def on_message(self, frame):
439
message_id = frame.headers.get('message-id')
440
subscription_id = frame.headers.get('subscription')
441
442
try:
443
# Process message
444
pass
445
except Exception:
446
# NACK available in STOMP 1.1+
447
self.connection.nack(message_id, subscription_id)
448
449
processor = NackCapableProcessor(conn11)
450
conn11.set_listener('processor', processor)
451
conn11.subscribe('/queue/test', id='test-sub', ack='client-individual')
452
453
time.sleep(10)
454
conn11.disconnect()
455
456
# STOMP 1.2 - Enhanced header escaping
457
conn12 = stomp.Connection12([('localhost', 61613)])
458
conn12.connect('user', 'password', wait=True)
459
460
# STOMP 1.2 handles special characters in headers properly
461
conn12.send(
462
body='Message with special header',
463
destination='/queue/test',
464
headers={
465
'custom-header': 'value with\nspecial\tcharacters', # Automatically escaped
466
'correlation-id': 'msg:with:colons'
467
}
468
)
469
470
conn12.disconnect()
471
```