or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

containers.mdinbound.mdindex.mdoutbound.mdprotocols.mdsockjs.md
tile.json

outbound.mddocs/

Outbound Message Handling

The outbound message handler sends Spring Integration messages to WebSocket sessions. It converts Integration messages to WebSocket messages using configured protocol handlers and message converters, handling both client and server-side scenarios.

Capabilities

WebSocketOutboundMessageHandler

Message handler that sends Integration messages to WebSocket sessions.

/**
 * MessageHandler for sending Integration messages via WebSocket.
 * Converts Integration messages to WebSocket messages and delivers them.
 * Thread-safe for concurrent message sending.
 *
 * @since 4.1
 */
public class WebSocketOutboundMessageHandler extends AbstractMessageHandler {

    /**
     * Create outbound handler with default pass-through protocol handler.
     *
     * @param webSocketContainer the container managing WebSocket sessions
     * @throws IllegalArgumentException if container is null
     */
    public WebSocketOutboundMessageHandler(IntegrationWebSocketContainer webSocketContainer);

    /**
     * Create outbound handler with custom protocol handler registry.
     * Allows for STOMP or custom sub-protocol support.
     *
     * @param webSocketContainer the container managing WebSocket sessions
     * @param protocolHandlerRegistry the sub-protocol handler registry
     * @throws IllegalArgumentException if container or registry is null
     */
    public WebSocketOutboundMessageHandler(
        IntegrationWebSocketContainer webSocketContainer,
        SubProtocolHandlerRegistry protocolHandlerRegistry);

    /**
     * Set message converters for payload serialization.
     * Converts Integration message payloads to WebSocket-compatible format.
     * If not set, default converters are used (String, byte[], JSON).
     *
     * @param messageConverters list of MessageConverter instances
     * @throws IllegalArgumentException if converters list is null
     */
    public void setMessageConverters(List<MessageConverter> messageConverters);

    /**
     * Configure whether custom converters should merge with default converters.
     * If true, custom converters are tried first, then defaults.
     * If false, only custom converters are used.
     * Default: true
     *
     * @param mergeWithDefaultConverters true to merge, false to replace
     */
    public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);

    /**
     * Get the component type identifier.
     *
     * @return "websocket:outbound-channel-adapter"
     */
    @Override
    public String getComponentType();

    /**
     * Handle message and send via WebSocket.
     * Thread-safe: can be called concurrently.
     *
     * @param message the Integration message to send
     * @throws MessageHandlingException if message handling fails
     * @throws IllegalArgumentException if session ID missing (server mode) or payload type unsupported
     * @throws SessionLimitExceededException if send buffer limit exceeded
     */
    @Override
    public void handleMessage(Message<?> message);
}

Usage Example - Basic Client:

import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.messaging.support.GenericMessage;

// Create client container
ClientWebSocketContainer container =
    new ClientWebSocketContainer(client, "ws://localhost:8080/websocket");
container.start();

// Wait for connection
if (!container.isConnected()) {
    // Handle connection failure
    throw new IllegalStateException("WebSocket not connected");
}

// Create outbound handler
WebSocketOutboundMessageHandler outboundHandler =
    new WebSocketOutboundMessageHandler(container);
outboundHandler.afterPropertiesSet();

// Send a message
outboundHandler.handleMessage(new GenericMessage<>("Hello WebSocket!"));

// Send JSON data
MyData data = new MyData("value1", 42);
outboundHandler.handleMessage(new GenericMessage<>(data));

Usage Example - Server-Side Broadcasting:

import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import java.util.HashMap;
import java.util.Map;

// Create server container
ServerWebSocketContainer serverContainer =
    new ServerWebSocketContainer("/websocket");
serverContainer.setHandshakeHandler(handshakeHandler);
serverContainer.start();

// Create outbound handler
WebSocketOutboundMessageHandler outboundHandler =
    new WebSocketOutboundMessageHandler(serverContainer);
outboundHandler.afterPropertiesSet();

// Send to specific session
String sessionId = "session-123";
WebSocketSession session = serverContainer.getSession(sessionId);
if (session != null && session.isOpen()) {
    Map<String, Object> headers = new HashMap<>();
    headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId);

    Message<String> message = MessageBuilder
        .withPayload("Message for specific client")
        .copyHeaders(headers)
        .build();

    outboundHandler.handleMessage(message);
}

// Broadcast to all sessions
for (WebSocketSession session : serverContainer.getSessions().values()) {
    if (session.isOpen()) {
        Map<String, Object> sessionHeaders = new HashMap<>();
        sessionHeaders.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());

        Message<String> broadcastMsg = MessageBuilder
            .withPayload("Broadcast message")
            .copyHeaders(sessionHeaders)
            .build();

        try {
            outboundHandler.handleMessage(broadcastMsg);
        } catch (Exception e) {
            // Handle per-session errors
            logger.error("Failed to send to session " + session.getId(), e);
        }
    }
}

Usage Example - Custom Message Converters:

import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.Arrays;

// Create custom converters
List<MessageConverter> converters = Arrays.asList(
    new MappingJackson2MessageConverter(),  // Object to JSON
    new StringMessageConverter()             // String handling
);

// Create outbound handler with custom converters
WebSocketOutboundMessageHandler handler =
    new WebSocketOutboundMessageHandler(container);
handler.setMessageConverters(converters);
handler.setMergeWithDefaultConverters(true); // Keep defaults as fallback

handler.afterPropertiesSet();

// Send complex object - automatically converted to JSON
MyComplexObject obj = new MyComplexObject();
handler.handleMessage(new GenericMessage<>(obj));

Usage Example - STOMP Protocol:

import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;

// Create STOMP protocol handler
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();

// Create protocol handler registry
SubProtocolHandlerRegistry registry =
    new SubProtocolHandlerRegistry(stompHandler);

// Create outbound handler with STOMP support
WebSocketOutboundMessageHandler handler =
    new WebSocketOutboundMessageHandler(serverContainer, registry);
handler.afterPropertiesSet();

// Send STOMP message with destination
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(
    SimpMessageType.MESSAGE);
headers.setSessionId("session-123");
headers.setDestination("/topic/notifications");
headers.setLeaveMutable(true);

Message<String> stompMessage = MessageBuilder
    .createMessage("Notification message", headers.getMessageHeaders());

handler.handleMessage(stompMessage);

Usage Example - Integration with Message Channel:

import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

// Create output channel
MessageChannel outboundChannel = new DirectChannel();

// Create and subscribe outbound handler
WebSocketOutboundMessageHandler handler =
    new WebSocketOutboundMessageHandler(container);
handler.afterPropertiesSet();

((DirectChannel) outboundChannel).subscribe(handler);

// Now messages sent to the channel are delivered via WebSocket
outboundChannel.send(new GenericMessage<>("Channeled message"));

Default Message Converters

The handler includes three default message converters:

  1. StringMessageConverter: Converts String payloads to WebSocket text messages
  2. ByteArrayMessageConverter: Converts byte[] payloads to WebSocket binary messages
  3. JacksonJsonMessageConverter: Converts objects to JSON (if Jackson 3 is present)

When custom converters are configured:

  • With mergeWithDefaultConverters=true: Custom converters are tried first, then defaults
  • With mergeWithDefaultConverters=false: Only custom converters are used

Converter Selection Order:

  1. Custom converters (if configured and mergeWithDefaultConverters=true)
  2. Default converters (String, byte[], JSON)
  3. If no converter matches, conversion fails with IllegalArgumentException

Message Flow

  1. Integration Message Received: Handler receives message from upstream
  2. Session Resolution:
    • Client mode: Uses the single client session
    • Server mode: Extracts session ID from message headers (required)
  3. Message Type Setting: Sets SIMP message type if not already present
  4. Payload Conversion: Message converters transform payload to WebSocket format
  5. Protocol Handler: Protocol handler converts to WebSocketMessage
  6. Delivery: WebSocketMessage is sent to the target session
  7. Error Handling: SessionLimitExceededException triggers session closure

Error Points in Flow:

  • Session ID missing (server mode): IllegalArgumentException
  • Session not found: IllegalStateException
  • Payload conversion failure: IllegalArgumentException
  • Send buffer overflow: SessionLimitExceededException
  • Network/session errors: IOException wrapped in MessageHandlingException

Session Resolution

Client Mode

  • Always uses the single established client session
  • Session ID in message headers is ignored
  • Container must be connected before sending
  • If not connected, IllegalStateException is thrown

Example:

// Client mode - session ID not needed
ClientWebSocketContainer container = new ClientWebSocketContainer(client, uri);
container.start();

WebSocketOutboundMessageHandler handler = 
    new WebSocketOutboundMessageHandler(container);
handler.afterPropertiesSet();

// Session ID in headers is ignored
Message<String> message = new GenericMessage<>("Hello");
handler.handleMessage(message); // Uses single client session

Server Mode

  • Requires session ID in message headers
  • Uses SimpMessageHeaderAccessor.SESSION_ID_HEADER constant
  • Throws IllegalArgumentException if session ID missing
  • Session must exist in container's session map
  • Throws IllegalStateException if session not found or closed

Example:

// Server mode - session ID required
ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");
container.start();

WebSocketOutboundMessageHandler handler = 
    new WebSocketOutboundMessageHandler(container);
handler.afterPropertiesSet();

// Session ID must be in headers
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, "session-123");

Message<String> message = MessageBuilder
    .withPayload("Hello")
    .copyHeaders(headers)
    .build();

handler.handleMessage(message);

Error Handling

SessionLimitExceededException

When send buffer limit is exceeded:

  1. Exception is caught by handler
  2. Error is logged at ERROR level
  3. Session is closed with appropriate status (from exception)
  4. If closure fails, secondary error is logged
  5. Exception is not propagated to caller (handled internally)

Handling:

try {
    outboundHandler.handleMessage(message);
} catch (SessionLimitExceededException e) {
    // Buffer limit exceeded
    CloseStatus status = e.getStatus();
    logger.warn("Session closed due to buffer overflow: {}", status);
    // Session is already closed by handler
    // Implement retry logic if needed
}

Prevention:

// Increase buffer size
container.setSendBufferSizeLimit(1024 * 1024); // 1 MB

// Use DROP_OLDEST for non-critical messages
container.setSendBufferOverflowStrategy(
    ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP_OLDEST
);

MessageHandlingException

For other errors (network, session closed, etc.):

  • Original message is included in exception
  • Detailed error message provided
  • Exception propagates to caller
  • Session may remain open (depends on error type)

Handling:

try {
    outboundHandler.handleMessage(message);
} catch (MessageHandlingException e) {
    Message<?> failedMessage = e.getFailedMessage();
    Throwable cause = e.getCause();
    
    if (cause instanceof IOException) {
        // Network error - session may be closed
        logger.error("Network error sending message", e);
    } else if (cause instanceof IllegalStateException) {
        // Session closed or not found
        logger.warn("Session not available", e);
    } else {
        // Other error
        logger.error("Failed to send message", e);
    }
}

IllegalArgumentException

Thrown when:

  • Session ID missing in server mode
  • Payload type not supported by converters
  • Container or registry is null

Handling:

// Always check session ID in server mode
if (message.getHeaders().containsKey(SimpMessageHeaderAccessor.SESSION_ID_HEADER)) {
    outboundHandler.handleMessage(message);
} else {
    logger.error("Session ID missing in message headers");
    throw new IllegalArgumentException("Session ID required");
}

IllegalStateException

Thrown when:

  • Client container not connected
  • Session not found in container
  • Session already closed

Handling:

// Client mode - check connection
if (container instanceof ClientWebSocketContainer) {
    ClientWebSocketContainer clientContainer = (ClientWebSocketContainer) container;
    if (!clientContainer.isConnected()) {
        throw new IllegalStateException("WebSocket not connected");
    }
}

// Server mode - verify session exists
String sessionId = (String) message.getHeaders()
    .get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);
WebSocketSession session = container.getSession(sessionId);
if (session == null || !session.isOpen()) {
    throw new IllegalStateException("Session not available: " + sessionId);
}

Thread Safety

Handler Thread Safety

  • WebSocketOutboundMessageHandler: Thread-safe for concurrent message sending
  • handleMessage(): Can be called concurrently from multiple threads
  • Session access: Uses thread-safe session decorator (ConcurrentWebSocketSessionDecorator)

Protocol Handler Thread Safety

  • PassThruSubProtocolHandler: Thread-safe (stateless)
  • StompSubProtocolHandler: Thread-safe (uses thread-local state)
  • Custom handlers: Must be thread-safe if shared across sessions

Best Practice: Multiple threads can safely send messages to the same session concurrently. The session decorator handles synchronization internally.

Performance Considerations

Send Buffer Management

  • Default buffer size: 512 KB per session
  • Default send timeout: 10 seconds
  • Overflow strategies: TERMINATE, DROP_OLDEST, DROP_CURRENT
  • Concurrent sends: Uses ConcurrentWebSocketSessionDecorator for thread-safe buffering

Buffer Behavior:

  • Messages accumulate in buffer during slow sends
  • Buffer size limit prevents memory exhaustion
  • Overflow strategy determines behavior when limit reached

Message Conversion Overhead

  • String messages: Minimal overhead (~0.1ms)
  • JSON conversion: Varies with payload size (typically 1-5ms)
  • Binary messages: Direct pass-through, minimal overhead

Concurrent Sends

  • Multiple threads can send to same session safely
  • Buffer accumulates messages during slow sends
  • No blocking between different sessions

Optimization Tips:

  • Configure appropriate buffer sizes based on message frequency
  • Use DROP_OLDEST strategy for high-frequency, non-critical messages
  • Use TERMINATE strategy for critical messages requiring delivery guarantee
  • Monitor session count and implement session limits if needed
  • Batch messages when possible to reduce conversion overhead

Memory Usage

  • Each handler instance: ~1-2 KB
  • Protocol handler state: Varies (PassThru: minimal, STOMP: ~100 bytes per session)
  • Message conversion cache: Minimal (converter-specific)
  • Send buffer: 512 KB per active session (default)

Troubleshooting

Issue: Session ID not found when sending

Symptoms: IllegalArgumentException when sending messages in server mode.

Causes:

  1. Session ID not in message headers
  2. Session already closed
  3. Wrong session ID

Solutions:

// Always include session ID in headers
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId);

Message<String> message = MessageBuilder
    .withPayload("Hello")
    .copyHeaders(headers)
    .build();

// Verify session exists before sending
WebSocketSession session = container.getSession(sessionId);
if (session != null && session.isOpen()) {
    outboundHandler.handleMessage(message);
} else {
    logger.warn("Session not available: {}", sessionId);
}

Issue: Buffer overflow causing session termination

Symptoms: Sessions closing unexpectedly with SessionLimitExceededException.

Causes:

  1. Send buffer too small for message frequency
  2. Slow network causing buffer accumulation
  3. TERMINATE strategy too aggressive

Solutions:

// Increase buffer size
container.setSendBufferSizeLimit(1024 * 1024); // 1 MB

// Use DROP_OLDEST for non-critical messages
container.setSendBufferOverflowStrategy(
    ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP_OLDEST
);

// Increase send timeout
container.setSendTimeLimit(30000); // 30 seconds

// Monitor buffer usage
// Implement message rate limiting if needed

Issue: Client container not connected

Symptoms: IllegalStateException when sending messages.

Causes:

  1. Container not started
  2. Connection failed
  3. Connection lost

Solutions:

// Check connection status
if (!container.isConnected()) {
    // Attempt to reconnect
    container.start();
    
    // Wait for connection with timeout
    int timeout = 10; // seconds
    long start = System.currentTimeMillis();
    while (!container.isConnected() && 
           (System.currentTimeMillis() - start) < timeout * 1000) {
        Thread.sleep(100);
    }
    
    if (!container.isConnected()) {
        throw new IllegalStateException("Failed to connect");
    }
}

Issue: Payload conversion fails

Symptoms: IllegalArgumentException with "Cannot convert payload" message.

Causes:

  1. Payload type not supported by converters
  2. JSON structure doesn't match target type
  3. Custom converters not configured correctly

Solutions:

// Configure appropriate converters
List<MessageConverter> converters = Arrays.asList(
    new MappingJackson2MessageConverter(),
    new StringMessageConverter()
);
handler.setMessageConverters(converters);
handler.setMergeWithDefaultConverters(true);

// Verify payload type is supported
// String, byte[], and JSON-serializable objects are supported by default

Issue: Messages sent but not received by client

Symptoms: No errors but messages not appearing on client.

Causes:

  1. Session closed after message sent
  2. Network issues
  3. Protocol handler not converting correctly
  4. Client not subscribed to destination (STOMP)

Solutions:

// Verify session is open
if (session.isOpen()) {
    outboundHandler.handleMessage(message);
} else {
    logger.warn("Session closed: {}", session.getId());
}

// Check network connectivity
// Verify protocol handler configuration
// For STOMP: ensure client is subscribed to destination

Types

Message Headers

// Key headers for outbound messages
class SimpMessageHeaderAccessor {
    public static final String SESSION_ID_HEADER = "simpSessionId";
    public static final String DESTINATION_HEADER = "simpDestination";
    public static final String MESSAGE_TYPE_HEADER = "simpMessageType";

    public static SimpMessageHeaderAccessor create(SimpMessageType messageType);
    public void setSessionId(String sessionId);
    public void setDestination(String destination);
    public void setMessageTypeIfNotSet(SimpMessageType messageType);
    public void setLeaveMutable(boolean leaveMutable);
}

Session Limit Exceeded Exception

/**
 * Exception thrown when send buffer limit is exceeded.
 * Contains the close status for session termination.
 * Caught and handled internally by handler.
 */
class SessionLimitExceededException extends RuntimeException {
    /**
     * Get the close status that will be used to close the session.
     *
     * @return the close status (never null)
     */
    public CloseStatus getStatus();
}

Message Handling Exception

/**
 * Exception thrown when message handling fails.
 * Contains the original message and detailed error info.
 * Propagates to caller.
 */
class MessageHandlingException extends MessagingException {
    /**
     * Get the message that failed to be handled.
     *
     * @return the failed message (never null)
     */
    public Message<?> getFailedMessage();
    
    /**
     * Get the cause of the failure.
     *
     * @return the cause (may be null)
     */
    public Throwable getCause();
}

Integration Patterns

Request-Reply Pattern

// Outbound handler for sending requests
WebSocketOutboundMessageHandler outbound =
    new WebSocketOutboundMessageHandler(container);
outbound.afterPropertiesSet();

// Inbound adapter for receiving replies
WebSocketInboundChannelAdapter inbound =
    new WebSocketInboundChannelAdapter(container);
DirectChannel replyChannel = new DirectChannel();
inbound.setOutputChannel(replyChannel);
inbound.afterPropertiesSet();
inbound.start();

// Send request with correlation ID
String correlationId = UUID.randomUUID().toString();
Message<String> request = MessageBuilder
    .withPayload("request-data")
    .setHeader("correlationId", correlationId)
    .build();

try {
    outbound.handleMessage(request);
} catch (Exception e) {
    logger.error("Failed to send request", e);
}

// Reply handler matches correlation ID
replyChannel.subscribe(message -> {
    String replyCorrelationId = (String) message.getHeaders().get("correlationId");
    if (correlationId.equals(replyCorrelationId)) {
        // Process matching reply
        processReply(message);
    }
});

Publish-Subscribe Pattern

// Single outbound handler for broadcasting
WebSocketOutboundMessageHandler broadcaster =
    new WebSocketOutboundMessageHandler(serverContainer);
broadcaster.afterPropertiesSet();

// Broadcast to all connected clients
void broadcast(String message) {
    int successCount = 0;
    int failureCount = 0;
    
    for (WebSocketSession session : serverContainer.getSessions().values()) {
        if (session.isOpen()) {
            Map<String, Object> headers = new HashMap<>();
            headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());

            Message<String> msg = MessageBuilder
                .withPayload(message)
                .copyHeaders(headers)
                .build();

            try {
                broadcaster.handleMessage(msg);
                successCount++;
            } catch (Exception e) {
                failureCount++;
                logger.error("Failed to send to session " + session.getId(), e);
            }
        }
    }
    
    logger.info("Broadcast complete: {} success, {} failures", successCount, failureCount);
}

Selective Routing Pattern

// Route messages based on session attributes
void routeMessage(String payload, String userRole) {
    for (WebSocketSession session : serverContainer.getSessions().values()) {
        Map<String, Object> attributes = session.getAttributes();
        String sessionRole = (String) attributes.get("userRole");

        if (userRole.equals(sessionRole) && session.isOpen()) {
            Map<String, Object> headers = new HashMap<>();
            headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());

            Message<String> msg = MessageBuilder
                .withPayload(payload)
                .copyHeaders(headers)
                .build();

            try {
                outboundHandler.handleMessage(msg);
            } catch (Exception e) {
                logger.error("Failed to route message to session " + session.getId(), e);
            }
        }
    }
}

Error Recovery Pattern

// Retry logic with exponential backoff
public void sendWithRetry(Message<?> message, int maxRetries) {
    int retryCount = 0;
    long delay = 100; // Initial delay in ms
    
    while (retryCount < maxRetries) {
        try {
            outboundHandler.handleMessage(message);
            return; // Success
        } catch (MessageHandlingException e) {
            retryCount++;
            if (retryCount >= maxRetries) {
                logger.error("Failed to send after {} retries", maxRetries, e);
                throw e;
            }
            
            // Exponential backoff
            try {
                Thread.sleep(delay);
                delay *= 2; // Double delay for next retry
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted during retry", ie);
            }
        }
    }
}