or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

containers.mdinbound.mdindex.mdoutbound.mdprotocols.mdsockjs.md
tile.json

inbound.mddocs/

Inbound Message Handling

The inbound message adapter receives WebSocket messages from connected sessions and sends them into Spring Integration message channels. It implements WebSocketListener to handle WebSocket events and converts WebSocket messages to Spring Integration messages with automatic payload conversion.

Capabilities

WebSocketInboundChannelAdapter

Message producer that receives WebSocket messages and publishes them to Integration channels.

/**
 * MessageProducer for inbound WebSocket messages.
 * Implements WebSocketListener to receive WebSocket events and messages.
 *
 * @since 4.1
 */
public class WebSocketInboundChannelAdapter extends MessageProducerSupport
        implements WebSocketListener, ApplicationEventPublisherAware {

    /**
     * Create inbound adapter with default pass-through protocol handler.
     *
     * @param webSocketContainer the container managing WebSocket sessions
     * @throws IllegalArgumentException if container is null
     */
    public WebSocketInboundChannelAdapter(IntegrationWebSocketContainer webSocketContainer);

    /**
     * Create inbound adapter with custom protocol handler registry.
     * Allows for STOMP or custom sub-protocol support.
     *
     * @param webSocketContainer the container managing WebSocket sessions
     * @param protocolHandlerRegistry the sub-protocol handler registry
     * @throws IllegalArgumentException if container or registry is null
     */
    public WebSocketInboundChannelAdapter(
        IntegrationWebSocketContainer webSocketContainer,
        SubProtocolHandlerRegistry protocolHandlerRegistry);

    /**
     * Set message converters for payload type conversion.
     * Converts WebSocket message payloads to the configured payload type.
     * If not set, default converters are used (String, byte[], JSON).
     *
     * @param messageConverters list of MessageConverter instances
     * @throws IllegalArgumentException if converters list is null
     */
    public void setMessageConverters(List<MessageConverter> messageConverters);

    /**
     * Configure whether custom converters should merge with default converters.
     * If true, custom converters are tried first, then defaults.
     * If false, only custom converters are used.
     * Default: true
     *
     * @param mergeWithDefaultConverters true to merge, false to replace
     */
    public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);

    /**
     * Set the target payload type for message conversion.
     * WebSocket message bodies will be converted to this type.
     * Default: String.class
     *
     * @param payloadType the target payload class
     * @throws IllegalArgumentException if payloadType is null
     */
    public void setPayloadType(Class<?> payloadType);

    /**
     * Enable broker message routing for server-side STOMP support.
     * When enabled, uses existing AbstractBrokerMessageHandler bean
     * for non-MESSAGE type messages and broker destination routing.
     * Only applicable on server side; ignored on client side.
     * Default: false
     *
     * @param useBroker true to enable broker routing
     */
    public void setUseBroker(boolean useBroker);

    /**
     * Set the ApplicationEventPublisher for publishing WebSocket events.
     * Used to publish SessionConnectedEvent and ReceiptEvent.
     * If not set, events are not published.
     *
     * @param applicationEventPublisher the event publisher
     */
    @Override
    public void setApplicationEventPublisher(
        ApplicationEventPublisher applicationEventPublisher);

    /**
     * Get the component type identifier.
     *
     * @return "websocket:inbound-channel-adapter"
     */
    @Override
    public String getComponentType();

    /**
     * Check if the adapter is active and accepting messages.
     *
     * @return true if active and running
     */
    public boolean isActive();

    // WebSocketListener implementation
    /**
     * Get supported sub-protocols from the protocol handler registry.
     *
     * @return list of supported sub-protocol names (never null, may be empty)
     */
    @Override
    public List<String> getSubProtocols();

    /**
     * Handle incoming WebSocket message.
     * Delegates to protocol handler for conversion and processing.
     * Thread-safe: can be called concurrently from multiple sessions.
     *
     * @param session the WebSocket session (must not be null)
     * @param message the WebSocket message (must not be null)
     * @throws IllegalStateException if adapter not initialized or not active
     */
    @Override
    public void onMessage(WebSocketSession session, WebSocketMessage<?> message);

    /**
     * Invoked when WebSocket session starts.
     * Notifies protocol handler and sends CONNECT frame for client-side STOMP.
     * Thread-safe: can be called concurrently.
     *
     * @param session the WebSocket session (must not be null)
     */
    @Override
    public void afterSessionStarted(WebSocketSession session);

    /**
     * Invoked when WebSocket session ends.
     * Notifies protocol handler for cleanup.
     * Thread-safe: can be called concurrently.
     *
     * @param session the WebSocket session (must not be null)
     * @param closeStatus the close status (must not be null)
     */
    @Override
    public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
}

Usage Example - Basic:

import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.handler.LoggingHandler;

// Create container
ClientWebSocketContainer container =
    new ClientWebSocketContainer(client, "ws://localhost:8080/websocket");

// Create message channel for received messages
DirectChannel inputChannel = new DirectChannel();

// Create inbound adapter
WebSocketInboundChannelAdapter inboundAdapter =
    new WebSocketInboundChannelAdapter(container);
inboundAdapter.setOutputChannel(inputChannel);

// Configure payload type
inboundAdapter.setPayloadType(String.class);

// Add handler to process messages
inputChannel.subscribe(new LoggingHandler(LoggingHandler.Level.INFO));

// Initialize and start (required)
inboundAdapter.afterPropertiesSet();
inboundAdapter.start();

// Start container to establish connection
container.start();

Usage Example - Custom Message Converters:

import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.Arrays;

// Create custom converters
List<MessageConverter> converters = Arrays.asList(
    new MappingJackson2MessageConverter(),  // JSON to object
    new StringMessageConverter()             // String handling
);

// Create inbound adapter with custom converters
WebSocketInboundChannelAdapter adapter =
    new WebSocketInboundChannelAdapter(container);
adapter.setMessageConverters(converters);
adapter.setMergeWithDefaultConverters(true); // Keep default converters as fallback
adapter.setPayloadType(MyDataClass.class);
adapter.setOutputChannel(outputChannel);

// Initialize
adapter.afterPropertiesSet();
adapter.start();

Usage Example - STOMP Protocol:

import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;

// Create STOMP protocol handler
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();

// Create protocol handler registry
SubProtocolHandlerRegistry registry =
    new SubProtocolHandlerRegistry(stompHandler);

// Create inbound adapter with STOMP support
WebSocketInboundChannelAdapter adapter =
    new WebSocketInboundChannelAdapter(serverContainer, registry);
adapter.setOutputChannel(inputChannel);

// Enable broker routing for server-side
adapter.setUseBroker(true);

adapter.afterPropertiesSet();
adapter.start();

Usage Example - Server-Side with Event Publishing:

import org.springframework.context.event.EventListener;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.integration.websocket.event.ReceiptEvent;

// Create server container
ServerWebSocketContainer serverContainer =
    new ServerWebSocketContainer("/websocket");
serverContainer.setHandshakeHandler(handshakeHandler);

// Create inbound adapter
WebSocketInboundChannelAdapter adapter =
    new WebSocketInboundChannelAdapter(serverContainer);
adapter.setOutputChannel(messageChannel);
adapter.setApplicationEventPublisher(applicationContext);

// Handle connection events
@EventListener
public void handleSessionConnected(SessionConnectedEvent event) {
    Message<byte[]> message = event.getMessage();
    String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
    System.out.println("Client connected: " + sessionId);
}

// Handle STOMP receipt events (client-side)
@EventListener
public void handleReceipt(ReceiptEvent event) {
    System.out.println("Receipt received: " + event.getMessage());
}

adapter.afterPropertiesSet();
adapter.start();

Default Message Converters

The adapter includes three default message converters:

  1. StringMessageConverter: Converts between String and Message
  2. ByteArrayMessageConverter: Converts between byte[] and Message
  3. JacksonJsonMessageConverter: Converts between JSON and objects (if Jackson 3 is present)

When custom converters are configured:

  • With mergeWithDefaultConverters=true: Custom converters are tried first, then defaults
  • With mergeWithDefaultConverters=false: Only custom converters are used

Converter Selection Order:

  1. Custom converters (if configured and mergeWithDefaultConverters=true)
  2. Default converters (String, byte[], JSON)
  3. If no converter matches, conversion fails with exception

Message Flow

  1. WebSocket Message Received: Container calls onMessage() on the adapter
  2. Protocol Handler Processing: Protocol handler converts WebSocketMessage to Spring Message
  3. Payload Conversion: Message converters transform payload to target type
  4. Channel Delivery: Converted message is sent to the configured output channel
  5. Event Publishing: Special message types trigger events (CONNECT_ACK, RECEIPT)

Error Handling in Flow:

  • Protocol handler exceptions are caught and logged
  • Conversion failures are logged but don't crash the adapter
  • Channel send failures are handled by channel's error handler (if configured)

Session Lifecycle

Session Started

  • Protocol handler is notified via afterSessionStarted()
  • For client-side STOMP: CONNECT frame is automatically sent
  • Session is registered in protocol handler cache
  • SessionConnectedEvent is published if ApplicationEventPublisher is set

Thread Safety: afterSessionStarted() is thread-safe and can be called concurrently.

Message Received

  • Protocol handler converts WebSocketMessage to Integration Message
  • Payload is converted to configured type
  • Message headers include session ID and attributes:
    • simpSessionId: WebSocket session ID
    • simpSessionAttributes: Session attributes map
    • simpUser: Authenticated principal (if available)
    • simpMessageType: Message type (MESSAGE, CONNECT, etc.)

Null Handling:

  • If session is null, IllegalArgumentException is thrown
  • If message is null, IllegalArgumentException is thrown
  • If payload conversion fails, exception is logged and message is not sent to channel

Session Ended

  • Protocol handler is notified via afterSessionEnded()
  • Session cleanup is performed
  • Close status is propagated
  • Session is removed from container's session map

Thread Safety: afterSessionEnded() is thread-safe and can be called concurrently.

Error Handling

Initialization Errors

Missing Output Channel:

WebSocketInboundChannelAdapter adapter = new WebSocketInboundChannelAdapter(container);
// Must set output channel before initialization
adapter.setOutputChannel(outputChannel);
adapter.afterPropertiesSet(); // Throws IllegalStateException if channel not set

Null Container:

// Throws IllegalArgumentException
WebSocketInboundChannelAdapter adapter = new WebSocketInboundChannelAdapter(null);

Runtime Errors

Protocol Handler Exceptions:

  • Caught and logged at ERROR level
  • Session remains open
  • Message is not delivered to output channel
  • Adapter continues processing other messages

Message Conversion Failures:

  • Caught and logged at WARN level
  • Original message payload is logged
  • Message is not delivered to output channel
  • Adapter continues processing other messages

Channel Send Failures:

  • Handled by channel's error handler (if configured)
  • If no error handler, exception propagates and may stop adapter
  • Configure error handler for resilience:
DirectChannel channel = new DirectChannel();
channel.setErrorHandler(error -> {
    logger.error("Failed to send message", error);
    // Custom error handling
});
adapter.setOutputChannel(channel);

Session State Errors

Session Already Closed:

// Protocol handler checks session state
// If closed, message is not processed
// No exception thrown, just logged

Session Not Found:

  • Should not occur in normal operation
  • If occurs, indicates container state inconsistency
  • Logged at ERROR level

Thread Safety

Adapter Thread Safety

  • WebSocketInboundChannelAdapter: Thread-safe for concurrent message reception
  • onMessage(): Can be called concurrently from multiple sessions
  • afterSessionStarted(): Thread-safe
  • afterSessionEnded(): Thread-safe

Protocol Handler Thread Safety

  • PassThruSubProtocolHandler: Thread-safe (stateless)
  • StompSubProtocolHandler: Thread-safe (uses thread-local state)
  • Custom handlers: Must be thread-safe if shared across sessions

Best Practice: If creating custom protocol handlers, ensure they are thread-safe or use thread-local state.

Performance Considerations

Message Processing Overhead

  • Protocol conversion: ~0.1-0.5ms per message
  • Payload conversion: Varies by type (String: ~0.1ms, JSON: 1-5ms)
  • Channel send: Depends on channel type (DirectChannel: ~0.01ms, QueueChannel: varies)

Concurrent Message Handling

  • Multiple sessions can send messages concurrently
  • Each message is processed independently
  • No blocking between sessions

Optimization Tips:

  • Use DirectChannel for low-latency requirements
  • Use QueueChannel with executor for high-throughput scenarios
  • Minimize payload conversion overhead by using appropriate types
  • Configure appropriate channel capacity for burst traffic

Memory Usage

  • Each adapter instance: ~1-2 KB
  • Protocol handler state: Varies (PassThru: minimal, STOMP: ~100 bytes per session)
  • Message conversion cache: Minimal (converter-specific)

Troubleshooting

Issue: Messages not received in output channel

Symptoms: WebSocket messages arrive but don't appear in output channel.

Causes:

  1. Adapter not started
  2. Output channel not subscribed
  3. Protocol handler not converting messages
  4. Message conversion failure

Solutions:

// Ensure adapter is started
if (!adapter.isActive()) {
    adapter.start();
}

// Verify channel has subscribers
DirectChannel channel = (DirectChannel) adapter.getOutputChannel();
if (channel.getSubscriberCount() == 0) {
    channel.subscribe(message -> {
        // Process message
    });
}

// Check protocol handler
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(handler);
// Verify handler supports received protocol

// Enable debug logging
logger.debug("Adapter active: {}", adapter.isActive());
logger.debug("Channel subscribers: {}", channel.getSubscriberCount());

Issue: Payload type conversion fails

Symptoms: Messages received but payload type mismatch errors in logs.

Causes:

  1. Payload type not supported by converters
  2. JSON structure doesn't match target type
  3. Custom converters not configured correctly

Solutions:

// Set appropriate payload type
adapter.setPayloadType(String.class); // For text messages

// Configure custom converters
List<MessageConverter> converters = Arrays.asList(
    new MappingJackson2MessageConverter()
);
adapter.setMessageConverters(converters);
adapter.setPayloadType(MyDataClass.class);

// Verify JSON structure matches target class
// Enable converter debug logging

Issue: STOMP broker routing not working

Symptoms: STOMP messages not routed through broker.

Causes:

  1. useBroker not enabled
  2. Broker bean not found in context
  3. Server-side only feature used on client

Solutions:

// Enable broker routing (server-side only)
adapter.setUseBroker(true);

// Ensure broker bean exists
@Bean
public AbstractBrokerMessageHandler brokerMessageHandler() {
    // Configure broker
}

// Verify server-side usage
if (container instanceof ServerWebSocketContainer) {
    adapter.setUseBroker(true);
}

Issue: Events not published

Symptoms: SessionConnectedEvent and ReceiptEvent not received.

Causes:

  1. ApplicationEventPublisher not set
  2. Event listeners not configured
  3. Events only published for specific message types

Solutions:

// Set event publisher
adapter.setApplicationEventPublisher(applicationContext);

// Configure event listeners
@EventListener
public void handleSessionConnected(SessionConnectedEvent event) {
    // Handle event
}

// Verify event types
// SessionConnectedEvent: Published on server-side CONNECT
// ReceiptEvent: Published on client-side RECEIPT frame

Types

WebSocket Message

// From Spring Framework
interface WebSocketMessage<T> {
    T getPayload();
    int getPayloadLength();
    boolean isLast();
}

class TextMessage implements WebSocketMessage<String> {
    public TextMessage(String payload);
    public TextMessage(CharBuffer payload);
}

class BinaryMessage implements WebSocketMessage<ByteBuffer> {
    public BinaryMessage(ByteBuffer payload);
    public BinaryMessage(byte[] payload);
}

class PingMessage implements WebSocketMessage<ByteBuffer> { }
class PongMessage implements WebSocketMessage<ByteBuffer> { }

Message Headers

// Common headers in converted messages
class SimpMessageHeaderAccessor {
    public static final String SESSION_ID_HEADER = "simpSessionId";
    public static final String SESSION_ATTRIBUTES_HEADER = "simpSessionAttributes";
    public static final String USER_HEADER = "simpUser";
    public static final String MESSAGE_TYPE_HEADER = "simpMessageType";
    public static final String DESTINATION_HEADER = "simpDestination";

    public static String getSessionId(MessageHeaders headers);
    public static Map<String, Object> getSessionAttributes(MessageHeaders headers);
    public static Principal getUser(MessageHeaders headers);
    public static SimpMessageType getMessageType(MessageHeaders headers);
    public static String getDestination(MessageHeaders headers);
}

Message Types

// From Spring Framework
enum SimpMessageType {
    CONNECT,        // Client connection request
    CONNECT_ACK,    // Server connection acknowledgment
    MESSAGE,        // Regular message
    SUBSCRIBE,      // STOMP subscribe
    UNSUBSCRIBE,    // STOMP unsubscribe
    HEARTBEAT,      // Heartbeat ping
    DISCONNECT,     // Disconnect request
    DISCONNECT_ACK, // Disconnect acknowledgment
    OTHER           // Other message types
}

Events

/**
 * Event published when WebSocket session is established (server-side)
 * Published only if ApplicationEventPublisher is set on adapter.
 */
class SessionConnectedEvent extends AbstractSubProtocolEvent {
    public Message<byte[]> getMessage();
    public String getSessionId();
}

/**
 * Event published when STOMP RECEIPT frame is received (client-side)
 * Published only if ApplicationEventPublisher is set on adapter.
 * @since 4.1.3
 */
class ReceiptEvent extends AbstractSubProtocolEvent {
    public Message<byte[]> getMessage();
    public String getSessionId();
}

Integration Examples

Integration with Spring Integration Flow

@Configuration
public class WebSocketIntegrationConfig {

    @Bean
    public ServerWebSocketContainer webSocketContainer() {
        ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");
        container.setHandshakeHandler(new DefaultHandshakeHandler());
        return container;
    }

    @Bean
    public WebSocketInboundChannelAdapter inboundAdapter(
            ServerWebSocketContainer container) {
        WebSocketInboundChannelAdapter adapter = 
            new WebSocketInboundChannelAdapter(container);
        adapter.setOutputChannel(inputChannel());
        adapter.setPayloadType(String.class);
        return adapter;
    }

    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow processingFlow() {
        return IntegrationFlows.from(inputChannel())
            .transform(String::toUpperCase)
            .handle(System.out::println)
            .get();
    }
}

Error Handling with Retry

DirectChannel inputChannel = new DirectChannel();
inputChannel.setErrorHandler(error -> {
    // Retry logic
    Message<?> failedMessage = ((MessagingException) error).getFailedMessage();
    // Implement retry with exponential backoff
});

WebSocketInboundChannelAdapter adapter = 
    new WebSocketInboundChannelAdapter(container);
adapter.setOutputChannel(inputChannel);