or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdcli.mdconnections.mdindex.mdlisteners.mdprotocol.mdtypes.mdutilities.mdwebsocket.md

listeners.mddocs/

0

# Event Handling

1

2

Listener-based event system for handling connection events, message delivery, error conditions, and protocol-specific events with built-in listeners for common use cases like statistics tracking, debugging, and synchronous operations.

3

4

## Capabilities

5

6

### Base Connection Listener

7

8

Abstract base class defining the event handling interface for all connection events.

9

10

```python { .api }

11

class ConnectionListener:

12

def on_connecting(self, host_and_port):

13

"""

14

Called when TCP connection is established.

15

16

Parameters:

17

- host_and_port: tuple, (hostname, port) of connected broker

18

"""

19

20

def on_connected(self, frame):

21

"""

22

Called when STOMP connection is established.

23

24

Parameters:

25

- frame: Frame, CONNECTED frame from broker

26

"""

27

28

def on_disconnecting(self):

29

"""

30

Called before DISCONNECT frame is sent.

31

"""

32

33

def on_disconnected(self):

34

"""

35

Called when connection is lost or closed.

36

"""

37

38

def on_heartbeat_timeout(self):

39

"""

40

Called when heartbeat timeout occurs.

41

"""

42

43

def on_before_message(self, frame):

44

"""

45

Called before message processing.

46

47

Parameters:

48

- frame: Frame, message frame

49

50

Returns:

51

tuple: (headers, body) for processing, or None to skip

52

"""

53

54

def on_message(self, frame):

55

"""

56

Called when message is received.

57

58

Parameters:

59

- frame: Frame, message frame with headers and body

60

"""

61

62

def on_receipt(self, frame):

63

"""

64

Called when receipt confirmation is received.

65

66

Parameters:

67

- frame: Frame, receipt frame

68

"""

69

70

def on_error(self, frame):

71

"""

72

Called when error frame is received.

73

74

Parameters:

75

- frame: Frame, error frame with error details

76

"""

77

78

def on_send(self, frame):

79

"""

80

Called when frame is sent to broker.

81

82

Parameters:

83

- frame: Frame, outgoing frame

84

"""

85

86

def on_heartbeat(self):

87

"""

88

Called when heartbeat is received.

89

"""

90

91

def on_receiver_loop_completed(self, frame):

92

"""

93

Called when receiver loop completes.

94

95

Parameters:

96

- frame: Frame, final frame when receiver loop completes

97

"""

98

```

99

100

### Statistics Listener

101

102

Tracks connection statistics and metrics for monitoring and debugging.

103

104

```python { .api }

105

class StatsListener(ConnectionListener):

106

def __init__(self):

107

"""Initialize statistics tracking listener."""

108

self.errors = 0

109

self.connections = 0

110

self.disconnects = 0

111

self.messages = 0

112

self.messages_sent = 0

113

self.heartbeat_timeouts = 0

114

self.heartbeat_count = 0

115

116

def __str__(self) -> str:

117

"""

118

Get formatted statistics summary.

119

120

Returns:

121

str: formatted statistics

122

"""

123

124

def on_connecting(self, host_and_port):

125

"""Increment connection counter."""

126

127

def on_disconnected(self):

128

"""Increment disconnect counter."""

129

130

def on_message(self, frame):

131

"""Increment message received counter."""

132

133

def on_send(self, frame):

134

"""Increment message sent counter."""

135

136

def on_error(self, frame):

137

"""Increment error counter."""

138

139

def on_heartbeat_timeout(self):

140

"""Increment heartbeat timeout counter."""

141

142

def on_heartbeat(self):

143

"""Increment heartbeat received counter."""

144

```

145

146

### Waiting Listener

147

148

Synchronous listener that waits for specific events like receipts or disconnection.

149

150

```python { .api }

151

class WaitingListener(ConnectionListener):

152

def __init__(self, receipt):

153

"""

154

Initialize waiting listener for specific receipt.

155

156

Parameters:

157

- receipt: str, receipt ID to wait for

158

"""

159

160

def wait_on_receipt(self, timeout=10):

161

"""

162

Wait for receipt confirmation.

163

164

Parameters:

165

- timeout: float, timeout in seconds

166

167

Returns:

168

bool: True if receipt received, False if timeout

169

"""

170

171

def wait_on_disconnected(self, timeout=10):

172

"""

173

Wait for disconnection event.

174

175

Parameters:

176

- timeout: float, timeout in seconds

177

178

Returns:

179

bool: True if disconnected, False if timeout

180

"""

181

182

def on_receipt(self, frame):

183

"""Handle receipt and signal waiting threads."""

184

185

def on_disconnected(self):

186

"""Handle disconnection and signal waiting threads."""

187

```

188

189

### Printing Listener

190

191

Debug listener that prints all connection events to console or log.

192

193

```python { .api }

194

class PrintingListener(ConnectionListener):

195

def __init__(self, print_to_log=False):

196

"""

197

Initialize printing listener.

198

199

Parameters:

200

- print_to_log: bool, print to log instead of stdout

201

"""

202

203

def on_connecting(self, host_and_port):

204

"""Print connecting event."""

205

206

def on_connected(self, frame):

207

"""Print connected event with frame details."""

208

209

def on_disconnecting(self):

210

"""Print disconnecting event."""

211

212

def on_disconnected(self):

213

"""Print disconnected event."""

214

215

def on_message(self, frame):

216

"""Print received message details."""

217

218

def on_error(self, frame):

219

"""Print error details."""

220

221

def on_send(self, frame):

222

"""Print sent frame details."""

223

224

def on_receipt(self, frame):

225

"""Print receipt confirmation."""

226

227

def on_heartbeat(self):

228

"""Print heartbeat event."""

229

230

def on_heartbeat_timeout(self):

231

"""Print heartbeat timeout."""

232

```

233

234

### Test Listener

235

236

Combined listener for testing that includes statistics, waiting, and printing functionality.

237

238

```python { .api }

239

class TestListener(StatsListener, WaitingListener, PrintingListener):

240

def __init__(self, receipt=None, print_to_log=True):

241

"""

242

Initialize test listener with combined functionality.

243

244

Parameters:

245

- receipt: str, receipt ID to wait for

246

- print_to_log: bool, print events to log

247

"""

248

self.message_list = []

249

self.timestamp = None

250

251

def wait_for_message(self, timeout=10):

252

"""

253

Wait for next message.

254

255

Parameters:

256

- timeout: float, timeout in seconds

257

258

Returns:

259

Frame: received message frame or None if timeout

260

"""

261

262

def get_latest_message(self):

263

"""

264

Get most recently received message.

265

266

Returns:

267

Frame: latest message frame or None if no messages

268

"""

269

270

def wait_for_heartbeat(self, timeout=10):

271

"""

272

Wait for heartbeat.

273

274

Parameters:

275

- timeout: float, timeout in seconds

276

277

Returns:

278

bool: True if heartbeat received, False if timeout

279

"""

280

281

def on_message(self, frame):

282

"""Store message in list and update timestamp."""

283

```

284

285

### Heartbeat Listener

286

287

Internal listener for managing STOMP heartbeat functionality.

288

289

```python { .api }

290

class HeartbeatListener(ConnectionListener):

291

def __init__(self, transport, heartbeats, heart_beat_receive_scale=1.5):

292

"""

293

Initialize heartbeat management.

294

295

Parameters:

296

- transport: Transport, connection transport

297

- heartbeats: tuple, (send_ms, receive_ms) heartbeat intervals

298

- heart_beat_receive_scale: float, receive timeout scale factor

299

"""

300

301

def on_connected(self, frame):

302

"""Start heartbeat timers after connection."""

303

304

def on_disconnected(self):

305

"""Stop heartbeat timers on disconnection."""

306

307

def on_heartbeat(self):

308

"""Reset receive heartbeat timer."""

309

310

def on_send(self, frame):

311

"""Update send heartbeat timer."""

312

```

313

314

## Usage Examples

315

316

### Custom Message Handler

317

318

```python

319

import stomp

320

321

class MessageHandler(stomp.ConnectionListener):

322

def __init__(self):

323

self.processed_count = 0

324

325

def on_message(self, frame):

326

print(f"Processing message: {frame.body}")

327

328

# Process message based on headers

329

if frame.headers.get('type') == 'order':

330

self.process_order(frame.body)

331

elif frame.headers.get('type') == 'notification':

332

self.send_notification(frame.body)

333

334

self.processed_count += 1

335

336

def on_error(self, frame):

337

print(f"Error occurred: {frame.body}")

338

# Handle error conditions

339

340

def process_order(self, order_data):

341

# Order processing logic

342

pass

343

344

def send_notification(self, notification_data):

345

# Notification logic

346

pass

347

348

# Use custom handler

349

conn = stomp.Connection([('localhost', 61613)])

350

handler = MessageHandler()

351

conn.set_listener('message_handler', handler)

352

conn.connect('user', 'pass', wait=True)

353

conn.subscribe('/queue/orders', id=1)

354

```

355

356

### Multiple Listeners

357

358

```python

359

import stomp

360

361

conn = stomp.Connection([('localhost', 61613)])

362

363

# Add statistics tracking

364

stats = stomp.StatsListener()

365

conn.set_listener('stats', stats)

366

367

# Add debug printing

368

debug = stomp.PrintingListener(print_to_log=True)

369

conn.set_listener('debug', debug)

370

371

# Add custom business logic

372

class BusinessLogic(stomp.ConnectionListener):

373

def on_message(self, frame):

374

# Business processing

375

pass

376

377

business = BusinessLogic()

378

conn.set_listener('business', business)

379

380

conn.connect('user', 'pass', wait=True)

381

382

# Check statistics

383

print(f"Messages received: {stats.messages}")

384

print(f"Errors: {stats.errors}")

385

```

386

387

### Synchronous Operations

388

389

```python

390

import stomp

391

import uuid

392

393

conn = stomp.Connection([('localhost', 61613)])

394

395

# Create waiting listener for receipt

396

receipt_id = str(uuid.uuid4())

397

waiter = stomp.WaitingListener(receipt_id)

398

conn.set_listener('waiter', waiter)

399

400

conn.connect('user', 'pass', wait=True)

401

402

# Send message with receipt and wait for confirmation

403

conn.send(

404

body='Important message',

405

destination='/queue/critical',

406

receipt=receipt_id

407

)

408

409

# Wait for receipt confirmation

410

if waiter.wait_on_receipt(timeout=30):

411

print("Message confirmed delivered")

412

else:

413

print("Delivery confirmation timeout")

414

415

conn.disconnect()

416

```

417

418

### Error Handling

419

420

```python

421

import stomp

422

423

class ErrorHandler(stomp.ConnectionListener):

424

def on_error(self, frame):

425

error_msg = frame.body

426

error_code = frame.headers.get('code', 'unknown')

427

428

print(f"STOMP Error {error_code}: {error_msg}")

429

430

# Handle specific error types

431

if error_code == 'AUTHORIZATION_FAILED':

432

self.handle_auth_error()

433

elif error_code == 'DESTINATION_NOT_FOUND':

434

self.handle_destination_error()

435

else:

436

self.handle_generic_error(error_code, error_msg)

437

438

def on_heartbeat_timeout(self):

439

print("Heartbeat timeout - connection may be lost")

440

# Trigger reconnection logic

441

442

def handle_auth_error(self):

443

# Authentication error handling

444

pass

445

446

def handle_destination_error(self):

447

# Destination error handling

448

pass

449

450

def handle_generic_error(self, code, message):

451

# Generic error handling

452

pass

453

454

conn = stomp.Connection([('localhost', 61613)])

455

error_handler = ErrorHandler()

456

conn.set_listener('error_handler', error_handler)

457

```