0
# Connection Management
1
2
Comprehensive connection classes supporting all STOMP protocol versions (1.0, 1.1, 1.2) with automatic reconnection, SSL/TLS support, heartbeat handling, and connection pooling for robust message broker connectivity.
3
4
## Capabilities
5
6
### STOMP 1.1 Connection (Default)
7
8
The default connection class supporting STOMP 1.1 protocol with heartbeat negotiation and enhanced error handling.
9
10
```python { .api }
11
class Connection:
12
def __init__(self,
13
host_and_ports=None,
14
prefer_localhost=True,
15
try_loopback_connect=True,
16
reconnect_sleep_initial=0.1,
17
reconnect_sleep_increase=0.5,
18
reconnect_sleep_jitter=0.1,
19
reconnect_sleep_max=60.0,
20
reconnect_attempts_max=3,
21
timeout=None,
22
heartbeats=(0, 0),
23
keepalive=None,
24
vhost=None,
25
auto_decode=True,
26
encoding="utf-8",
27
auto_content_length=True,
28
heart_beat_receive_scale=1.5,
29
bind_host_port=None):
30
"""
31
Create STOMP 1.1 connection.
32
33
Parameters:
34
- host_and_ports: list of tuples, broker addresses [('localhost', 61613)]
35
- prefer_localhost: bool, prefer localhost connections
36
- try_loopback_connect: bool, try loopback if localhost fails
37
- reconnect_sleep_initial: float, initial reconnect delay
38
- reconnect_sleep_increase: float, delay increase factor
39
- reconnect_sleep_jitter: float, random delay variation
40
- reconnect_sleep_max: float, maximum reconnect delay
41
- reconnect_attempts_max: int, maximum reconnect attempts
42
- timeout: float, socket timeout in seconds
43
- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)
44
- keepalive: bool, enable TCP keepalive
45
- vhost: str, virtual host name
46
- auto_decode: bool, automatically decode message bodies
47
- encoding: str, text encoding for messages
48
- auto_content_length: bool, automatically set content-length header
49
- heart_beat_receive_scale: float, heartbeat timeout scale factor
50
- bind_host_port: tuple, local bind address
51
"""
52
53
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
54
"""
55
Connect to STOMP broker.
56
57
Parameters:
58
- username: str, authentication username
59
- passcode: str, authentication password
60
- wait: bool, wait for connection confirmation
61
- headers: dict, additional connection headers
62
- **keyword_headers: additional headers as keyword arguments
63
"""
64
65
def disconnect(self, receipt=None, headers=None, **keyword_headers):
66
"""
67
Disconnect from STOMP broker.
68
69
Parameters:
70
- receipt: str, receipt ID for disconnect confirmation
71
- headers: dict, additional disconnect headers
72
- **keyword_headers: additional headers as keyword arguments
73
"""
74
75
def is_connected(self) -> bool:
76
"""
77
Check if connected to broker.
78
79
Returns:
80
bool: True if connected, False otherwise
81
"""
82
83
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
84
"""
85
Send message to destination.
86
87
Parameters:
88
- body: str, message body
89
- destination: str, destination queue/topic
90
- content_type: str, message content type
91
- headers: dict, message headers
92
- **keyword_headers: additional headers as keyword arguments
93
"""
94
95
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
96
"""
97
Subscribe to destination.
98
99
Parameters:
100
- destination: str, destination queue/topic
101
- id: str, subscription ID
102
- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')
103
- headers: dict, subscription headers
104
- **keyword_headers: additional headers as keyword arguments
105
"""
106
107
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
108
"""
109
Unsubscribe from destination.
110
111
Parameters:
112
- destination: str, destination to unsubscribe from
113
- id: str, subscription ID to unsubscribe
114
- headers: dict, unsubscribe headers
115
- **keyword_headers: additional headers as keyword arguments
116
"""
117
118
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
119
"""
120
Acknowledge message.
121
122
Parameters:
123
- id: str, message ID to acknowledge
124
- subscription: str, subscription ID
125
- transaction: str, transaction ID
126
- headers: dict, ack headers
127
- **keyword_headers: additional headers as keyword arguments
128
"""
129
130
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
131
"""
132
Negative acknowledge message (STOMP 1.1+).
133
134
Parameters:
135
- id: str, message ID to nack
136
- subscription: str, subscription ID
137
- transaction: str, transaction ID
138
- headers: dict, nack headers
139
- **keyword_headers: additional headers as keyword arguments
140
"""
141
```
142
143
### STOMP 1.0 Connection
144
145
Legacy STOMP 1.0 protocol support for compatibility with older message brokers.
146
147
```python { .api }
148
class Connection10:
149
def __init__(self,
150
host_and_ports=None,
151
prefer_localhost=True,
152
try_loopback_connect=True,
153
reconnect_sleep_initial=0.1,
154
reconnect_sleep_increase=0.5,
155
reconnect_sleep_jitter=0.1,
156
reconnect_sleep_max=60.0,
157
reconnect_attempts_max=3,
158
timeout=None,
159
keepalive=None,
160
auto_decode=True,
161
encoding="utf-8",
162
auto_content_length=True,
163
bind_host_port=None):
164
"""
165
Create STOMP 1.0 connection.
166
167
Parameters:
168
- host_and_ports: list of tuples, broker addresses
169
- prefer_localhost: bool, prefer localhost connections
170
- try_loopback_connect: bool, try loopback if localhost fails
171
- reconnect_sleep_initial: float, initial reconnect delay
172
- reconnect_sleep_increase: float, delay increase factor
173
- reconnect_sleep_jitter: float, random delay variation
174
- reconnect_sleep_max: float, maximum reconnect delay
175
- reconnect_attempts_max: int, maximum reconnect attempts
176
- timeout: float, socket timeout in seconds
177
- keepalive: bool, enable TCP keepalive
178
- auto_decode: bool, automatically decode message bodies
179
- encoding: str, text encoding for messages
180
- auto_content_length: bool, automatically set content-length header
181
- bind_host_port: tuple, local bind address
182
"""
183
184
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
185
"""Connect to STOMP 1.0 broker."""
186
187
def disconnect(self, receipt=None, headers=None, **keyword_headers):
188
"""Disconnect from STOMP 1.0 broker."""
189
190
def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):
191
"""Send message via STOMP 1.0."""
192
193
def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):
194
"""Subscribe via STOMP 1.0."""
195
196
def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
197
"""Unsubscribe via STOMP 1.0."""
198
199
def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
200
"""Acknowledge message via STOMP 1.0."""
201
```
202
203
### STOMP 1.2 Connection
204
205
Latest STOMP 1.2 protocol with enhanced header escaping and improved error handling.
206
207
```python { .api }
208
class Connection12:
209
def __init__(self,
210
host_and_ports=None,
211
prefer_localhost=True,
212
try_loopback_connect=True,
213
reconnect_sleep_initial=0.1,
214
reconnect_sleep_increase=0.5,
215
reconnect_sleep_jitter=0.1,
216
reconnect_sleep_max=60.0,
217
reconnect_attempts_max=3,
218
timeout=None,
219
heartbeats=(0, 0),
220
keepalive=None,
221
vhost=None,
222
auto_decode=True,
223
encoding="utf-8",
224
auto_content_length=True,
225
heart_beat_receive_scale=1.5,
226
bind_host_port=None):
227
"""
228
Create STOMP 1.2 connection.
229
230
Parameters: Same as Connection (STOMP 1.1) plus STOMP 1.2 enhancements
231
"""
232
233
def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
234
"""Connect to STOMP 1.2 broker with enhanced error handling."""
235
236
def disconnect(self, receipt=None, headers=None, **keyword_headers):
237
"""Disconnect from STOMP 1.2 broker."""
238
239
def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):
240
"""STOMP 1.2 negative acknowledge with enhanced header escaping."""
241
242
@staticmethod
243
def is_eol(line):
244
"""
245
Check if line is end-of-line marker.
246
247
Parameters:
248
- line: bytes, line to check
249
250
Returns:
251
bool: True if end-of-line
252
"""
253
```
254
255
### Connection Management
256
257
Base connection functionality shared across all protocol versions.
258
259
```python { .api }
260
def set_listener(self, name, listener):
261
"""
262
Set named connection listener.
263
264
Parameters:
265
- name: str, listener name
266
- listener: ConnectionListener, listener instance
267
"""
268
269
def remove_listener(self, name):
270
"""
271
Remove named listener.
272
273
Parameters:
274
- name: str, listener name to remove
275
"""
276
277
def get_listener(self, name):
278
"""
279
Get named listener.
280
281
Parameters:
282
- name: str, listener name
283
284
Returns:
285
ConnectionListener: listener instance or None
286
"""
287
288
def set_ssl(self, for_hosts=(), key_file=None, cert_file=None, ca_certs=None,
289
cert_validator=None, ssl_version=None, password=None, **kwargs):
290
"""
291
Configure SSL/TLS connection.
292
293
Parameters:
294
- for_hosts: tuple, hosts requiring SSL
295
- key_file: str, private key file path
296
- cert_file: str, certificate file path
297
- ca_certs: str, CA certificates file path
298
- cert_validator: callable, certificate validation function
299
- ssl_version: int, SSL protocol version
300
- password: str, private key password
301
- **kwargs: additional SSL parameters
302
"""
303
304
def get_ssl(self, host_and_port=None):
305
"""
306
Get SSL configuration for host.
307
308
Parameters:
309
- host_and_port: tuple, host and port
310
311
Returns:
312
dict: SSL configuration
313
"""
314
```
315
316
### Transaction Support
317
318
Transaction management for atomic message operations.
319
320
```python { .api }
321
def begin(self, transaction=None, headers=None, **keyword_headers):
322
"""
323
Begin transaction.
324
325
Parameters:
326
- transaction: str, transaction ID
327
- headers: dict, begin headers
328
- **keyword_headers: additional headers as keyword arguments
329
"""
330
331
def commit(self, transaction=None, headers=None, **keyword_headers):
332
"""
333
Commit transaction.
334
335
Parameters:
336
- transaction: str, transaction ID to commit
337
- headers: dict, commit headers
338
- **keyword_headers: additional headers as keyword arguments
339
"""
340
341
def abort(self, transaction=None, headers=None, **keyword_headers):
342
"""
343
Abort transaction.
344
345
Parameters:
346
- transaction: str, transaction ID to abort
347
- headers: dict, abort headers
348
- **keyword_headers: additional headers as keyword arguments
349
"""
350
```
351
352
## Usage Examples
353
354
### Basic Connection
355
356
```python
357
import stomp
358
359
# Create connection with reconnection settings
360
conn = stomp.Connection(
361
[('localhost', 61613)],
362
reconnect_sleep_initial=1.0,
363
reconnect_sleep_max=30.0,
364
reconnect_attempts_max=10
365
)
366
367
# Connect with authentication
368
conn.connect('username', 'password', wait=True)
369
370
# Use connection
371
conn.send(body='Hello World', destination='/queue/test')
372
373
# Disconnect
374
conn.disconnect()
375
```
376
377
### SSL Connection
378
379
```python
380
import stomp
381
382
conn = stomp.Connection([('broker.example.com', 61614)])
383
384
# Configure SSL
385
conn.set_ssl(
386
for_hosts=[('broker.example.com', 61614)],
387
key_file='/path/to/client.key',
388
cert_file='/path/to/client.crt',
389
ca_certs='/path/to/ca.crt'
390
)
391
392
conn.connect('username', 'password', wait=True)
393
```
394
395
### Transaction Example
396
397
```python
398
import stomp
399
400
conn = stomp.Connection([('localhost', 61613)])
401
conn.connect('user', 'pass', wait=True)
402
403
# Begin transaction
404
conn.begin('tx-001')
405
406
# Send messages in transaction
407
conn.send(body='Message 1', destination='/queue/test', transaction='tx-001')
408
conn.send(body='Message 2', destination='/queue/test', transaction='tx-001')
409
410
# Commit transaction
411
conn.commit('tx-001')
412
413
conn.disconnect()
414
```