The inbound message adapter receives WebSocket messages from connected sessions and sends them into Spring Integration message channels. It implements WebSocketListener to handle WebSocket events and converts WebSocket messages to Spring Integration messages with automatic payload conversion.
Message producer that receives WebSocket messages and publishes them to Integration channels.
/**
* MessageProducer for inbound WebSocket messages.
* Implements WebSocketListener to receive WebSocket events and messages.
*
* @since 4.1
*/
public class WebSocketInboundChannelAdapter extends MessageProducerSupport
implements WebSocketListener, ApplicationEventPublisherAware {
/**
* Create inbound adapter with default pass-through protocol handler.
*
* @param webSocketContainer the container managing WebSocket sessions
* @throws IllegalArgumentException if container is null
*/
public WebSocketInboundChannelAdapter(IntegrationWebSocketContainer webSocketContainer);
/**
* Create inbound adapter with custom protocol handler registry.
* Allows for STOMP or custom sub-protocol support.
*
* @param webSocketContainer the container managing WebSocket sessions
* @param protocolHandlerRegistry the sub-protocol handler registry
* @throws IllegalArgumentException if container or registry is null
*/
public WebSocketInboundChannelAdapter(
IntegrationWebSocketContainer webSocketContainer,
SubProtocolHandlerRegistry protocolHandlerRegistry);
/**
* Set message converters for payload type conversion.
* Converts WebSocket message payloads to the configured payload type.
* If not set, default converters are used (String, byte[], JSON).
*
* @param messageConverters list of MessageConverter instances
* @throws IllegalArgumentException if converters list is null
*/
public void setMessageConverters(List<MessageConverter> messageConverters);
/**
* Configure whether custom converters should merge with default converters.
* If true, custom converters are tried first, then defaults.
* If false, only custom converters are used.
* Default: true
*
* @param mergeWithDefaultConverters true to merge, false to replace
*/
public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);
/**
* Set the target payload type for message conversion.
* WebSocket message bodies will be converted to this type.
* Default: String.class
*
* @param payloadType the target payload class
* @throws IllegalArgumentException if payloadType is null
*/
public void setPayloadType(Class<?> payloadType);
/**
* Enable broker message routing for server-side STOMP support.
* When enabled, uses existing AbstractBrokerMessageHandler bean
* for non-MESSAGE type messages and broker destination routing.
* Only applicable on server side; ignored on client side.
* Default: false
*
* @param useBroker true to enable broker routing
*/
public void setUseBroker(boolean useBroker);
/**
* Set the ApplicationEventPublisher for publishing WebSocket events.
* Used to publish SessionConnectedEvent and ReceiptEvent.
* If not set, events are not published.
*
* @param applicationEventPublisher the event publisher
*/
@Override
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher);
/**
* Get the component type identifier.
*
* @return "websocket:inbound-channel-adapter"
*/
@Override
public String getComponentType();
/**
* Check if the adapter is active and accepting messages.
*
* @return true if active and running
*/
public boolean isActive();
// WebSocketListener implementation
/**
* Get supported sub-protocols from the protocol handler registry.
*
* @return list of supported sub-protocol names (never null, may be empty)
*/
@Override
public List<String> getSubProtocols();
/**
* Handle incoming WebSocket message.
* Delegates to protocol handler for conversion and processing.
* Thread-safe: can be called concurrently from multiple sessions.
*
* @param session the WebSocket session (must not be null)
* @param message the WebSocket message (must not be null)
* @throws IllegalStateException if adapter not initialized or not active
*/
@Override
public void onMessage(WebSocketSession session, WebSocketMessage<?> message);
/**
* Invoked when WebSocket session starts.
* Notifies protocol handler and sends CONNECT frame for client-side STOMP.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
*/
@Override
public void afterSessionStarted(WebSocketSession session);
/**
* Invoked when WebSocket session ends.
* Notifies protocol handler for cleanup.
* Thread-safe: can be called concurrently.
*
* @param session the WebSocket session (must not be null)
* @param closeStatus the close status (must not be null)
*/
@Override
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
}Usage Example - Basic:
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.handler.LoggingHandler;
// Create container
ClientWebSocketContainer container =
new ClientWebSocketContainer(client, "ws://localhost:8080/websocket");
// Create message channel for received messages
DirectChannel inputChannel = new DirectChannel();
// Create inbound adapter
WebSocketInboundChannelAdapter inboundAdapter =
new WebSocketInboundChannelAdapter(container);
inboundAdapter.setOutputChannel(inputChannel);
// Configure payload type
inboundAdapter.setPayloadType(String.class);
// Add handler to process messages
inputChannel.subscribe(new LoggingHandler(LoggingHandler.Level.INFO));
// Initialize and start (required)
inboundAdapter.afterPropertiesSet();
inboundAdapter.start();
// Start container to establish connection
container.start();Usage Example - Custom Message Converters:
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.Arrays;
// Create custom converters
List<MessageConverter> converters = Arrays.asList(
new MappingJackson2MessageConverter(), // JSON to object
new StringMessageConverter() // String handling
);
// Create inbound adapter with custom converters
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(container);
adapter.setMessageConverters(converters);
adapter.setMergeWithDefaultConverters(true); // Keep default converters as fallback
adapter.setPayloadType(MyDataClass.class);
adapter.setOutputChannel(outputChannel);
// Initialize
adapter.afterPropertiesSet();
adapter.start();Usage Example - STOMP Protocol:
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
// Create STOMP protocol handler
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
// Create protocol handler registry
SubProtocolHandlerRegistry registry =
new SubProtocolHandlerRegistry(stompHandler);
// Create inbound adapter with STOMP support
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(serverContainer, registry);
adapter.setOutputChannel(inputChannel);
// Enable broker routing for server-side
adapter.setUseBroker(true);
adapter.afterPropertiesSet();
adapter.start();Usage Example - Server-Side with Event Publishing:
import org.springframework.context.event.EventListener;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.integration.websocket.event.ReceiptEvent;
// Create server container
ServerWebSocketContainer serverContainer =
new ServerWebSocketContainer("/websocket");
serverContainer.setHandshakeHandler(handshakeHandler);
// Create inbound adapter
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(serverContainer);
adapter.setOutputChannel(messageChannel);
adapter.setApplicationEventPublisher(applicationContext);
// Handle connection events
@EventListener
public void handleSessionConnected(SessionConnectedEvent event) {
Message<byte[]> message = event.getMessage();
String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
System.out.println("Client connected: " + sessionId);
}
// Handle STOMP receipt events (client-side)
@EventListener
public void handleReceipt(ReceiptEvent event) {
System.out.println("Receipt received: " + event.getMessage());
}
adapter.afterPropertiesSet();
adapter.start();The adapter includes three default message converters:
When custom converters are configured:
mergeWithDefaultConverters=true: Custom converters are tried first, then defaultsmergeWithDefaultConverters=false: Only custom converters are usedConverter Selection Order:
onMessage() on the adapterError Handling in Flow:
afterSessionStarted()SessionConnectedEvent is published if ApplicationEventPublisher is setThread Safety: afterSessionStarted() is thread-safe and can be called concurrently.
simpSessionId: WebSocket session IDsimpSessionAttributes: Session attributes mapsimpUser: Authenticated principal (if available)simpMessageType: Message type (MESSAGE, CONNECT, etc.)Null Handling:
IllegalArgumentException is thrownIllegalArgumentException is thrownafterSessionEnded()Thread Safety: afterSessionEnded() is thread-safe and can be called concurrently.
Missing Output Channel:
WebSocketInboundChannelAdapter adapter = new WebSocketInboundChannelAdapter(container);
// Must set output channel before initialization
adapter.setOutputChannel(outputChannel);
adapter.afterPropertiesSet(); // Throws IllegalStateException if channel not setNull Container:
// Throws IllegalArgumentException
WebSocketInboundChannelAdapter adapter = new WebSocketInboundChannelAdapter(null);Protocol Handler Exceptions:
Message Conversion Failures:
Channel Send Failures:
DirectChannel channel = new DirectChannel();
channel.setErrorHandler(error -> {
logger.error("Failed to send message", error);
// Custom error handling
});
adapter.setOutputChannel(channel);Session Already Closed:
// Protocol handler checks session state
// If closed, message is not processed
// No exception thrown, just loggedSession Not Found:
Best Practice: If creating custom protocol handlers, ensure they are thread-safe or use thread-local state.
Optimization Tips:
Symptoms: WebSocket messages arrive but don't appear in output channel.
Causes:
Solutions:
// Ensure adapter is started
if (!adapter.isActive()) {
adapter.start();
}
// Verify channel has subscribers
DirectChannel channel = (DirectChannel) adapter.getOutputChannel();
if (channel.getSubscriberCount() == 0) {
channel.subscribe(message -> {
// Process message
});
}
// Check protocol handler
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(handler);
// Verify handler supports received protocol
// Enable debug logging
logger.debug("Adapter active: {}", adapter.isActive());
logger.debug("Channel subscribers: {}", channel.getSubscriberCount());Symptoms: Messages received but payload type mismatch errors in logs.
Causes:
Solutions:
// Set appropriate payload type
adapter.setPayloadType(String.class); // For text messages
// Configure custom converters
List<MessageConverter> converters = Arrays.asList(
new MappingJackson2MessageConverter()
);
adapter.setMessageConverters(converters);
adapter.setPayloadType(MyDataClass.class);
// Verify JSON structure matches target class
// Enable converter debug loggingSymptoms: STOMP messages not routed through broker.
Causes:
useBroker not enabledSolutions:
// Enable broker routing (server-side only)
adapter.setUseBroker(true);
// Ensure broker bean exists
@Bean
public AbstractBrokerMessageHandler brokerMessageHandler() {
// Configure broker
}
// Verify server-side usage
if (container instanceof ServerWebSocketContainer) {
adapter.setUseBroker(true);
}Symptoms: SessionConnectedEvent and ReceiptEvent not received.
Causes:
Solutions:
// Set event publisher
adapter.setApplicationEventPublisher(applicationContext);
// Configure event listeners
@EventListener
public void handleSessionConnected(SessionConnectedEvent event) {
// Handle event
}
// Verify event types
// SessionConnectedEvent: Published on server-side CONNECT
// ReceiptEvent: Published on client-side RECEIPT frame// From Spring Framework
interface WebSocketMessage<T> {
T getPayload();
int getPayloadLength();
boolean isLast();
}
class TextMessage implements WebSocketMessage<String> {
public TextMessage(String payload);
public TextMessage(CharBuffer payload);
}
class BinaryMessage implements WebSocketMessage<ByteBuffer> {
public BinaryMessage(ByteBuffer payload);
public BinaryMessage(byte[] payload);
}
class PingMessage implements WebSocketMessage<ByteBuffer> { }
class PongMessage implements WebSocketMessage<ByteBuffer> { }// Common headers in converted messages
class SimpMessageHeaderAccessor {
public static final String SESSION_ID_HEADER = "simpSessionId";
public static final String SESSION_ATTRIBUTES_HEADER = "simpSessionAttributes";
public static final String USER_HEADER = "simpUser";
public static final String MESSAGE_TYPE_HEADER = "simpMessageType";
public static final String DESTINATION_HEADER = "simpDestination";
public static String getSessionId(MessageHeaders headers);
public static Map<String, Object> getSessionAttributes(MessageHeaders headers);
public static Principal getUser(MessageHeaders headers);
public static SimpMessageType getMessageType(MessageHeaders headers);
public static String getDestination(MessageHeaders headers);
}// From Spring Framework
enum SimpMessageType {
CONNECT, // Client connection request
CONNECT_ACK, // Server connection acknowledgment
MESSAGE, // Regular message
SUBSCRIBE, // STOMP subscribe
UNSUBSCRIBE, // STOMP unsubscribe
HEARTBEAT, // Heartbeat ping
DISCONNECT, // Disconnect request
DISCONNECT_ACK, // Disconnect acknowledgment
OTHER // Other message types
}/**
* Event published when WebSocket session is established (server-side)
* Published only if ApplicationEventPublisher is set on adapter.
*/
class SessionConnectedEvent extends AbstractSubProtocolEvent {
public Message<byte[]> getMessage();
public String getSessionId();
}
/**
* Event published when STOMP RECEIPT frame is received (client-side)
* Published only if ApplicationEventPublisher is set on adapter.
* @since 4.1.3
*/
class ReceiptEvent extends AbstractSubProtocolEvent {
public Message<byte[]> getMessage();
public String getSessionId();
}@Configuration
public class WebSocketIntegrationConfig {
@Bean
public ServerWebSocketContainer webSocketContainer() {
ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");
container.setHandshakeHandler(new DefaultHandshakeHandler());
return container;
}
@Bean
public WebSocketInboundChannelAdapter inboundAdapter(
ServerWebSocketContainer container) {
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(container);
adapter.setOutputChannel(inputChannel());
adapter.setPayloadType(String.class);
return adapter;
}
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow processingFlow() {
return IntegrationFlows.from(inputChannel())
.transform(String::toUpperCase)
.handle(System.out::println)
.get();
}
}DirectChannel inputChannel = new DirectChannel();
inputChannel.setErrorHandler(error -> {
// Retry logic
Message<?> failedMessage = ((MessagingException) error).getFailedMessage();
// Implement retry with exponential backoff
});
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(container);
adapter.setOutputChannel(inputChannel);