or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdcli.mdconnections.mdindex.mdlisteners.mdprotocol.mdtypes.mdutilities.mdwebsocket.md

adapters.mddocs/

0

# Transport Adapters

1

2

Specialized transport adapters extending stomp.py beyond traditional TCP connections to support multicast, broadcast messaging, and alternative transport mechanisms for specific deployment scenarios.

3

4

## Capabilities

5

6

### Multicast Adapter

7

8

STOMP messaging over UDP multicast transport enabling broker-less messaging patterns and distributed system coordination without central message broker infrastructure.

9

10

```python { .api }

11

class MulticastConnection:

12

def __init__(self,

13

host_and_ports=None,

14

prefer_localhost=True,

15

try_loopback_connect=True,

16

reconnect_sleep_initial=0.1,

17

reconnect_sleep_increase=0.5,

18

reconnect_sleep_jitter=0.1,

19

reconnect_sleep_max=60.0,

20

reconnect_attempts_max=3,

21

timeout=None,

22

heartbeats=(0, 0),

23

keepalive=None,

24

vhost=None,

25

auto_decode=True,

26

encoding="utf-8",

27

auto_content_length=True,

28

heart_beat_receive_scale=1.5,

29

bind_host_port=None,

30

multicast_group="224.1.2.3",

31

multicast_port=61616):

32

"""

33

Create multicast STOMP connection.

34

35

Parameters:

36

- host_and_ports: list, multicast endpoints (optional for multicast)

37

- prefer_localhost: bool, prefer localhost connections

38

- try_loopback_connect: bool, try loopback if localhost fails

39

- reconnect_sleep_initial: float, initial reconnect delay

40

- reconnect_sleep_increase: float, delay increase factor

41

- reconnect_sleep_jitter: float, random delay variation

42

- reconnect_sleep_max: float, maximum reconnect delay

43

- reconnect_attempts_max: int, maximum reconnect attempts

44

- timeout: float, socket timeout in seconds

45

- heartbeats: tuple, (send_heartbeat_ms, receive_heartbeat_ms)

46

- keepalive: bool, enable keepalive (N/A for multicast)

47

- vhost: str, virtual host name

48

- auto_decode: bool, automatically decode message bodies

49

- encoding: str, text encoding for messages

50

- auto_content_length: bool, automatically set content-length header

51

- heart_beat_receive_scale: float, heartbeat timeout scale factor

52

- bind_host_port: tuple, local bind address

53

- multicast_group: str, multicast group IP address

54

- multicast_port: int, multicast port number

55

"""

56

57

def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):

58

"""

59

Join multicast group for STOMP messaging.

60

61

Parameters:

62

- username: str, authentication username (optional for multicast)

63

- passcode: str, authentication password (optional for multicast)

64

- wait: bool, wait for connection confirmation

65

- headers: dict, additional connection headers

66

- **keyword_headers: additional headers as keyword arguments

67

68

Note: Authentication may not apply in multicast scenarios

69

"""

70

71

def disconnect(self, receipt=None, headers=None, **keyword_headers):

72

"""

73

Leave multicast group.

74

75

Parameters:

76

- receipt: str, receipt ID for disconnect confirmation

77

- headers: dict, additional disconnect headers

78

- **keyword_headers: additional headers as keyword arguments

79

"""

80

81

def send(self, body='', destination=None, content_type=None, headers=None, **keyword_headers):

82

"""

83

Send multicast message to all group members.

84

85

Parameters:

86

- body: str, message body

87

- destination: str, logical destination (for routing/filtering)

88

- content_type: str, message content type

89

- headers: dict, message headers

90

- **keyword_headers: additional headers as keyword arguments

91

92

Message is broadcast to all multicast group members.

93

"""

94

95

def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers):

96

"""

97

Subscribe to multicast destination pattern.

98

99

Parameters:

100

- destination: str, destination pattern for filtering

101

- id: str, subscription ID

102

- ack: str, acknowledgment mode (limited in multicast)

103

- headers: dict, subscription headers

104

- **keyword_headers: additional headers as keyword arguments

105

106

Note: Acknowledgments have limited meaning in multicast scenarios

107

"""

108

```

109

110

### Multicast Transport

111

112

Low-level multicast transport implementation handling UDP multicast socket operations.

113

114

```python { .api }

115

class MulticastTransport:

116

def __init__(self,

117

multicast_group="224.1.2.3",

118

multicast_port=61616,

119

timeout=None,

120

bind_host_port=None):

121

"""

122

Initialize multicast transport.

123

124

Parameters:

125

- multicast_group: str, multicast IP address

126

- multicast_port: int, multicast port number

127

- timeout: float, socket timeout in seconds

128

- bind_host_port: tuple, local bind address

129

"""

130

131

def start(self):

132

"""Start multicast transport and join group."""

133

134

def stop(self):

135

"""Stop transport and leave multicast group."""

136

137

def send(self, encoded_frame):

138

"""

139

Send encoded STOMP frame via multicast.

140

141

Parameters:

142

- encoded_frame: bytes, encoded STOMP frame data

143

"""

144

145

def receive(self):

146

"""

147

Receive multicast STOMP frame.

148

149

Returns:

150

bytes: received frame data or None if timeout

151

"""

152

153

def is_connected(self):

154

"""

155

Check if multicast socket is active.

156

157

Returns:

158

bool: True if connected to multicast group

159

"""

160

```

161

162

### Advanced Transport Configuration

163

164

Enhanced transport configuration options for specialized deployment scenarios.

165

166

```python { .api }

167

def override_threading(self, create_thread_fc):

168

"""

169

Override thread creation for custom threading libraries.

170

171

Parameters:

172

- create_thread_fc: callable, custom thread creation function

173

174

Enables integration with:

175

- gevent greenlet threads

176

- asyncio event loops

177

- Custom thread pools

178

- Testing frameworks with thread mocking

179

180

Example:

181

def custom_thread_creator(callback):

182

return gevent.spawn(callback)

183

184

conn.override_threading(custom_thread_creator)

185

"""

186

187

def wait_for_connection(self, timeout=None):

188

"""

189

Wait for connection establishment with timeout.

190

191

Parameters:

192

- timeout: float, maximum wait time in seconds

193

194

Returns:

195

bool: True if connected within timeout, False otherwise

196

197

Useful for synchronous connection patterns and testing.

198

"""

199

200

def set_keepalive_options(self, keepalive_options):

201

"""

202

Configure advanced TCP keepalive parameters.

203

204

Parameters:

205

- keepalive_options: tuple, platform-specific keepalive config

206

207

Linux format: ("linux", idle_sec, interval_sec, probe_count)

208

macOS format: ("mac", interval_sec)

209

Windows format: ("windows", idle_ms, interval_ms)

210

211

Examples:

212

# Linux: 2 hour idle, 75 sec intervals, 9 probes

213

conn.set_keepalive_options(("linux", 7200, 75, 9))

214

215

# macOS: 75 second intervals

216

conn.set_keepalive_options(("mac", 75))

217

"""

218

```

219

220

## Usage Examples

221

222

### Basic Multicast Messaging

223

224

```python

225

import stomp

226

from stomp.adapter.multicast import MulticastConnection

227

228

# Create multicast connection

229

conn = MulticastConnection(

230

multicast_group="224.10.20.30",

231

multicast_port=61620

232

)

233

234

# Set up message handler

235

class MulticastListener(stomp.ConnectionListener):

236

def on_message(self, frame):

237

print(f"Multicast message: {frame.body}")

238

print(f"From destination: {frame.headers.get('destination')}")

239

240

conn.set_listener('multicast', MulticastListener())

241

242

# Join multicast group

243

conn.connect(wait=True)

244

245

# Send message to all group members

246

conn.send(

247

body='{"event": "system_alert", "level": "warning"}',

248

destination='/topic/system-alerts'

249

)

250

251

# Subscribe to specific message types

252

conn.subscribe('/topic/system-alerts', id='alerts')

253

254

# Keep listening for multicast messages

255

import time

256

time.sleep(60)

257

258

conn.disconnect()

259

```

260

261

### Broker-less Service Discovery

262

263

```python

264

import stomp

265

import json

266

import time

267

from stomp.adapter.multicast import MulticastConnection

268

269

class ServiceDiscovery:

270

def __init__(self, service_name, service_port):

271

self.service_name = service_name

272

self.service_port = service_port

273

self.discovered_services = {}

274

275

# Setup multicast for service announcements

276

self.conn = MulticastConnection(

277

multicast_group="224.0.1.100",

278

multicast_port=61700

279

)

280

self.conn.set_listener('discovery', self)

281

self.conn.connect(wait=True)

282

self.conn.subscribe('/topic/service-discovery', id='discovery')

283

284

def announce_service(self):

285

"""Announce this service to the network"""

286

announcement = {

287

"service": self.service_name,

288

"port": self.service_port,

289

"timestamp": time.time(),

290

"type": "announcement"

291

}

292

293

self.conn.send(

294

body=json.dumps(announcement),

295

destination='/topic/service-discovery'

296

)

297

298

def on_message(self, frame):

299

"""Handle service discovery messages"""

300

try:

301

message = json.loads(frame.body)

302

if message.get('type') == 'announcement':

303

service_name = message['service']

304

self.discovered_services[service_name] = {

305

'port': message['port'],

306

'last_seen': message['timestamp']

307

}

308

print(f"Discovered service: {service_name}:{message['port']}")

309

except Exception as e:

310

print(f"Error processing discovery message: {e}")

311

312

def get_service_endpoint(self, service_name):

313

"""Get endpoint for discovered service"""

314

return self.discovered_services.get(service_name)

315

316

# Usage

317

discovery = ServiceDiscovery("user-service", 8080)

318

319

# Announce this service every 30 seconds

320

while True:

321

discovery.announce_service()

322

time.sleep(30)

323

```

324

325

### Custom Threading Integration

326

327

```python

328

import stomp

329

import gevent

330

from gevent import monkey

331

monkey.patch_all()

332

333

# Custom thread creation for gevent

334

def gevent_thread_creator(callback):

335

return gevent.spawn(callback)

336

337

conn = stomp.Connection([('broker.com', 61613)])

338

339

# Use gevent threads instead of standard threads

340

conn.override_threading(gevent_thread_creator)

341

342

# Now all stomp.py background operations use gevent

343

conn.connect('user', 'pass', wait=True)

344

conn.subscribe('/queue/events', id='events')

345

346

# Gevent-compatible message handling

347

class GeventListener(stomp.ConnectionListener):

348

def on_message(self, frame):

349

# This runs in a gevent greenlet

350

gevent.sleep(0) # Yield to other greenlets

351

print(f"Processing: {frame.body}")

352

353

conn.set_listener('main', GeventListener())

354

```

355

356

### Advanced Keepalive Configuration

357

358

```python

359

import stomp

360

361

conn = stomp.Connection([('broker.com', 61613)])

362

363

# Configure aggressive keepalive for unstable networks

364

# Linux: 30 sec idle, 10 sec intervals, 6 probes = 90 sec total

365

conn.set_keepalive_options(("linux", 30, 10, 6))

366

367

# Alternative for macOS

368

# conn.set_keepalive_options(("mac", 30))

369

370

conn.connect('user', 'pass', wait=True)

371

372

# Connection will detect network failures faster

373

conn.send(body='Test message', destination='/queue/test')

374

```

375

376

### Synchronous Connection Patterns

377

378

```python

379

import stomp

380

381

conn = stomp.Connection([('broker1.com', 61613), ('broker2.com', 61613)])

382

383

# Wait for connection with timeout

384

if conn.wait_for_connection(timeout=10.0):

385

print("Connected successfully")

386

387

# Perform operations

388

conn.send(body='Connected!', destination='/queue/status')

389

390

else:

391

print("Connection timeout - trying alternative approach")

392

# Fallback logic

393

```

394

395

### Testing with Custom Transport

396

397

```python

398

import stomp

399

from unittest.mock import Mock

400

401

# Mock transport for testing

402

def mock_thread_creator(callback):

403

# Don't create real threads in tests

404

return Mock()

405

406

conn = stomp.Connection([('localhost', 61613)])

407

conn.override_threading(mock_thread_creator)

408

409

# Now stomp.py won't create background threads

410

# Useful for deterministic testing

411

conn.connect('test', 'test', wait=True)

412

```