or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

containers.mdinbound.mdindex.mdoutbound.mdprotocols.mdsockjs.md
tile.json

tessl/maven-spring-integration-websocket

WebSockets Support module for Spring Integration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.springframework.integration/spring-integration-websocket@7.0.x

To install, run

npx @tessl/cli install tessl/maven-spring-integration-websocket@7.0.0

index.mddocs/

Spring Integration WebSocket

Spring Integration WebSocket provides comprehensive WebSocket support for building bidirectional messaging applications within the Spring Integration framework. It bridges WebSocket connections with Spring Integration's message channels, enabling real-time bidirectional communication patterns such as chat applications, live dashboards, collaborative tools, and streaming data feeds.

Package Information

  • Package Name: spring-integration-websocket

  • Package Type: Maven

  • Group ID: org.springframework.integration

  • Artifact ID: spring-integration-websocket

  • Version: 7.0.0

  • Language: Java

  • Installation:

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-websocket</artifactId>
        <version>7.0.0</version>
    </dependency>

    Gradle:

    implementation 'org.springframework.integration:spring-integration-websocket:7.0.0'

Dependencies

Spring Integration WebSocket requires:

  • Spring Framework 6.0+
  • Spring Integration Core 6.0+
  • Spring WebSocket (included in Spring Framework)
  • For STOMP support: Spring Messaging (included in Spring Framework)
  • For SockJS support: Spring WebSocket SockJS (included in Spring Framework)

Maven Dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-websocket</artifactId>
        <version>7.0.0</version>
    </dependency>
    <!-- Spring Integration Core (required) -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
    </dependency>
    <!-- Spring WebSocket (required) -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-websocket</artifactId>
    </dependency>
    <!-- For JSON message conversion (optional) -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

Core Imports

// Core container classes
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.IntegrationWebSocketContainer;
import org.springframework.integration.websocket.WebSocketListener;

// Inbound/Outbound adapters
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;

// Protocol support
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.integration.websocket.support.PassThruSubProtocolHandler;

// Message handling
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;

// WebSocket types
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

Basic Usage

Client-Side Example

import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.integration.channel.DirectChannel;

// Create WebSocket client container
WebSocketClient client = new StandardWebSocketClient();
ClientWebSocketContainer container =
    new ClientWebSocketContainer(client, "ws://localhost:8080/websocket");

// Configure inbound adapter to receive messages
DirectChannel inputChannel = new DirectChannel();
WebSocketInboundChannelAdapter inboundAdapter =
    new WebSocketInboundChannelAdapter(container);
inboundAdapter.setOutputChannel(inputChannel);

// Configure outbound handler to send messages
WebSocketOutboundMessageHandler outboundHandler =
    new WebSocketOutboundMessageHandler(container);

// Initialize adapters
inboundAdapter.afterPropertiesSet();
outboundHandler.afterPropertiesSet();

// Start the container (establishes connection)
container.start();

// Send a message
outboundHandler.handleMessage(new GenericMessage<>("Hello WebSocket!"));

// Process received messages
inputChannel.subscribe(message -> {
    System.out.println("Received: " + message.getPayload());
});

Server-Side Example

import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;

// Create WebSocket server container for specific paths
ServerWebSocketContainer serverContainer =
    new ServerWebSocketContainer("/websocket/messages");

// Set handshake handler (required)
DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(
    new TomcatRequestUpgradeStrategy()
);
serverContainer.setHandshakeHandler(handshakeHandler);

// Configure allowed origins for CORS
serverContainer.setAllowedOrigins("http://localhost:3000");

// Create inbound adapter to receive client messages
DirectChannel inputChannel = new DirectChannel();
WebSocketInboundChannelAdapter inboundAdapter =
    new WebSocketInboundChannelAdapter(serverContainer);
inboundAdapter.setOutputChannel(inputChannel);

// Create outbound handler to send messages to clients
WebSocketOutboundMessageHandler outboundHandler =
    new WebSocketOutboundMessageHandler(serverContainer);

// Initialize and start
inboundAdapter.afterPropertiesSet();
outboundHandler.afterPropertiesSet();
serverContainer.start();

Architecture

Spring Integration WebSocket is built around several key architectural components:

  • Container Layer: IntegrationWebSocketContainer and its implementations (ClientWebSocketContainer, ServerWebSocketContainer) provide high-level connection management and session lifecycle handling, abstracting the complexities of WebSocket connections.

  • Integration Layer: WebSocketInboundChannelAdapter and WebSocketOutboundMessageHandler bridge WebSocket sessions with Spring Integration message channels, enabling seamless integration with existing Spring Integration flows.

  • Protocol Layer: SubProtocolHandlerRegistry and protocol handlers (like PassThruSubProtocolHandler and StompSubProtocolHandler) enable support for various sub-protocols including raw WebSocket, STOMP, and custom protocols.

  • Session Management: Thread-safe session tracking using ConcurrentWebSocketSessionDecorator with configurable send buffer limits and overflow strategies.

  • Lifecycle Management: Full SmartLifecycle support for proper startup, shutdown, and integration with Spring's application context lifecycle.

Capabilities

WebSocket Containers

Core container classes for managing WebSocket connections on both client and server sides. Containers handle connection establishment, session management, and protocol negotiation.

// Abstract base container
public abstract class IntegrationWebSocketContainer implements DisposableBean {
    public static final int DEFAULT_SEND_TIME_LIMIT = 10000; // 10 seconds
    public static final int DEFAULT_SEND_BUFFER_SIZE = 524288; // 512 KB

    public void setMessageListener(WebSocketListener messageListener);
    public void setSupportedProtocols(String... protocols);
    public Map<String, WebSocketSession> getSessions();
    public WebSocketSession getSession(String sessionId);
    public void closeSession(WebSocketSession session, CloseStatus closeStatus);
    public void setSendTimeLimit(int sendTimeLimit);
    public void setSendBufferSizeLimit(int sendBufferSizeLimit);
    public void setSendBufferOverflowStrategy(
        ConcurrentWebSocketSessionDecorator.OverflowStrategy overflowStrategy);
    public void destroy();
}

// Client-side container
public class ClientWebSocketContainer extends IntegrationWebSocketContainer
        implements SmartLifecycle {
    public ClientWebSocketContainer(WebSocketClient client, String uriTemplate, Object... uriVariables);
    public ClientWebSocketContainer(WebSocketClient client, URI uri);
    public void setHeaders(HttpHeaders headers);
    public void setConnectionTimeout(int connectionTimeout);
    public boolean isConnected();
    public void start();
    public void stop();
}

// Server-side container
public class ServerWebSocketContainer extends IntegrationWebSocketContainer
        implements WebSocketConfigurer, SmartLifecycle {
    public ServerWebSocketContainer(String... paths);
    public ServerWebSocketContainer setHandshakeHandler(HandshakeHandler handshakeHandler);
    public ServerWebSocketContainer setAllowedOrigins(String... origins);
    public ServerWebSocketContainer withSockJs(SockJsServiceOptions... sockJsServiceOptions);
    public void start();
    public void stop();
}

WebSocket Containers

Inbound Message Handling

Receive WebSocket messages and send them into Spring Integration message channels. The inbound adapter implements WebSocketListener and converts WebSocket messages to Spring Integration messages.

public class WebSocketInboundChannelAdapter extends MessageProducerSupport
        implements WebSocketListener {
    public WebSocketInboundChannelAdapter(IntegrationWebSocketContainer webSocketContainer);
    public WebSocketInboundChannelAdapter(
        IntegrationWebSocketContainer webSocketContainer,
        SubProtocolHandlerRegistry protocolHandlerRegistry);
    public void setMessageConverters(List<MessageConverter> messageConverters);
    public void setPayloadType(Class<?> payloadType);
    public void setUseBroker(boolean useBroker);
    public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);
    public boolean isActive();
    public void onMessage(WebSocketSession session, WebSocketMessage<?> message);
    public void afterSessionStarted(WebSocketSession session);
    public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
}

Inbound Message Handling

Outbound Message Handling

Send Spring Integration messages to WebSocket sessions. The outbound handler converts Integration messages to WebSocket messages and delivers them to connected clients.

public class WebSocketOutboundMessageHandler extends AbstractMessageHandler {
    public WebSocketOutboundMessageHandler(IntegrationWebSocketContainer webSocketContainer);
    public WebSocketOutboundMessageHandler(
        IntegrationWebSocketContainer webSocketContainer,
        SubProtocolHandlerRegistry protocolHandlerRegistry);
    public void setMessageConverters(List<MessageConverter> messageConverters);
    public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);
    public void handleMessage(Message<?> message);
}

Outbound Message Handling

Sub-Protocol Support

Extensible sub-protocol handling system supporting raw WebSocket, STOMP, and custom protocols. Protocol handlers manage message format conversion and session-specific protocol logic.

// Protocol handler registry
public class SubProtocolHandlerRegistry {
    public SubProtocolHandlerRegistry(List<SubProtocolHandler> protocolHandlers);
    public SubProtocolHandlerRegistry(SubProtocolHandler defaultProtocolHandler);
    public SubProtocolHandler findProtocolHandler(WebSocketSession session);
    public String resolveSessionId(Message<?> message);
}

// Pass-through handler for raw WebSocket
public class PassThruSubProtocolHandler implements SubProtocolHandler {
    public void setSupportedProtocols(String... supportedProtocols);
}

// WebSocket listener interface
public interface WebSocketListener extends SubProtocolCapable {
    void onMessage(WebSocketSession session, WebSocketMessage<?> message);
    void afterSessionStarted(WebSocketSession session);
    void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
}

Sub-Protocol Support

SockJS Fallback Support

SockJS fallback support for browsers without native WebSocket capabilities. Provides HTTP-based transports with automatic fallback and heartbeat management.

// SockJS configuration builder
public static class SockJsServiceOptions {
    public SockJsServiceOptions setTaskScheduler(TaskScheduler taskScheduler);
    public SockJsServiceOptions setClientLibraryUrl(String clientLibraryUrl);
    public SockJsServiceOptions setHeartbeatTime(long heartbeatTime);
    public SockJsServiceOptions setDisconnectDelay(long disconnectDelay);
    public SockJsServiceOptions setWebSocketEnabled(boolean webSocketEnabled);
    public SockJsServiceOptions setSessionCookieNeeded(boolean sessionCookieNeeded);
}

SockJS Fallback

Types

WebSocket Session

// From Spring Framework - included for reference
interface WebSocketSession {
    String getId();
    URI getUri();
    Map<String, String> getHandshakeHeaders();
    Map<String, Object> getAttributes();
    Principal getPrincipal();
    String getAcceptedProtocol();
    void setTextMessageSizeLimit(int messageSizeLimit);
    void setBinaryMessageSizeLimit(int messageSizeLimit);
    List<WebSocketExtension> getExtensions();
    boolean isOpen();
    void sendMessage(WebSocketMessage<?> message) throws IOException;
    void close() throws IOException;
    void close(CloseStatus status) throws IOException;
}

Close Status

// From Spring Framework - included for reference
class CloseStatus {
    public static final CloseStatus NORMAL = new CloseStatus(1000);
    public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
    public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
    public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
    public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
    public static final CloseStatus SESSION_NOT_RELIABLE = new CloseStatus(4500);

    public CloseStatus(int code);
    public CloseStatus(int code, String reason);
    public int getCode();
    public String getReason();
}

Message Converter

// From Spring Framework - included for reference
interface MessageConverter {
    boolean canConvertFrom(Message<?> message, Class<?> targetClass);
    boolean canConvertTo(Object payload, Class<?> targetClass);
    Object fromMessage(Message<?> message, Class<?> targetClass);
    Message<?> toMessage(Object payload, MessageHeaders headers);
}

Thread Safety

Container Thread Safety

  • IntegrationWebSocketContainer: Thread-safe for concurrent session access
  • ClientWebSocketContainer: Thread-safe for concurrent operations
  • ServerWebSocketContainer: Thread-safe for concurrent client connections
  • Session Map: Uses ConcurrentHashMap internally for thread-safe session storage

Adapter Thread Safety

  • WebSocketInboundChannelAdapter: Thread-safe for concurrent message reception
  • WebSocketOutboundMessageHandler: Thread-safe for concurrent message sending
  • Protocol Handlers: Must be thread-safe if shared across sessions

Important: Custom protocol handlers and message converters must be thread-safe when used in multi-session scenarios.

Performance Considerations

Send Buffer Management

  • Default buffer size: 512 KB per session
  • Default send timeout: 10 seconds
  • Overflow strategies: TERMINATE, DROP_OLDEST, DROP_CURRENT
  • Concurrent sends: Uses ConcurrentWebSocketSessionDecorator for thread-safe buffering

Message Conversion

  • String messages: Minimal overhead (~0.1ms)
  • JSON conversion: Varies with payload size (typically 1-5ms)
  • Binary messages: Direct pass-through, minimal overhead

Session Management

  • Session lookup: O(1) via ConcurrentHashMap
  • Session cleanup: Automatic on disconnect
  • Memory usage: ~1-2 KB per active session

Optimization Tips:

  • Configure appropriate buffer sizes based on message frequency
  • Use DROP_OLDEST strategy for high-frequency, non-critical messages
  • Use TERMINATE strategy for critical messages requiring delivery guarantee
  • Monitor session count and implement session limits if needed

Error Handling

Connection Errors

Client Connection Failures:

try {
    container.start();
} catch (Exception e) {
    // Handle connection failure
    // Container will not be in running state
    // Retry logic should be implemented at application level
}

Server Handshake Failures:

  • Handled by HandshakeHandler
  • Failed handshakes don't create sessions
  • Errors logged but don't affect existing sessions

Message Send Errors

SessionLimitExceededException:

try {
    outboundHandler.handleMessage(message);
} catch (SessionLimitExceededException e) {
    // Buffer limit exceeded
    // Session may be closed depending on overflow strategy
    CloseStatus status = e.getStatus();
    // Handle accordingly
}

IllegalArgumentException:

  • Thrown when session ID missing in server mode
  • Thrown when payload type unsupported
  • Always check session existence before sending

Protocol Handler Errors

  • Protocol handler exceptions are caught and logged
  • Failed message conversions don't crash the adapter
  • Session remains open after handler errors

Troubleshooting

Issue: Client container fails to connect

Symptoms: container.start() throws exception or isConnected() returns false.

Causes:

  1. Server not running or wrong URI
  2. Network connectivity issues
  3. Handshake failure (protocol mismatch, CORS, etc.)

Solutions:

// Verify URI and network
URI uri = URI.create("ws://localhost:8080/websocket");
container = new ClientWebSocketContainer(client, uri);

// Check connection status
if (!container.isConnected()) {
    // Implement retry logic
    container.start();
}

// Verify protocols match
container.setSupportedProtocols("v1.protocol");

Issue: Server container not accepting connections

Symptoms: Clients cannot establish WebSocket connections.

Causes:

  1. Handshake handler not configured
  2. CORS restrictions
  3. Path not registered

Solutions:

// Ensure handshake handler is set (required)
serverContainer.setHandshakeHandler(new DefaultHandshakeHandler());

// Configure CORS if needed
serverContainer.setAllowedOrigins("*"); // Or specific origins

// Verify paths are correct
ServerWebSocketContainer container = new ServerWebSocketContainer("/websocket");

Issue: Messages not received

Symptoms: Messages sent but not appearing in output channel.

Causes:

  1. Adapter not started
  2. Output channel not subscribed
  3. Protocol handler not converting messages correctly

Solutions:

// Ensure adapter is initialized and started
inboundAdapter.afterPropertiesSet();
inboundAdapter.start();

// Verify channel subscription
DirectChannel channel = (DirectChannel) inboundAdapter.getOutputChannel();
// Channel must have subscribers

// Check protocol handler configuration
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(handler);
WebSocketInboundChannelAdapter adapter = 
    new WebSocketInboundChannelAdapter(container, registry);

Issue: Session ID not found when sending

Symptoms: IllegalArgumentException when sending messages in server mode.

Causes:

  1. Session ID not in message headers
  2. Session already closed
  3. Wrong session ID

Solutions:

// Always include session ID in headers
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId);

Message<String> message = MessageBuilder
    .withPayload("Hello")
    .copyHeaders(headers)
    .build();

// Verify session exists before sending
WebSocketSession session = container.getSession(sessionId);
if (session != null && session.isOpen()) {
    outboundHandler.handleMessage(message);
}

Issue: Buffer overflow causing session termination

Symptoms: Sessions closing unexpectedly with SessionLimitExceededException.

Causes:

  1. Send buffer too small for message frequency
  2. Slow network causing buffer accumulation
  3. TERMINATE strategy too aggressive

Solutions:

// Increase buffer size
container.setSendBufferSizeLimit(1024 * 1024); // 1 MB

// Use DROP_OLDEST for non-critical messages
container.setSendBufferOverflowStrategy(
    ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP_OLDEST
);

// Increase send timeout
container.setSendTimeLimit(30000); // 30 seconds

Integration Patterns

Request-Reply Pattern

// Outbound handler for sending requests
WebSocketOutboundMessageHandler outbound =
    new WebSocketOutboundMessageHandler(container);

// Inbound adapter for receiving replies
WebSocketInboundChannelAdapter inbound =
    new WebSocketInboundChannelAdapter(container);
DirectChannel replyChannel = new DirectChannel();
inbound.setOutputChannel(replyChannel);

// Initialize and start
inbound.afterPropertiesSet();
outbound.afterPropertiesSet();
container.start();

// Send request with correlation ID
String correlationId = UUID.randomUUID().toString();
Message<String> request = MessageBuilder
    .withPayload("request-data")
    .setHeader("correlationId", correlationId)
    .build();
outbound.handleMessage(request);

// Reply handler matches correlation ID
replyChannel.subscribe(message -> {
    String replyCorrelationId = (String) message.getHeaders().get("correlationId");
    if (correlationId.equals(replyCorrelationId)) {
        // Process matching reply
    }
});

Publish-Subscribe Pattern

// Single outbound handler for broadcasting
WebSocketOutboundMessageHandler broadcaster =
    new WebSocketOutboundMessageHandler(serverContainer);

// Broadcast to all connected clients
void broadcast(String message) {
    for (WebSocketSession session : serverContainer.getSessions().values()) {
        if (session.isOpen()) {
            Map<String, Object> headers = new HashMap<>();
            headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());

            Message<String> msg = MessageBuilder
                .withPayload(message)
                .copyHeaders(headers)
                .build();

            try {
                broadcaster.handleMessage(msg);
            } catch (Exception e) {
                // Handle per-session errors
                logger.error("Failed to send to session " + session.getId(), e);
            }
        }
    }
}

Selective Routing Pattern

// Route messages based on session attributes
void routeMessage(String payload, String userRole) {
    for (WebSocketSession session : serverContainer.getSessions().values()) {
        Map<String, Object> attributes = session.getAttributes();
        String sessionRole = (String) attributes.get("userRole");

        if (userRole.equals(sessionRole) && session.isOpen()) {
            Map<String, Object> headers = new HashMap<>();
            headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());

            Message<String> msg = MessageBuilder
                .withPayload(payload)
                .copyHeaders(headers)
                .build();

            outboundHandler.handleMessage(msg);
        }
    }
}

Lifecycle Management

Container Lifecycle

Client Container:

ClientWebSocketContainer container = new ClientWebSocketContainer(client, uri);

// Configure lifecycle
container.setAutoStartup(true);  // Auto-start on application context startup
container.setPhase(100);         // Startup phase

// Manual control
container.start();   // Establish connection
container.stop();    // Close connection and cleanup

Server Container:

ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");

// Configure lifecycle
container.setAutoStartup(true);
container.setPhase(100);

// Manual control
container.start();   // Register handlers and start accepting connections
container.stop();    // Close all sessions and cleanup

Adapter Lifecycle

Inbound Adapter:

WebSocketInboundChannelAdapter adapter = new WebSocketInboundChannelAdapter(container);

// Required initialization
adapter.afterPropertiesSet();  // Validate configuration
adapter.start();               // Begin receiving messages

// Cleanup
adapter.stop();                // Stop receiving messages

Outbound Handler:

WebSocketOutboundMessageHandler handler = new WebSocketOutboundMessageHandler(container);

// Required initialization
handler.afterPropertiesSet();  // Validate configuration

// No explicit start/stop needed - ready after initialization

Important: Always call afterPropertiesSet() before using adapters. For inbound adapters, also call start() to begin message reception.