0
# STOMP Messaging
1
2
STOMP (Simple Text Oriented Messaging Protocol) support for higher-level messaging patterns over WebSocket connections.
3
4
## Capabilities
5
6
### Sub-Protocol Handler Interface
7
8
Contract for handling messages according to sub-protocol conventions.
9
10
```java { .api }
11
/**
12
* Contract for handling messages according to sub-protocol conventions.
13
* Implementations process messages for specific protocols like STOMP.
14
*/
15
interface SubProtocolHandler {
16
/**
17
* Get the list of supported sub-protocols.
18
* @return list of protocol names
19
*/
20
List<String> getSupportedProtocols();
21
22
/**
23
* Handle message from WebSocket client.
24
* @param session WebSocket session
25
* @param message incoming WebSocket message
26
* @param outputChannel channel for sending processed messages
27
* @throws Exception if message handling fails
28
*/
29
void handleMessageFromClient(
30
WebSocketSession session,
31
WebSocketMessage<?> message,
32
MessageChannel outputChannel
33
) throws Exception;
34
35
/**
36
* Handle message being sent to WebSocket client.
37
* @param session WebSocket session
38
* @param message outgoing message
39
* @throws Exception if message handling fails
40
*/
41
void handleMessageToClient(
42
WebSocketSession session,
43
Message<?> message
44
) throws Exception;
45
46
/**
47
* Resolve session ID from message headers.
48
* @param message the message
49
* @return session ID or null
50
*/
51
String resolveSessionId(Message<?> message);
52
53
/**
54
* Called after WebSocket session is started.
55
* @param session WebSocket session
56
* @param outputChannel message output channel
57
* @throws Exception if initialization fails
58
*/
59
void afterSessionStarted(
60
WebSocketSession session,
61
MessageChannel outputChannel
62
) throws Exception;
63
64
/**
65
* Called after WebSocket session is ended.
66
* @param session WebSocket session
67
* @param closeStatus close status
68
* @param outputChannel message output channel
69
* @throws Exception if cleanup fails
70
*/
71
void afterSessionEnded(
72
WebSocketSession session,
73
CloseStatus closeStatus,
74
MessageChannel outputChannel
75
) throws Exception;
76
}
77
```
78
79
### Sub-Protocol WebSocket Handler
80
81
WebSocket handler that delegates to sub-protocol handlers based on negotiated protocol.
82
83
```java { .api }
84
/**
85
* WebSocket handler that delegates to sub-protocol handlers.
86
* Routes messages based on the negotiated sub-protocol.
87
*/
88
class SubProtocolWebSocketHandler implements WebSocketHandler, SubProtocolCapable {
89
/**
90
* Set the list of sub-protocol handlers.
91
* @param protocolHandlers list of protocol handlers
92
*/
93
public void setSubProtocolHandlers(List<SubProtocolHandler> protocolHandlers);
94
95
/**
96
* Get the list of sub-protocol handlers.
97
* @return list of protocol handlers
98
*/
99
public List<SubProtocolHandler> getSubProtocolHandlers();
100
101
/**
102
* Set the default protocol handler for unsupported protocols.
103
* @param defaultProtocolHandler default handler
104
*/
105
public void setDefaultProtocolHandler(SubProtocolHandler defaultProtocolHandler);
106
107
/**
108
* Get the default protocol handler.
109
* @return default protocol handler
110
*/
111
public SubProtocolHandler getDefaultProtocolHandler();
112
113
/**
114
* Get supported sub-protocols from all handlers.
115
* @return list of supported protocol names
116
*/
117
public List<String> getSubProtocols();
118
}
119
```
120
121
**Usage Example:**
122
123
```java
124
@Configuration
125
public class SubProtocolConfig {
126
127
@Bean
128
public SubProtocolWebSocketHandler subProtocolWebSocketHandler(
129
MessageChannel clientInboundChannel,
130
MessageChannel clientOutboundChannel) {
131
132
SubProtocolWebSocketHandler handler = new SubProtocolWebSocketHandler(
133
clientInboundChannel,
134
clientOutboundChannel
135
);
136
137
// Configure STOMP sub-protocol handler
138
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
139
stompHandler.setMessageTypes(
140
StompCommand.CONNECT,
141
StompCommand.CONNECTED,
142
StompCommand.SUBSCRIBE,
143
StompCommand.UNSUBSCRIBE,
144
StompCommand.SEND,
145
StompCommand.MESSAGE,
146
StompCommand.DISCONNECT,
147
StompCommand.ERROR
148
);
149
150
// Configure custom sub-protocol handler
151
CustomProtocolHandler customHandler = new CustomProtocolHandler();
152
153
handler.setSubProtocolHandlers(Arrays.asList(stompHandler, customHandler));
154
handler.setDefaultProtocolHandler(stompHandler);
155
156
return handler;
157
}
158
}
159
160
// Custom sub-protocol implementation
161
public class CustomProtocolHandler implements SubProtocolHandler {
162
163
@Override
164
public List<String> getSupportedProtocols() {
165
return Arrays.asList("custom-v1", "custom-v2");
166
}
167
168
@Override
169
public void handleMessageFromClient(
170
WebSocketSession session,
171
WebSocketMessage<?> message,
172
MessageChannel outputChannel) throws Exception {
173
174
if (message instanceof TextMessage textMsg) {
175
String payload = textMsg.getPayload();
176
CustomMessage customMsg = parseCustomMessage(payload);
177
178
// Convert to Spring message and send to message channel
179
Message<CustomMessage> springMessage = MessageBuilder
180
.withPayload(customMsg)
181
.setHeader("sessionId", session.getId())
182
.setHeader("protocol", "custom-v1")
183
.build();
184
185
outputChannel.send(springMessage);
186
}
187
}
188
189
@Override
190
public void handleMessageToClient(WebSocketSession session, Message<?> message) throws Exception {
191
CustomMessage customMsg = (CustomMessage) message.getPayload();
192
String serialized = serializeCustomMessage(customMsg);
193
session.sendMessage(new TextMessage(serialized));
194
}
195
}
196
```
197
198
### STOMP Sub-Protocol Handler
199
200
Built-in sub-protocol handler for STOMP messaging.
201
202
```java { .api }
203
/**
204
* Sub-protocol handler for STOMP (Simple Text Oriented Messaging Protocol).
205
* Handles STOMP frame parsing, validation, and routing.
206
*/
207
class StompSubProtocolHandler implements SubProtocolHandler {
208
/**
209
* Set supported STOMP message types.
210
* @param messageTypes array of STOMP commands
211
*/
212
public void setMessageTypes(StompCommand... messageTypes);
213
214
/**
215
* Get supported STOMP message types.
216
* @return collection of STOMP commands
217
*/
218
public Collection<StompCommand> getMessageTypes();
219
220
/**
221
* Set the heartbeat scheduler for STOMP heartbeats.
222
* @param heartbeatScheduler task scheduler for heartbeats
223
*/
224
public void setHeartbeatScheduler(TaskScheduler heartbeatScheduler);
225
226
/**
227
* Set statistics for monitoring STOMP connections.
228
* @param statsRegistry statistics registry
229
*/
230
public void setStatsRegistry(Object statsRegistry);
231
}
232
233
/**
234
* STOMP command enumeration.
235
*/
236
enum StompCommand {
237
CONNECT, CONNECTED, SEND, SUBSCRIBE, UNSUBSCRIBE,
238
BEGIN, COMMIT, ABORT, ACK, NACK, DISCONNECT,
239
MESSAGE, RECEIPT, ERROR
240
}
241
```
242
243
### STOMP Client
244
245
STOMP client implementation for connecting to STOMP-enabled servers.
246
247
```java { .api }
248
/**
249
* STOMP client over WebSocket transport.
250
* Provides high-level STOMP messaging capabilities.
251
*/
252
class WebSocketStompClient implements StompSession.Receiptable {
253
/**
254
* Create STOMP client with WebSocket client.
255
* @param webSocketClient underlying WebSocket client
256
*/
257
public WebSocketStompClient(WebSocketClient webSocketClient);
258
259
/**
260
* Set message converter for payload conversion.
261
* @param messageConverter message converter instance
262
*/
263
public void setMessageConverter(MessageConverter messageConverter);
264
265
/**
266
* Get the message converter.
267
* @return message converter instance
268
*/
269
public MessageConverter getMessageConverter();
270
271
/**
272
* Set the inbound message channel.
273
* @param inboundChannel channel for inbound messages
274
*/
275
public void setInboundChannel(MessageChannel inboundChannel);
276
277
/**
278
* Set the outbound message channel.
279
* @param outboundChannel channel for outbound messages
280
*/
281
public void setOutboundChannel(MessageChannel outboundChannel);
282
283
/**
284
* Connect to STOMP server.
285
* @param url server URL
286
* @param handler session handler
287
* @param headers STOMP headers
288
* @return future that completes when connected
289
*/
290
public ListenableFuture<StompSession> connect(
291
String url,
292
StompSessionHandler handler,
293
Object... headers
294
);
295
296
/**
297
* Connect to STOMP server with WebSocket headers.
298
* @param url server URL
299
* @param webSocketHeaders WebSocket handshake headers
300
* @param handler session handler
301
* @param stompHeaders STOMP connect headers
302
* @return future that completes when connected
303
*/
304
public ListenableFuture<StompSession> connect(
305
String url,
306
WebSocketHttpHeaders webSocketHeaders,
307
StompSessionHandler handler,
308
Object... stompHeaders
309
);
310
}
311
```
312
313
**Usage Example:**
314
315
```java
316
@Service
317
public class StompClientService {
318
private final WebSocketStompClient stompClient;
319
private StompSession stompSession;
320
321
public StompClientService(WebSocketClient webSocketClient) {
322
this.stompClient = new WebSocketStompClient(webSocketClient);
323
this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());
324
}
325
326
@PostConstruct
327
public void connect() {
328
StompSessionHandler sessionHandler = new StompSessionHandlerAdapter() {
329
@Override
330
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
331
logger.info("Connected to STOMP server");
332
StompClientService.this.stompSession = session;
333
subscribeToTopics(session);
334
}
335
336
@Override
337
public void handleException(StompSession session, StompCommand command,
338
StompHeaders headers, byte[] payload, Throwable exception) {
339
logger.error("STOMP error: {}", exception.getMessage());
340
}
341
};
342
343
try {
344
ListenableFuture<StompSession> future = stompClient.connect(
345
"ws://localhost:8080/stomp",
346
sessionHandler
347
);
348
349
stompSession = future.get(10, TimeUnit.SECONDS);
350
} catch (Exception e) {
351
logger.error("Failed to connect to STOMP server", e);
352
}
353
}
354
355
private void subscribeToTopics(StompSession session) {
356
// Subscribe to chat messages
357
session.subscribe("/topic/chat", new StompFrameHandler() {
358
@Override
359
public Type getPayloadType(StompHeaders headers) {
360
return ChatMessage.class;
361
}
362
363
@Override
364
public void handleFrame(StompHeaders headers, Object payload) {
365
ChatMessage message = (ChatMessage) payload;
366
processChatMessage(message);
367
}
368
});
369
370
// Subscribe to user-specific notifications
371
session.subscribe("/user/queue/notifications", new StompFrameHandler() {
372
@Override
373
public Type getPayloadType(StompHeaders headers) {
374
return Notification.class;
375
}
376
377
@Override
378
public void handleFrame(StompHeaders headers, Object payload) {
379
Notification notification = (Notification) payload;
380
processNotification(notification);
381
}
382
});
383
}
384
385
public void sendChatMessage(String message) {
386
if (stompSession != null && stompSession.isConnected()) {
387
ChatMessage chatMsg = new ChatMessage("user123", message);
388
stompSession.send("/app/chat", chatMsg);
389
}
390
}
391
392
public void sendPrivateMessage(String userId, String message) {
393
if (stompSession != null && stompSession.isConnected()) {
394
PrivateMessage privateMsg = new PrivateMessage(userId, message);
395
stompSession.send("/app/private", privateMsg);
396
}
397
}
398
}
399
```
400
401
### STOMP Error Handling
402
403
Error handling interfaces and implementations for STOMP sub-protocol.
404
405
```java { .api }
406
/**
407
* Contract for handling sub-protocol errors.
408
* @param <P> the payload type
409
*/
410
interface SubProtocolErrorHandler<P> {
411
/**
412
* Handle error during client message processing.
413
* @param clientMessage original client message
414
* @param ex exception that occurred
415
* @return error message to send to client
416
*/
417
Message<P> handleClientMessageProcessingError(
418
Message<P> clientMessage,
419
Throwable ex
420
);
421
422
/**
423
* Handle error message being sent to client.
424
* @param errorMessage error message
425
* @return processed error message
426
*/
427
Message<P> handleErrorMessageToClient(Message<P> errorMessage);
428
}
429
430
/**
431
* Default error handler for STOMP sub-protocol.
432
* Creates STOMP ERROR frames for exceptions.
433
*/
434
class StompSubProtocolErrorHandler implements SubProtocolErrorHandler<byte[]> {
435
/**
436
* Create STOMP error handler.
437
*/
438
public StompSubProtocolErrorHandler();
439
}
440
```
441
442
### STOMP Session Events
443
444
Spring application events published during STOMP session lifecycle.
445
446
```java { .api }
447
/**
448
* Base class for sub-protocol events.
449
* Contains common information about WebSocket sessions and messages.
450
*/
451
abstract class AbstractSubProtocolEvent extends ApplicationEvent {
452
/**
453
* Get the message associated with this event.
454
* @return the message
455
*/
456
public Message<byte[]> getMessage();
457
458
/**
459
* Get the user principal.
460
* @return user principal or null
461
*/
462
public Principal getUser();
463
}
464
465
/**
466
* Event published when a WebSocket client connects using STOMP.
467
* Published before CONNECTED frame is sent.
468
*/
469
class SessionConnectEvent extends AbstractSubProtocolEvent {
470
/**
471
* Create connect event.
472
* @param source event source
473
* @param message CONNECT message
474
* @param user authenticated user
475
*/
476
public SessionConnectEvent(Object source, Message<byte[]> message, Principal user);
477
}
478
479
/**
480
* Event published when a WebSocket client has successfully connected.
481
* Published after CONNECTED frame is sent.
482
*/
483
class SessionConnectedEvent extends AbstractSubProtocolEvent {
484
/**
485
* Create connected event.
486
* @param source event source
487
* @param message CONNECTED message
488
* @param user authenticated user
489
*/
490
public SessionConnectedEvent(Object source, Message<byte[]> message, Principal user);
491
}
492
493
/**
494
* Event published when a client subscribes to a destination.
495
*/
496
class SessionSubscribeEvent extends AbstractSubProtocolEvent {
497
/**
498
* Create subscribe event.
499
* @param source event source
500
* @param message SUBSCRIBE message
501
* @param user authenticated user
502
*/
503
public SessionSubscribeEvent(Object source, Message<byte[]> message, Principal user);
504
}
505
506
/**
507
* Event published when a client unsubscribes from a destination.
508
*/
509
class SessionUnsubscribeEvent extends AbstractSubProtocolEvent {
510
/**
511
* Create unsubscribe event.
512
* @param source event source
513
* @param message UNSUBSCRIBE message
514
* @param user authenticated user
515
*/
516
public SessionUnsubscribeEvent(Object source, Message<byte[]> message, Principal user);
517
}
518
519
/**
520
* Event published when a WebSocket client disconnects.
521
*/
522
class SessionDisconnectEvent extends AbstractSubProtocolEvent {
523
/**
524
* Create disconnect event.
525
* @param source event source
526
* @param message DISCONNECT message
527
* @param sessionId session identifier
528
* @param closeStatus WebSocket close status
529
* @param user authenticated user
530
*/
531
public SessionDisconnectEvent(
532
Object source,
533
Message<byte[]> message,
534
String sessionId,
535
CloseStatus closeStatus,
536
Principal user
537
);
538
539
/**
540
* Get the WebSocket close status.
541
* @return close status
542
*/
543
public CloseStatus getCloseStatus();
544
545
/**
546
* Get the session ID.
547
* @return session identifier
548
*/
549
public String getSessionId();
550
}
551
```
552
553
**Usage Example:**
554
555
```java
556
@Component
557
public class StompEventListener {
558
559
@EventListener
560
public void handleSessionConnect(SessionConnectEvent event) {
561
Principal user = event.getUser();
562
logger.info("User {} connecting via STOMP", user != null ? user.getName() : "anonymous");
563
564
// Perform connection validation
565
if (!isUserAllowed(user)) {
566
// Could throw exception to reject connection
567
throw new SecurityException("User not allowed to connect");
568
}
569
}
570
571
@EventListener
572
public void handleSessionConnected(SessionConnectedEvent event) {
573
Principal user = event.getUser();
574
logger.info("User {} connected via STOMP", user != null ? user.getName() : "anonymous");
575
576
// Send welcome message
577
if (user != null) {
578
sendWelcomeMessage(user.getName());
579
}
580
}
581
582
@EventListener
583
public void handleSessionSubscribe(SessionSubscribeEvent event) {
584
Message<byte[]> message = event.getMessage();
585
String destination = (String) message.getHeaders().get("simpDestination");
586
Principal user = event.getUser();
587
588
logger.info("User {} subscribed to {}",
589
user != null ? user.getName() : "anonymous",
590
destination);
591
592
// Track subscriptions for analytics
593
subscriptionTracker.recordSubscription(user, destination);
594
}
595
596
@EventListener
597
public void handleSessionDisconnect(SessionDisconnectEvent event) {
598
String sessionId = event.getSessionId();
599
Principal user = event.getUser();
600
CloseStatus closeStatus = event.getCloseStatus();
601
602
logger.info("User {} disconnected (session: {}, status: {})",
603
user != null ? user.getName() : "anonymous",
604
sessionId,
605
closeStatus);
606
607
// Cleanup user resources
608
cleanupUserSession(sessionId, user);
609
}
610
}
611
```