Spring Integration WebSocket provides an extensible sub-protocol system that supports raw WebSocket communication, STOMP messaging, and custom protocols. The protocol handling layer manages message format conversion, session management, and protocol-specific behaviors.
Registry for managing and resolving sub-protocol handlers based on WebSocket session protocol negotiation.
/**
* Registry for sub-protocol handlers with protocol resolution algorithms.
* Maps protocol names to handlers using case-insensitive matching.
* Thread-safe for concurrent protocol resolution.
*
* @since 4.1
*/
public class SubProtocolHandlerRegistry {
/**
* Create registry from list of protocol handlers.
* If only one handler provided, it becomes the default.
*
* @param protocolHandlers list of SubProtocolHandler instances (must not be null)
* @throws IllegalArgumentException if protocolHandlers is null or empty
*/
public SubProtocolHandlerRegistry(List<SubProtocolHandler> protocolHandlers);
/**
* Create registry with single default protocol handler.
*
* @param defaultProtocolHandler the default handler (must not be null)
* @throws IllegalArgumentException if defaultProtocolHandler is null
*/
public SubProtocolHandlerRegistry(SubProtocolHandler defaultProtocolHandler);
/**
* Create registry with handlers and explicit default.
*
* @param protocolHandlers list of protocol handlers (must not be null)
* @param defaultProtocolHandler fallback handler when no protocol matches (must not be null)
* @throws IllegalArgumentException if protocolHandlers or defaultProtocolHandler is null
*/
public SubProtocolHandlerRegistry(
List<SubProtocolHandler> protocolHandlers,
SubProtocolHandler defaultProtocolHandler);
/**
* Resolve protocol handler for a WebSocket session.
* Uses session's accepted protocol from handshake negotiation.
* Falls back to default handler if no protocol specified.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @return the resolved SubProtocolHandler (never null)
* @throws IllegalStateException if no handler can be resolved
* @throws IllegalArgumentException if session is null
*/
public SubProtocolHandler findProtocolHandler(WebSocketSession session);
/**
* Resolve session ID from Integration message headers.
* Iterates through all handlers attempting resolution.
* Thread-safe: can be called concurrently.
*
* @param message the Integration message (must not be null)
* @return the session ID or null if not resolvable
* @throws IllegalArgumentException if message is null
*/
public String resolveSessionId(Message<?> message);
/**
* Get list of all registered sub-protocol names.
*
* @return list of protocol names (case-insensitive, never null, may be empty)
*/
public List<String> getSubProtocols();
}Usage Example - Single Protocol:
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.integration.websocket.support.PassThruSubProtocolHandler;
// Create simple pass-through handler
PassThruSubProtocolHandler handler = new PassThruSubProtocolHandler();
handler.setSupportedProtocols("v1.protocol");
// Create registry with single handler (becomes default)
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(handler);
// Use with adapters
WebSocketInboundChannelAdapter inbound =
new WebSocketInboundChannelAdapter(container, registry);
WebSocketOutboundMessageHandler outbound =
new WebSocketOutboundMessageHandler(container, registry);Usage Example - Multiple Protocols:
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import java.util.Arrays;
// Create multiple protocol handlers
PassThruSubProtocolHandler rawHandler = new PassThruSubProtocolHandler();
rawHandler.setSupportedProtocols("raw");
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
// Create registry with multiple handlers
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(
Arrays.asList(rawHandler, stompHandler),
rawHandler // Default when no protocol specified
);
// Registry automatically routes based on negotiated protocol
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(container, registry);Simple pass-through protocol handler for raw WebSocket communication without sub-protocol specifics.
/**
* Simple pass-through SubProtocolHandler implementation.
* Converts between WebSocket messages and Integration messages
* without protocol-specific processing.
* Thread-safe: stateless implementation.
*
* @since 4.1
*/
public class PassThruSubProtocolHandler implements SubProtocolHandler {
/**
* Set the supported protocol names.
* Replaces any previously configured protocols.
*
* @param supportedProtocols protocol name array (may be empty)
*/
public void setSupportedProtocols(String... supportedProtocols);
/**
* Get list of supported protocol names.
*
* @return list of protocol names (never null, may be empty)
*/
@Override
public List<String> getSupportedProtocols();
/**
* Handle message received from WebSocket client.
* Converts WebSocketMessage to Integration Message.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param webSocketMessage the received WebSocket message (must not be null)
* @param outputChannel channel to send converted message to (must not be null)
* @throws IllegalArgumentException if session, message, or channel is null
*/
@Override
public void handleMessageFromClient(
WebSocketSession session,
WebSocketMessage<?> webSocketMessage,
MessageChannel outputChannel);
/**
* Handle message to be sent to WebSocket client.
* Converts Integration Message to WebSocketMessage.
* Supports String, byte[], and ByteBuffer payloads.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param message the Integration message (must not be null)
* @throws Exception if conversion or sending fails
* @throws IllegalArgumentException if payload type unsupported or session/message is null
*/
@Override
public void handleMessageToClient(WebSocketSession session, Message<?> message)
throws Exception;
/**
* Resolve session ID from message headers.
* Extracts from SIMP message headers.
*
* @param message the Integration message (must not be null)
* @return session ID or null if not found
* @throws IllegalArgumentException if message is null
*/
@Override
public String resolveSessionId(Message<?> message);
/**
* Invoked when WebSocket session starts.
* Default implementation does nothing; subclasses may override.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param outputChannel the output message channel (must not be null)
*/
@Override
public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel);
/**
* Invoked when WebSocket session ends.
* Default implementation does nothing; subclasses may override.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param closeStatus the close status (must not be null)
* @param outputChannel the output message channel (must not be null)
*/
@Override
public void afterSessionEnded(
WebSocketSession session,
CloseStatus closeStatus,
MessageChannel outputChannel);
}Supported Payload Types:
Unsupported Types: Other types throw IllegalArgumentException
Usage Example:
import org.springframework.integration.websocket.support.PassThruSubProtocolHandler;
// Create pass-through handler
PassThruSubProtocolHandler handler = new PassThruSubProtocolHandler();
// Configure supported protocols
handler.setSupportedProtocols("binary", "text", "json");
// Create registry
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(handler);
// Use with container and adapters
WebSocketInboundChannelAdapter inbound =
new WebSocketInboundChannelAdapter(container, registry);
WebSocketOutboundMessageHandler outbound =
new WebSocketOutboundMessageHandler(container, registry);
// Messages automatically converted
// String -> TextMessage
// byte[] -> TextMessage (binary)
// ByteBuffer -> TextMessageInterface for handling WebSocket session events and messages. Typically implemented by WebSocketInboundChannelAdapter.
/**
* Contract for handling incoming WebSocket messages and session events.
* Part of higher-level sub-protocol processing.
* Implementations must be thread-safe.
*
* @since 4.1
*/
public interface WebSocketListener extends SubProtocolCapable {
/**
* Handle received WebSocket message.
* Thread-safe: can be called concurrently from multiple sessions.
*
* @param session the WebSocket session (must not be null)
* @param message the received WebSocket message (must not be null)
* @throws IllegalArgumentException if session or message is null
*/
void onMessage(WebSocketSession session, WebSocketMessage<?> message);
/**
* Invoked when WebSocket session starts.
* Called after session is added to container.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
*/
void afterSessionStarted(WebSocketSession session);
/**
* Invoked when WebSocket session ends.
* Called after session is removed from container.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param closeStatus the reason for session closure (must not be null)
*/
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
/**
* Get supported sub-protocol names.
* Inherited from SubProtocolCapable.
*
* @return list of supported protocol names (never null, may be empty)
*/
@Override
List<String> getSubProtocols();
}Specialized STOMP encoder for client-side message encoding with MESSAGE to SEND frame conversion.
/**
* STOMP encoder extension for client-side WebSocket messaging.
* Converts MESSAGE frames to SEND frames for STOMP client compliance.
* Thread-safe: stateless implementation.
*
* @since 4.3.13
*/
public class ClientStompEncoder extends StompEncoder {
/**
* Encode STOMP message with automatic MESSAGE to SEND conversion.
* Overrides parent to convert MESSAGE command to SEND for client-side.
*
* @param headers STOMP headers including command and destination (must not be null)
* @param payload message payload bytes (must not be null)
* @return encoded STOMP frame as byte array (never null)
* @throws IllegalArgumentException if headers or payload is null
*/
@Override
public byte[] encode(Map<String, Object> headers, byte[] payload);
}For STOMP protocol support, use Spring Framework's StompSubProtocolHandler:
Example - Server-Side STOMP:
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
import org.springframework.messaging.support.ChannelInterceptor;
// Create STOMP handler
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
// Create message broker (optional)
SimpleBrokerMessageHandler broker = new SimpleBrokerMessageHandler(
applicationContext.getBean("clientInboundChannel", MessageChannel.class)
);
broker.setDestinationPrefixes("/topic", "/queue");
// Create registry with STOMP support
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(stompHandler);
// Create server container
ServerWebSocketContainer container =
new ServerWebSocketContainer("/stomp");
container.setHandshakeHandler(handshakeHandler);
// Create adapters with STOMP support
WebSocketInboundChannelAdapter inbound =
new WebSocketInboundChannelAdapter(container, registry);
inbound.setUseBroker(true); // Enable broker routing
WebSocketOutboundMessageHandler outbound =
new WebSocketOutboundMessageHandler(container, registry);
// STOMP messages are automatically handled
// Supports CONNECT, SUBSCRIBE, SEND, UNSUBSCRIBE, DISCONNECTExample - Client-Side STOMP:
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
// Create STOMP handler for client
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
// Create registry
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(stompHandler);
// Create client container
ClientWebSocketContainer container =
new ClientWebSocketContainer(client, "ws://localhost:8080/stomp");
// Create adapters
WebSocketInboundChannelAdapter inbound =
new WebSocketInboundChannelAdapter(container, registry);
inbound.setOutputChannel(messageChannel);
WebSocketOutboundMessageHandler outbound =
new WebSocketOutboundMessageHandler(container, registry);
// Start connection (CONNECT frame sent automatically)
container.start();
// Subscribe to topic
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setDestination("/topic/notifications");
accessor.setSubscriptionId("sub-1");
Message<byte[]> subscribeMsg = MessageBuilder.createMessage(
new byte[0],
accessor.getMessageHeaders()
);
outbound.handleMessage(subscribeMsg);
// Send message to destination
accessor = StompHeaderAccessor.create(StompCommand.SEND);
accessor.setDestination("/app/message");
Message<String> sendMsg = MessageBuilder.createMessage(
"Message content",
accessor.getMessageHeaders()
);
outbound.handleMessage(sendMsg);Create custom protocol handlers by implementing SubProtocolHandler:
import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.messaging.MessageChannel;
public class CustomProtocolHandler implements SubProtocolHandler {
// Thread-local or synchronized state if needed
private final Map<String, SessionState> sessionStates = new ConcurrentHashMap<>();
@Override
public List<String> getSupportedProtocols() {
return Arrays.asList("custom-v1", "custom-v2");
}
@Override
public void handleMessageFromClient(
WebSocketSession session,
WebSocketMessage<?> webSocketMessage,
MessageChannel outputChannel) {
try {
// Convert WebSocket message to Integration message
// with custom protocol logic
String payload = extractPayload(webSocketMessage);
Map<String, Object> headers = extractHeaders(webSocketMessage);
SimpMessageHeaderAccessor accessor =
SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setSessionId(session.getId());
accessor.copyHeaders(headers);
Message<?> message = MessageBuilder
.withPayload(payload)
.setHeaders(accessor)
.build();
outputChannel.send(message);
} catch (Exception e) {
logger.error("Failed to handle message from client", e);
// Don't throw - let adapter handle error
}
}
@Override
public void handleMessageToClient(
WebSocketSession session,
Message<?> message) throws Exception {
try {
// Convert Integration message to WebSocket message
// with custom protocol logic
String payload = serializePayload(message.getPayload());
session.sendMessage(new TextMessage(payload));
} catch (Exception e) {
logger.error("Failed to handle message to client", e);
throw e; // Propagate to outbound handler
}
}
@Override
public String resolveSessionId(Message<?> message) {
return SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
}
@Override
public void afterSessionStarted(
WebSocketSession session,
MessageChannel outputChannel) {
// Initialize protocol-specific session state
sessionStates.put(session.getId(), new SessionState());
}
@Override
public void afterSessionEnded(
WebSocketSession session,
CloseStatus closeStatus,
MessageChannel outputChannel) {
// Cleanup protocol-specific session state
sessionStates.remove(session.getId());
}
private String extractPayload(WebSocketMessage<?> msg) {
// Custom extraction logic
return msg.getPayload().toString();
}
private Map<String, Object> extractHeaders(WebSocketMessage<?> msg) {
// Custom header extraction
return new HashMap<>();
}
private String serializePayload(Object payload) {
// Custom serialization logic
return payload.toString();
}
private static class SessionState {
// Protocol-specific session state
}
}
// Usage
CustomProtocolHandler customHandler = new CustomProtocolHandler();
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(customHandler);Thread Safety Requirements:
ConcurrentHashMap for session stateWebSocket sub-protocol negotiation occurs during the handshake:
Sec-WebSocket-Protocol header with supported protocolsSec-WebSocket-Protocol headerExample - Configure Client Protocol:
ClientWebSocketContainer container =
new ClientWebSocketContainer(client, "ws://localhost:8080/ws");
// Set supported protocols for negotiation
container.setSupportedProtocols("stomp", "custom-v1");
// Or via headers
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.setSecWebSocketProtocol(Arrays.asList("stomp", "custom-v1"));
container.setHeaders(headers);Example - Server Protocol Selection:
ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");
// Protocols come from registry
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(
Arrays.asList(stompHandler, customHandler)
);
// Container exposes protocols during handshake
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(container, registry);
// Server selects from intersection of:
// 1. Client requested protocols
// 2. Registry supported protocolsNo Handler Found:
try {
SubProtocolHandler handler = registry.findProtocolHandler(session);
} catch (IllegalStateException e) {
// No handler can be resolved
logger.error("No protocol handler found for session", e);
// Session may be closed by container
}Handler Resolution:
IllegalStateException is thrownUnsupported Payload Type:
// PassThruSubProtocolHandler throws IllegalArgumentException
try {
handler.handleMessageToClient(session, message);
} catch (IllegalArgumentException e) {
// Payload type not supported
logger.error("Unsupported payload type", e);
}Channel Send Errors:
// Handle channel send failures in custom handlers
try {
outputChannel.send(message);
} catch (Exception e) {
logger.error("Failed to send to channel", e);
// Don't throw - let adapter handle
}Session Not Found:
// In custom handlers, always check session state
if (sessionStates.containsKey(session.getId())) {
SessionState state = sessionStates.get(session.getId());
// Use state
} else {
logger.warn("Session state not found: {}", session.getId());
}Best Practices:
ConcurrentHashMap for session stateSymptoms: IllegalStateException when resolving protocol handler.
Causes:
Solutions:
// Always provide default handler
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(
Arrays.asList(stompHandler, customHandler),
defaultHandler // Default handler
);
// Verify protocol negotiation
String acceptedProtocol = session.getAcceptedProtocol();
logger.debug("Accepted protocol: {}", acceptedProtocol);
// Verify handler supports protocol
List<String> supportedProtocols = handler.getSupportedProtocols();
logger.debug("Supported protocols: {}", supportedProtocols);Symptoms: IllegalArgumentException with "Cannot convert payload" message.
Causes:
Solutions:
// Use supported payload types
// PassThruSubProtocolHandler supports: String, byte[], ByteBuffer
// For custom handlers, implement conversion logic
@Override
public void handleMessageToClient(WebSocketSession session, Message<?> message)
throws Exception {
Object payload = message.getPayload();
if (payload instanceof String) {
session.sendMessage(new TextMessage((String) payload));
} else if (payload instanceof byte[]) {
session.sendMessage(new TextMessage(new String((byte[]) payload)));
} else {
throw new IllegalArgumentException("Unsupported payload type: " +
payload.getClass());
}
}Symptoms: Session state lost between messages.
Causes:
Solutions:
// Use thread-safe data structures
private final Map<String, SessionState> sessionStates = new ConcurrentHashMap<>();
// Properly initialize and cleanup
@Override
public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) {
sessionStates.put(session.getId(), new SessionState());
}
@Override
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus,
MessageChannel outputChannel) {
sessionStates.remove(session.getId());
}// From Spring Framework
interface SubProtocolHandler {
/**
* Get list of supported sub-protocol names.
*
* @return list of protocol names (never null, may be empty)
*/
List<String> getSupportedProtocols();
/**
* Handle message received from WebSocket client.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param webSocketMessage the received WebSocket message (must not be null)
* @param outputChannel channel to send converted message to (must not be null)
*/
void handleMessageFromClient(
WebSocketSession session,
WebSocketMessage<?> message,
MessageChannel outputChannel);
/**
* Handle message to be sent to WebSocket client.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param message the Integration message (must not be null)
* @throws Exception if conversion or sending fails
*/
void handleMessageToClient(
WebSocketSession session,
Message<?> message) throws Exception;
/**
* Resolve session ID from message headers.
*
* @param message the Integration message (must not be null)
* @return session ID or null if not resolvable
*/
String resolveSessionId(Message<?> message);
/**
* Invoked when WebSocket session starts.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param outputChannel the output message channel (must not be null)
*/
void afterSessionStarted(
WebSocketSession session,
MessageChannel outputChannel);
/**
* Invoked when WebSocket session ends.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param closeStatus the close status (must not be null)
* @param outputChannel the output message channel (must not be null)
*/
void afterSessionEnded(
WebSocketSession session,
CloseStatus closeStatus,
MessageChannel outputChannel);
}// From Spring Framework
interface SubProtocolCapable {
/**
* Get list of supported sub-protocol names.
*
* @return list of protocol names (never null, may be empty)
*/
List<String> getSubProtocols();
}