0
# STOMP and WebSocket Support
1
2
Complete STOMP (Simple Text Oriented Messaging Protocol) implementation for WebSocket-based messaging with session management, subscription handling, and broker relay capabilities.
3
4
## Capabilities
5
6
### STOMP Session Interface
7
8
Core interface for STOMP client session operations.
9
10
```java { .api }
11
/**
12
* A STOMP session that provides methods for sending messages and managing subscriptions.
13
*/
14
public interface StompSession {
15
/**
16
* Get the session id.
17
*/
18
String getSessionId();
19
20
/**
21
* Whether the session is connected.
22
*/
23
boolean isConnected();
24
25
/**
26
* Set the interval between heartbeat messages.
27
*/
28
void setAutoReceipt(boolean enabled);
29
30
/**
31
* Send a message to the specified destination.
32
*/
33
void send(String destination, Object payload);
34
35
/**
36
* Send a message to the specified destination with headers.
37
*/
38
void send(StompHeaders headers, Object payload);
39
40
/**
41
* Subscribe to the given destination.
42
*/
43
Subscription subscribe(String destination, StompFrameHandler handler);
44
45
/**
46
* Subscribe to the given destination with headers.
47
*/
48
Subscription subscribe(StompHeaders headers, StompFrameHandler handler);
49
50
/**
51
* Send a receipt.
52
*/
53
Receiptable receipt(String receiptId, ReceiptHandler receiptHandler);
54
55
/**
56
* Disconnect the session.
57
*/
58
void disconnect();
59
60
/**
61
* Represents a subscription in a STOMP session.
62
*/
63
interface Subscription {
64
/**
65
* Return the subscription id.
66
*/
67
String getSubscriptionId();
68
69
/**
70
* Remove the subscription.
71
*/
72
void unsubscribe();
73
74
/**
75
* Add a handler for received receipt.
76
*/
77
void addReceiptTask(Runnable runnable);
78
79
/**
80
* Add a handler for lost receipts.
81
*/
82
void addReceiptLostTask(Runnable runnable);
83
}
84
85
/**
86
* Represents a receiptable STOMP frame.
87
*/
88
interface Receiptable {
89
/**
90
* Return the receipt id.
91
*/
92
String getReceiptId();
93
94
/**
95
* Add a receipt received task.
96
*/
97
void addReceiptTask(Runnable runnable);
98
99
/**
100
* Add a receipt lost task.
101
*/
102
void addReceiptLostTask(Runnable runnable);
103
}
104
}
105
```
106
107
### STOMP Frame Handling
108
109
Interfaces for handling STOMP frames and session events.
110
111
```java { .api }
112
/**
113
* Contract to handle a STOMP frame.
114
*/
115
public interface StompFrameHandler {
116
/**
117
* Invoked before the STOMP CONNECTED frame has been processed.
118
*/
119
void afterConnected(StompSession session, StompHeaders connectedHeaders);
120
121
/**
122
* Handle an exception.
123
*/
124
void handleException(StompSession session, @Nullable StompCommand command,
125
StompHeaders headers, byte[] payload, Throwable exception);
126
127
/**
128
* Return the payload type this handler expects to receive.
129
*/
130
Type getPayloadType(StompHeaders headers);
131
132
/**
133
* Handle the payload of a STOMP message.
134
*/
135
void handleFrame(StompHeaders headers, @Nullable Object payload);
136
}
137
138
/**
139
* Contract to handle STOMP session lifecycle events.
140
*/
141
public interface StompSessionHandler extends StompFrameHandler {
142
/**
143
* Handle any exception arising while processing a STOMP frame.
144
*/
145
void handleException(StompSession session, @Nullable StompCommand command,
146
StompHeaders headers, byte[] payload, Throwable exception);
147
148
/**
149
* Handle a STOMP transport error.
150
*/
151
void handleTransportError(StompSession session, Throwable exception);
152
}
153
```
154
155
### STOMP Headers
156
157
STOMP-specific headers implementation and accessor.
158
159
```java { .api }
160
/**
161
* Represents STOMP frame headers.
162
*/
163
public class StompHeaders implements MultiValueMap<String, String>, Serializable {
164
165
public StompHeaders();
166
167
/**
168
* Set the destination header.
169
*/
170
public void setDestination(@Nullable String destination);
171
172
/**
173
* Return the destination header value.
174
*/
175
@Nullable
176
public String getDestination();
177
178
/**
179
* Set the content-type header.
180
*/
181
public void setContentType(@Nullable MimeType contentType);
182
183
/**
184
* Return the content-type header value.
185
*/
186
@Nullable
187
public MimeType getContentType();
188
189
/**
190
* Set the content-length header.
191
*/
192
public void setContentLength(long contentLength);
193
194
/**
195
* Return the content-length header value.
196
*/
197
public long getContentLength();
198
199
/**
200
* Set the receipt header.
201
*/
202
public void setReceipt(@Nullable String receipt);
203
204
/**
205
* Return the receipt header value.
206
*/
207
@Nullable
208
public String getReceipt();
209
210
/**
211
* Set the heartbeat header.
212
*/
213
public void setHeartbeat(@Nullable long[] heartbeat);
214
215
/**
216
* Return the heartbeat header value.
217
*/
218
@Nullable
219
public long[] getHeartbeat();
220
}
221
222
/**
223
* A MessageHeaderAccessor for STOMP messaging.
224
*/
225
public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
226
227
public static StompHeaderAccessor create(StompCommand command);
228
229
public static StompHeaderAccessor createForHeartbeat();
230
231
/**
232
* Return the STOMP command, or null if not yet set.
233
*/
234
@Nullable
235
public StompCommand getCommand();
236
237
/**
238
* Set the STOMP command.
239
*/
240
public void setCommand(StompCommand command);
241
242
/**
243
* Return the value of the "host" header.
244
*/
245
@Nullable
246
public String getHost();
247
248
/**
249
* Set the "host" header.
250
*/
251
public void setHost(@Nullable String host);
252
253
/**
254
* Return the value of the "login" header.
255
*/
256
@Nullable
257
public String getLogin();
258
259
/**
260
* Set the "login" header.
261
*/
262
public void setLogin(@Nullable String login);
263
264
/**
265
* Return the value of the "passcode" header.
266
*/
267
@Nullable
268
public String getPasscode();
269
270
/**
271
* Set the "passcode" header.
272
*/
273
public void setPasscode(@Nullable String passcode);
274
}
275
```
276
277
### STOMP Commands
278
279
Enumeration of STOMP command types.
280
281
```java { .api }
282
/**
283
* Represents a STOMP command frame as defined in the STOMP specification.
284
*/
285
public enum StompCommand {
286
287
// Client commands
288
CONNECT,
289
STOMP,
290
SEND,
291
SUBSCRIBE,
292
UNSUBSCRIBE,
293
ACK,
294
NACK,
295
BEGIN,
296
COMMIT,
297
ABORT,
298
DISCONNECT,
299
300
// Server commands
301
CONNECTED,
302
MESSAGE,
303
RECEIPT,
304
ERROR;
305
306
/**
307
* Whether this command can be sent by a client.
308
*/
309
public boolean isClientCommand() {
310
return this.ordinal() < CONNECTED.ordinal();
311
}
312
313
/**
314
* Whether this command is sent by a server.
315
*/
316
public boolean isServerCommand() {
317
return this.ordinal() >= CONNECTED.ordinal();
318
}
319
}
320
```
321
322
### STOMP Codec
323
324
Encoder and decoder for STOMP frames.
325
326
```java { .api }
327
/**
328
* Decodes STOMP frames from ByteBuffer chunks.
329
*/
330
public class StompDecoder {
331
332
public StompDecoder();
333
334
/**
335
* Decode one or more STOMP frames from the given ByteBuffer.
336
*/
337
public List<Message<byte[]>> decode(ByteBuffer byteBuffer);
338
339
/**
340
* Return the configured header length limit.
341
*/
342
public int getHeaderLengthLimit();
343
344
/**
345
* Configure the maximum length allowed for STOMP headers.
346
*/
347
public void setHeaderLengthLimit(int headerLengthLimit);
348
}
349
350
/**
351
* Encodes STOMP frames to ByteBuffer.
352
*/
353
public class StompEncoder {
354
355
public StompEncoder();
356
357
/**
358
* Encode the given STOMP message to a byte array.
359
*/
360
public byte[] encode(Message<byte[]> message);
361
362
/**
363
* Encode the given STOMP message to a ByteBuffer.
364
*/
365
public ByteBuffer encode(Map<String, Object> headers, byte[] payload);
366
}
367
```
368
369
### STOMP Client Implementation
370
371
Reactor Netty-based STOMP client for TCP connections.
372
373
```java { .api }
374
/**
375
* A STOMP over TCP client that uses ReactorNettyTcpClient.
376
*/
377
public class ReactorNettyTcpStompClient implements StompClient {
378
379
public ReactorNettyTcpStompClient();
380
381
public ReactorNettyTcpStompClient(String host, int port);
382
383
/**
384
* Set the STOMP message codec to use for encoding and decoding STOMP messages.
385
*/
386
public void setMessageConverter(MessageConverter messageConverter);
387
388
/**
389
* Set the interval between heartbeat messages.
390
*/
391
public void setDefaultHeartbeat(@Nullable long[] heartbeat);
392
393
/**
394
* Configure a timeout for the receipt of the STOMP CONNECTED frame.
395
*/
396
public void setConnectTimeout(Duration connectTimeout);
397
398
/**
399
* Connect to the STOMP server.
400
*/
401
public StompSession connect(String url, StompSessionHandler handler, Object... uriVariables);
402
403
/**
404
* Connect to the STOMP server with headers.
405
*/
406
public StompSession connect(String url, @Nullable StompHeaders connectHeaders,
407
StompSessionHandler handler, Object... uriVariables);
408
}
409
```
410
411
### STOMP Broker Relay
412
413
Message handler that relays messages to an external STOMP broker.
414
415
```java { .api }
416
/**
417
* A MessageHandler that handles messages by forwarding them to a STOMP broker.
418
*/
419
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
420
421
public StompBrokerRelayMessageHandler(SubscribableChannel clientInboundChannel,
422
MessageChannel clientOutboundChannel,
423
SubscribableChannel brokerChannel,
424
Collection<String> destinationPrefixes);
425
426
/**
427
* Set the STOMP broker host.
428
*/
429
public void setRelayHost(String relayHost);
430
431
/**
432
* Return the STOMP broker host.
433
*/
434
public String getRelayHost();
435
436
/**
437
* Set the STOMP broker port.
438
*/
439
public void setRelayPort(int relayPort);
440
441
/**
442
* Return the STOMP broker port.
443
*/
444
public int getRelayPort();
445
446
/**
447
* Set the login to use when creating connections to the STOMP broker.
448
*/
449
public void setSystemLogin(@Nullable String systemLogin);
450
451
/**
452
* Set the passcode to use when creating connections to the STOMP broker.
453
*/
454
public void setSystemPasscode(@Nullable String systemPasscode);
455
456
/**
457
* Set the heartbeat settings for connections to the STOMP broker.
458
*/
459
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval);
460
461
/**
462
* Set the heartbeat settings for connections to the STOMP broker.
463
*/
464
public void setSystemHeartbeatReceiveInterval(long systemHeartbeatReceiveInterval);
465
}
466
```
467
468
### STOMP Exceptions
469
470
Exceptions specific to STOMP protocol handling.
471
472
```java { .api }
473
/**
474
* Raised when an error occurs during STOMP message conversion.
475
*/
476
public class StompConversionException extends MessagingException {
477
478
public StompConversionException(String description, Throwable cause);
479
480
public StompConversionException(String description);
481
}
482
483
/**
484
* Raised when a connection is lost unexpectedly.
485
*/
486
public class ConnectionLostException extends MessagingException {
487
488
public ConnectionLostException(String description);
489
}
490
```
491
492
**Usage Examples:**
493
494
```java
495
import org.springframework.messaging.simp.stomp.*;
496
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
497
498
// STOMP client setup
499
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("localhost", 61613);
500
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
501
502
// Session handler
503
StompSessionHandler sessionHandler = new StompSessionHandlerAdapter() {
504
@Override
505
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
506
System.out.println("Connected to STOMP broker");
507
}
508
509
@Override
510
public void handleException(StompSession session, StompCommand command,
511
StompHeaders headers, byte[] payload, Throwable exception) {
512
System.err.println("STOMP error: " + exception.getMessage());
513
}
514
};
515
516
// Connect and use session
517
StompSession session = stompClient.connect("stomp://localhost:61613/", sessionHandler);
518
519
// Send message
520
session.send("/topic/messages", "Hello STOMP!");
521
522
// Subscribe to destination
523
StompSession.Subscription subscription = session.subscribe("/topic/messages",
524
new StompFrameHandler() {
525
@Override
526
public Type getPayloadType(StompHeaders headers) {
527
return String.class;
528
}
529
530
@Override
531
public void handleFrame(StompHeaders headers, Object payload) {
532
System.out.println("Received: " + payload);
533
}
534
});
535
536
// Unsubscribe
537
subscription.unsubscribe();
538
539
// Disconnect
540
session.disconnect();
541
```