or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdcli.mdconnections.mdindex.mdlisteners.mdprotocol.mdtypes.mdutilities.mdwebsocket.md

websocket.mddocs/

0

# WebSocket Support

1

2

WebSocket transport adapter enabling STOMP messaging over WebSocket connections for browser-based applications, web-friendly messaging patterns, and environments where traditional TCP sockets are not available.

3

4

## Capabilities

5

6

### WebSocket Connection

7

8

STOMP over WebSocket connection supporting all STOMP 1.2 features with WebSocket-specific transport handling.

9

10

```python { .api }

11

class WSConnection:

12

def __init__(self,

13

host_and_ports=None,

14

prefer_localhost=True,

15

try_loopback_connect=True,

16

reconnect_sleep_initial=0.1,

17

reconnect_sleep_increase=0.5,

18

reconnect_sleep_jitter=0.1,

19

reconnect_sleep_max=60.0,

20

reconnect_attempts_max=3,

21

timeout=None,

22

heartbeats=(0, 0),

23

keepalive=None,

24

vhost=None,

25

auto_decode=True,

26

encoding="utf-8",

27

auto_content_length=True,

28

heart_beat_receive_scale=1.5,

29

bind_host_port=None,

30

ws=None,

31

ws_path=None,

32

header=None,

33

binary_mode=False):

34

"""

35

Create WebSocket STOMP connection.

36

37

Parameters:

38

- host_and_ports: list of tuples, WebSocket host/port pairs [('localhost', 8080)]

39

- prefer_localhost: bool, prefer localhost connections

40

- try_loopback_connect: bool, try loopback if localhost fails

41

- reconnect_sleep_initial: float, initial reconnect delay

42

- reconnect_sleep_increase: float, delay increase factor

43

- reconnect_sleep_jitter: float, random delay variation

44

- reconnect_sleep_max: float, maximum reconnect delay

45

- reconnect_attempts_max: int, maximum reconnect attempts

46

- timeout: float, socket timeout in seconds

47

- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)

48

- keepalive: bool, enable keepalive

49

- vhost: str, virtual host name

50

- auto_decode: bool, automatically decode message bodies

51

- encoding: str, text encoding for messages

52

- auto_content_length: bool, automatically set content-length header

53

- heart_beat_receive_scale: float, heartbeat timeout scale factor

54

- bind_host_port: tuple, local bind address

55

- ws: WebSocket connection object

56

- ws_path: str, WebSocket path

57

- header: dict, WebSocket headers

58

- binary_mode: bool, use binary WebSocket frames

59

"""

60

61

def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):

62

"""

63

Connect to STOMP broker via WebSocket.

64

65

Parameters:

66

- username: str, authentication username

67

- passcode: str, authentication password

68

- wait: bool, wait for connection confirmation

69

- headers: dict, additional connection headers

70

- **keyword_headers: additional headers as keyword arguments

71

"""

72

73

def disconnect(self, receipt=None, headers=None, **keyword_headers):

74

"""

75

Disconnect from STOMP broker.

76

77

Parameters:

78

- receipt: str, receipt ID for disconnect confirmation

79

- headers: dict, additional disconnect headers

80

- **keyword_headers: additional headers as keyword arguments

81

"""

82

83

def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):

84

"""

85

Send message via WebSocket transport.

86

87

Parameters:

88

- body: str, message body

89

- destination: str, destination queue/topic

90

- content_type: str, message content type

91

- headers: dict, message headers

92

- **keyword_headers: additional headers as keyword arguments

93

"""

94

95

def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):

96

"""

97

Subscribe to destination via WebSocket.

98

99

Parameters:

100

- destination: str, destination queue/topic

101

- id: str, subscription ID

102

- ack: str, acknowledgment mode ('auto', 'client', 'client-individual')

103

- headers: dict, subscription headers

104

- **keyword_headers: additional headers as keyword arguments

105

"""

106

107

def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):

108

"""

109

Unsubscribe from destination.

110

111

Parameters:

112

- destination: str, destination to unsubscribe from

113

- id: str, subscription ID to unsubscribe

114

- headers: dict, unsubscribe headers

115

- **keyword_headers: additional headers as keyword arguments

116

"""

117

118

def ack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

119

"""

120

Acknowledge message.

121

122

Parameters:

123

- id: str, message ID to acknowledge

124

- subscription: str, subscription ID

125

- transaction: str, transaction ID

126

- headers: dict, ack headers

127

- **keyword_headers: additional headers as keyword arguments

128

"""

129

130

def nack(self, id, subscription=None, transaction=None, headers=None, **keyword_headers):

131

"""

132

Negative acknowledge message.

133

134

Parameters:

135

- id: str, message ID to nack

136

- subscription: str, subscription ID

137

- transaction: str, transaction ID

138

- headers: dict, nack headers

139

- **keyword_headers: additional headers as keyword arguments

140

"""

141

```

142

143

### WebSocket Transport

144

145

Low-level WebSocket transport implementation for STOMP protocol.

146

147

```python { .api }

148

class WSTransport:

149

def __init__(self,

150

url,

151

auto_decode=True,

152

encoding="utf-8",

153

is_eol_fc=None,

154

**kwargs):

155

"""

156

Initialize WebSocket transport.

157

158

Parameters:

159

- url: str, WebSocket URL

160

- auto_decode: bool, automatically decode message bodies

161

- encoding: str, text encoding for messages

162

- is_eol_fc: callable, end-of-line detection function

163

- **kwargs: additional WebSocket parameters

164

"""

165

166

def start(self):

167

"""Start WebSocket connection."""

168

169

def stop(self, timeout=None):

170

"""

171

Stop WebSocket connection.

172

173

Parameters:

174

- timeout: float, stop timeout in seconds

175

"""

176

177

def send(self, frame):

178

"""

179

Send STOMP frame via WebSocket.

180

181

Parameters:

182

- frame: Frame, STOMP frame to send

183

"""

184

185

def receive(self):

186

"""

187

Receive data from WebSocket.

188

189

Returns:

190

bytes: received data

191

"""

192

193

def is_connected(self) -> bool:

194

"""

195

Check if WebSocket is connected.

196

197

Returns:

198

bool: True if connected, False otherwise

199

"""

200

```

201

202

## Usage Examples

203

204

### Basic WebSocket Connection

205

206

```python

207

import stomp

208

209

# Create WebSocket connection

210

ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')

211

212

# Set up message handler

213

class WSMessageHandler(stomp.ConnectionListener):

214

def on_message(self, frame):

215

print(f"WebSocket message: {frame.body}")

216

217

def on_error(self, frame):

218

print(f"WebSocket error: {frame.body}")

219

220

handler = WSMessageHandler()

221

ws_conn.set_listener('handler', handler)

222

223

# Connect and use

224

ws_conn.connect('user', 'password', wait=True)

225

ws_conn.subscribe('/topic/updates', id=1)

226

ws_conn.send(body='Hello via WebSocket', destination='/topic/chat')

227

228

# Keep connection alive

229

import time

230

time.sleep(10)

231

232

ws_conn.disconnect()

233

```

234

235

### Secure WebSocket (WSS)

236

237

```python

238

import stomp

239

240

# Create secure WebSocket connection

241

ws_conn = stomp.WSConnection('wss://secure-broker.example.com/stomp')

242

243

# WebSocket-specific options

244

ws_conn = stomp.WSConnection(

245

'wss://broker.example.com/stomp',

246

heartbeats=(10000, 10000), # 10 second heartbeats

247

timeout=30, # 30 second timeout

248

reconnect_attempts_max=5 # Max 5 reconnection attempts

249

)

250

251

ws_conn.connect('username', 'password', wait=True)

252

```

253

254

### Browser-Compatible Usage Pattern

255

256

```python

257

import stomp

258

import json

259

260

class BrowserCompatibleHandler(stomp.ConnectionListener):

261

def __init__(self):

262

self.message_queue = []

263

264

def on_message(self, frame):

265

# Handle JSON messages typical in web applications

266

try:

267

data = json.loads(frame.body)

268

self.process_web_message(data, frame.headers)

269

except json.JSONDecodeError:

270

# Handle plain text messages

271

self.process_text_message(frame.body, frame.headers)

272

273

def process_web_message(self, data, headers):

274

# Process structured web messages

275

message_type = data.get('type', 'unknown')

276

277

if message_type == 'notification':

278

self.handle_notification(data)

279

elif message_type == 'update':

280

self.handle_update(data)

281

282

# Store for polling-based retrieval

283

self.message_queue.append({

284

'data': data,

285

'headers': headers,

286

'timestamp': time.time()

287

})

288

289

def process_text_message(self, body, headers):

290

# Handle plain text messages

291

self.message_queue.append({

292

'body': body,

293

'headers': headers,

294

'timestamp': time.time()

295

})

296

297

def get_messages(self):

298

"""Get queued messages for web application."""

299

messages = self.message_queue[:]

300

self.message_queue.clear()

301

return messages

302

303

def handle_notification(self, data):

304

# Handle notification-type messages

305

pass

306

307

def handle_update(self, data):

308

# Handle update-type messages

309

pass

310

311

# Setup WebSocket connection for web app

312

ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')

313

handler = BrowserCompatibleHandler()

314

ws_conn.set_listener('web_handler', handler)

315

316

# Connect and subscribe to web-friendly topics

317

ws_conn.connect('webapp_user', 'webapp_pass', wait=True)

318

ws_conn.subscribe('/topic/notifications', id=1)

319

ws_conn.subscribe('/topic/updates', id=2)

320

321

# Send JSON message

322

message_data = {

323

'type': 'user_action',

324

'action': 'login',

325

'user_id': '12345',

326

'timestamp': time.time()

327

}

328

329

ws_conn.send(

330

body=json.dumps(message_data),

331

destination='/topic/user_actions',

332

content_type='application/json'

333

)

334

335

# Periodic message retrieval pattern

336

def get_new_messages():

337

return handler.get_messages()

338

```

339

340

### WebSocket with Heartbeats

341

342

```python

343

import stomp

344

import threading

345

import time

346

347

class HeartbeatMonitor(stomp.ConnectionListener):

348

def __init__(self):

349

self.last_heartbeat = time.time()

350

self.connection_healthy = True

351

352

def on_heartbeat(self):

353

self.last_heartbeat = time.time()

354

self.connection_healthy = True

355

print("WebSocket heartbeat received")

356

357

def on_heartbeat_timeout(self):

358

self.connection_healthy = False

359

print("WebSocket heartbeat timeout - connection may be unhealthy")

360

361

def monitor_connection(self):

362

"""Monitor connection health based on heartbeats."""

363

while True:

364

time.sleep(5) # Check every 5 seconds

365

366

if time.time() - self.last_heartbeat > 30: # 30 second threshold

367

print("Connection appears stale")

368

self.connection_healthy = False

369

370

if not self.connection_healthy:

371

print("Connection health check failed")

372

# Trigger reconnection logic

373

break

374

375

# Create WebSocket connection with heartbeats

376

ws_conn = stomp.WSConnection(

377

'ws://localhost:61614/stomp',

378

heartbeats=(10000, 10000) # 10 second heartbeats

379

)

380

381

monitor = HeartbeatMonitor()

382

ws_conn.set_listener('monitor', monitor)

383

384

# Start monitoring in background

385

monitor_thread = threading.Thread(target=monitor.monitor_connection)

386

monitor_thread.daemon = True

387

monitor_thread.start()

388

389

ws_conn.connect('user', 'password', wait=True)

390

```

391

392

### WebSocket Error Handling

393

394

```python

395

import stomp

396

import time

397

398

class WebSocketErrorHandler(stomp.ConnectionListener):

399

def __init__(self, connection):

400

self.connection = connection

401

self.reconnect_attempts = 0

402

self.max_reconnect_attempts = 5

403

404

def on_error(self, frame):

405

error_msg = frame.body

406

print(f"WebSocket STOMP error: {error_msg}")

407

408

# Handle specific WebSocket errors

409

if 'connection refused' in error_msg.lower():

410

self.handle_connection_refused()

411

elif 'unauthorized' in error_msg.lower():

412

self.handle_unauthorized()

413

else:

414

self.handle_generic_error(error_msg)

415

416

def on_disconnected(self):

417

print("WebSocket disconnected")

418

419

if self.reconnect_attempts < self.max_reconnect_attempts:

420

self.attempt_reconnect()

421

else:

422

print("Max reconnection attempts reached")

423

424

def handle_connection_refused(self):

425

print("WebSocket connection refused - broker may be down")

426

time.sleep(5) # Wait before retry

427

428

def handle_unauthorized(self):

429

print("WebSocket authentication failed")

430

# Don't auto-reconnect on auth failures

431

self.max_reconnect_attempts = 0

432

433

def handle_generic_error(self, error_msg):

434

print(f"Generic WebSocket error: {error_msg}")

435

436

def attempt_reconnect(self):

437

self.reconnect_attempts += 1

438

delay = min(self.reconnect_attempts * 2, 30) # Exponential backoff, max 30s

439

440

print(f"Attempting reconnection {self.reconnect_attempts}/{self.max_reconnect_attempts} in {delay}s")

441

time.sleep(delay)

442

443

try:

444

self.connection.connect('user', 'password', wait=True)

445

self.reconnect_attempts = 0 # Reset on successful connection

446

print("WebSocket reconnection successful")

447

except Exception as e:

448

print(f"Reconnection failed: {e}")

449

450

# Setup WebSocket with error handling

451

ws_conn = stomp.WSConnection('ws://localhost:61614/stomp')

452

error_handler = WebSocketErrorHandler(ws_conn)

453

ws_conn.set_listener('error_handler', error_handler)

454

455

ws_conn.connect('user', 'password', wait=True)

456

```