Spring WebFlux provides comprehensive WebSocket support for building real-time, bidirectional communication between clients and servers. The API includes handler abstractions, session management, message handling, and server-specific implementations for Reactor Netty, Tomcat, Jetty, and standard Java WebSocket (JSR-356).
The WebSocketHandler interface defines the contract for handling WebSocket sessions.
@FunctionalInterface
public interface WebSocketHandler {
// Handle the WebSocket session
Mono<Void> handle(WebSocketSession session);
// Get list of sub-protocols
default List<String> getSubProtocols() {
return Collections.emptyList();
}
}Usage:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
public class EchoWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
session.receive()
.map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText()))
);
}
}
// Chat handler example
public class ChatWebSocketHandler implements WebSocketHandler {
private final Sinks.Many<String> chatSink = Sinks.many().multicast().onBackpressureBuffer();
@Override
public Mono<Void> handle(WebSocketSession session) {
// Receive messages from this session and publish to all subscribers
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(message -> chatSink.tryEmitNext(message))
.then();
// Send all messages to this session
Mono<Void> output = session.send(
chatSink.asFlux()
.map(session::textMessage)
);
return Mono.zip(input, output).then();
}
}The WebSocketSession interface represents a WebSocket session with methods for sending and receiving messages.
public interface WebSocketSession {
// Get session ID
String getId();
// Get handshake info
HandshakeInfo getHandshakeInfo();
// Get data buffer factory
DataBufferFactory bufferFactory();
// Get session attributes
Map<String, Object> getAttributes();
// Receive messages
Flux<WebSocketMessage> receive();
// Send messages
Mono<Void> send(Publisher<WebSocketMessage> messages);
// Close the session
Mono<Void> close();
Mono<Void> close(CloseStatus status);
// Check if session is open
boolean isOpen();
// Create text message
WebSocketMessage textMessage(String payload);
// Create binary message
WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
// Create ping message
WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
// Create pong message
WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
}Usage:
public class MessageWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// Get session ID
String sessionId = session.getId();
// Access handshake info
HandshakeInfo handshake = session.getHandshakeInfo();
URI uri = handshake.getUri();
HttpHeaders headers = handshake.getHeaders();
// Store session attribute
session.getAttributes().put("userId", extractUserId(headers));
// Receive and process messages
Flux<WebSocketMessage> output = session.receive()
.doOnNext(message -> {
if (message.getType() == WebSocketMessage.Type.TEXT) {
String payload = message.getPayloadAsText();
System.out.println("Received: " + payload);
}
})
.map(message -> processMessage(session, message));
return session.send(output);
}
private WebSocketMessage processMessage(WebSocketSession session, WebSocketMessage message) {
String response = "Processed: " + message.getPayloadAsText();
return session.textMessage(response);
}
}The WebSocketMessage class represents a WebSocket message with payload and type.
public class WebSocketMessage {
// Get message type
public Type getType() { ... }
// Get payload as DataBuffer
public DataBuffer getPayload() { ... }
// Get payload as text (for TEXT messages)
public String getPayloadAsText() { ... }
public String getPayloadAsText(Charset charset) { ... }
// Get payload as byte buffer
public ByteBuffer getPayloadAsByteBuffer() { ... }
// Retain the message
public WebSocketMessage retain() { ... }
// Release the message
public void release() { ... }
// Message types
public enum Type {
TEXT,
BINARY,
PING,
PONG
}
}Usage:
public Mono<Void> handle(WebSocketSession session) {
return session.send(
Flux.interval(Duration.ofSeconds(1))
.map(i -> {
// Create text message
return session.textMessage("Tick: " + i);
})
);
}
// Binary message example
public Mono<Void> sendBinaryData(WebSocketSession session, byte[] data) {
return session.send(
Mono.just(session.binaryMessage(factory -> {
DataBuffer buffer = factory.allocateBuffer(data.length);
buffer.write(data);
return buffer;
}))
);
}
// Ping/Pong example
public Mono<Void> sendPing(WebSocketSession session) {
return session.send(
Mono.just(session.pingMessage(factory -> factory.wrap("ping".getBytes())))
);
}The HandshakeInfo class contains information from the WebSocket handshake request.
public class HandshakeInfo {
// Get request URI
public URI getUri() { ... }
// Get request headers
public HttpHeaders getHeaders() { ... }
// Get cookies
public MultiValueMap<String, HttpCookie> getCookies() { ... }
// Get selected sub-protocol
public String getSubProtocol() { ... }
// Get remote address
public InetSocketAddress getRemoteAddress() { ... }
// Get principal
public Mono<Principal> getPrincipal() { ... }
// Get log prefix
public String getLogPrefix() { ... }
}Usage:
public Mono<Void> handle(WebSocketSession session) {
HandshakeInfo info = session.getHandshakeInfo();
// Get request URI and query parameters
URI uri = info.getUri();
String query = uri.getQuery();
// Get authentication header
HttpHeaders headers = info.getHeaders();
String authHeader = headers.getFirst("Authorization");
// Get cookies
MultiValueMap<String, HttpCookie> cookies = info.getCookies();
// Get sub-protocol
String subProtocol = info.getSubProtocol();
// Get remote address
InetSocketAddress remoteAddress = info.getRemoteAddress();
// Get principal (if authenticated)
return info.getPrincipal()
.flatMap(principal -> {
String username = principal.getName();
return handleAuthenticatedSession(session, username);
})
.switchIfEmpty(handleAnonymousSession(session));
}The CloseStatus class represents WebSocket close status codes and reasons.
public final class CloseStatus {
// Get status code
public int getCode() { ... }
// Get reason phrase
public String getReason() { ... }
// Standard close status codes
public static final CloseStatus NORMAL = new CloseStatus(1000);
public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
public static final CloseStatus BAD_DATA = new CloseStatus(1007);
public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);
// Create custom close status
public CloseStatus(int code) { ... }
public CloseStatus(int code, String reason) { ... }
// Create with reason
public CloseStatus withReason(String reason) { ... }
}Usage:
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(payload -> {
if (payload.equals("quit")) {
session.close(CloseStatus.NORMAL).subscribe();
}
})
.then()
.onErrorResume(ex -> {
// Close with error status
return session.close(new CloseStatus(1011, "Internal error: " + ex.getMessage()));
});
}
// Close with custom status
public Mono<Void> closeWithCustomStatus(WebSocketSession session, String reason) {
return session.close(CloseStatus.POLICY_VIOLATION.withReason(reason));
}The WebSocketClient interface provides a contract for initiating WebSocket connections.
public interface WebSocketClient {
// Execute WebSocket handshake
Mono<Void> execute(URI url, WebSocketHandler handler);
// Execute with headers
Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler);
// Execute with full request details
default Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler, String... subProtocols) { ... }
}Server-specific client implementations:
public class ReactorNettyWebSocketClient implements WebSocketClient {
public ReactorNettyWebSocketClient() { ... }
public ReactorNettyWebSocketClient(HttpClient httpClient) { ... }
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}public class StandardWebSocketClient implements WebSocketClient {
public StandardWebSocketClient() { ... }
public StandardWebSocketClient(WebSocketContainer webSocketContainer) { ... }
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}public class TomcatWebSocketClient implements WebSocketClient {
public TomcatWebSocketClient() { ... }
public TomcatWebSocketClient(org.apache.tomcat.websocket.WsWebSocketContainer webSocketContainer) { ... }
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}public class JettyWebSocketClient implements WebSocketClient {
public JettyWebSocketClient() { ... }
public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient client) { ... }
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}Usage:
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;
public class WebSocketClientExample {
public void connectToWebSocket() {
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = URI.create("ws://localhost:8080/websocket");
WebSocketHandler handler = session -> {
return session.send(
Mono.just(session.textMessage("Hello Server"))
).then(
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(message -> System.out.println("Received: " + message))
.then()
);
};
client.execute(url, handler).subscribe();
}
// With custom headers
public void connectWithHeaders() {
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = URI.create("ws://localhost:8080/websocket");
HttpHeaders headers = new HttpHeaders();
headers.add("Authorization", "Bearer token123");
WebSocketHandler handler = session -> {
// Handle session
return session.receive().then();
};
client.execute(url, headers, handler).subscribe();
}
}Server-side WebSocket handling requires registering a handler mapping and adapter.
public interface WebSocketService {
// Handle WebSocket upgrade
Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler);
}public class HandshakeWebSocketService implements WebSocketService {
public HandshakeWebSocketService() { ... }
public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) { ... }
// Set session attribute predicate
public void setSessionAttributePredicate(Predicate<String> predicate) { ... }
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) { ... }
}public interface RequestUpgradeStrategy {
// Upgrade HTTP request to WebSocket
Mono<Void> upgrade(ServerWebExchange exchange,
WebSocketHandler handler,
String subProtocol,
Supplier<HandshakeInfo> handshakeInfoFactory);
}Server-specific upgrade strategies:
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange,
WebSocketHandler handler,
String subProtocol,
Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}public class StandardWebSocketUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange,
WebSocketHandler handler,
String subProtocol,
Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange,
WebSocketHandler handler,
String subProtocol,
Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}public class JettyCoreRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange,
WebSocketHandler handler,
String subProtocol,
Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}The WebSocketHandlerAdapter integrates WebSocket handlers with the WebFlux dispatcher.
public class WebSocketHandlerAdapter implements HandlerAdapter {
public WebSocketHandlerAdapter() { ... }
public WebSocketHandlerAdapter(WebSocketService webSocketService) { ... }
// Set WebSocket service
public void setWebSocketService(WebSocketService webSocketService) { ... }
// Get WebSocket service
public WebSocketService getWebSocketService() { ... }
@Override
public boolean supports(Object handler) { ... }
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { ... }
}Configure WebSocket endpoints using handler mappings:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketHandlerMapping(
EchoWebSocketHandler echoHandler,
ChatWebSocketHandler chatHandler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", echoHandler);
map.put("/chat", chatHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
@Bean
public EchoWebSocketHandler echoHandler() {
return new EchoWebSocketHandler();
}
@Bean
public ChatWebSocketHandler chatHandler() {
return new ChatWebSocketHandler();
}
}public class ReactorNettyWebSocketSession extends AbstractWebSocketSession<ReactorNettyWebSocketSession.WebSocketConnection> {
// Reactor Netty WebSocket session implementation
}public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> {
// Standard Java WebSocket (JSR-356) session implementation
}public class TomcatWebSocketSession extends StandardWebSocketSession {
// Tomcat WebSocket session implementation
}public class JettyWebSocketSession extends AbstractWebSocketSession<org.eclipse.jetty.websocket.api.Session> {
// Jetty WebSocket session implementation
}public abstract class AbstractWebSocketSession<T> implements WebSocketSession {
// Base class for WebSocket session implementations
protected AbstractWebSocketSession(T delegate,
String id,
HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) { ... }
protected abstract Flux<WebSocketMessage> receiveInternal();
protected abstract Mono<Void> sendInternal(Publisher<WebSocketMessage> messages);
protected abstract Mono<Void> closeInternal(CloseStatus status);
}public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T> {
// Base class for listener-based WebSocket sessions
}public abstract class NettyWebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {
// Support class for Netty-based WebSocket sessions
}public class ContextWebSocketHandler implements WebSocketHandler {
// Wrapper that ensures proper context propagation
public ContextWebSocketHandler(WebSocketHandler delegate, ContextView contextView) { ... }
@Override
public List<String> getSubProtocols() { ... }
@Override
public Mono<Void> handle(WebSocketSession session) { ... }
}public class WebSocketUpgradeHandlerPredicate implements Predicate<ServerWebExchange> {
// Predicate for identifying WebSocket upgrade requests
@Override
public boolean test(ServerWebExchange exchange) { ... }
}