or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdcli.mdconnections.mdindex.mdlisteners.mdprotocol.mdtypes.mdutilities.mdwebsocket.md

protocol.mddocs/

0

# Protocol Operations

1

2

Core STOMP protocol operations including message sending, queue subscription, transaction management, and acknowledgment handling across all supported protocol versions (1.0, 1.1, 1.2).

3

4

## Capabilities

5

6

### Message Operations

7

8

Core messaging functionality for sending and receiving messages through STOMP destinations.

9

10

```python { .api }

11

def send(self, destination, body='', content_type=None, headers=None, **keyword_headers):

12

"""

13

Send message to destination.

14

15

Parameters:

16

- destination: str, destination queue/topic (required)

17

- body: str or bytes, message body content

18

- content_type: str, MIME content type of message

19

- headers: dict, additional message headers

20

- **keyword_headers: additional headers as keyword arguments

21

22

Common headers:

23

- persistent: str, 'true' for persistent messages

24

- priority: str, message priority (0-9)

25

- expires: str, message expiration time

26

- correlation-id: str, correlation identifier

27

- reply-to: str, reply destination

28

- custom headers: any string key-value pairs

29

"""

30

31

def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):

32

"""

33

Subscribe to destination for message delivery.

34

35

Parameters:

36

- destination: str, destination queue/topic to subscribe to

37

- id: str, unique subscription identifier (auto-generated if None)

38

- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')

39

- headers: dict, subscription headers

40

- **keyword_headers: additional headers as keyword arguments

41

42

Acknowledgment modes:

43

- 'auto': automatic acknowledgment (default)

44

- 'client': manual acknowledgment per subscription

45

- 'client-individual': manual acknowledgment per message

46

"""

47

48

def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):

49

"""

50

Unsubscribe from destination.

51

52

Parameters:

53

- destination: str, destination to unsubscribe from (if no id)

54

- id: str, subscription ID to unsubscribe (preferred method)

55

- headers: dict, unsubscribe headers

56

- **keyword_headers: additional headers as keyword arguments

57

58

Note: Either destination or id must be provided

59

"""

60

```

61

62

### Message Acknowledgment

63

64

Manual message acknowledgment for reliable message processing.

65

66

```python { .api }

67

def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

68

"""

69

Acknowledge message processing (STOMP 1.0+).

70

71

Parameters:

72

- id: str, message ID to acknowledge (required)

73

- subscription: str, subscription ID (STOMP 1.1+ only)

74

- transaction: str, transaction ID if within transaction

75

- headers: dict, acknowledgment headers

76

- **keyword_headers: additional headers as keyword arguments

77

78

Used with ack modes 'client' and 'client-individual'

79

"""

80

81

def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

82

"""

83

Negative acknowledge message (STOMP 1.1+ only).

84

85

Parameters:

86

- id: str, message ID to nack (required)

87

- subscription: str, subscription ID (STOMP 1.1+ only)

88

- transaction: str, transaction ID if within transaction

89

- headers: dict, nack headers

90

- **keyword_headers: additional headers as keyword arguments

91

92

Signals message processing failure, may trigger redelivery

93

"""

94

```

95

96

### Transaction Management

97

98

Atomic transaction support for grouping multiple operations.

99

100

```python { .api }

101

def begin(self, transaction=None, headers=None, **keyword_headers):

102

"""

103

Begin transaction.

104

105

Parameters:

106

- transaction: str, transaction identifier (auto-generated if None)

107

- headers: dict, begin headers

108

- **keyword_headers: additional headers as keyword arguments

109

110

Returns transaction ID for use in subsequent operations

111

"""

112

113

def commit(self, transaction=None, headers=None, **keyword_headers):

114

"""

115

Commit transaction.

116

117

Parameters:

118

- transaction: str, transaction ID to commit (required)

119

- headers: dict, commit headers

120

- **keyword_headers: additional headers as keyword arguments

121

122

All operations within transaction are atomically applied

123

"""

124

125

def abort(self, transaction=None, headers=None, **keyword_headers):

126

"""

127

Abort transaction.

128

129

Parameters:

130

- transaction: str, transaction ID to abort (required)

131

- headers: dict, abort headers

132

- **keyword_headers: additional headers as keyword arguments

133

134

All operations within transaction are rolled back

135

"""

136

```

137

138

### Receipt Management

139

140

Receipt confirmations for reliable operation acknowledgment.

141

142

```python { .api }

143

def set_receipt(self, receipt_id, value):

144

"""

145

Set receipt handler for confirmation tracking.

146

147

Parameters:

148

- receipt_id: str, receipt identifier

149

- value: any, value associated with receipt

150

"""

151

```

152

153

### Frame Processing

154

155

Low-level STOMP frame operations for advanced usage.

156

157

```python { .api }

158

def send_frame(self, frame):

159

"""

160

Send raw STOMP frame.

161

162

Parameters:

163

- frame: Frame, raw STOMP frame to send

164

"""

165

166

class Frame:

167

"""

168

STOMP frame representation.

169

170

Attributes:

171

- cmd: str, STOMP command (CONNECT, SEND, etc.)

172

- headers: dict, frame headers

173

- body: str or bytes, frame body content

174

"""

175

def __init__(self, cmd=None, headers=None, body=None):

176

self.cmd = cmd

177

self.headers = headers or {}

178

self.body = body

179

```

180

181

## Usage Examples

182

183

### Basic Messaging

184

185

```python

186

import stomp

187

188

conn = stomp.Connection([('localhost', 61613)])

189

conn.connect('user', 'password', wait=True)

190

191

# Send simple message

192

conn.send(

193

body='Hello World',

194

destination='/queue/test'

195

)

196

197

# Send message with headers

198

conn.send(

199

body='Priority message',

200

destination='/queue/important',

201

headers={

202

'priority': '9',

203

'persistent': 'true',

204

'correlation-id': 'msg-001'

205

}

206

)

207

208

# Send JSON message

209

import json

210

data = {'user': 'john', 'action': 'login', 'timestamp': 1234567890}

211

conn.send(

212

body=json.dumps(data),

213

destination='/topic/events',

214

content_type='application/json'

215

)

216

217

conn.disconnect()

218

```

219

220

### Subscription Patterns

221

222

```python

223

import stomp

224

import time

225

226

class MessageProcessor(stomp.ConnectionListener):

227

def on_message(self, frame):

228

print(f"Received from {frame.headers.get('destination', 'unknown')}: {frame.body}")

229

230

conn = stomp.Connection([('localhost', 61613)])

231

processor = MessageProcessor()

232

conn.set_listener('processor', processor)

233

conn.connect('user', 'password', wait=True)

234

235

# Subscribe to queue with auto-acknowledgment

236

conn.subscribe('/queue/orders', id='orders-sub', ack='auto')

237

238

# Subscribe to topic with manual acknowledgment

239

conn.subscribe('/topic/notifications', id='notify-sub', ack='client')

240

241

# Subscribe with custom headers

242

conn.subscribe(

243

destination='/queue/priority',

244

id='priority-sub',

245

ack='client-individual',

246

headers={'selector': "priority > 5"}

247

)

248

249

# Keep connection alive

250

time.sleep(30)

251

252

# Unsubscribe

253

conn.unsubscribe(id='orders-sub')

254

conn.unsubscribe(id='notify-sub')

255

conn.unsubscribe(id='priority-sub')

256

257

conn.disconnect()

258

```

259

260

### Manual Acknowledgment

261

262

```python

263

import stomp

264

import time

265

266

class ManualAckProcessor(stomp.ConnectionListener):

267

def __init__(self, connection):

268

self.connection = connection

269

270

def on_message(self, frame):

271

message_id = frame.headers.get('message-id')

272

subscription_id = frame.headers.get('subscription')

273

274

try:

275

# Process message

276

self.process_message(frame.body)

277

278

# Acknowledge successful processing

279

self.connection.ack(message_id, subscription_id)

280

print(f"Acknowledged message {message_id}")

281

282

except Exception as e:

283

print(f"Processing failed: {e}")

284

285

# Negative acknowledge (STOMP 1.1+)

286

if hasattr(self.connection, 'nack'):

287

self.connection.nack(message_id, subscription_id)

288

print(f"Nacked message {message_id}")

289

290

def process_message(self, body):

291

# Simulate message processing

292

if 'error' in body.lower():

293

raise ValueError("Simulated processing error")

294

295

print(f"Successfully processed: {body}")

296

297

conn = stomp.Connection11([('localhost', 61613)]) # STOMP 1.1 for NACK support

298

processor = ManualAckProcessor(conn)

299

conn.set_listener('processor', processor)

300

conn.connect('user', 'password', wait=True)

301

302

# Subscribe with client-individual acknowledgment

303

conn.subscribe('/queue/work', id='work-sub', ack='client-individual')

304

305

time.sleep(60) # Process messages for 1 minute

306

conn.disconnect()

307

```

308

309

### Transaction Example

310

311

```python

312

import stomp

313

import uuid

314

315

conn = stomp.Connection([('localhost', 61613)])

316

conn.connect('user', 'password', wait=True)

317

318

# Begin transaction

319

tx_id = str(uuid.uuid4())

320

conn.begin(tx_id)

321

322

try:

323

# Send multiple messages in transaction

324

conn.send(

325

body='Order created',

326

destination='/queue/orders',

327

transaction=tx_id

328

)

329

330

conn.send(

331

body='Inventory updated',

332

destination='/queue/inventory',

333

transaction=tx_id

334

)

335

336

conn.send(

337

body='Email notification',

338

destination='/queue/notifications',

339

transaction=tx_id

340

)

341

342

# Simulate business logic

343

if all_operations_successful():

344

conn.commit(tx_id)

345

print("Transaction committed successfully")

346

else:

347

conn.abort(tx_id)

348

print("Transaction aborted")

349

350

except Exception as e:

351

# Abort transaction on error

352

conn.abort(tx_id)

353

print(f"Transaction aborted due to error: {e}")

354

355

conn.disconnect()

356

357

def all_operations_successful():

358

# Simulate business validation

359

return True

360

```

361

362

### Receipt Confirmation

363

364

```python

365

import stomp

366

import uuid

367

import time

368

369

class ReceiptHandler(stomp.ConnectionListener):

370

def __init__(self):

371

self.pending_receipts = {}

372

373

def on_receipt(self, frame):

374

receipt_id = frame.headers.get('receipt-id')

375

if receipt_id in self.pending_receipts:

376

print(f"Receipt confirmed: {receipt_id}")

377

self.pending_receipts[receipt_id] = True

378

379

def wait_for_receipt(self, receipt_id, timeout=10):

380

"""Wait for specific receipt confirmation."""

381

end_time = time.time() + timeout

382

383

while time.time() < end_time:

384

if self.pending_receipts.get(receipt_id):

385

return True

386

time.sleep(0.1)

387

388

return False

389

390

conn = stomp.Connection([('localhost', 61613)])

391

receipt_handler = ReceiptHandler()

392

conn.set_listener('receipt_handler', receipt_handler)

393

conn.connect('user', 'password', wait=True)

394

395

# Send message with receipt

396

receipt_id = str(uuid.uuid4())

397

receipt_handler.pending_receipts[receipt_id] = False

398

399

conn.send(

400

body='Important message',

401

destination='/queue/critical',

402

receipt=receipt_id

403

)

404

405

# Wait for delivery confirmation

406

if receipt_handler.wait_for_receipt(receipt_id, timeout=30):

407

print("Message delivery confirmed")

408

else:

409

print("Message delivery confirmation timeout")

410

411

conn.disconnect()

412

```

413

414

### Protocol Version Specific Features

415

416

```python

417

import stomp

418

419

# STOMP 1.0 - Basic functionality

420

conn10 = stomp.Connection10([('localhost', 61613)])

421

conn10.connect('user', 'password', wait=True)

422

conn10.send(body='STOMP 1.0 message', destination='/queue/test')

423

# No NACK support in 1.0

424

conn10.disconnect()

425

426

# STOMP 1.1 - Heartbeats and NACK

427

conn11 = stomp.Connection11(

428

[('localhost', 61613)],

429

heartbeats=(10000, 10000) # 10 second heartbeats

430

)

431

conn11.connect('user', 'password', wait=True)

432

433

# NACK support in 1.1+

434

class NackCapableProcessor(stomp.ConnectionListener):

435

def __init__(self, connection):

436

self.connection = connection

437

438

def on_message(self, frame):

439

message_id = frame.headers.get('message-id')

440

subscription_id = frame.headers.get('subscription')

441

442

try:

443

# Process message

444

pass

445

except Exception:

446

# NACK available in STOMP 1.1+

447

self.connection.nack(message_id, subscription_id)

448

449

processor = NackCapableProcessor(conn11)

450

conn11.set_listener('processor', processor)

451

conn11.subscribe('/queue/test', id='test-sub', ack='client-individual')

452

453

time.sleep(10)

454

conn11.disconnect()

455

456

# STOMP 1.2 - Enhanced header escaping

457

conn12 = stomp.Connection12([('localhost', 61613)])

458

conn12.connect('user', 'password', wait=True)

459

460

# STOMP 1.2 handles special characters in headers properly

461

conn12.send(

462

body='Message with special header',

463

destination='/queue/test',

464

headers={

465

'custom-header': 'value with\nspecial\tcharacters', # Automatically escaped

466

'correlation-id': 'msg:with:colons'

467

}

468

)

469

470

conn12.disconnect()

471

```