or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdcli.mdconnections.mdindex.mdlisteners.mdprotocol.mdtypes.mdutilities.mdwebsocket.md

connections.mddocs/

0

# Connection Management

1

2

Comprehensive connection classes supporting all STOMP protocol versions (1.0, 1.1, 1.2) with automatic reconnection, SSL/TLS support, heartbeat handling, and connection pooling for robust message broker connectivity.

3

4

## Capabilities

5

6

### STOMP 1.1 Connection (Default)

7

8

The default connection class supporting STOMP 1.1 protocol with heartbeat negotiation and enhanced error handling.

9

10

```python { .api }

11

class Connection:

12

def __init__(self,

13

host_and_ports=None,

14

prefer_localhost=True,

15

try_loopback_connect=True,

16

reconnect_sleep_initial=0.1,

17

reconnect_sleep_increase=0.5,

18

reconnect_sleep_jitter=0.1,

19

reconnect_sleep_max=60.0,

20

reconnect_attempts_max=3,

21

timeout=None,

22

heartbeats=(0, 0),

23

keepalive=None,

24

vhost=None,

25

auto_decode=True,

26

encoding="utf-8",

27

auto_content_length=True,

28

heart_beat_receive_scale=1.5,

29

bind_host_port=None):

30

"""

31

Create STOMP 1.1 connection.

32

33

Parameters:

34

- host_and_ports: list of tuples, broker addresses [('localhost', 61613)]

35

- prefer_localhost: bool, prefer localhost connections

36

- try_loopback_connect: bool, try loopback if localhost fails

37

- reconnect_sleep_initial: float, initial reconnect delay

38

- reconnect_sleep_increase: float, delay increase factor

39

- reconnect_sleep_jitter: float, random delay variation

40

- reconnect_sleep_max: float, maximum reconnect delay

41

- reconnect_attempts_max: int, maximum reconnect attempts

42

- timeout: float, socket timeout in seconds

43

- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)

44

- keepalive: bool, enable TCP keepalive

45

- vhost: str, virtual host name

46

- auto_decode: bool, automatically decode message bodies

47

- encoding: str, text encoding for messages

48

- auto_content_length: bool, automatically set content-length header

49

- heart_beat_receive_scale: float, heartbeat timeout scale factor

50

- bind_host_port: tuple, local bind address

51

"""

52

53

def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):

54

"""

55

Connect to STOMP broker.

56

57

Parameters:

58

- username: str, authentication username

59

- passcode: str, authentication password

60

- wait: bool, wait for connection confirmation

61

- headers: dict, additional connection headers

62

- **keyword_headers: additional headers as keyword arguments

63

"""

64

65

def disconnect(self, receipt=None, headers=None, **keyword_headers):

66

"""

67

Disconnect from STOMP broker.

68

69

Parameters:

70

- receipt: str, receipt ID for disconnect confirmation

71

- headers: dict, additional disconnect headers

72

- **keyword_headers: additional headers as keyword arguments

73

"""

74

75

def is_connected(self) -> bool:

76

"""

77

Check if connected to broker.

78

79

Returns:

80

bool: True if connected, False otherwise

81

"""

82

83

def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):

84

"""

85

Send message to destination.

86

87

Parameters:

88

- body: str, message body

89

- destination: str, destination queue/topic

90

- content_type: str, message content type

91

- headers: dict, message headers

92

- **keyword_headers: additional headers as keyword arguments

93

"""

94

95

def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):

96

"""

97

Subscribe to destination.

98

99

Parameters:

100

- destination: str, destination queue/topic

101

- id: str, subscription ID

102

- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')

103

- headers: dict, subscription headers

104

- **keyword_headers: additional headers as keyword arguments

105

"""

106

107

def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):

108

"""

109

Unsubscribe from destination.

110

111

Parameters:

112

- destination: str, destination to unsubscribe from

113

- id: str, subscription ID to unsubscribe

114

- headers: dict, unsubscribe headers

115

- **keyword_headers: additional headers as keyword arguments

116

"""

117

118

def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

119

"""

120

Acknowledge message.

121

122

Parameters:

123

- id: str, message ID to acknowledge

124

- subscription: str, subscription ID

125

- transaction: str, transaction ID

126

- headers: dict, ack headers

127

- **keyword_headers: additional headers as keyword arguments

128

"""

129

130

def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

131

"""

132

Negative acknowledge message (STOMP 1.1+).

133

134

Parameters:

135

- id: str, message ID to nack

136

- subscription: str, subscription ID

137

- transaction: str, transaction ID

138

- headers: dict, nack headers

139

- **keyword_headers: additional headers as keyword arguments

140

"""

141

```

142

143

### STOMP 1.0 Connection

144

145

Legacy STOMP 1.0 protocol support for compatibility with older message brokers.

146

147

```python { .api }

148

class Connection10:

149

def __init__(self,

150

host_and_ports=None,

151

prefer_localhost=True,

152

try_loopback_connect=True,

153

reconnect_sleep_initial=0.1,

154

reconnect_sleep_increase=0.5,

155

reconnect_sleep_jitter=0.1,

156

reconnect_sleep_max=60.0,

157

reconnect_attempts_max=3,

158

timeout=None,

159

keepalive=None,

160

auto_decode=True,

161

encoding="utf-8",

162

auto_content_length=True,

163

bind_host_port=None):

164

"""

165

Create STOMP 1.0 connection.

166

167

Parameters:

168

- host_and_ports: list of tuples, broker addresses

169

- prefer_localhost: bool, prefer localhost connections

170

- try_loopback_connect: bool, try loopback if localhost fails

171

- reconnect_sleep_initial: float, initial reconnect delay

172

- reconnect_sleep_increase: float, delay increase factor

173

- reconnect_sleep_jitter: float, random delay variation

174

- reconnect_sleep_max: float, maximum reconnect delay

175

- reconnect_attempts_max: int, maximum reconnect attempts

176

- timeout: float, socket timeout in seconds

177

- keepalive: bool, enable TCP keepalive

178

- auto_decode: bool, automatically decode message bodies

179

- encoding: str, text encoding for messages

180

- auto_content_length: bool, automatically set content-length header

181

- bind_host_port: tuple, local bind address

182

"""

183

184

def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):

185

"""Connect to STOMP 1.0 broker."""

186

187

def disconnect(self, receipt=None, headers=None, **keyword_headers):

188

"""Disconnect from STOMP 1.0 broker."""

189

190

def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):

191

"""Send message via STOMP 1.0."""

192

193

def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):

194

"""Subscribe via STOMP 1.0."""

195

196

def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):

197

"""Unsubscribe via STOMP 1.0."""

198

199

def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

200

"""Acknowledge message via STOMP 1.0."""

201

```

202

203

### STOMP 1.2 Connection

204

205

Latest STOMP 1.2 protocol with enhanced header escaping and improved error handling.

206

207

```python { .api }

208

class Connection12:

209

def __init__(self,

210

host_and_ports=None,

211

prefer_localhost=True,

212

try_loopback_connect=True,

213

reconnect_sleep_initial=0.1,

214

reconnect_sleep_increase=0.5,

215

reconnect_sleep_jitter=0.1,

216

reconnect_sleep_max=60.0,

217

reconnect_attempts_max=3,

218

timeout=None,

219

heartbeats=(0, 0),

220

keepalive=None,

221

vhost=None,

222

auto_decode=True,

223

encoding="utf-8",

224

auto_content_length=True,

225

heart_beat_receive_scale=1.5,

226

bind_host_port=None):

227

"""

228

Create STOMP 1.2 connection.

229

230

Parameters: Same as Connection (STOMP 1.1) plus STOMP 1.2 enhancements

231

"""

232

233

def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):

234

"""Connect to STOMP 1.2 broker with enhanced error handling."""

235

236

def disconnect(self, receipt=None, headers=None, **keyword_headers):

237

"""Disconnect from STOMP 1.2 broker."""

238

239

def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

240

"""STOMP 1.2 negative acknowledge with enhanced header escaping."""

241

242

@staticmethod

243

def is_eol(line):

244

"""

245

Check if line is end-of-line marker.

246

247

Parameters:

248

- line: bytes, line to check

249

250

Returns:

251

bool: True if end-of-line

252

"""

253

```

254

255

### Connection Management

256

257

Base connection functionality shared across all protocol versions.

258

259

```python { .api }

260

def set_listener(self, name, listener):

261

"""

262

Set named connection listener.

263

264

Parameters:

265

- name: str, listener name

266

- listener: ConnectionListener, listener instance

267

"""

268

269

def remove_listener(self, name):

270

"""

271

Remove named listener.

272

273

Parameters:

274

- name: str, listener name to remove

275

"""

276

277

def get_listener(self, name):

278

"""

279

Get named listener.

280

281

Parameters:

282

- name: str, listener name

283

284

Returns:

285

ConnectionListener: listener instance or None

286

"""

287

288

def set_ssl(self, for_hosts=(), key_file=None, cert_file=None, ca_certs=None,

289

cert_validator=None, ssl_version=None, password=None, **kwargs):

290

"""

291

Configure SSL/TLS connection.

292

293

Parameters:

294

- for_hosts: tuple, hosts requiring SSL

295

- key_file: str, private key file path

296

- cert_file: str, certificate file path

297

- ca_certs: str, CA certificates file path

298

- cert_validator: callable, certificate validation function

299

- ssl_version: int, SSL protocol version

300

- password: str, private key password

301

- **kwargs: additional SSL parameters

302

"""

303

304

def get_ssl(self, host_and_port=None):

305

"""

306

Get SSL configuration for host.

307

308

Parameters:

309

- host_and_port: tuple, host and port

310

311

Returns:

312

dict: SSL configuration

313

"""

314

```

315

316

### Transaction Support

317

318

Transaction management for atomic message operations.

319

320

```python { .api }

321

def begin(self, transaction=None, headers=None, **keyword_headers):

322

"""

323

Begin transaction.

324

325

Parameters:

326

- transaction: str, transaction ID

327

- headers: dict, begin headers

328

- **keyword_headers: additional headers as keyword arguments

329

"""

330

331

def commit(self, transaction=None, headers=None, **keyword_headers):

332

"""

333

Commit transaction.

334

335

Parameters:

336

- transaction: str, transaction ID to commit

337

- headers: dict, commit headers

338

- **keyword_headers: additional headers as keyword arguments

339

"""

340

341

def abort(self, transaction=None, headers=None, **keyword_headers):

342

"""

343

Abort transaction.

344

345

Parameters:

346

- transaction: str, transaction ID to abort

347

- headers: dict, abort headers

348

- **keyword_headers: additional headers as keyword arguments

349

"""

350

```

351

352

## Usage Examples

353

354

### Basic Connection

355

356

```python

357

import stomp

358

359

# Create connection with reconnection settings

360

conn = stomp.Connection(

361

[('localhost', 61613)],

362

reconnect_sleep_initial=1.0,

363

reconnect_sleep_max=30.0,

364

reconnect_attempts_max=10

365

)

366

367

# Connect with authentication

368

conn.connect('username', 'password', wait=True)

369

370

# Use connection

371

conn.send(body='Hello World', destination='/queue/test')

372

373

# Disconnect

374

conn.disconnect()

375

```

376

377

### SSL Connection

378

379

```python

380

import stomp

381

382

conn = stomp.Connection([('broker.example.com', 61614)])

383

384

# Configure SSL

385

conn.set_ssl(

386

for_hosts=[('broker.example.com', 61614)],

387

key_file='/path/to/client.key',

388

cert_file='/path/to/client.crt',

389

ca_certs='/path/to/ca.crt'

390

)

391

392

conn.connect('username', 'password', wait=True)

393

```

394

395

### Transaction Example

396

397

```python

398

import stomp

399

400

conn = stomp.Connection([('localhost', 61613)])

401

conn.connect('user', 'pass', wait=True)

402

403

# Begin transaction

404

conn.begin('tx-001')

405

406

# Send messages in transaction

407

conn.send(body='Message 1', destination='/queue/test', transaction='tx-001')

408

conn.send(body='Message 2', destination='/queue/test', transaction='tx-001')

409

410

# Commit transaction

411

conn.commit('tx-001')

412

413

conn.disconnect()

414

```