Complete STOMP (Simple Text Oriented Messaging Protocol) implementation for WebSocket communication including session management, broker relay, subscription handling, and client/server support for real-time bidirectional messaging.
Client-side STOMP session for sending messages and managing subscriptions.
/**
* Represents a STOMP session with operations to send messages,
* subscribe to destinations, and manage the session lifecycle.
*/
public interface StompSession {
/**
* Return the id for the STOMP session.
*/
String getSessionId();
/**
* Whether the session is connected.
*/
boolean isConnected();
/**
* Set whether to automatically add receipt headers to SEND, SUBSCRIBE,
* and UNSUBSCRIBE frames. Default is false.
* @param enabled whether to enable auto-receipts
*/
void setAutoReceipt(boolean enabled);
/**
* Send a message to the specified destination.
*/
Receiptable send(String destination, Object payload);
/**
* Send a message to the specified destination with headers.
*/
Receiptable send(StompHeaders headers, Object payload);
/**
* Subscribe to the given destination.
*/
Subscription subscribe(String destination, StompFrameHandler handler);
/**
* Subscribe to the given destination with headers.
*/
Subscription subscribe(StompHeaders headers, StompFrameHandler handler);
/**
* Send an acknowledgement for the given message.
*/
Receiptable acknowledge(String messageId, boolean consumed);
/**
* Send an acknowledgement for consumption of a message.
*/
Receiptable acknowledge(StompHeaders headers, boolean consumed);
/**
* Disconnect the session.
*/
void disconnect();
/**
* Disconnect the session with custom headers.
*/
void disconnect(StompHeaders headers);
interface Receiptable {
/**
* Return the receipt id, or null if the STOMP frame did not have a receipt header.
*/
String getReceiptId();
/**
* Task to invoke when a receipt is received.
* @param task the task to invoke
*/
void addReceiptTask(Runnable task);
/**
* Variant of addReceiptTask with a Consumer of the headers from the RECEIPT frame.
* @param task the consumer to invoke
* @since 5.3.23
*/
void addReceiptTask(Consumer<StompHeaders> task);
/**
* Task to invoke when a receipt is not received in the configured time.
* @param task the task to invoke
*/
void addReceiptLostTask(Runnable task);
}
interface Subscription {
String getSubscriptionId();
StompHeaders getSubscriptionHeaders();
void unsubscribe();
void unsubscribe(StompHeaders headers);
}
}Usage Examples:
import org.springframework.messaging.simp.stomp.*;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import java.util.concurrent.CompletableFuture;
// Create STOMP client
ReactorNettyTcpStompClient client = new ReactorNettyTcpStompClient("localhost", 61613);
client.setMessageConverter(new MappingJackson2MessageConverter());
// Connect and get session
CompletableFuture<StompSession> sessionFuture = client.connectAsync(new StompSessionHandlerAdapter() {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
System.out.println("Connected! Session: " + session.getSessionId());
// Send message
session.send("/topic/news", "Breaking news!");
// Subscribe to topic
session.subscribe("/topic/updates", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("Received: " + payload);
}
});
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
System.err.println("Transport error: " + exception.getMessage());
}
});
// Use session when connected
sessionFuture.thenAccept(session -> {
// Send with receipt
StompSession.Receiptable receipt = session.send("/queue/tasks", taskData);
receipt.addReceiptTask(() -> System.out.println("Message received by broker"));
// Subscribe with custom headers
StompHeaders subHeaders = new StompHeaders();
subHeaders.setDestination("/user/queue/notifications");
subHeaders.setId("sub-1");
StompSession.Subscription subscription = session.subscribe(subHeaders, new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return Notification.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
Notification notification = (Notification) payload;
System.out.println("Notification: " + notification.getMessage());
}
});
// Unsubscribe later
// subscription.unsubscribe();
}).exceptionally(ex -> {
System.err.println("Connection failed: " + ex.getMessage());
return null;
});
Object taskData = "task-data";
class Notification {
private String message;
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}Represents STOMP frame headers with typed accessors for standard STOMP headers.
/**
* Represents STOMP frame headers.
* Extends MultiValueMap for general header access.
*/
public class StompHeaders implements MultiValueMap<String, String>, Serializable {
public static final String CONTENT_TYPE = "content-type";
public static final String CONTENT_LENGTH = "content-length";
public static final String RECEIPT = "receipt";
public static final String HOST = "host";
public static final String ACCEPT_VERSION = "accept-version";
public static final String LOGIN = "login";
public static final String PASSCODE = "passcode";
public static final String HEARTBEAT = "heart-beat";
public static final String SESSION = "session";
public static final String SERVER = "server";
public static final String DESTINATION = "destination";
public static final String ID = "id";
public static final String ACK = "ack";
public static final String SUBSCRIPTION = "subscription";
public static final String MESSAGE_ID = "message-id";
public static final String MESSAGE = "message";
public static final String RECEIPT_ID = "receipt-id";
public StompHeaders();
public void setContentType(MimeType mimeType);
public MimeType getContentType();
public void setContentLength(long contentLength);
public long getContentLength();
public void setReceipt(String receipt);
public String getReceipt();
public void setHost(String host);
public String getHost();
public void setAcceptVersion(String... acceptVersions);
public String[] getAcceptVersion();
public void setLogin(String login);
public String getLogin();
public void setPasscode(String passcode);
public String getPasscode();
public void setHeartbeat(long[] heartbeat);
public long[] getHeartbeat();
public void setSession(String session);
public String getSession();
public void setServer(String server);
public String getServer();
public void setDestination(String destination);
public String getDestination();
public void setId(String id);
public String getId();
public void setAck(String ack);
public String getAck();
public void setSubscription(String subscription);
public String getSubscription();
public void setMessageId(String messageId);
public String getMessageId();
public void setMessage(String message);
public String getMessage();
public void setReceiptId(String receiptId);
public String getReceiptId();
}Usage Examples:
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.util.MimeTypeUtils;
// Create headers for CONNECT frame
StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setAcceptVersion("1.1", "1.2");
connectHeaders.setHeartbeat(new long[]{10000, 10000}); // 10s send/receive
connectHeaders.setLogin("user");
connectHeaders.setPasscode("pass");
connectHeaders.setHost("example.com");
// Create headers for SEND frame
StompHeaders sendHeaders = new StompHeaders();
sendHeaders.setDestination("/topic/news");
sendHeaders.setContentType(MimeTypeUtils.APPLICATION_JSON);
sendHeaders.setReceipt("msg-001");
// Create headers for SUBSCRIBE frame
StompHeaders subscribeHeaders = new StompHeaders();
subscribeHeaders.setDestination("/user/queue/notifications");
subscribeHeaders.setId("sub-1");
subscribeHeaders.setAck("client");
// Access headers
String destination = subscribeHeaders.getDestination();
String subscriptionId = subscribeHeaders.getId();
MimeType contentType = sendHeaders.getContentType();Represents STOMP protocol commands.
/**
* Represents a STOMP command.
*/
public enum StompCommand {
STOMP,
CONNECT,
CONNECTED,
SEND,
SUBSCRIBE,
UNSUBSCRIBE,
ACK,
NACK,
BEGIN,
COMMIT,
ABORT,
DISCONNECT,
MESSAGE,
RECEIPT,
ERROR;
/**
* Return the SimpMessageType for this STOMP command.
*/
public SimpMessageType getMessageType();
/**
* Whether this command requires a destination header.
*/
public boolean requiresDestination();
/**
* Whether this command requires a subscription id header.
*/
public boolean requiresSubscriptionId();
/**
* Whether this command requires content-length header.
*/
public boolean requiresContentLength();
/**
* Whether a body is allowed for this command.
*/
public boolean isBodyAllowed();
}Maps subscription requests to handler methods.
/**
* Annotation for mapping subscription messages onto specific handler methods
* based on the destination of a subscription.
* Typically used in combination with @MessageMapping.
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SubscribeMapping {
/**
* Destination patterns to match.
*/
String[] value() default {};
}Usage Examples:
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
@Controller
public class SubscriptionController {
// Return initial data on subscription
@SubscribeMapping("/topic/initial")
public InitialData handleSubscription() {
return new InitialData("Welcome!", System.currentTimeMillis());
}
// User-specific subscription
@SubscribeMapping("/user/queue/position")
public Position getCurrentPosition() {
return new Position(0, 0);
}
// Combined with MessageMapping
@MessageMapping("/data")
@SubscribeMapping("/topic/data")
public String handleDataSubscription() {
return "Current data state";
}
}
class InitialData {
private String message;
private long timestamp;
public InitialData(String message, long timestamp) {
this.message = message;
this.timestamp = timestamp;
}
public String getMessage() { return message; }
public long getTimestamp() { return timestamp; }
}
class Position {
private int x, y;
public Position(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() { return x; }
public int getY() { return y; }
}Sends return value to user-specific destinations.
/**
* Annotation that indicates the return value of a message-handling method
* should be sent as a Message to the user-specific destination(s).
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SendToUser {
/**
* One or more user destinations.
*/
String[] value() default {};
/**
* Alias for value().
*/
String[] destinations() default {};
/**
* Whether to broadcast to all sessions for the user or just the session
* that sent the message. Default is true (broadcast to all sessions).
*/
boolean broadcast() default true;
}Usage Examples:
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.handler.annotation.MessageMapping;
@Controller
public class UserMessagingController {
// Send reply to user who sent the message
@MessageMapping("/private")
@SendToUser("/queue/reply")
public String handlePrivateMessage(String message) {
return "Reply: " + message;
}
// Send to specific user destinations
@MessageMapping("/notification")
@SendToUser({"/queue/notifications", "/queue/alerts"})
public Notification handleNotification(String data) {
return new Notification(data, "high");
}
// Broadcast to all user sessions
@MessageMapping("/sync")
@SendToUser(value = "/queue/sync", broadcast = true)
public SyncData handleSync(String request) {
return new SyncData(request, System.currentTimeMillis());
}
// Send only to requesting session
@MessageMapping("/session")
@SendToUser(value = "/queue/session", broadcast = false)
public String handleSessionMessage(String message) {
return "Session response: " + message;
}
}
class SyncData {
private String request;
private long timestamp;
public SyncData(String request, long timestamp) {
this.request = request;
this.timestamp = timestamp;
}
public String getRequest() { return request; }
public long getTimestamp() { return timestamp; }
}Accessor for SIMP message headers.
/**
* A base class for working with message headers in simple messaging protocols
* that support basic messaging patterns.
*/
public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
public static final String DESTINATION_HEADER = "simpDestination";
public static final String MESSAGE_TYPE_HEADER = "simpMessageType";
public static final String SESSION_ID_HEADER = "simpSessionId";
public static final String SESSION_ATTRIBUTES = "simpSessionAttributes";
public static final String SUBSCRIPTION_ID_HEADER = "simpSubscriptionId";
public static final String USER_HEADER = "simpUser";
public static final String CONNECT_MESSAGE_HEADER = "simpConnectMessage";
public static final String DISCONNECT_MESSAGE_HEADER = "simpDisconnectMessage";
public static final String HEART_BEAT_HEADER = "simpHeartbeat";
public static final String ORIGINAL_DESTINATION = "simpOriginalDestination";
protected SimpMessageHeaderAccessor(SimpMessageType messageType, Map<String, List<String>> externalSourceHeaders);
public static SimpMessageHeaderAccessor create(SimpMessageType messageType);
public static SimpMessageHeaderAccessor wrap(Message<?> message);
public void setMessageTypeIfNotSet(SimpMessageType messageType);
public SimpMessageType getMessageType();
public void setDestination(String destination);
public String getDestination();
public void setSubscriptionId(String subscriptionId);
public String getSubscriptionId();
public void setSessionId(String sessionId);
public String getSessionId();
public void setSessionAttributes(Map<String, Object> attributes);
public Map<String, Object> getSessionAttributes();
public void setUser(Principal principal);
public Principal getUser();
public long[] getHeartbeat();
public void setHeartbeat(long cx, long cy);
}Types of simple messaging protocol messages.
/**
* A generic representation of different kinds of messages found in
* simple messaging protocols like STOMP.
*/
public enum SimpMessageType {
CONNECT,
CONNECT_ACK,
MESSAGE,
SUBSCRIBE,
UNSUBSCRIBE,
HEARTBEAT,
DISCONNECT,
DISCONNECT_ACK,
OTHER
}Contract for handling STOMP frames received from subscriptions.
/**
* Contract to handle a STOMP frame.
*/
public interface StompFrameHandler {
/**
* Invoked before handleFrame to determine the type of Object
* the payload should be converted to.
* @param headers the headers of a message
* @return the target type for the payload
*/
Type getPayloadType(StompHeaders headers);
/**
* Handle a STOMP frame with the payload converted to the target type.
* @param headers the headers of the frame
* @param payload the payload, or null if there was no payload
*/
void handleFrame(StompHeaders headers, Object payload);
}Contract for session-level STOMP events.
/**
* Contract for handling STOMP session lifecycle events.
*/
public interface StompSessionHandler extends StompFrameHandler {
/**
* Invoked when the STOMP session is established.
* @param session the client STOMP session
* @param connectedHeaders the CONNECTED frame headers
*/
void afterConnected(StompSession session, StompHeaders connectedHeaders);
/**
* Handle any exception that arises during STOMP message handling.
* @param session the client STOMP session
* @param command the STOMP command, may be null
* @param headers the headers
* @param payload the raw payload
* @param exception the exception
*/
void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable exception);
/**
* Handle a low-level transport error.
* @param session the client STOMP session
* @param exception the exception that occurred
*/
void handleTransportError(StompSession session, Throwable exception);
// Inherits getPayloadType and handleFrame from StompFrameHandler
}Abstract adapter class for StompSessionHandler with default empty implementations.
/**
* Abstract adapter class for StompSessionHandler with mostly empty
* implementation methods except for getPayloadType which returns String.
*/
public abstract class StompSessionHandlerAdapter implements StompSessionHandler {
/**
* This implementation returns String as the expected payload type
* for STOMP ERROR frames.
*/
@Override
public Type getPayloadType(StompHeaders headers);
/**
* This implementation is empty.
*/
@Override
public void handleFrame(StompHeaders headers, Object payload);
/**
* This implementation is empty.
*/
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders);
/**
* This implementation is empty.
*/
@Override
public void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable exception);
/**
* This implementation is empty.
*/
@Override
public void handleTransportError(StompSession session, Throwable exception);
}import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// Register STOMP endpoint
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Enable simple in-memory broker
registry.enableSimpleBroker("/topic", "/queue");
// Set application destination prefix
registry.setApplicationDestinationPrefixes("/app");
// Set user destination prefix
registry.setUserDestinationPrefix("/user");
}
}
@Controller
public class StompController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
public ChatMessage handleChatMessage(ChatMessage message) {
message.setTimestamp(System.currentTimeMillis());
return message;
}
@MessageMapping("/private")
@SendToUser("/queue/reply")
public String handlePrivateMessage(String message, Principal principal) {
return "Hello " + principal.getName() + ": " + message;
}
@SubscribeMapping("/topic/greetings")
public String handleSubscription() {
return "Welcome to the chat!";
}
}
class ChatMessage {
private String content;
private String sender;
private long timestamp;
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public String getSender() { return sender; }
public void setSender(String sender) { this.sender = sender; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}