STOMP (Simple Text Oriented Messaging Protocol) sub-protocol support for WebSocket messaging. Enables publish-subscribe messaging patterns, message routing, and integration with Spring's messaging infrastructure.
Handler for WebSocket sub-protocols like STOMP.
/**
* Handler for a WebSocket sub-protocol. Sub-protocols provide higher-level
* messaging semantics on top of raw WebSocket messages. STOMP is the primary
* sub-protocol supported by Spring.
*/
public interface SubProtocolHandler {
/**
* Return the list of sub-protocols supported by this handler.
* The value is used during WebSocket handshake negotiation.
*
* @return list of supported sub-protocol names
*/
List<String> getSupportedProtocols();
/**
* Handle a message received from a WebSocket client.
*
* @param session the WebSocket session
* @param message the WebSocket message
* @param outputChannel channel for sending messages to the application
*/
void handleMessageFromClient(
WebSocketSession session,
WebSocketMessage<?> message,
MessageChannel outputChannel);
/**
* Handle a message to be sent to a WebSocket client.
*
* @param session the WebSocket session
* @param message the message to send
*/
void handleMessageToClient(WebSocketSession session, Message<?> message);
/**
* Resolve the session ID from a message.
*
* @param message the message
* @return the session ID, or null if not found
*/
String resolveSessionId(Message<?> message);
/**
* Invoked after a WebSocket session is started.
*
* @param session the WebSocket session
* @param outputChannel channel for sending messages
*/
void afterSessionStarted(
WebSocketSession session,
MessageChannel outputChannel);
/**
* Invoked after a WebSocket session ends.
*
* @param session the WebSocket session
* @param closeStatus the close status
* @param outputChannel channel for sending messages
*/
void afterSessionEnded(
WebSocketSession session,
CloseStatus closeStatus,
MessageChannel outputChannel);
}Handler for sub-protocol errors.
/**
* Handler for errors that occur while processing sub-protocol messages.
* Allows customizing error message generation and handling.
*
* @param <P> the message payload type
*/
public interface SubProtocolErrorHandler<P> {
/**
* Handle an error that occurred while processing a client message.
*
* @param clientMessage the message from the client
* @param ex the exception that occurred
* @return an error message to send to the client, or null
*/
Message<P> handleClientMessageProcessingError(
Message<P> clientMessage,
Throwable ex);
/**
* Handle an error message before sending it to the client.
* Allows customizing the error message.
*
* @param errorMessage the error message to send
* @return the possibly modified error message
*/
Message<P> handleErrorMessageToClient(Message<P> errorMessage);
}WebSocket handler that delegates to SubProtocolHandler implementations.
/**
* WebSocket handler that routes messages to SubProtocolHandler instances
* based on the negotiated sub-protocol. Integrates WebSocket with Spring's
* messaging infrastructure.
*/
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable {
/**
* Create a handler with message channels.
*
* @param clientInboundChannel channel for messages from clients
* @param clientOutboundChannel channel for messages to clients
*/
public SubProtocolWebSocketHandler(
MessageChannel clientInboundChannel,
SubscribableChannel clientOutboundChannel);
/**
* Set the SubProtocolHandler instances to delegate to.
*
* @param protocolHandlers the protocol handlers
*/
public void setProtocolHandlers(List<SubProtocolHandler> protocolHandlers);
/**
* Get the configured protocol handlers.
*
* @return the protocol handlers
*/
public List<SubProtocolHandler> getProtocolHandlers();
/**
* Add a protocol handler.
*
* @param handler the handler to add
* @return this handler for chaining
*/
public SubProtocolWebSocketHandler addProtocolHandler(SubProtocolHandler handler);
/**
* Set a default protocol handler for when no sub-protocol is negotiated.
*
* @param defaultProtocolHandler the default handler
*/
public void setDefaultProtocolHandler(SubProtocolHandler defaultProtocolHandler);
/**
* Set the send time limit in milliseconds.
*
* @param sendTimeLimit the time limit
*/
public void setSendTimeLimit(int sendTimeLimit);
/**
* Set the send buffer size limit in bytes.
*
* @param sendBufferSizeLimit the buffer size limit
*/
public void setSendBufferSizeLimit(int sendBufferSizeLimit);
/**
* Set the time in milliseconds to wait for the first message.
*
* @param timeToFirstMessage the time limit
*/
public void setTimeToFirstMessage(int timeToFirstMessage);
/**
* Set the lifecycle phase.
*
* @param phase the phase value
*/
public void setPhase(int phase);
/**
* Get runtime statistics.
*
* @return the statistics
*/
public Stats getStats();
/**
* Statistics for sub-protocol WebSocket sessions.
*/
public interface Stats {
int getTotalSessions();
int getWebSocketSessions();
int getHttpStreamingSessions();
int getHttpPolingSessions();
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception;
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
throws Exception;
@Override
public void handleTransportError(WebSocketSession session, Throwable exception)
throws Exception;
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus)
throws Exception;
@Override
public boolean supportsPartialMessages();
@Override
public List<String> getSubProtocols();
}STOMP protocol handler implementation.
/**
* SubProtocolHandler for the STOMP messaging protocol. Handles STOMP
* frame encoding/decoding and message routing.
*/
public class StompSubProtocolHandler implements SubProtocolHandler {
/**
* Create a STOMP handler with default configuration.
*/
public StompSubProtocolHandler();
/**
* Set the message size limit for incoming STOMP messages.
* Default is 64KB.
*
* @param messageSizeLimit the message size limit in bytes
*/
public void setMessageSizeLimit(int messageSizeLimit);
/**
* Get the message size limit.
*
* @return the message size limit in bytes
*/
public int getMessageSizeLimit();
/**
* Set the error handler for STOMP protocol errors.
*
* @param errorHandler the error handler
*/
public void setErrorHandler(StompSubProtocolErrorHandler errorHandler);
/**
* Get the configured error handler.
*
* @return the error handler
*/
public StompSubProtocolErrorHandler getErrorHandler();
@Override
public List<String> getSupportedProtocols();
@Override
public void handleMessageFromClient(
WebSocketSession session,
WebSocketMessage<?> message,
MessageChannel outputChannel);
@Override
public void handleMessageToClient(WebSocketSession session, Message<?> message);
@Override
public String resolveSessionId(Message<?> message);
@Override
public void afterSessionStarted(
WebSocketSession session,
MessageChannel outputChannel);
@Override
public void afterSessionEnded(
WebSocketSession session,
CloseStatus closeStatus,
MessageChannel outputChannel);
}Default error handler for STOMP protocol.
/**
* Default error handler for STOMP protocol errors. Generates ERROR frames
* according to the STOMP specification.
*/
public class StompSubProtocolErrorHandler
implements SubProtocolErrorHandler<byte[]> {
/**
* Create an error handler with default configuration.
*/
public StompSubProtocolErrorHandler();
@Override
public Message<byte[]> handleClientMessageProcessingError(
Message<byte[]> clientMessage,
Throwable ex);
@Override
public Message<byte[]> handleErrorMessageToClient(Message<byte[]> errorMessage);
}STOMP client for connecting to STOMP over WebSocket servers.
/**
* STOMP over WebSocket client. Provides a STOMP messaging client that
* communicates over WebSocket connections.
*/
public class WebSocketStompClient extends StompClientSupport {
/**
* Create a STOMP client using a WebSocketClient.
*
* @param webSocketClient the WebSocket client to use
*/
public WebSocketStompClient(WebSocketClient webSocketClient);
/**
* Set the task scheduler for heartbeat management.
*
* @param taskScheduler the task scheduler
*/
public void setTaskScheduler(TaskScheduler taskScheduler);
/**
* Set the maximum size for inbound STOMP messages.
*
* @param messageSizeLimit the size limit in bytes
*/
public void setInboundMessageSizeLimit(int messageSizeLimit);
/**
* Get the inbound message size limit.
*
* @return the size limit in bytes
*/
public int getInboundMessageSizeLimit();
/**
* Set the maximum size for outbound STOMP messages.
*
* @param messageSizeLimit the size limit in bytes
*/
public void setOutboundMessageSizeLimit(int messageSizeLimit);
/**
* Get the outbound message size limit.
*
* @return the size limit in bytes
*/
public int getOutboundMessageSizeLimit();
/**
* Set whether to automatically start on initialization.
*
* @param autoStartup whether to auto-start
*/
public void setAutoStartup(boolean autoStartup);
/**
* Set the lifecycle phase.
*
* @param phase the phase value
*/
public void setPhase(int phase);
/**
* Connect to a STOMP server asynchronously.
*
* @param url the WebSocket URL
* @param handshakeHeaders headers for WebSocket handshake
* @param handler the session handler
* @return a CompletableFuture for the STOMP session
*/
public CompletableFuture<StompSession> connectAsync(
String url,
WebSocketHttpHeaders handshakeHeaders,
StompSessionHandler handler);
/**
* Connect with STOMP headers.
*
* @param url the WebSocket URL
* @param handshakeHeaders headers for WebSocket handshake
* @param connectHeaders headers for STOMP CONNECT frame
* @param handler the session handler
* @return a CompletableFuture for the STOMP session
*/
public CompletableFuture<StompSession> connectAsync(
String url,
WebSocketHttpHeaders handshakeHeaders,
StompHeaders connectHeaders,
StompSessionHandler handler);
}Processes @MessageMapping annotated methods for WebSocket messages.
/**
* Processes messages by invoking @MessageMapping, @SubscribeMapping, and
* other messaging annotations. Integrates STOMP message handling with
* Spring's annotation-based programming model.
*/
public class WebSocketAnnotationMethodMessageHandler
extends SimpAnnotationMethodMessageHandler {
/**
* Create a handler with message channels and template.
*
* @param clientInboundChannel channel for inbound messages
* @param clientOutboundChannel channel for outbound messages
* @param brokerChannel channel for broker messages
*/
public WebSocketAnnotationMethodMessageHandler(
SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel,
SubscribableChannel brokerChannel);
}Default implementation of SimpUserRegistry that tracks connected users and their subscriptions. Automatically maintains user session state by listening to WebSocket session events.
/**
* Default SimpUserRegistry implementation that tracks users, sessions,
* and subscriptions by listening to AbstractSubProtocolEvent application
* context events (SessionConnectedEvent, SessionDisconnectEvent, etc.).
*
* Provides lookup capabilities for finding users, sessions, and subscriptions
* for user-targeted messaging in STOMP applications.
*
* @since 4.2
*/
public class DefaultSimpUserRegistry implements SimpUserRegistry, SmartApplicationListener {
/**
* Specify the order value for this registry as an ApplicationListener.
* Default is LOWEST_PRECEDENCE.
*
* @since 5.0.8
*/
public void setOrder(int order);
/**
* Get the configured order value.
*/
@Override
public int getOrder();
/**
* Get the user for the given name. Returns null if not found.
*/
@Override
public SimpUser getUser(String userName);
/**
* Get all currently connected users.
*/
@Override
public Set<SimpUser> getUsers();
/**
* Get count of currently connected users.
*/
@Override
public int getUserCount();
/**
* Find subscriptions matching the given message.
*/
@Override
public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher);
}Application events published during STOMP session lifecycle.
/**
* Base class for sub-protocol application events. Extends Spring's ApplicationEvent
* and provides access to the message and user associated with the event.
*/
public abstract class AbstractSubProtocolEvent extends ApplicationEvent {
/**
* Create a new AbstractSubProtocolEvent.
*
* @param source the component that published the event (never null)
* @param message the incoming message (never null)
*/
protected AbstractSubProtocolEvent(Object source, Message<byte[]> message);
/**
* Create a new AbstractSubProtocolEvent with a user principal.
*
* @param source the component that published the event (never null)
* @param message the incoming message (never null)
* @param user the user associated with the session, or null
*/
protected AbstractSubProtocolEvent(Object source, Message<byte[]> message, Principal user);
/**
* Get the message associated with the event.
*
* @return the message
*/
public Message<byte[]> getMessage();
/**
* Get the user associated with the event.
*
* @return the user principal, or null if none
*/
public Principal getUser();
}
/**
* Event published when a new STOMP session is established.
* Published after CONNECT frame is received but before CONNECTED is sent.
*/
public class SessionConnectEvent extends AbstractSubProtocolEvent {
public SessionConnectEvent(Object source, Message<byte[]> message);
}
/**
* Event published after a STOMP session is fully established.
* Published after CONNECTED frame is sent to client.
*/
public class SessionConnectedEvent extends AbstractSubProtocolEvent {
public SessionConnectedEvent(Object source, Message<byte[]> message, Principal user);
}
/**
* Event published when a client subscribes to a destination.
*/
public class SessionSubscribeEvent extends AbstractSubProtocolEvent {
public SessionSubscribeEvent(Object source, Message<byte[]> message, Principal user);
}
/**
* Event published when a client unsubscribes from a destination.
*/
public class SessionUnsubscribeEvent extends AbstractSubProtocolEvent {
public SessionUnsubscribeEvent(Object source, Message<byte[]> message, Principal user);
}
/**
* Event published when a STOMP session is disconnected.
*/
public class SessionDisconnectEvent extends AbstractSubProtocolEvent {
public SessionDisconnectEvent(
Object source,
Message<byte[]> message,
String sessionId,
CloseStatus closeStatus);
/**
* Get the WebSocket close status.
*
* @return the close status
*/
public CloseStatus getCloseStatus();
}import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// Register STOMP endpoint with SockJS fallback
registry.addEndpoint("/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Enable simple in-memory message broker
registry.enableSimpleBroker("/topic", "/queue");
// Set application destination prefix
registry.setApplicationDestinationPrefixes("/app");
}
}import org.springframework.messaging.handler.annotation.*;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
@Controller
public class ChatController {
/**
* Handle messages sent to /app/chat
* Broadcast to /topic/messages
*/
@MessageMapping("/chat")
@SendTo("/topic/messages")
public ChatMessage sendMessage(ChatMessage message) {
message.setTimestamp(System.currentTimeMillis());
return message;
}
/**
* Handle subscription to /app/rooms
* Return immediate response to subscriber
*/
@SubscribeMapping("/rooms")
public List<Room> getRooms() {
return roomService.getAllRooms();
}
/**
* Handle messages with path variables
*/
@MessageMapping("/room/{roomId}/message")
@SendTo("/topic/room/{roomId}")
public ChatMessage sendToRoom(
@DestinationVariable String roomId,
@Payload ChatMessage message,
@Header("simpSessionId") String sessionId) {
message.setRoomId(roomId);
message.setSessionId(sessionId);
return message;
}
/**
* Send message to specific user
*/
@MessageMapping("/private")
public void sendPrivateMessage(
@Payload PrivateMessage message,
Principal principal) {
// Send to /user/{username}/queue/private
messagingTemplate.convertAndSendToUser(
message.getRecipient(),
"/queue/private",
message
);
}
}import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
private final SimpMessagingTemplate messagingTemplate;
public NotificationService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
/**
* Send message to a topic (broadcast)
*/
public void broadcastNotification(Notification notification) {
messagingTemplate.convertAndSend(
"/topic/notifications",
notification
);
}
/**
* Send message to specific user
*/
public void sendToUser(String username, Notification notification) {
messagingTemplate.convertAndSendToUser(
username,
"/queue/notifications",
notification
);
}
/**
* Send with custom headers
*/
public void sendWithHeaders(String destination, Object payload) {
Map<String, Object> headers = new HashMap<>();
headers.put("priority", "high");
headers.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSend(destination, payload, headers);
}
}import org.springframework.messaging.simp.stomp.*;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
public class StompClientExample {
public void connectAndSubscribe() {
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
// Configure message converter
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
StompSessionHandler sessionHandler = new StompSessionHandlerAdapter() {
@Override
public void afterConnected(
StompSession session,
StompHeaders connectedHeaders) {
System.out.println("Connected to STOMP server");
// Subscribe to destination
session.subscribe("/topic/messages", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return ChatMessage.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
ChatMessage message = (ChatMessage) payload;
System.out.println("Received: " + message);
}
});
// Send message
session.send("/app/chat", new ChatMessage("Hello STOMP!"));
}
@Override
public void handleException(
StompSession session,
StompCommand command,
StompHeaders headers,
byte[] payload,
Throwable exception) {
System.err.println("Error: " + exception.getMessage());
}
@Override
public void handleTransportError(
StompSession session,
Throwable exception) {
System.err.println("Transport error: " + exception.getMessage());
}
};
// Connect asynchronously
stompClient.connectAsync("ws://localhost:8080/ws", sessionHandler)
.thenAccept(session -> {
System.out.println("Session established: " + session.getSessionId());
})
.exceptionally(ex -> {
System.err.println("Connection failed: " + ex.getMessage());
return null;
});
}
}import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.*;
@Component
public class WebSocketEventListener {
@EventListener
public void handleSessionConnected(SessionConnectedEvent event) {
String sessionId = event.getMessage().getHeaders()
.get("simpSessionId", String.class);
System.out.println("Session connected: " + sessionId);
}
@EventListener
public void handleSessionDisconnect(SessionDisconnectEvent event) {
String sessionId = event.getSessionId();
CloseStatus status = event.getCloseStatus();
System.out.println("Session disconnected: " + sessionId +
" Status: " + status);
// Clean up session resources
cleanupUserSession(sessionId);
}
@EventListener
public void handleSubscribe(SessionSubscribeEvent event) {
String destination = event.getMessage().getHeaders()
.get("simpDestination", String.class);
System.out.println("Client subscribed to: " + destination);
}
@EventListener
public void handleUnsubscribe(SessionUnsubscribeEvent event) {
String subscriptionId = event.getMessage().getHeaders()
.get("simpSubscriptionId", String.class);
System.out.println("Client unsubscribed: " + subscriptionId);
}
private void cleanupUserSession(String sessionId) {
// Cleanup logic
}
}import org.springframework.messaging.Message;
import org.springframework.web.socket.messaging.StompSubProtocolErrorHandler;
public class CustomStompErrorHandler extends StompSubProtocolErrorHandler {
@Override
public Message<byte[]> handleClientMessageProcessingError(
Message<byte[]> clientMessage,
Throwable ex) {
// Log error
System.err.println("Error processing message: " + ex.getMessage());
// Customize error response
if (ex instanceof AccessDeniedException) {
// Return custom ERROR frame for authentication failures
return super.handleClientMessageProcessingError(clientMessage, ex);
}
return super.handleClientMessageProcessingError(clientMessage, ex);
}
}
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.withSockJS();
// Set custom error handler
registry.setErrorHandler(new CustomStompErrorHandler());
}
}import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
public class AuthChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
// Validate authentication
String authToken = accessor.getFirstNativeHeader("Authorization");
if (!isValidToken(authToken)) {
throw new IllegalArgumentException("Invalid authentication token");
}
// Set user principal
accessor.setUser(getUserFromToken(authToken));
}
return message;
}
private boolean isValidToken(String token) {
// Validate token
return token != null && token.startsWith("Bearer ");
}
private Principal getUserFromToken(String token) {
// Extract user from token
String username = token.substring(7); // Remove "Bearer "
return new SimplePrincipal(username);
}
}
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new AuthChannelInterceptor());
}
}@Configuration
@EnableWebSocketMessageBroker
public class StompBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Configure RabbitMQ relay
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest")
.setSystemLogin("guest")
.setSystemPasscode("guest")
.setSystemHeartbeatSendInterval(5000)
.setSystemHeartbeatReceiveInterval(5000)
.setVirtualHost("/");
registry.setApplicationDestinationPrefixes("/app");
}
}