WebSockets Support module for Spring Integration
npx @tessl/cli install tessl/maven-spring-integration-websocket@7.0.0Spring Integration WebSocket provides comprehensive WebSocket support for building bidirectional messaging applications within the Spring Integration framework. It bridges WebSocket connections with Spring Integration's message channels, enabling real-time bidirectional communication patterns such as chat applications, live dashboards, collaborative tools, and streaming data feeds.
Package Name: spring-integration-websocket
Package Type: Maven
Group ID: org.springframework.integration
Artifact ID: spring-integration-websocket
Version: 7.0.0
Language: Java
Installation:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-websocket:7.0.0'Spring Integration WebSocket requires:
Maven Dependencies:
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>7.0.0</version>
</dependency>
<!-- Spring Integration Core (required) -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<!-- Spring WebSocket (required) -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
</dependency>
<!-- For JSON message conversion (optional) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>// Core container classes
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.IntegrationWebSocketContainer;
import org.springframework.integration.websocket.WebSocketListener;
// Inbound/Outbound adapters
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
// Protocol support
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
import org.springframework.integration.websocket.support.PassThruSubProtocolHandler;
// Message handling
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
// WebSocket types
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.integration.channel.DirectChannel;
// Create WebSocket client container
WebSocketClient client = new StandardWebSocketClient();
ClientWebSocketContainer container =
new ClientWebSocketContainer(client, "ws://localhost:8080/websocket");
// Configure inbound adapter to receive messages
DirectChannel inputChannel = new DirectChannel();
WebSocketInboundChannelAdapter inboundAdapter =
new WebSocketInboundChannelAdapter(container);
inboundAdapter.setOutputChannel(inputChannel);
// Configure outbound handler to send messages
WebSocketOutboundMessageHandler outboundHandler =
new WebSocketOutboundMessageHandler(container);
// Initialize adapters
inboundAdapter.afterPropertiesSet();
outboundHandler.afterPropertiesSet();
// Start the container (establishes connection)
container.start();
// Send a message
outboundHandler.handleMessage(new GenericMessage<>("Hello WebSocket!"));
// Process received messages
inputChannel.subscribe(message -> {
System.out.println("Received: " + message.getPayload());
});import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
// Create WebSocket server container for specific paths
ServerWebSocketContainer serverContainer =
new ServerWebSocketContainer("/websocket/messages");
// Set handshake handler (required)
DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(
new TomcatRequestUpgradeStrategy()
);
serverContainer.setHandshakeHandler(handshakeHandler);
// Configure allowed origins for CORS
serverContainer.setAllowedOrigins("http://localhost:3000");
// Create inbound adapter to receive client messages
DirectChannel inputChannel = new DirectChannel();
WebSocketInboundChannelAdapter inboundAdapter =
new WebSocketInboundChannelAdapter(serverContainer);
inboundAdapter.setOutputChannel(inputChannel);
// Create outbound handler to send messages to clients
WebSocketOutboundMessageHandler outboundHandler =
new WebSocketOutboundMessageHandler(serverContainer);
// Initialize and start
inboundAdapter.afterPropertiesSet();
outboundHandler.afterPropertiesSet();
serverContainer.start();Spring Integration WebSocket is built around several key architectural components:
Container Layer: IntegrationWebSocketContainer and its implementations (ClientWebSocketContainer, ServerWebSocketContainer) provide high-level connection management and session lifecycle handling, abstracting the complexities of WebSocket connections.
Integration Layer: WebSocketInboundChannelAdapter and WebSocketOutboundMessageHandler bridge WebSocket sessions with Spring Integration message channels, enabling seamless integration with existing Spring Integration flows.
Protocol Layer: SubProtocolHandlerRegistry and protocol handlers (like PassThruSubProtocolHandler and StompSubProtocolHandler) enable support for various sub-protocols including raw WebSocket, STOMP, and custom protocols.
Session Management: Thread-safe session tracking using ConcurrentWebSocketSessionDecorator with configurable send buffer limits and overflow strategies.
Lifecycle Management: Full SmartLifecycle support for proper startup, shutdown, and integration with Spring's application context lifecycle.
Core container classes for managing WebSocket connections on both client and server sides. Containers handle connection establishment, session management, and protocol negotiation.
// Abstract base container
public abstract class IntegrationWebSocketContainer implements DisposableBean {
public static final int DEFAULT_SEND_TIME_LIMIT = 10000; // 10 seconds
public static final int DEFAULT_SEND_BUFFER_SIZE = 524288; // 512 KB
public void setMessageListener(WebSocketListener messageListener);
public void setSupportedProtocols(String... protocols);
public Map<String, WebSocketSession> getSessions();
public WebSocketSession getSession(String sessionId);
public void closeSession(WebSocketSession session, CloseStatus closeStatus);
public void setSendTimeLimit(int sendTimeLimit);
public void setSendBufferSizeLimit(int sendBufferSizeLimit);
public void setSendBufferOverflowStrategy(
ConcurrentWebSocketSessionDecorator.OverflowStrategy overflowStrategy);
public void destroy();
}
// Client-side container
public class ClientWebSocketContainer extends IntegrationWebSocketContainer
implements SmartLifecycle {
public ClientWebSocketContainer(WebSocketClient client, String uriTemplate, Object... uriVariables);
public ClientWebSocketContainer(WebSocketClient client, URI uri);
public void setHeaders(HttpHeaders headers);
public void setConnectionTimeout(int connectionTimeout);
public boolean isConnected();
public void start();
public void stop();
}
// Server-side container
public class ServerWebSocketContainer extends IntegrationWebSocketContainer
implements WebSocketConfigurer, SmartLifecycle {
public ServerWebSocketContainer(String... paths);
public ServerWebSocketContainer setHandshakeHandler(HandshakeHandler handshakeHandler);
public ServerWebSocketContainer setAllowedOrigins(String... origins);
public ServerWebSocketContainer withSockJs(SockJsServiceOptions... sockJsServiceOptions);
public void start();
public void stop();
}Receive WebSocket messages and send them into Spring Integration message channels. The inbound adapter implements WebSocketListener and converts WebSocket messages to Spring Integration messages.
public class WebSocketInboundChannelAdapter extends MessageProducerSupport
implements WebSocketListener {
public WebSocketInboundChannelAdapter(IntegrationWebSocketContainer webSocketContainer);
public WebSocketInboundChannelAdapter(
IntegrationWebSocketContainer webSocketContainer,
SubProtocolHandlerRegistry protocolHandlerRegistry);
public void setMessageConverters(List<MessageConverter> messageConverters);
public void setPayloadType(Class<?> payloadType);
public void setUseBroker(boolean useBroker);
public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);
public boolean isActive();
public void onMessage(WebSocketSession session, WebSocketMessage<?> message);
public void afterSessionStarted(WebSocketSession session);
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
}Send Spring Integration messages to WebSocket sessions. The outbound handler converts Integration messages to WebSocket messages and delivers them to connected clients.
public class WebSocketOutboundMessageHandler extends AbstractMessageHandler {
public WebSocketOutboundMessageHandler(IntegrationWebSocketContainer webSocketContainer);
public WebSocketOutboundMessageHandler(
IntegrationWebSocketContainer webSocketContainer,
SubProtocolHandlerRegistry protocolHandlerRegistry);
public void setMessageConverters(List<MessageConverter> messageConverters);
public void setMergeWithDefaultConverters(boolean mergeWithDefaultConverters);
public void handleMessage(Message<?> message);
}Extensible sub-protocol handling system supporting raw WebSocket, STOMP, and custom protocols. Protocol handlers manage message format conversion and session-specific protocol logic.
// Protocol handler registry
public class SubProtocolHandlerRegistry {
public SubProtocolHandlerRegistry(List<SubProtocolHandler> protocolHandlers);
public SubProtocolHandlerRegistry(SubProtocolHandler defaultProtocolHandler);
public SubProtocolHandler findProtocolHandler(WebSocketSession session);
public String resolveSessionId(Message<?> message);
}
// Pass-through handler for raw WebSocket
public class PassThruSubProtocolHandler implements SubProtocolHandler {
public void setSupportedProtocols(String... supportedProtocols);
}
// WebSocket listener interface
public interface WebSocketListener extends SubProtocolCapable {
void onMessage(WebSocketSession session, WebSocketMessage<?> message);
void afterSessionStarted(WebSocketSession session);
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus);
}SockJS fallback support for browsers without native WebSocket capabilities. Provides HTTP-based transports with automatic fallback and heartbeat management.
// SockJS configuration builder
public static class SockJsServiceOptions {
public SockJsServiceOptions setTaskScheduler(TaskScheduler taskScheduler);
public SockJsServiceOptions setClientLibraryUrl(String clientLibraryUrl);
public SockJsServiceOptions setHeartbeatTime(long heartbeatTime);
public SockJsServiceOptions setDisconnectDelay(long disconnectDelay);
public SockJsServiceOptions setWebSocketEnabled(boolean webSocketEnabled);
public SockJsServiceOptions setSessionCookieNeeded(boolean sessionCookieNeeded);
}// From Spring Framework - included for reference
interface WebSocketSession {
String getId();
URI getUri();
Map<String, String> getHandshakeHeaders();
Map<String, Object> getAttributes();
Principal getPrincipal();
String getAcceptedProtocol();
void setTextMessageSizeLimit(int messageSizeLimit);
void setBinaryMessageSizeLimit(int messageSizeLimit);
List<WebSocketExtension> getExtensions();
boolean isOpen();
void sendMessage(WebSocketMessage<?> message) throws IOException;
void close() throws IOException;
void close(CloseStatus status) throws IOException;
}// From Spring Framework - included for reference
class CloseStatus {
public static final CloseStatus NORMAL = new CloseStatus(1000);
public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
public static final CloseStatus SESSION_NOT_RELIABLE = new CloseStatus(4500);
public CloseStatus(int code);
public CloseStatus(int code, String reason);
public int getCode();
public String getReason();
}// From Spring Framework - included for reference
interface MessageConverter {
boolean canConvertFrom(Message<?> message, Class<?> targetClass);
boolean canConvertTo(Object payload, Class<?> targetClass);
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, MessageHeaders headers);
}ConcurrentHashMap internally for thread-safe session storageImportant: Custom protocol handlers and message converters must be thread-safe when used in multi-session scenarios.
ConcurrentWebSocketSessionDecorator for thread-safe bufferingOptimization Tips:
Client Connection Failures:
try {
container.start();
} catch (Exception e) {
// Handle connection failure
// Container will not be in running state
// Retry logic should be implemented at application level
}Server Handshake Failures:
HandshakeHandlerSessionLimitExceededException:
try {
outboundHandler.handleMessage(message);
} catch (SessionLimitExceededException e) {
// Buffer limit exceeded
// Session may be closed depending on overflow strategy
CloseStatus status = e.getStatus();
// Handle accordingly
}IllegalArgumentException:
Symptoms: container.start() throws exception or isConnected() returns false.
Causes:
Solutions:
// Verify URI and network
URI uri = URI.create("ws://localhost:8080/websocket");
container = new ClientWebSocketContainer(client, uri);
// Check connection status
if (!container.isConnected()) {
// Implement retry logic
container.start();
}
// Verify protocols match
container.setSupportedProtocols("v1.protocol");Symptoms: Clients cannot establish WebSocket connections.
Causes:
Solutions:
// Ensure handshake handler is set (required)
serverContainer.setHandshakeHandler(new DefaultHandshakeHandler());
// Configure CORS if needed
serverContainer.setAllowedOrigins("*"); // Or specific origins
// Verify paths are correct
ServerWebSocketContainer container = new ServerWebSocketContainer("/websocket");Symptoms: Messages sent but not appearing in output channel.
Causes:
Solutions:
// Ensure adapter is initialized and started
inboundAdapter.afterPropertiesSet();
inboundAdapter.start();
// Verify channel subscription
DirectChannel channel = (DirectChannel) inboundAdapter.getOutputChannel();
// Channel must have subscribers
// Check protocol handler configuration
SubProtocolHandlerRegistry registry = new SubProtocolHandlerRegistry(handler);
WebSocketInboundChannelAdapter adapter =
new WebSocketInboundChannelAdapter(container, registry);Symptoms: IllegalArgumentException when sending messages in server mode.
Causes:
Solutions:
// Always include session ID in headers
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId);
Message<String> message = MessageBuilder
.withPayload("Hello")
.copyHeaders(headers)
.build();
// Verify session exists before sending
WebSocketSession session = container.getSession(sessionId);
if (session != null && session.isOpen()) {
outboundHandler.handleMessage(message);
}Symptoms: Sessions closing unexpectedly with SessionLimitExceededException.
Causes:
Solutions:
// Increase buffer size
container.setSendBufferSizeLimit(1024 * 1024); // 1 MB
// Use DROP_OLDEST for non-critical messages
container.setSendBufferOverflowStrategy(
ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP_OLDEST
);
// Increase send timeout
container.setSendTimeLimit(30000); // 30 seconds// Outbound handler for sending requests
WebSocketOutboundMessageHandler outbound =
new WebSocketOutboundMessageHandler(container);
// Inbound adapter for receiving replies
WebSocketInboundChannelAdapter inbound =
new WebSocketInboundChannelAdapter(container);
DirectChannel replyChannel = new DirectChannel();
inbound.setOutputChannel(replyChannel);
// Initialize and start
inbound.afterPropertiesSet();
outbound.afterPropertiesSet();
container.start();
// Send request with correlation ID
String correlationId = UUID.randomUUID().toString();
Message<String> request = MessageBuilder
.withPayload("request-data")
.setHeader("correlationId", correlationId)
.build();
outbound.handleMessage(request);
// Reply handler matches correlation ID
replyChannel.subscribe(message -> {
String replyCorrelationId = (String) message.getHeaders().get("correlationId");
if (correlationId.equals(replyCorrelationId)) {
// Process matching reply
}
});// Single outbound handler for broadcasting
WebSocketOutboundMessageHandler broadcaster =
new WebSocketOutboundMessageHandler(serverContainer);
// Broadcast to all connected clients
void broadcast(String message) {
for (WebSocketSession session : serverContainer.getSessions().values()) {
if (session.isOpen()) {
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());
Message<String> msg = MessageBuilder
.withPayload(message)
.copyHeaders(headers)
.build();
try {
broadcaster.handleMessage(msg);
} catch (Exception e) {
// Handle per-session errors
logger.error("Failed to send to session " + session.getId(), e);
}
}
}
}// Route messages based on session attributes
void routeMessage(String payload, String userRole) {
for (WebSocketSession session : serverContainer.getSessions().values()) {
Map<String, Object> attributes = session.getAttributes();
String sessionRole = (String) attributes.get("userRole");
if (userRole.equals(sessionRole) && session.isOpen()) {
Map<String, Object> headers = new HashMap<>();
headers.put(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId());
Message<String> msg = MessageBuilder
.withPayload(payload)
.copyHeaders(headers)
.build();
outboundHandler.handleMessage(msg);
}
}
}Client Container:
ClientWebSocketContainer container = new ClientWebSocketContainer(client, uri);
// Configure lifecycle
container.setAutoStartup(true); // Auto-start on application context startup
container.setPhase(100); // Startup phase
// Manual control
container.start(); // Establish connection
container.stop(); // Close connection and cleanupServer Container:
ServerWebSocketContainer container = new ServerWebSocketContainer("/ws");
// Configure lifecycle
container.setAutoStartup(true);
container.setPhase(100);
// Manual control
container.start(); // Register handlers and start accepting connections
container.stop(); // Close all sessions and cleanupInbound Adapter:
WebSocketInboundChannelAdapter adapter = new WebSocketInboundChannelAdapter(container);
// Required initialization
adapter.afterPropertiesSet(); // Validate configuration
adapter.start(); // Begin receiving messages
// Cleanup
adapter.stop(); // Stop receiving messagesOutbound Handler:
WebSocketOutboundMessageHandler handler = new WebSocketOutboundMessageHandler(container);
// Required initialization
handler.afterPropertiesSet(); // Validate configuration
// No explicit start/stop needed - ready after initializationImportant: Always call afterPropertiesSet() before using adapters. For inbound adapters, also call start() to begin message reception.