or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-support.mdconfiguration.mdcore-websocket-api.mdhandler-framework.mdindex.mdmessage-types.mdserver-integration.mdsockjs-support.mdstomp-messaging.md

stomp-messaging.mddocs/

0

# STOMP Messaging

1

2

STOMP (Simple Text Oriented Messaging Protocol) support for higher-level messaging patterns over WebSocket connections.

3

4

## Capabilities

5

6

### Sub-Protocol Handler Interface

7

8

Contract for handling messages according to sub-protocol conventions.

9

10

```java { .api }

11

/**

12

* Contract for handling messages according to sub-protocol conventions.

13

* Implementations process messages for specific protocols like STOMP.

14

*/

15

interface SubProtocolHandler {

16

/**

17

* Get the list of supported sub-protocols.

18

* @return list of protocol names

19

*/

20

List<String> getSupportedProtocols();

21

22

/**

23

* Handle message from WebSocket client.

24

* @param session WebSocket session

25

* @param message incoming WebSocket message

26

* @param outputChannel channel for sending processed messages

27

* @throws Exception if message handling fails

28

*/

29

void handleMessageFromClient(

30

WebSocketSession session,

31

WebSocketMessage<?> message,

32

MessageChannel outputChannel

33

) throws Exception;

34

35

/**

36

* Handle message being sent to WebSocket client.

37

* @param session WebSocket session

38

* @param message outgoing message

39

* @throws Exception if message handling fails

40

*/

41

void handleMessageToClient(

42

WebSocketSession session,

43

Message<?> message

44

) throws Exception;

45

46

/**

47

* Resolve session ID from message headers.

48

* @param message the message

49

* @return session ID or null

50

*/

51

String resolveSessionId(Message<?> message);

52

53

/**

54

* Called after WebSocket session is started.

55

* @param session WebSocket session

56

* @param outputChannel message output channel

57

* @throws Exception if initialization fails

58

*/

59

void afterSessionStarted(

60

WebSocketSession session,

61

MessageChannel outputChannel

62

) throws Exception;

63

64

/**

65

* Called after WebSocket session is ended.

66

* @param session WebSocket session

67

* @param closeStatus close status

68

* @param outputChannel message output channel

69

* @throws Exception if cleanup fails

70

*/

71

void afterSessionEnded(

72

WebSocketSession session,

73

CloseStatus closeStatus,

74

MessageChannel outputChannel

75

) throws Exception;

76

}

77

```

78

79

### Sub-Protocol WebSocket Handler

80

81

WebSocket handler that delegates to sub-protocol handlers based on negotiated protocol.

82

83

```java { .api }

84

/**

85

* WebSocket handler that delegates to sub-protocol handlers.

86

* Routes messages based on the negotiated sub-protocol.

87

*/

88

class SubProtocolWebSocketHandler implements WebSocketHandler, SubProtocolCapable {

89

/**

90

* Set the list of sub-protocol handlers.

91

* @param protocolHandlers list of protocol handlers

92

*/

93

public void setSubProtocolHandlers(List<SubProtocolHandler> protocolHandlers);

94

95

/**

96

* Get the list of sub-protocol handlers.

97

* @return list of protocol handlers

98

*/

99

public List<SubProtocolHandler> getSubProtocolHandlers();

100

101

/**

102

* Set the default protocol handler for unsupported protocols.

103

* @param defaultProtocolHandler default handler

104

*/

105

public void setDefaultProtocolHandler(SubProtocolHandler defaultProtocolHandler);

106

107

/**

108

* Get the default protocol handler.

109

* @return default protocol handler

110

*/

111

public SubProtocolHandler getDefaultProtocolHandler();

112

113

/**

114

* Get supported sub-protocols from all handlers.

115

* @return list of supported protocol names

116

*/

117

public List<String> getSubProtocols();

118

}

119

```

120

121

**Usage Example:**

122

123

```java

124

@Configuration

125

public class SubProtocolConfig {

126

127

@Bean

128

public SubProtocolWebSocketHandler subProtocolWebSocketHandler(

129

MessageChannel clientInboundChannel,

130

MessageChannel clientOutboundChannel) {

131

132

SubProtocolWebSocketHandler handler = new SubProtocolWebSocketHandler(

133

clientInboundChannel,

134

clientOutboundChannel

135

);

136

137

// Configure STOMP sub-protocol handler

138

StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();

139

stompHandler.setMessageTypes(

140

StompCommand.CONNECT,

141

StompCommand.CONNECTED,

142

StompCommand.SUBSCRIBE,

143

StompCommand.UNSUBSCRIBE,

144

StompCommand.SEND,

145

StompCommand.MESSAGE,

146

StompCommand.DISCONNECT,

147

StompCommand.ERROR

148

);

149

150

// Configure custom sub-protocol handler

151

CustomProtocolHandler customHandler = new CustomProtocolHandler();

152

153

handler.setSubProtocolHandlers(Arrays.asList(stompHandler, customHandler));

154

handler.setDefaultProtocolHandler(stompHandler);

155

156

return handler;

157

}

158

}

159

160

// Custom sub-protocol implementation

161

public class CustomProtocolHandler implements SubProtocolHandler {

162

163

@Override

164

public List<String> getSupportedProtocols() {

165

return Arrays.asList("custom-v1", "custom-v2");

166

}

167

168

@Override

169

public void handleMessageFromClient(

170

WebSocketSession session,

171

WebSocketMessage<?> message,

172

MessageChannel outputChannel) throws Exception {

173

174

if (message instanceof TextMessage textMsg) {

175

String payload = textMsg.getPayload();

176

CustomMessage customMsg = parseCustomMessage(payload);

177

178

// Convert to Spring message and send to message channel

179

Message<CustomMessage> springMessage = MessageBuilder

180

.withPayload(customMsg)

181

.setHeader("sessionId", session.getId())

182

.setHeader("protocol", "custom-v1")

183

.build();

184

185

outputChannel.send(springMessage);

186

}

187

}

188

189

@Override

190

public void handleMessageToClient(WebSocketSession session, Message<?> message) throws Exception {

191

CustomMessage customMsg = (CustomMessage) message.getPayload();

192

String serialized = serializeCustomMessage(customMsg);

193

session.sendMessage(new TextMessage(serialized));

194

}

195

}

196

```

197

198

### STOMP Sub-Protocol Handler

199

200

Built-in sub-protocol handler for STOMP messaging.

201

202

```java { .api }

203

/**

204

* Sub-protocol handler for STOMP (Simple Text Oriented Messaging Protocol).

205

* Handles STOMP frame parsing, validation, and routing.

206

*/

207

class StompSubProtocolHandler implements SubProtocolHandler {

208

/**

209

* Set supported STOMP message types.

210

* @param messageTypes array of STOMP commands

211

*/

212

public void setMessageTypes(StompCommand... messageTypes);

213

214

/**

215

* Get supported STOMP message types.

216

* @return collection of STOMP commands

217

*/

218

public Collection<StompCommand> getMessageTypes();

219

220

/**

221

* Set the heartbeat scheduler for STOMP heartbeats.

222

* @param heartbeatScheduler task scheduler for heartbeats

223

*/

224

public void setHeartbeatScheduler(TaskScheduler heartbeatScheduler);

225

226

/**

227

* Set statistics for monitoring STOMP connections.

228

* @param statsRegistry statistics registry

229

*/

230

public void setStatsRegistry(Object statsRegistry);

231

}

232

233

/**

234

* STOMP command enumeration.

235

*/

236

enum StompCommand {

237

CONNECT, CONNECTED, SEND, SUBSCRIBE, UNSUBSCRIBE,

238

BEGIN, COMMIT, ABORT, ACK, NACK, DISCONNECT,

239

MESSAGE, RECEIPT, ERROR

240

}

241

```

242

243

### STOMP Client

244

245

STOMP client implementation for connecting to STOMP-enabled servers.

246

247

```java { .api }

248

/**

249

* STOMP client over WebSocket transport.

250

* Provides high-level STOMP messaging capabilities.

251

*/

252

class WebSocketStompClient implements StompSession.Receiptable {

253

/**

254

* Create STOMP client with WebSocket client.

255

* @param webSocketClient underlying WebSocket client

256

*/

257

public WebSocketStompClient(WebSocketClient webSocketClient);

258

259

/**

260

* Set message converter for payload conversion.

261

* @param messageConverter message converter instance

262

*/

263

public void setMessageConverter(MessageConverter messageConverter);

264

265

/**

266

* Get the message converter.

267

* @return message converter instance

268

*/

269

public MessageConverter getMessageConverter();

270

271

/**

272

* Set the inbound message channel.

273

* @param inboundChannel channel for inbound messages

274

*/

275

public void setInboundChannel(MessageChannel inboundChannel);

276

277

/**

278

* Set the outbound message channel.

279

* @param outboundChannel channel for outbound messages

280

*/

281

public void setOutboundChannel(MessageChannel outboundChannel);

282

283

/**

284

* Connect to STOMP server.

285

* @param url server URL

286

* @param handler session handler

287

* @param headers STOMP headers

288

* @return future that completes when connected

289

*/

290

public ListenableFuture<StompSession> connect(

291

String url,

292

StompSessionHandler handler,

293

Object... headers

294

);

295

296

/**

297

* Connect to STOMP server with WebSocket headers.

298

* @param url server URL

299

* @param webSocketHeaders WebSocket handshake headers

300

* @param handler session handler

301

* @param stompHeaders STOMP connect headers

302

* @return future that completes when connected

303

*/

304

public ListenableFuture<StompSession> connect(

305

String url,

306

WebSocketHttpHeaders webSocketHeaders,

307

StompSessionHandler handler,

308

Object... stompHeaders

309

);

310

}

311

```

312

313

**Usage Example:**

314

315

```java

316

@Service

317

public class StompClientService {

318

private final WebSocketStompClient stompClient;

319

private StompSession stompSession;

320

321

public StompClientService(WebSocketClient webSocketClient) {

322

this.stompClient = new WebSocketStompClient(webSocketClient);

323

this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());

324

}

325

326

@PostConstruct

327

public void connect() {

328

StompSessionHandler sessionHandler = new StompSessionHandlerAdapter() {

329

@Override

330

public void afterConnected(StompSession session, StompHeaders connectedHeaders) {

331

logger.info("Connected to STOMP server");

332

StompClientService.this.stompSession = session;

333

subscribeToTopics(session);

334

}

335

336

@Override

337

public void handleException(StompSession session, StompCommand command,

338

StompHeaders headers, byte[] payload, Throwable exception) {

339

logger.error("STOMP error: {}", exception.getMessage());

340

}

341

};

342

343

try {

344

ListenableFuture<StompSession> future = stompClient.connect(

345

"ws://localhost:8080/stomp",

346

sessionHandler

347

);

348

349

stompSession = future.get(10, TimeUnit.SECONDS);

350

} catch (Exception e) {

351

logger.error("Failed to connect to STOMP server", e);

352

}

353

}

354

355

private void subscribeToTopics(StompSession session) {

356

// Subscribe to chat messages

357

session.subscribe("/topic/chat", new StompFrameHandler() {

358

@Override

359

public Type getPayloadType(StompHeaders headers) {

360

return ChatMessage.class;

361

}

362

363

@Override

364

public void handleFrame(StompHeaders headers, Object payload) {

365

ChatMessage message = (ChatMessage) payload;

366

processChatMessage(message);

367

}

368

});

369

370

// Subscribe to user-specific notifications

371

session.subscribe("/user/queue/notifications", new StompFrameHandler() {

372

@Override

373

public Type getPayloadType(StompHeaders headers) {

374

return Notification.class;

375

}

376

377

@Override

378

public void handleFrame(StompHeaders headers, Object payload) {

379

Notification notification = (Notification) payload;

380

processNotification(notification);

381

}

382

});

383

}

384

385

public void sendChatMessage(String message) {

386

if (stompSession != null && stompSession.isConnected()) {

387

ChatMessage chatMsg = new ChatMessage("user123", message);

388

stompSession.send("/app/chat", chatMsg);

389

}

390

}

391

392

public void sendPrivateMessage(String userId, String message) {

393

if (stompSession != null && stompSession.isConnected()) {

394

PrivateMessage privateMsg = new PrivateMessage(userId, message);

395

stompSession.send("/app/private", privateMsg);

396

}

397

}

398

}

399

```

400

401

### STOMP Error Handling

402

403

Error handling interfaces and implementations for STOMP sub-protocol.

404

405

```java { .api }

406

/**

407

* Contract for handling sub-protocol errors.

408

* @param <P> the payload type

409

*/

410

interface SubProtocolErrorHandler<P> {

411

/**

412

* Handle error during client message processing.

413

* @param clientMessage original client message

414

* @param ex exception that occurred

415

* @return error message to send to client

416

*/

417

Message<P> handleClientMessageProcessingError(

418

Message<P> clientMessage,

419

Throwable ex

420

);

421

422

/**

423

* Handle error message being sent to client.

424

* @param errorMessage error message

425

* @return processed error message

426

*/

427

Message<P> handleErrorMessageToClient(Message<P> errorMessage);

428

}

429

430

/**

431

* Default error handler for STOMP sub-protocol.

432

* Creates STOMP ERROR frames for exceptions.

433

*/

434

class StompSubProtocolErrorHandler implements SubProtocolErrorHandler<byte[]> {

435

/**

436

* Create STOMP error handler.

437

*/

438

public StompSubProtocolErrorHandler();

439

}

440

```

441

442

### STOMP Session Events

443

444

Spring application events published during STOMP session lifecycle.

445

446

```java { .api }

447

/**

448

* Base class for sub-protocol events.

449

* Contains common information about WebSocket sessions and messages.

450

*/

451

abstract class AbstractSubProtocolEvent extends ApplicationEvent {

452

/**

453

* Get the message associated with this event.

454

* @return the message

455

*/

456

public Message<byte[]> getMessage();

457

458

/**

459

* Get the user principal.

460

* @return user principal or null

461

*/

462

public Principal getUser();

463

}

464

465

/**

466

* Event published when a WebSocket client connects using STOMP.

467

* Published before CONNECTED frame is sent.

468

*/

469

class SessionConnectEvent extends AbstractSubProtocolEvent {

470

/**

471

* Create connect event.

472

* @param source event source

473

* @param message CONNECT message

474

* @param user authenticated user

475

*/

476

public SessionConnectEvent(Object source, Message<byte[]> message, Principal user);

477

}

478

479

/**

480

* Event published when a WebSocket client has successfully connected.

481

* Published after CONNECTED frame is sent.

482

*/

483

class SessionConnectedEvent extends AbstractSubProtocolEvent {

484

/**

485

* Create connected event.

486

* @param source event source

487

* @param message CONNECTED message

488

* @param user authenticated user

489

*/

490

public SessionConnectedEvent(Object source, Message<byte[]> message, Principal user);

491

}

492

493

/**

494

* Event published when a client subscribes to a destination.

495

*/

496

class SessionSubscribeEvent extends AbstractSubProtocolEvent {

497

/**

498

* Create subscribe event.

499

* @param source event source

500

* @param message SUBSCRIBE message

501

* @param user authenticated user

502

*/

503

public SessionSubscribeEvent(Object source, Message<byte[]> message, Principal user);

504

}

505

506

/**

507

* Event published when a client unsubscribes from a destination.

508

*/

509

class SessionUnsubscribeEvent extends AbstractSubProtocolEvent {

510

/**

511

* Create unsubscribe event.

512

* @param source event source

513

* @param message UNSUBSCRIBE message

514

* @param user authenticated user

515

*/

516

public SessionUnsubscribeEvent(Object source, Message<byte[]> message, Principal user);

517

}

518

519

/**

520

* Event published when a WebSocket client disconnects.

521

*/

522

class SessionDisconnectEvent extends AbstractSubProtocolEvent {

523

/**

524

* Create disconnect event.

525

* @param source event source

526

* @param message DISCONNECT message

527

* @param sessionId session identifier

528

* @param closeStatus WebSocket close status

529

* @param user authenticated user

530

*/

531

public SessionDisconnectEvent(

532

Object source,

533

Message<byte[]> message,

534

String sessionId,

535

CloseStatus closeStatus,

536

Principal user

537

);

538

539

/**

540

* Get the WebSocket close status.

541

* @return close status

542

*/

543

public CloseStatus getCloseStatus();

544

545

/**

546

* Get the session ID.

547

* @return session identifier

548

*/

549

public String getSessionId();

550

}

551

```

552

553

**Usage Example:**

554

555

```java

556

@Component

557

public class StompEventListener {

558

559

@EventListener

560

public void handleSessionConnect(SessionConnectEvent event) {

561

Principal user = event.getUser();

562

logger.info("User {} connecting via STOMP", user != null ? user.getName() : "anonymous");

563

564

// Perform connection validation

565

if (!isUserAllowed(user)) {

566

// Could throw exception to reject connection

567

throw new SecurityException("User not allowed to connect");

568

}

569

}

570

571

@EventListener

572

public void handleSessionConnected(SessionConnectedEvent event) {

573

Principal user = event.getUser();

574

logger.info("User {} connected via STOMP", user != null ? user.getName() : "anonymous");

575

576

// Send welcome message

577

if (user != null) {

578

sendWelcomeMessage(user.getName());

579

}

580

}

581

582

@EventListener

583

public void handleSessionSubscribe(SessionSubscribeEvent event) {

584

Message<byte[]> message = event.getMessage();

585

String destination = (String) message.getHeaders().get("simpDestination");

586

Principal user = event.getUser();

587

588

logger.info("User {} subscribed to {}",

589

user != null ? user.getName() : "anonymous",

590

destination);

591

592

// Track subscriptions for analytics

593

subscriptionTracker.recordSubscription(user, destination);

594

}

595

596

@EventListener

597

public void handleSessionDisconnect(SessionDisconnectEvent event) {

598

String sessionId = event.getSessionId();

599

Principal user = event.getUser();

600

CloseStatus closeStatus = event.getCloseStatus();

601

602

logger.info("User {} disconnected (session: {}, status: {})",

603

user != null ? user.getName() : "anonymous",

604

sessionId,

605

closeStatus);

606

607

// Cleanup user resources

608

cleanupUserSession(sessionId, user);

609

}

610

}

611

```