The outbound message handler sends Spring Integration messages to WebSocket sessions. It converts Integration messages to WebSocket messages using configured protocol handlers and message converters, handling both client and server-side scenarios.
Message handler that sends Integration messages to WebSocket sessions.
/**
* MessageHandler for sending Integration messages via WebSocket.
* Converts Integration messages to WebSocket messages and delivers them.
* Thread-safe for concurrent message sending.
*
* @since 4.1
*/
public class WebSocketOutboundMessageHandler extends AbstractMessageHandler {
/**
* Create outbound handler with default pass-through protocol handler.
*
* @param webSocketContainer the container managing WebSocket sessions
* @throws IllegalArgumentException if container is null
*/
public WebSocketOutboundMessageHandler(IntegrationWebSocketContainer webSocketContainer);
/**
* Create outbound handler with custom protocol handler registry.
* Allows for STOMP or custom sub-protocol support.
*
* @param webSocketContainer the container managing WebSocket sessions
* @param protocolHandlerRegistry the sub-protocol handler registry
* @throws IllegalArgumentException if container or registry is null
*/
public WebSocketOutboundMessageHandler(
IntegrationWebSocketContainer webSocketContainer,
SubProtocolHandlerRegistry protocolHandlerRegistry);
/**
* Set message converters for payload serialization.
* Converts Integration message payloads to WebSocket-compatible format.
* If not set, default converters are used (String, byte[], JSON).
*
* @param messageConverters list of MessageConverter instances
* @throws IllegalArgumentException if converters list is null
*/
public void setMessageConverters(List<MessageConverter> messageConverters);
/**
* Configure whether custom converters should merge with default converters.
* If true, custom converters are tried first, then defaults.
* If false, only custom converters are used.
* Default: true
*
* @param mergeWithDefaultConverters true to merge, false to replace
*/
public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);
/**
* Get the component type identifier.
*
* @return "websocket:outbound-channel-adapter"
*/
@Override
public String getComponentType();
/**
* Handle message and send via WebSocket.
* Thread-safe: can be called concurrently.
*
* @param message the Integration message to send
* @throws MessageHandlingException if message handling fails
* @throws IllegalArgumentException if session ID missing (server mode) or payload type unsupported
* @throws SessionLimitExceededException if send buffer limit exceeded
*/
@Override
public void handleMessage(Message<?> message);
}Usage Example - Basic Client:
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.messaging.support.GenericMessage;
// Create client container
ClientWebSocketContainer container =
new ClientWebSocketContainer(client, "ws://localhost:8080/websocket");
container.start();
// Wait for connection
if (!container.isConnected()) {
// Handle connection failure
throw new IllegalStateException("WebSocket not connected");
}
// Create outbound handler
WebSocketOutboundMessageHandler outboundHandler =
new WebSocketOutboundMessageHandler(container);
outboundHandler.afterPropertiesSet();
// Send a message
outboundHandler.handleMessage(new GenericMessage<>("Hello WebSocket!"));
// Send JSON data
MyData data = new MyData("value1", 42);
outboundHandler.handleMessage(new GenericMessage<>(data));Usage Example - Server-Side Broadcasting:
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import java.util.HashMap;
import java.util.Map;
// Create server container
ServerWebSocketContainer serverContainer =
new ServerWebSocketContainer("/websocket");
serverContainer.setHandshakeHandler(handshakeHandler);
serverContainer.start();
// Create outbound handler
WebSocketOutboundMessageHandler outboundHandler =
new WebSocketOutboundMessageHandler(serverContainer);
outboundHandler.afterPropertiesSet();
// Send to specific session
String sessionId = "session-123";
WebSocketSession session = serverContainer.getSession(sessionId);
if (session != null && session.isOpen()) {
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId);
Message<String> message = MessageBuilder
.withPayload("Message for specific client")
.copyHeaders(headers)
.build();
outboundHandler.handleMessage(message);
}
// Broadcast to all sessions
for (WebSocketSession session : serverContainer.getSessions().values()) {
if (session.isOpen()) {
Map<String, Object> sessionHeaders = new HashMap<>();
sessionHeaders.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());
Message<String> broadcastMsg = MessageBuilder
.withPayload("Broadcast message")
.copyHeaders(sessionHeaders)
.build();
try {
outboundHandler.handleMessage(broadcastMsg);
} catch (Exception e) {
// Handle per-session errors
logger.error("Failed to send to session " + session.getId(), e);
}
}
}Usage Example - Custom Message Converters:
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.Arrays;
// Create custom converters
List<MessageConverter> converters = Arrays.asList(
new MappingJackson2MessageConverter(), // Object to JSON
new StringMessageConverter() // String handling
);
// Create outbound handler with custom converters
WebSocketOutboundMessageHandler handler =
new WebSocketOutboundMessageHandler(container);
handler.setMessageConverters(converters);
handler.setMergeWithDefaultConverters(true); // Keep defaults as fallback
handler.afterPropertiesSet();
// Send complex object - automatically converted to JSON
MyComplexObject obj = new MyComplexObject();
handler.handleMessage(new GenericMessage<>(obj));Usage Example - STOMP Protocol:
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
// Create STOMP protocol handler
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
// Create protocol handler registry
SubProtocolHandlerRegistry registry =
new SubProtocolHandlerRegistry(stompHandler);
// Create outbound handler with STOMP support
WebSocketOutboundMessageHandler handler =
new WebSocketOutboundMessageHandler(serverContainer, registry);
handler.afterPropertiesSet();
// Send STOMP message with destination
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(
SimpMessageType.MESSAGE);
headers.setSessionId("session-123");
headers.setDestination("/topic/notifications");
headers.setLeaveMutable(true);
Message<String> stompMessage = MessageBuilder
.createMessage("Notification message", headers.getMessageHeaders());
handler.handleMessage(stompMessage);Usage Example - Integration with Message Channel:
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
// Create output channel
MessageChannel outboundChannel = new DirectChannel();
// Create and subscribe outbound handler
WebSocketOutboundMessageHandler handler =
new WebSocketOutboundMessageHandler(container);
handler.afterPropertiesSet();
((DirectChannel) outboundChannel).subscribe(handler);
// Now messages sent to the channel are delivered via WebSocket
outboundChannel.send(new GenericMessage<>("Channeled message"));The handler includes three default message converters:
When custom converters are configured:
mergeWithDefaultConverters=true: Custom converters are tried first, then defaultsmergeWithDefaultConverters=false: Only custom converters are usedConverter Selection Order:
IllegalArgumentExceptionError Points in Flow:
IllegalArgumentExceptionIllegalStateExceptionIllegalArgumentExceptionSessionLimitExceededExceptionIOException wrapped in MessageHandlingExceptionIllegalStateException is thrownExample:
// Client mode - session ID not needed
ClientWebSocketContainer container = new ClientWebSocketContainer(client, uri);
container.start();
WebSocketOutboundMessageHandler handler =
new WebSocketOutboundMessageHandler(container);
handler.afterPropertiesSet();
// Session ID in headers is ignored
Message<String> message = new GenericMessage<>("Hello");
handler.handleMessage(message); // Uses single client sessionSimpMessageHeaderAccessor.SESSION_ID_HEADER constantIllegalArgumentException if session ID missingIllegalStateException if session not found or closedExample:
// Server mode - session ID required
ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");
container.start();
WebSocketOutboundMessageHandler handler =
new WebSocketOutboundMessageHandler(container);
handler.afterPropertiesSet();
// Session ID must be in headers
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, "session-123");
Message<String> message = MessageBuilder
.withPayload("Hello")
.copyHeaders(headers)
.build();
handler.handleMessage(message);When send buffer limit is exceeded:
Handling:
try {
outboundHandler.handleMessage(message);
} catch (SessionLimitExceededException e) {
// Buffer limit exceeded
CloseStatus status = e.getStatus();
logger.warn("Session closed due to buffer overflow: {}", status);
// Session is already closed by handler
// Implement retry logic if needed
}Prevention:
// Increase buffer size
container.setSendBufferSizeLimit(1024 * 1024); // 1 MB
// Use DROP_OLDEST for non-critical messages
container.setSendBufferOverflowStrategy(
ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP_OLDEST
);For other errors (network, session closed, etc.):
Handling:
try {
outboundHandler.handleMessage(message);
} catch (MessageHandlingException e) {
Message<?> failedMessage = e.getFailedMessage();
Throwable cause = e.getCause();
if (cause instanceof IOException) {
// Network error - session may be closed
logger.error("Network error sending message", e);
} else if (cause instanceof IllegalStateException) {
// Session closed or not found
logger.warn("Session not available", e);
} else {
// Other error
logger.error("Failed to send message", e);
}
}Thrown when:
Handling:
// Always check session ID in server mode
if (message.getHeaders().containsKey(SimpMessageHeaderAccessor.SESSION_ID_HEADER)) {
outboundHandler.handleMessage(message);
} else {
logger.error("Session ID missing in message headers");
throw new IllegalArgumentException("Session ID required");
}Thrown when:
Handling:
// Client mode - check connection
if (container instanceof ClientWebSocketContainer) {
ClientWebSocketContainer clientContainer = (ClientWebSocketContainer) container;
if (!clientContainer.isConnected()) {
throw new IllegalStateException("WebSocket not connected");
}
}
// Server mode - verify session exists
String sessionId = (String) message.getHeaders()
.get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);
WebSocketSession session = container.getSession(sessionId);
if (session == null || !session.isOpen()) {
throw new IllegalStateException("Session not available: " + sessionId);
}ConcurrentWebSocketSessionDecorator)Best Practice: Multiple threads can safely send messages to the same session concurrently. The session decorator handles synchronization internally.
ConcurrentWebSocketSessionDecorator for thread-safe bufferingBuffer Behavior:
Optimization Tips:
Symptoms: IllegalArgumentException when sending messages in server mode.
Causes:
Solutions:
// Always include session ID in headers
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId);
Message<String> message = MessageBuilder
.withPayload("Hello")
.copyHeaders(headers)
.build();
// Verify session exists before sending
WebSocketSession session = container.getSession(sessionId);
if (session != null && session.isOpen()) {
outboundHandler.handleMessage(message);
} else {
logger.warn("Session not available: {}", sessionId);
}Symptoms: Sessions closing unexpectedly with SessionLimitExceededException.
Causes:
Solutions:
// Increase buffer size
container.setSendBufferSizeLimit(1024 * 1024); // 1 MB
// Use DROP_OLDEST for non-critical messages
container.setSendBufferOverflowStrategy(
ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP_OLDEST
);
// Increase send timeout
container.setSendTimeLimit(30000); // 30 seconds
// Monitor buffer usage
// Implement message rate limiting if neededSymptoms: IllegalStateException when sending messages.
Causes:
Solutions:
// Check connection status
if (!container.isConnected()) {
// Attempt to reconnect
container.start();
// Wait for connection with timeout
int timeout = 10; // seconds
long start = System.currentTimeMillis();
while (!container.isConnected() &&
(System.currentTimeMillis() - start) < timeout * 1000) {
Thread.sleep(100);
}
if (!container.isConnected()) {
throw new IllegalStateException("Failed to connect");
}
}Symptoms: IllegalArgumentException with "Cannot convert payload" message.
Causes:
Solutions:
// Configure appropriate converters
List<MessageConverter> converters = Arrays.asList(
new MappingJackson2MessageConverter(),
new StringMessageConverter()
);
handler.setMessageConverters(converters);
handler.setMergeWithDefaultConverters(true);
// Verify payload type is supported
// String, byte[], and JSON-serializable objects are supported by defaultSymptoms: No errors but messages not appearing on client.
Causes:
Solutions:
// Verify session is open
if (session.isOpen()) {
outboundHandler.handleMessage(message);
} else {
logger.warn("Session closed: {}", session.getId());
}
// Check network connectivity
// Verify protocol handler configuration
// For STOMP: ensure client is subscribed to destination// Key headers for outbound messages
class SimpMessageHeaderAccessor {
public static final String SESSION_ID_HEADER = "simpSessionId";
public static final String DESTINATION_HEADER = "simpDestination";
public static final String MESSAGE_TYPE_HEADER = "simpMessageType";
public static SimpMessageHeaderAccessor create(SimpMessageType messageType);
public void setSessionId(String sessionId);
public void setDestination(String destination);
public void setMessageTypeIfNotSet(SimpMessageType messageType);
public void setLeaveMutable(boolean leaveMutable);
}/**
* Exception thrown when send buffer limit is exceeded.
* Contains the close status for session termination.
* Caught and handled internally by handler.
*/
class SessionLimitExceededException extends RuntimeException {
/**
* Get the close status that will be used to close the session.
*
* @return the close status (never null)
*/
public CloseStatus getStatus();
}/**
* Exception thrown when message handling fails.
* Contains the original message and detailed error info.
* Propagates to caller.
*/
class MessageHandlingException extends MessagingException {
/**
* Get the message that failed to be handled.
*
* @return the failed message (never null)
*/
public Message<?> getFailedMessage();
/**
* Get the cause of the failure.
*
* @return the cause (may be null)
*/
public Throwable getCause();
}// Outbound handler for sending requests
WebSocketOutboundMessageHandler outbound =
new WebSocketOutboundMessageHandler(container);
outbound.afterPropertiesSet();
// Inbound adapter for receiving replies
WebSocketInboundChannelAdapter inbound =
new WebSocketInboundChannelAdapter(container);
DirectChannel replyChannel = new DirectChannel();
inbound.setOutputChannel(replyChannel);
inbound.afterPropertiesSet();
inbound.start();
// Send request with correlation ID
String correlationId = UUID.randomUUID().toString();
Message<String> request = MessageBuilder
.withPayload("request-data")
.setHeader("correlationId", correlationId)
.build();
try {
outbound.handleMessage(request);
} catch (Exception e) {
logger.error("Failed to send request", e);
}
// Reply handler matches correlation ID
replyChannel.subscribe(message -> {
String replyCorrelationId = (String) message.getHeaders().get("correlationId");
if (correlationId.equals(replyCorrelationId)) {
// Process matching reply
processReply(message);
}
});// Single outbound handler for broadcasting
WebSocketOutboundMessageHandler broadcaster =
new WebSocketOutboundMessageHandler(serverContainer);
broadcaster.afterPropertiesSet();
// Broadcast to all connected clients
void broadcast(String message) {
int successCount = 0;
int failureCount = 0;
for (WebSocketSession session : serverContainer.getSessions().values()) {
if (session.isOpen()) {
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());
Message<String> msg = MessageBuilder
.withPayload(message)
.copyHeaders(headers)
.build();
try {
broadcaster.handleMessage(msg);
successCount++;
} catch (Exception e) {
failureCount++;
logger.error("Failed to send to session " + session.getId(), e);
}
}
}
logger.info("Broadcast complete: {} success, {} failures", successCount, failureCount);
}// Route messages based on session attributes
void routeMessage(String payload, String userRole) {
for (WebSocketSession session : serverContainer.getSessions().values()) {
Map<String, Object> attributes = session.getAttributes();
String sessionRole = (String) attributes.get("userRole");
if (userRole.equals(sessionRole) && session.isOpen()) {
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());
Message<String> msg = MessageBuilder
.withPayload(payload)
.copyHeaders(headers)
.build();
try {
outboundHandler.handleMessage(msg);
} catch (Exception e) {
logger.error("Failed to route message to session " + session.getId(), e);
}
}
}
}// Retry logic with exponential backoff
public void sendWithRetry(Message<?> message, int maxRetries) {
int retryCount = 0;
long delay = 100; // Initial delay in ms
while (retryCount < maxRetries) {
try {
outboundHandler.handleMessage(message);
return; // Success
} catch (MessageHandlingException e) {
retryCount++;
if (retryCount >= maxRetries) {
logger.error("Failed to send after {} retries", maxRetries, e);
throw e;
}
// Exponential backoff
try {
Thread.sleep(delay);
delay *= 2; // Double delay for next retry
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
}