or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

annotation-controllers.mdconfiguration.mdcontent-negotiation.mdexception-handling.mdfunctional-routing.mdindex.mdresource-handling.mdview-rendering.mdwebclient.mdwebsocket.md
tile.json

websocket.mddocs/

WebSocket Support

Spring WebFlux provides comprehensive WebSocket support for building real-time, bidirectional communication between clients and servers. The API includes handler abstractions, session management, message handling, and server-specific implementations for Reactor Netty, Tomcat, Jetty, and standard Java WebSocket (JSR-356).

Capabilities

Handle WebSocket Sessions

The WebSocketHandler interface defines the contract for handling WebSocket sessions.

@FunctionalInterface
public interface WebSocketHandler {
    // Handle the WebSocket session
    Mono<Void> handle(WebSocketSession session);

    // Get list of sub-protocols
    default List<String> getSubProtocols() {
        return Collections.emptyList();
    }
}

Usage:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;

public class EchoWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(
            session.receive()
                .map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText()))
        );
    }
}

// Chat handler example
public class ChatWebSocketHandler implements WebSocketHandler {

    private final Sinks.Many<String> chatSink = Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // Receive messages from this session and publish to all subscribers
        Mono<Void> input = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(message -> chatSink.tryEmitNext(message))
            .then();

        // Send all messages to this session
        Mono<Void> output = session.send(
            chatSink.asFlux()
                .map(session::textMessage)
        );

        return Mono.zip(input, output).then();
    }
}

WebSocket Session

The WebSocketSession interface represents a WebSocket session with methods for sending and receiving messages.

public interface WebSocketSession {
    // Get session ID
    String getId();

    // Get handshake info
    HandshakeInfo getHandshakeInfo();

    // Get data buffer factory
    DataBufferFactory bufferFactory();

    // Get session attributes
    Map<String, Object> getAttributes();

    // Receive messages
    Flux<WebSocketMessage> receive();

    // Send messages
    Mono<Void> send(Publisher<WebSocketMessage> messages);

    // Close the session
    Mono<Void> close();
    Mono<Void> close(CloseStatus status);

    // Check if session is open
    boolean isOpen();

    // Create text message
    WebSocketMessage textMessage(String payload);

    // Create binary message
    WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

    // Create ping message
    WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

    // Create pong message
    WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
}

Usage:

public class MessageWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // Get session ID
        String sessionId = session.getId();

        // Access handshake info
        HandshakeInfo handshake = session.getHandshakeInfo();
        URI uri = handshake.getUri();
        HttpHeaders headers = handshake.getHeaders();

        // Store session attribute
        session.getAttributes().put("userId", extractUserId(headers));

        // Receive and process messages
        Flux<WebSocketMessage> output = session.receive()
            .doOnNext(message -> {
                if (message.getType() == WebSocketMessage.Type.TEXT) {
                    String payload = message.getPayloadAsText();
                    System.out.println("Received: " + payload);
                }
            })
            .map(message -> processMessage(session, message));

        return session.send(output);
    }

    private WebSocketMessage processMessage(WebSocketSession session, WebSocketMessage message) {
        String response = "Processed: " + message.getPayloadAsText();
        return session.textMessage(response);
    }
}

WebSocket Messages

The WebSocketMessage class represents a WebSocket message with payload and type.

public class WebSocketMessage {
    // Get message type
    public Type getType() { ... }

    // Get payload as DataBuffer
    public DataBuffer getPayload() { ... }

    // Get payload as text (for TEXT messages)
    public String getPayloadAsText() { ... }
    public String getPayloadAsText(Charset charset) { ... }

    // Get payload as byte buffer
    public ByteBuffer getPayloadAsByteBuffer() { ... }

    // Retain the message
    public WebSocketMessage retain() { ... }

    // Release the message
    public void release() { ... }

    // Message types
    public enum Type {
        TEXT,
        BINARY,
        PING,
        PONG
    }
}

Usage:

public Mono<Void> handle(WebSocketSession session) {
    return session.send(
        Flux.interval(Duration.ofSeconds(1))
            .map(i -> {
                // Create text message
                return session.textMessage("Tick: " + i);
            })
    );
}

// Binary message example
public Mono<Void> sendBinaryData(WebSocketSession session, byte[] data) {
    return session.send(
        Mono.just(session.binaryMessage(factory -> {
            DataBuffer buffer = factory.allocateBuffer(data.length);
            buffer.write(data);
            return buffer;
        }))
    );
}

// Ping/Pong example
public Mono<Void> sendPing(WebSocketSession session) {
    return session.send(
        Mono.just(session.pingMessage(factory -> factory.wrap("ping".getBytes())))
    );
}

Handshake Info

The HandshakeInfo class contains information from the WebSocket handshake request.

public class HandshakeInfo {
    // Get request URI
    public URI getUri() { ... }

    // Get request headers
    public HttpHeaders getHeaders() { ... }

    // Get cookies
    public MultiValueMap<String, HttpCookie> getCookies() { ... }

    // Get selected sub-protocol
    public String getSubProtocol() { ... }

    // Get remote address
    public InetSocketAddress getRemoteAddress() { ... }

    // Get principal
    public Mono<Principal> getPrincipal() { ... }

    // Get log prefix
    public String getLogPrefix() { ... }
}

Usage:

public Mono<Void> handle(WebSocketSession session) {
    HandshakeInfo info = session.getHandshakeInfo();

    // Get request URI and query parameters
    URI uri = info.getUri();
    String query = uri.getQuery();

    // Get authentication header
    HttpHeaders headers = info.getHeaders();
    String authHeader = headers.getFirst("Authorization");

    // Get cookies
    MultiValueMap<String, HttpCookie> cookies = info.getCookies();

    // Get sub-protocol
    String subProtocol = info.getSubProtocol();

    // Get remote address
    InetSocketAddress remoteAddress = info.getRemoteAddress();

    // Get principal (if authenticated)
    return info.getPrincipal()
        .flatMap(principal -> {
            String username = principal.getName();
            return handleAuthenticatedSession(session, username);
        })
        .switchIfEmpty(handleAnonymousSession(session));
}

Close Status

The CloseStatus class represents WebSocket close status codes and reasons.

public final class CloseStatus {
    // Get status code
    public int getCode() { ... }

    // Get reason phrase
    public String getReason() { ... }

    // Standard close status codes
    public static final CloseStatus NORMAL = new CloseStatus(1000);
    public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
    public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
    public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
    public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
    public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
    public static final CloseStatus BAD_DATA = new CloseStatus(1007);
    public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
    public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
    public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
    public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
    public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
    public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);

    // Create custom close status
    public CloseStatus(int code) { ... }
    public CloseStatus(int code, String reason) { ... }

    // Create with reason
    public CloseStatus withReason(String reason) { ... }
}

Usage:

public Mono<Void> handle(WebSocketSession session) {
    return session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .doOnNext(payload -> {
            if (payload.equals("quit")) {
                session.close(CloseStatus.NORMAL).subscribe();
            }
        })
        .then()
        .onErrorResume(ex -> {
            // Close with error status
            return session.close(new CloseStatus(1011, "Internal error: " + ex.getMessage()));
        });
}

// Close with custom status
public Mono<Void> closeWithCustomStatus(WebSocketSession session, String reason) {
    return session.close(CloseStatus.POLICY_VIOLATION.withReason(reason));
}

WebSocket Client

The WebSocketClient interface provides a contract for initiating WebSocket connections.

public interface WebSocketClient {
    // Execute WebSocket handshake
    Mono<Void> execute(URI url, WebSocketHandler handler);

    // Execute with headers
    Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler);

    // Execute with full request details
    default Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler, String... subProtocols) { ... }
}

Server-specific client implementations:

public class ReactorNettyWebSocketClient implements WebSocketClient {
    public ReactorNettyWebSocketClient() { ... }
    public ReactorNettyWebSocketClient(HttpClient httpClient) { ... }

    @Override
    public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }

    @Override
    public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}
public class StandardWebSocketClient implements WebSocketClient {
    public StandardWebSocketClient() { ... }
    public StandardWebSocketClient(WebSocketContainer webSocketContainer) { ... }

    @Override
    public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }

    @Override
    public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}
public class TomcatWebSocketClient implements WebSocketClient {
    public TomcatWebSocketClient() { ... }
    public TomcatWebSocketClient(org.apache.tomcat.websocket.WsWebSocketContainer webSocketContainer) { ... }

    @Override
    public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }

    @Override
    public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}
public class JettyWebSocketClient implements WebSocketClient {
    public JettyWebSocketClient() { ... }
    public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient client) { ... }

    @Override
    public Mono<Void> execute(URI url, WebSocketHandler handler) { ... }

    @Override
    public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { ... }
}

Usage:

import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;

public class WebSocketClientExample {

    public void connectToWebSocket() {
        WebSocketClient client = new ReactorNettyWebSocketClient();

        URI url = URI.create("ws://localhost:8080/websocket");

        WebSocketHandler handler = session -> {
            return session.send(
                Mono.just(session.textMessage("Hello Server"))
            ).then(
                session.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .doOnNext(message -> System.out.println("Received: " + message))
                    .then()
            );
        };

        client.execute(url, handler).subscribe();
    }

    // With custom headers
    public void connectWithHeaders() {
        WebSocketClient client = new ReactorNettyWebSocketClient();

        URI url = URI.create("ws://localhost:8080/websocket");
        HttpHeaders headers = new HttpHeaders();
        headers.add("Authorization", "Bearer token123");

        WebSocketHandler handler = session -> {
            // Handle session
            return session.receive().then();
        };

        client.execute(url, headers, handler).subscribe();
    }
}

WebSocket Server Support

Server-side WebSocket handling requires registering a handler mapping and adapter.

public interface WebSocketService {
    // Handle WebSocket upgrade
    Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler);
}
public class HandshakeWebSocketService implements WebSocketService {
    public HandshakeWebSocketService() { ... }
    public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) { ... }

    // Set session attribute predicate
    public void setSessionAttributePredicate(Predicate<String> predicate) { ... }

    @Override
    public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) { ... }
}
public interface RequestUpgradeStrategy {
    // Upgrade HTTP request to WebSocket
    Mono<Void> upgrade(ServerWebExchange exchange,
                       WebSocketHandler handler,
                       String subProtocol,
                       Supplier<HandshakeInfo> handshakeInfoFactory);
}

Server-specific upgrade strategies:

public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
    @Override
    public Mono<Void> upgrade(ServerWebExchange exchange,
                              WebSocketHandler handler,
                              String subProtocol,
                              Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}
public class StandardWebSocketUpgradeStrategy implements RequestUpgradeStrategy {
    @Override
    public Mono<Void> upgrade(ServerWebExchange exchange,
                              WebSocketHandler handler,
                              String subProtocol,
                              Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}
public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
    @Override
    public Mono<Void> upgrade(ServerWebExchange exchange,
                              WebSocketHandler handler,
                              String subProtocol,
                              Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}
public class JettyCoreRequestUpgradeStrategy implements RequestUpgradeStrategy {
    @Override
    public Mono<Void> upgrade(ServerWebExchange exchange,
                              WebSocketHandler handler,
                              String subProtocol,
                              Supplier<HandshakeInfo> handshakeInfoFactory) { ... }
}

WebSocket Handler Adapter

The WebSocketHandlerAdapter integrates WebSocket handlers with the WebFlux dispatcher.

public class WebSocketHandlerAdapter implements HandlerAdapter {
    public WebSocketHandlerAdapter() { ... }
    public WebSocketHandlerAdapter(WebSocketService webSocketService) { ... }

    // Set WebSocket service
    public void setWebSocketService(WebSocketService webSocketService) { ... }

    // Get WebSocket service
    public WebSocketService getWebSocketService() { ... }

    @Override
    public boolean supports(Object handler) { ... }

    @Override
    public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { ... }
}

Handler Mapping Configuration

Configure WebSocket endpoints using handler mappings:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketHandlerMapping(
            EchoWebSocketHandler echoHandler,
            ChatWebSocketHandler chatHandler) {

        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", echoHandler);
        map.put("/chat", chatHandler);

        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

    @Bean
    public EchoWebSocketHandler echoHandler() {
        return new EchoWebSocketHandler();
    }

    @Bean
    public ChatWebSocketHandler chatHandler() {
        return new ChatWebSocketHandler();
    }
}

Types

Server-Specific Session Implementations

public class ReactorNettyWebSocketSession extends AbstractWebSocketSession<ReactorNettyWebSocketSession.WebSocketConnection> {
    // Reactor Netty WebSocket session implementation
}
public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> {
    // Standard Java WebSocket (JSR-356) session implementation
}
public class TomcatWebSocketSession extends StandardWebSocketSession {
    // Tomcat WebSocket session implementation
}
public class JettyWebSocketSession extends AbstractWebSocketSession<org.eclipse.jetty.websocket.api.Session> {
    // Jetty WebSocket session implementation
}

Abstract Base Classes

public abstract class AbstractWebSocketSession<T> implements WebSocketSession {
    // Base class for WebSocket session implementations
    protected AbstractWebSocketSession(T delegate,
                                       String id,
                                       HandshakeInfo handshakeInfo,
                                       DataBufferFactory bufferFactory) { ... }

    protected abstract Flux<WebSocketMessage> receiveInternal();
    protected abstract Mono<Void> sendInternal(Publisher<WebSocketMessage> messages);
    protected abstract Mono<Void> closeInternal(CloseStatus status);
}
public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T> {
    // Base class for listener-based WebSocket sessions
}
public abstract class NettyWebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {
    // Support class for Netty-based WebSocket sessions
}

Context WebSocket Handler

public class ContextWebSocketHandler implements WebSocketHandler {
    // Wrapper that ensures proper context propagation
    public ContextWebSocketHandler(WebSocketHandler delegate, ContextView contextView) { ... }

    @Override
    public List<String> getSubProtocols() { ... }

    @Override
    public Mono<Void> handle(WebSocketSession session) { ... }
}

WebSocket Upgrade Predicate

public class WebSocketUpgradeHandlerPredicate implements Predicate<ServerWebExchange> {
    // Predicate for identifying WebSocket upgrade requests
    @Override
    public boolean test(ServerWebExchange exchange) { ... }
}