or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotation-handling.mdcore-messaging.mdindex.mdmessage-conversion.mdmessaging-templates.mdrsocket-integration.mdstomp-websocket.md

stomp-websocket.mddocs/

0

# STOMP and WebSocket Support

1

2

Complete STOMP (Simple Text Oriented Messaging Protocol) implementation for WebSocket-based messaging with session management, subscription handling, and broker relay capabilities.

3

4

## Capabilities

5

6

### STOMP Session Interface

7

8

Core interface for STOMP client session operations.

9

10

```java { .api }

11

/**

12

* A STOMP session that provides methods for sending messages and managing subscriptions.

13

*/

14

public interface StompSession {

15

/**

16

* Get the session id.

17

*/

18

String getSessionId();

19

20

/**

21

* Whether the session is connected.

22

*/

23

boolean isConnected();

24

25

/**

26

* Set the interval between heartbeat messages.

27

*/

28

void setAutoReceipt(boolean enabled);

29

30

/**

31

* Send a message to the specified destination.

32

*/

33

void send(String destination, Object payload);

34

35

/**

36

* Send a message to the specified destination with headers.

37

*/

38

void send(StompHeaders headers, Object payload);

39

40

/**

41

* Subscribe to the given destination.

42

*/

43

Subscription subscribe(String destination, StompFrameHandler handler);

44

45

/**

46

* Subscribe to the given destination with headers.

47

*/

48

Subscription subscribe(StompHeaders headers, StompFrameHandler handler);

49

50

/**

51

* Send a receipt.

52

*/

53

Receiptable receipt(String receiptId, ReceiptHandler receiptHandler);

54

55

/**

56

* Disconnect the session.

57

*/

58

void disconnect();

59

60

/**

61

* Represents a subscription in a STOMP session.

62

*/

63

interface Subscription {

64

/**

65

* Return the subscription id.

66

*/

67

String getSubscriptionId();

68

69

/**

70

* Remove the subscription.

71

*/

72

void unsubscribe();

73

74

/**

75

* Add a handler for received receipt.

76

*/

77

void addReceiptTask(Runnable runnable);

78

79

/**

80

* Add a handler for lost receipts.

81

*/

82

void addReceiptLostTask(Runnable runnable);

83

}

84

85

/**

86

* Represents a receiptable STOMP frame.

87

*/

88

interface Receiptable {

89

/**

90

* Return the receipt id.

91

*/

92

String getReceiptId();

93

94

/**

95

* Add a receipt received task.

96

*/

97

void addReceiptTask(Runnable runnable);

98

99

/**

100

* Add a receipt lost task.

101

*/

102

void addReceiptLostTask(Runnable runnable);

103

}

104

}

105

```

106

107

### STOMP Frame Handling

108

109

Interfaces for handling STOMP frames and session events.

110

111

```java { .api }

112

/**

113

* Contract to handle a STOMP frame.

114

*/

115

public interface StompFrameHandler {

116

/**

117

* Invoked before the STOMP CONNECTED frame has been processed.

118

*/

119

void afterConnected(StompSession session, StompHeaders connectedHeaders);

120

121

/**

122

* Handle an exception.

123

*/

124

void handleException(StompSession session, @Nullable StompCommand command,

125

StompHeaders headers, byte[] payload, Throwable exception);

126

127

/**

128

* Return the payload type this handler expects to receive.

129

*/

130

Type getPayloadType(StompHeaders headers);

131

132

/**

133

* Handle the payload of a STOMP message.

134

*/

135

void handleFrame(StompHeaders headers, @Nullable Object payload);

136

}

137

138

/**

139

* Contract to handle STOMP session lifecycle events.

140

*/

141

public interface StompSessionHandler extends StompFrameHandler {

142

/**

143

* Handle any exception arising while processing a STOMP frame.

144

*/

145

void handleException(StompSession session, @Nullable StompCommand command,

146

StompHeaders headers, byte[] payload, Throwable exception);

147

148

/**

149

* Handle a STOMP transport error.

150

*/

151

void handleTransportError(StompSession session, Throwable exception);

152

}

153

```

154

155

### STOMP Headers

156

157

STOMP-specific headers implementation and accessor.

158

159

```java { .api }

160

/**

161

* Represents STOMP frame headers.

162

*/

163

public class StompHeaders implements MultiValueMap<String, String>, Serializable {

164

165

public StompHeaders();

166

167

/**

168

* Set the destination header.

169

*/

170

public void setDestination(@Nullable String destination);

171

172

/**

173

* Return the destination header value.

174

*/

175

@Nullable

176

public String getDestination();

177

178

/**

179

* Set the content-type header.

180

*/

181

public void setContentType(@Nullable MimeType contentType);

182

183

/**

184

* Return the content-type header value.

185

*/

186

@Nullable

187

public MimeType getContentType();

188

189

/**

190

* Set the content-length header.

191

*/

192

public void setContentLength(long contentLength);

193

194

/**

195

* Return the content-length header value.

196

*/

197

public long getContentLength();

198

199

/**

200

* Set the receipt header.

201

*/

202

public void setReceipt(@Nullable String receipt);

203

204

/**

205

* Return the receipt header value.

206

*/

207

@Nullable

208

public String getReceipt();

209

210

/**

211

* Set the heartbeat header.

212

*/

213

public void setHeartbeat(@Nullable long[] heartbeat);

214

215

/**

216

* Return the heartbeat header value.

217

*/

218

@Nullable

219

public long[] getHeartbeat();

220

}

221

222

/**

223

* A MessageHeaderAccessor for STOMP messaging.

224

*/

225

public class StompHeaderAccessor extends SimpMessageHeaderAccessor {

226

227

public static StompHeaderAccessor create(StompCommand command);

228

229

public static StompHeaderAccessor createForHeartbeat();

230

231

/**

232

* Return the STOMP command, or null if not yet set.

233

*/

234

@Nullable

235

public StompCommand getCommand();

236

237

/**

238

* Set the STOMP command.

239

*/

240

public void setCommand(StompCommand command);

241

242

/**

243

* Return the value of the "host" header.

244

*/

245

@Nullable

246

public String getHost();

247

248

/**

249

* Set the "host" header.

250

*/

251

public void setHost(@Nullable String host);

252

253

/**

254

* Return the value of the "login" header.

255

*/

256

@Nullable

257

public String getLogin();

258

259

/**

260

* Set the "login" header.

261

*/

262

public void setLogin(@Nullable String login);

263

264

/**

265

* Return the value of the "passcode" header.

266

*/

267

@Nullable

268

public String getPasscode();

269

270

/**

271

* Set the "passcode" header.

272

*/

273

public void setPasscode(@Nullable String passcode);

274

}

275

```

276

277

### STOMP Commands

278

279

Enumeration of STOMP command types.

280

281

```java { .api }

282

/**

283

* Represents a STOMP command frame as defined in the STOMP specification.

284

*/

285

public enum StompCommand {

286

287

// Client commands

288

CONNECT,

289

STOMP,

290

SEND,

291

SUBSCRIBE,

292

UNSUBSCRIBE,

293

ACK,

294

NACK,

295

BEGIN,

296

COMMIT,

297

ABORT,

298

DISCONNECT,

299

300

// Server commands

301

CONNECTED,

302

MESSAGE,

303

RECEIPT,

304

ERROR;

305

306

/**

307

* Whether this command can be sent by a client.

308

*/

309

public boolean isClientCommand() {

310

return this.ordinal() < CONNECTED.ordinal();

311

}

312

313

/**

314

* Whether this command is sent by a server.

315

*/

316

public boolean isServerCommand() {

317

return this.ordinal() >= CONNECTED.ordinal();

318

}

319

}

320

```

321

322

### STOMP Codec

323

324

Encoder and decoder for STOMP frames.

325

326

```java { .api }

327

/**

328

* Decodes STOMP frames from ByteBuffer chunks.

329

*/

330

public class StompDecoder {

331

332

public StompDecoder();

333

334

/**

335

* Decode one or more STOMP frames from the given ByteBuffer.

336

*/

337

public List<Message<byte[]>> decode(ByteBuffer byteBuffer);

338

339

/**

340

* Return the configured header length limit.

341

*/

342

public int getHeaderLengthLimit();

343

344

/**

345

* Configure the maximum length allowed for STOMP headers.

346

*/

347

public void setHeaderLengthLimit(int headerLengthLimit);

348

}

349

350

/**

351

* Encodes STOMP frames to ByteBuffer.

352

*/

353

public class StompEncoder {

354

355

public StompEncoder();

356

357

/**

358

* Encode the given STOMP message to a byte array.

359

*/

360

public byte[] encode(Message<byte[]> message);

361

362

/**

363

* Encode the given STOMP message to a ByteBuffer.

364

*/

365

public ByteBuffer encode(Map<String, Object> headers, byte[] payload);

366

}

367

```

368

369

### STOMP Client Implementation

370

371

Reactor Netty-based STOMP client for TCP connections.

372

373

```java { .api }

374

/**

375

* A STOMP over TCP client that uses ReactorNettyTcpClient.

376

*/

377

public class ReactorNettyTcpStompClient implements StompClient {

378

379

public ReactorNettyTcpStompClient();

380

381

public ReactorNettyTcpStompClient(String host, int port);

382

383

/**

384

* Set the STOMP message codec to use for encoding and decoding STOMP messages.

385

*/

386

public void setMessageConverter(MessageConverter messageConverter);

387

388

/**

389

* Set the interval between heartbeat messages.

390

*/

391

public void setDefaultHeartbeat(@Nullable long[] heartbeat);

392

393

/**

394

* Configure a timeout for the receipt of the STOMP CONNECTED frame.

395

*/

396

public void setConnectTimeout(Duration connectTimeout);

397

398

/**

399

* Connect to the STOMP server.

400

*/

401

public StompSession connect(String url, StompSessionHandler handler, Object... uriVariables);

402

403

/**

404

* Connect to the STOMP server with headers.

405

*/

406

public StompSession connect(String url, @Nullable StompHeaders connectHeaders,

407

StompSessionHandler handler, Object... uriVariables);

408

}

409

```

410

411

### STOMP Broker Relay

412

413

Message handler that relays messages to an external STOMP broker.

414

415

```java { .api }

416

/**

417

* A MessageHandler that handles messages by forwarding them to a STOMP broker.

418

*/

419

public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {

420

421

public StompBrokerRelayMessageHandler(SubscribableChannel clientInboundChannel,

422

MessageChannel clientOutboundChannel,

423

SubscribableChannel brokerChannel,

424

Collection<String> destinationPrefixes);

425

426

/**

427

* Set the STOMP broker host.

428

*/

429

public void setRelayHost(String relayHost);

430

431

/**

432

* Return the STOMP broker host.

433

*/

434

public String getRelayHost();

435

436

/**

437

* Set the STOMP broker port.

438

*/

439

public void setRelayPort(int relayPort);

440

441

/**

442

* Return the STOMP broker port.

443

*/

444

public int getRelayPort();

445

446

/**

447

* Set the login to use when creating connections to the STOMP broker.

448

*/

449

public void setSystemLogin(@Nullable String systemLogin);

450

451

/**

452

* Set the passcode to use when creating connections to the STOMP broker.

453

*/

454

public void setSystemPasscode(@Nullable String systemPasscode);

455

456

/**

457

* Set the heartbeat settings for connections to the STOMP broker.

458

*/

459

public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval);

460

461

/**

462

* Set the heartbeat settings for connections to the STOMP broker.

463

*/

464

public void setSystemHeartbeatReceiveInterval(long systemHeartbeatReceiveInterval);

465

}

466

```

467

468

### STOMP Exceptions

469

470

Exceptions specific to STOMP protocol handling.

471

472

```java { .api }

473

/**

474

* Raised when an error occurs during STOMP message conversion.

475

*/

476

public class StompConversionException extends MessagingException {

477

478

public StompConversionException(String description, Throwable cause);

479

480

public StompConversionException(String description);

481

}

482

483

/**

484

* Raised when a connection is lost unexpectedly.

485

*/

486

public class ConnectionLostException extends MessagingException {

487

488

public ConnectionLostException(String description);

489

}

490

```

491

492

**Usage Examples:**

493

494

```java

495

import org.springframework.messaging.simp.stomp.*;

496

import org.springframework.messaging.converter.MappingJackson2MessageConverter;

497

498

// STOMP client setup

499

ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("localhost", 61613);

500

stompClient.setMessageConverter(new MappingJackson2MessageConverter());

501

502

// Session handler

503

StompSessionHandler sessionHandler = new StompSessionHandlerAdapter() {

504

@Override

505

public void afterConnected(StompSession session, StompHeaders connectedHeaders) {

506

System.out.println("Connected to STOMP broker");

507

}

508

509

@Override

510

public void handleException(StompSession session, StompCommand command,

511

StompHeaders headers, byte[] payload, Throwable exception) {

512

System.err.println("STOMP error: " + exception.getMessage());

513

}

514

};

515

516

// Connect and use session

517

StompSession session = stompClient.connect("stomp://localhost:61613/", sessionHandler);

518

519

// Send message

520

session.send("/topic/messages", "Hello STOMP!");

521

522

// Subscribe to destination

523

StompSession.Subscription subscription = session.subscribe("/topic/messages",

524

new StompFrameHandler() {

525

@Override

526

public Type getPayloadType(StompHeaders headers) {

527

return String.class;

528

}

529

530

@Override

531

public void handleFrame(StompHeaders headers, Object payload) {

532

System.out.println("Received: " + payload);

533

}

534

});

535

536

// Unsubscribe

537

subscription.unsubscribe();

538

539

// Disconnect

540

session.disconnect();

541

```