or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdcli.mdconnections.mdindex.mdlisteners.mdprotocol.mdtypes.mdutilities.mdwebsocket.md

utilities.mddocs/

0

# Utilities and Helpers

1

2

Core utility functions, constants, logging configuration, and helper methods that support stomp.py's internal operations and provide convenient functionality for advanced use cases.

3

4

## Capabilities

5

6

### Frame Processing Utilities

7

8

Low-level frame manipulation and conversion functions for custom protocol handling.

9

10

```python { .api }

11

def convert_frame(frame):

12

"""

13

Convert Frame object to transmission format.

14

15

Parameters:

16

- frame: Frame, STOMP frame object to convert

17

18

Returns:

19

bytes: encoded frame ready for network transmission

20

21

Handles:

22

- Header encoding and escaping

23

- Body encoding with proper content-length

24

- Protocol version specific formatting

25

"""

26

27

def parse_headers(lines, offset=0):

28

"""

29

Parse STOMP headers from frame lines.

30

31

Parameters:

32

- lines: list, frame lines to parse

33

- offset: int, starting line offset for headers

34

35

Returns:

36

dict: parsed headers as key-value pairs

37

38

Handles:

39

- Header unescaping per protocol version

40

- Duplicate header handling

41

- Invalid header format detection

42

"""

43

44

def pack(pieces):

45

"""

46

Join byte sequences efficiently.

47

48

Parameters:

49

- pieces: iterable, sequence of bytes objects

50

51

Returns:

52

bytes: concatenated byte sequence

53

54

Optimized for frame assembly and network I/O.

55

"""

56

57

def join(chars):

58

"""

59

Join character sequences efficiently.

60

61

Parameters:

62

- chars: iterable, sequence of character strings

63

64

Returns:

65

str: concatenated string

66

67

Optimized for header and command processing.

68

"""

69

```

70

71

### Threading and Execution Utilities

72

73

Thread management and callback execution helpers for customizing stomp.py's concurrency model.

74

75

```python { .api }

76

def default_create_thread(callback):

77

"""

78

Default thread creation function for receiver loops.

79

80

Parameters:

81

- callback: callable, function to execute in new thread

82

83

Returns:

84

Thread: started daemon thread

85

86

Creates daemon threads for background operations:

87

- Message receiver loops

88

- Heartbeat timers

89

- Connection monitoring

90

"""

91

92

def is_eol_default(line):

93

"""

94

Default end-of-line detection for frame parsing.

95

96

Parameters:

97

- line: bytes, line to check for end-of-line marker

98

99

Returns:

100

bool: True if line represents end-of-line

101

102

Protocol-agnostic EOL detection for frame boundaries.

103

"""

104

105

def calculate_heartbeats(send_heartbeat, client_heartbeat):

106

"""

107

Calculate negotiated heartbeat intervals between client and server.

108

109

Parameters:

110

- send_heartbeat: int, client's desired send interval (ms)

111

- client_heartbeat: int, client's desired receive interval (ms)

112

113

Returns:

114

tuple: (negotiated_send_ms, negotiated_receive_ms)

115

116

Implements STOMP heartbeat negotiation algorithm:

117

- 0 means no heartbeat desired

118

- Non-zero values are negotiated to maximum of client/server desires

119

"""

120

121

def get_errno(exception):

122

"""

123

Extract errno from socket or OS exception.

124

125

Parameters:

126

- exception: Exception, OS or socket exception

127

128

Returns:

129

int: errno value or None if not available

130

131

Cross-platform errno extraction for error handling.

132

"""

133

```

134

135

### Protocol Constants

136

137

STOMP protocol constants for commands, headers, and response codes.

138

139

```python { .api }

140

# STOMP Commands

141

CONNECT = "CONNECT"

142

STOMP = "STOMP" # STOMP 1.2 connection command

143

CONNECTED = "CONNECTED"

144

DISCONNECT = "DISCONNECT"

145

SEND = "SEND"

146

SUBSCRIBE = "SUBSCRIBE"

147

UNSUBSCRIBE = "UNSUBSCRIBE"

148

ACK = "ACK"

149

NACK = "NACK" # STOMP 1.1+

150

BEGIN = "BEGIN"

151

COMMIT = "COMMIT"

152

ABORT = "ABORT"

153

MESSAGE = "MESSAGE"

154

RECEIPT = "RECEIPT"

155

ERROR = "ERROR"

156

157

# Standard Headers

158

CONTENT_LENGTH = "content-length"

159

CONTENT_TYPE = "content-type"

160

DESTINATION = "destination"

161

MESSAGE_ID = "message-id"

162

SUBSCRIPTION = "subscription"

163

TRANSACTION = "transaction"

164

RECEIPT_ID = "receipt-id"

165

ACK_MODE = "ack"

166

HOST = "host"

167

VERSION = "version"

168

HEARTBEAT = "heart-beat"

169

SESSION = "session"

170

SERVER = "server"

171

172

# Acknowledgment Modes

173

ACK_AUTO = "auto"

174

ACK_CLIENT = "client"

175

ACK_CLIENT_INDIVIDUAL = "client-individual"

176

177

# Protocol Versions

178

PROTOCOL_10 = "1.0"

179

PROTOCOL_11 = "1.1"

180

PROTOCOL_12 = "1.2"

181

```

182

183

### Logging System

184

185

Configurable logging setup for debugging and monitoring stomp.py operations.

186

187

```python { .api }

188

def log_to_stdout(verbose_logging=True):

189

"""

190

Configure logging output to stdout with optional verbosity.

191

192

Parameters:

193

- verbose_logging: bool, enable detailed debug logging

194

195

When verbose_logging=True:

196

- Shows all frame transmissions

197

- Logs connection state changes

198

- Reports heartbeat activity

199

- Displays protocol negotiation details

200

201

When verbose_logging=False:

202

- Shows only errors and warnings

203

- Logs connection events

204

- Reports critical failures

205

"""

206

207

def log_to_file(filename, verbose_logging=True):

208

"""

209

Configure logging output to file.

210

211

Parameters:

212

- filename: str, path to log file

213

- verbose_logging: bool, enable detailed debug logging

214

215

Creates rotating log file with:

216

- Timestamped entries

217

- Thread identification

218

- Structured frame logging

219

- Error stack traces

220

"""

221

222

def get_logger(name):

223

"""

224

Get named logger for stomp.py components.

225

226

Parameters:

227

- name: str, logger name (e.g., 'stomp.connection', 'stomp.transport')

228

229

Returns:

230

Logger: configured logger instance

231

232

Available loggers:

233

- stomp.connection: connection lifecycle events

234

- stomp.transport: low-level transport operations

235

- stomp.protocol: STOMP protocol processing

236

- stomp.heartbeat: heartbeat management

237

- stomp.listener: listener callback execution

238

"""

239

```

240

241

### Color Constants

242

243

Terminal color constants for CLI output formatting.

244

245

```python { .api }

246

# ANSI Color Codes

247

GREEN = "\33[32m"

248

RED = "\33[31m"

249

YELLOW = "\33[33m"

250

BLUE = "\33[34m"

251

MAGENTA = "\33[35m"

252

CYAN = "\33[36m"

253

WHITE = "\33[37m"

254

255

# Text Formatting

256

BOLD = "\33[1m"

257

UNDERLINE = "\33[4m"

258

ITALIC = "\33[3m"

259

260

# Reset

261

NO_COLOUR = "\33[0m"

262

RESET = NO_COLOUR

263

264

def colorize(text, color):

265

"""

266

Apply color formatting to text.

267

268

Parameters:

269

- text: str, text to colorize

270

- color: str, color constant (GREEN, RED, etc.)

271

272

Returns:

273

str: colorized text with reset code

274

275

Automatically handles color reset to prevent bleeding.

276

"""

277

```

278

279

### Testing Utilities

280

281

Helper functions and classes for testing STOMP applications.

282

283

```python { .api }

284

class TestListener(StatsListener, WaitingListener, PrintingListener):

285

"""

286

Combined listener for testing with message queuing and statistics.

287

288

Inherits from:

289

- StatsListener: connection statistics tracking

290

- WaitingListener: synchronous wait operations

291

- PrintingListener: debug output functionality

292

"""

293

294

def __init__(self, receipt=None, print_to_log=True):

295

"""

296

Initialize test listener with combined functionality.

297

298

Parameters:

299

- receipt: str, receipt ID to wait for (optional)

300

- print_to_log: bool, print events to log instead of stdout

301

"""

302

self.message_list = []

303

self.timestamp = None

304

305

def wait_for_message(self, timeout=10):

306

"""

307

Wait for next message with timeout.

308

309

Parameters:

310

- timeout: float, maximum wait time in seconds

311

312

Returns:

313

Frame: received message frame or None if timeout

314

315

Blocks until message arrives or timeout expires.

316

"""

317

318

def get_latest_message(self):

319

"""

320

Get most recently received message.

321

322

Returns:

323

Frame: latest message frame or None if no messages

324

325

Non-blocking access to latest message for assertions.

326

"""

327

328

def wait_for_heartbeat(self, timeout=10):

329

"""

330

Wait for heartbeat with timeout.

331

332

Parameters:

333

- timeout: float, maximum wait time in seconds

334

335

Returns:

336

bool: True if heartbeat received, False if timeout

337

338

Useful for testing heartbeat negotiation and timing.

339

"""

340

341

def clear_messages(self):

342

"""

343

Clear stored message list.

344

345

Resets message_list to empty for fresh test runs.

346

"""

347

348

def create_test_connection(host='localhost', port=61613, **kwargs):

349

"""

350

Create connection configured for testing.

351

352

Parameters:

353

- host: str, test broker hostname

354

- port: int, test broker port

355

- **kwargs: additional connection parameters

356

357

Returns:

358

Connection: configured test connection

359

360

Pre-configured with:

361

- Short timeouts for fast test execution

362

- Reduced retry attempts

363

- Test-friendly heartbeat settings

364

"""

365

```

366

367

## Usage Examples

368

369

### Custom Frame Processing

370

371

```python

372

import stomp

373

from stomp.utils import convert_frame, parse_headers

374

375

# Custom frame manipulation

376

class CustomProtocolHandler(stomp.ConnectionListener):

377

def on_send(self, frame):

378

# Intercept outgoing frames for custom processing

379

raw_frame = convert_frame(frame)

380

print(f"Sending raw frame: {raw_frame}")

381

382

# Could modify frame here before transmission

383

return frame

384

385

def on_message(self, frame):

386

# Parse custom headers

387

custom_headers = parse_headers([

388

"custom-property:value1",

389

"app-specific:value2"

390

])

391

392

print(f"Custom headers: {custom_headers}")

393

```

394

395

### Advanced Logging Configuration

396

397

```python

398

import stomp

399

import stomp.logging as logging

400

401

# Enable detailed logging

402

logging.log_to_stdout(verbose_logging=True)

403

404

# Create connection with full logging

405

conn = stomp.Connection([('broker.com', 61613)])

406

407

# Get specific logger for custom output

408

transport_logger = logging.get_logger('stomp.transport')

409

transport_logger.info("Custom transport message")

410

411

conn.connect('user', 'pass', wait=True)

412

# Will log all frame exchanges, heartbeats, etc.

413

```

414

415

### Testing Framework Integration

416

417

```python

418

import stomp

419

from stomp.utils import TestListener, create_test_connection

420

import unittest

421

422

class STOMPIntegrationTest(unittest.TestCase):

423

def setUp(self):

424

# Create test connection with fast timeouts

425

self.conn = create_test_connection(

426

timeout=5.0,

427

reconnect_attempts_max=1

428

)

429

430

# Setup test listener

431

self.test_listener = TestListener(print_to_log=True)

432

self.conn.set_listener('test', self.test_listener)

433

434

self.conn.connect('testuser', 'testpass', wait=True)

435

436

def test_message_exchange(self):

437

# Subscribe and wait for subscription

438

self.conn.subscribe('/queue/test', id='test-sub')

439

440

# Send test message

441

self.conn.send(

442

body='test message',

443

destination='/queue/test'

444

)

445

446

# Wait for message with timeout

447

message = self.test_listener.wait_for_message(timeout=5.0)

448

self.assertIsNotNone(message)

449

self.assertEqual(message.body, 'test message')

450

451

# Check statistics

452

stats = str(self.test_listener)

453

self.assertIn('messages: 1', stats.lower())

454

455

def tearDown(self):

456

self.conn.disconnect()

457

```

458

459

### Custom Threading Integration

460

461

```python

462

import stomp

463

from stomp.utils import default_create_thread

464

import threading

465

import queue

466

467

# Custom thread pool for stomp.py operations

468

class ThreadPoolCreator:

469

def __init__(self, pool_size=5):

470

self.pool = []

471

self.task_queue = queue.Queue()

472

473

# Create worker threads

474

for _ in range(pool_size):

475

worker = threading.Thread(target=self._worker, daemon=True)

476

worker.start()

477

self.pool.append(worker)

478

479

def _worker(self):

480

while True:

481

task = self.task_queue.get()

482

if task is None:

483

break

484

try:

485

task()

486

except Exception as e:

487

print(f"Task error: {e}")

488

finally:

489

self.task_queue.task_done()

490

491

def create_thread(self, callback):

492

# Instead of creating new thread, queue the task

493

self.task_queue.put(callback)

494

return threading.current_thread() # Return dummy thread

495

496

# Use custom thread creator

497

thread_creator = ThreadPoolCreator(pool_size=3)

498

conn = stomp.Connection([('broker.com', 61613)])

499

conn.override_threading(thread_creator.create_thread)

500

501

# Now all stomp.py background tasks use the thread pool

502

conn.connect('user', 'pass', wait=True)

503

```

504

505

### Heartbeat Calculation

506

507

```python

508

from stomp.utils import calculate_heartbeats

509

510

# Client wants to send heartbeats every 10 seconds

511

# Client wants to receive heartbeats every 15 seconds

512

client_send = 10000 # ms

513

client_receive = 15000 # ms

514

515

# Server advertises it can send every 5 seconds

516

# Server wants to receive every 8 seconds

517

server_send = 5000 # ms

518

server_receive = 8000 # ms

519

520

# Calculate negotiated heartbeats

521

negotiated = calculate_heartbeats(

522

max(client_send, server_receive), # Send interval

523

max(client_receive, server_send) # Receive interval

524

)

525

526

print(f"Negotiated heartbeats: {negotiated}")

527

# Result: (10000, 15000) - most restrictive wins

528

```

529

530

### Protocol Version Handling

531

532

```python

533

import stomp

534

from stomp.utils import PROTOCOL_10, PROTOCOL_11, PROTOCOL_12

535

536

def create_version_specific_connection(version):

537

"""Create connection for specific STOMP version"""

538

539

if version == PROTOCOL_10:

540

return stomp.Connection10([('broker.com', 61613)])

541

elif version == PROTOCOL_11:

542

return stomp.Connection11([('broker.com', 61613)])

543

elif version == PROTOCOL_12:

544

return stomp.Connection12([('broker.com', 61613)])

545

else:

546

raise ValueError(f"Unsupported version: {version}")

547

548

# Use version-specific features

549

conn_12 = create_version_specific_connection(PROTOCOL_12)

550

conn_12.connect('user', 'pass', wait=True)

551

552

# STOMP 1.2 supports enhanced header escaping

553

conn_12.send(

554

body='Message with\nspecial\rcharacters',

555

destination='/queue/test',

556

headers={'custom-header': 'value\nwith\nlines'}

557

)

558

```