Client-side SockJS implementation with automatic transport fallback. Provides WebSocket client functionality with automatic fallback to HTTP-based transports when WebSocket is unavailable.
WebSocket client implementation with SockJS protocol and transport fallback.
/**
* WebSocket client that implements the SockJS protocol with automatic
* transport fallback. Tries transports in order until one succeeds,
* providing maximum compatibility across network environments.
*/
public class SockJsClient implements WebSocketClient {
/**
* Create a SockJS client with transport handlers.
*
* @param transports list of transports to try in order
*/
public SockJsClient(List<Transport> transports);
/**
* Set HTTP header names to be copied from handshake headers to
* HTTP transport requests. By default, only "Accept-Language" and
* "User-Agent" headers are included.
*
* @param httpHeaderNames the header names to include
*/
public void setHttpHeaderNames(String... httpHeaderNames);
/**
* Get the configured HTTP header names.
*
* @return the header names
*/
public String[] getHttpHeaderNames();
/**
* Set the InfoReceiver for executing SockJS info requests.
* Default uses RestTemplateXhrTransport.
*
* @param infoReceiver the info receiver
*/
public void setInfoReceiver(InfoReceiver infoReceiver);
/**
* Get the configured info receiver.
*
* @return the info receiver
*/
public InfoReceiver getInfoReceiver();
/**
* Set the message codec for encoding/decoding SockJS messages.
* Default is Jackson2SockJsMessageCodec.
*
* @param messageCodec the message codec
*/
public void setMessageCodec(SockJsMessageCodec messageCodec);
/**
* Get the message codec.
*
* @return the message codec
*/
public SockJsMessageCodec getMessageCodec();
/**
* Set the task scheduler for connect timeout management.
*
* @param connectTimeoutScheduler the task scheduler
*/
public void setConnectTimeoutScheduler(TaskScheduler connectTimeoutScheduler);
/**
* Clear the cache of SockJS server info responses.
* Info responses are cached to avoid repeated requests.
*/
public void clearServerInfoCache();
@Override
public CompletableFuture<WebSocketSession> execute(
WebSocketHandler webSocketHandler,
String uriTemplate,
Object... uriVariables);
@Override
public CompletableFuture<WebSocketSession> execute(
WebSocketHandler webSocketHandler,
WebSocketHttpHeaders headers,
URI uri);
}Interface for client-side SockJS transports.
/**
* Client-side SockJS transport. Each transport implements a different
* mechanism for establishing bidirectional communication with the server.
*/
public interface Transport {
/**
* Get the transport types supported by this transport implementation.
*
* @return list of supported transport types
*/
List<TransportType> getTransportTypes();
/**
* Establish a connection using this transport.
*
* @param transportRequest the transport request with connection details
* @param webSocketHandler the handler for the connection
* @return a CompletableFuture that completes when connection is established
*/
CompletableFuture<Void> connect(
TransportRequest transportRequest,
WebSocketHandler webSocketHandler);
}WebSocket transport implementation for SockJS client.
/**
* SockJS client transport that uses native WebSocket connections.
* This is the preferred transport when WebSocket is available.
*/
public class WebSocketTransport implements Transport {
/**
* Create a WebSocket transport using a WebSocketClient.
*
* @param webSocketClient the WebSocket client
*/
public WebSocketTransport(WebSocketClient webSocketClient);
@Override
public List<TransportType> getTransportTypes();
@Override
public CompletableFuture<Void> connect(
TransportRequest transportRequest,
WebSocketHandler webSocketHandler);
}Interface for XHR-based transports.
/**
* XHR transport interface for SockJS client. Extends Transport
* and InfoReceiver to support both messaging and info requests.
*/
public interface XhrTransport extends Transport, InfoReceiver {
// Combines transport and info request capabilities
}XHR transport using Spring's RestTemplate.
/**
* XHR transport implementation using RestTemplate for HTTP requests.
* Supports both streaming and polling XHR transports.
*/
public class RestTemplateXhrTransport extends AbstractXhrTransport {
/**
* Create a transport with default RestTemplate.
*/
public RestTemplateXhrTransport();
/**
* Create a transport with a custom RestTemplate.
*
* @param restTemplate the RestTemplate to use
*/
public RestTemplateXhrTransport(RestTemplate restTemplate);
/**
* Create a transport with a custom HTTP client.
*
* @param httpClient the HTTP client
*/
public RestTemplateXhrTransport(ClientHttpRequestFactory httpClient);
@Override
public List<TransportType> getTransportTypes();
@Override
public CompletableFuture<Void> connect(
TransportRequest transportRequest,
WebSocketHandler webSocketHandler);
}XHR transport using Jetty's HTTP client. Provides better performance characteristics compared to RestTemplate-based transport, especially for high-throughput scenarios.
/**
* XHR transport implementation using Jetty's HttpClient for HTTP requests.
* Supports both streaming and polling XHR transports with async I/O.
* Requires Jetty HTTP client on the classpath.
*
* @since 4.1
*/
public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle {
/**
* Create a transport with default Jetty HttpClient.
*/
public JettyXhrTransport();
/**
* Create a transport with a custom Jetty HttpClient.
*
* @param httpClient the Jetty HttpClient to use
*/
public JettyXhrTransport(HttpClient httpClient);
/**
* Start the underlying Jetty HttpClient.
*/
@Override
public void start();
/**
* Stop the underlying Jetty HttpClient.
*/
@Override
public void stop();
/**
* Return whether the HttpClient is running.
*/
@Override
public boolean isRunning();
@Override
public List<TransportType> getTransportTypes();
@Override
public CompletableFuture<Void> connect(
TransportRequest transportRequest,
WebSocketHandler webSocketHandler);
}Base class for XHR transport implementations.
/**
* Abstract base class for XHR-based transports. Provides common
* functionality for executing HTTP requests and handling responses.
*/
public abstract class AbstractXhrTransport implements XhrTransport {
/**
* Set whether to use streaming XHR transport. If false, uses polling.
* Default is true for streaming.
*
* @param xhrStreaming whether to use streaming
*/
public void setXhrStreaming(boolean xhrStreaming);
/**
* Whether XHR streaming is enabled.
*
* @return true if streaming is enabled
*/
public boolean isXhrStreaming();
@Override
public List<TransportType> getTransportTypes();
@Override
public String executeInfoRequest(URI infoUrl, HttpHeaders headers);
@Override
public CompletableFuture<Void> connect(
TransportRequest transportRequest,
WebSocketHandler webSocketHandler);
}Interface for executing SockJS info requests.
/**
* Strategy for executing SockJS info requests to discover server
* capabilities and configuration. The info response indicates whether
* WebSocket is available, whether cookies are needed, etc.
*/
public interface InfoReceiver {
/**
* Execute a SockJS info request.
*
* @param infoUrl the info endpoint URL
* @param headers HTTP headers for the request
* @return the info response as JSON string
*/
String executeInfoRequest(URI infoUrl, HttpHeaders headers);
}Represents a SockJS transport connection request.
/**
* Encapsulates information about a SockJS transport connection request.
*/
public interface TransportRequest {
/**
* Get the SockJS URL information.
*
* @return the SockJS URL info
*/
SockJsUrlInfo getSockJsUrlInfo();
/**
* Get headers for the WebSocket handshake.
*
* @return the handshake headers
*/
HttpHeaders getHandshakeHeaders();
/**
* Get headers for HTTP transport requests.
*
* @return the HTTP request headers
*/
HttpHeaders getHttpRequestHeaders();
/**
* Get the transport-specific URL.
*
* @return the transport URL
*/
URI getTransportUrl();
/**
* Get the user principal for the connection.
*
* @return the user principal, or null if none
*/
Principal getUser();
/**
* Establish a connection using the next available transport.
*
* @param handler the WebSocket handler
* @return a CompletableFuture for the WebSocket session
*/
CompletableFuture<WebSocketSession> connect(WebSocketHandler handler);
}Encapsulates SockJS URL structure and generation.
/**
* Encapsulates a SockJS base URL and provides methods to generate
* transport-specific URLs according to the SockJS protocol.
*/
public class SockJsUrlInfo {
/**
* Create SockJS URL info from a base URL.
*
* @param baseUrl the base SockJS URL (e.g., http://localhost:8080/sockjs)
*/
public SockJsUrlInfo(URI baseUrl);
/**
* Get the info endpoint URL.
* Format: {baseUrl}/info
*
* @return the info URL
*/
public URI getInfoUrl();
/**
* Get the server and session URL.
* Format: {baseUrl}/{server}/{session}
*
* @return the SockJS URL with server and session IDs
*/
public URI getSockJsUrl();
/**
* Get the transport-specific URL.
* Format: {baseUrl}/{server}/{session}/{transport}
*
* @param transportType the transport type
* @return the transport URL
*/
public URI getTransportUrl(TransportType transportType);
}/**
* Abstract base class for client-side SockJS sessions.
*/
public abstract class AbstractClientSockJsSession implements WebSocketSession {
/**
* Get the session ID.
*
* @return the session ID
*/
@Override
public String getId();
/**
* Get the connection URI.
*
* @return the URI
*/
@Override
public URI getUri();
/**
* Whether the session is currently open.
*
* @return true if open
*/
@Override
public boolean isOpen();
/**
* Send a WebSocket message.
*
* @param message the message to send
* @throws IOException if sending fails
*/
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException;
/**
* Close the session.
*
* @param status the close status
* @throws IOException if closing fails
*/
@Override
public void close(CloseStatus status) throws IOException;
}
/**
* WebSocket transport session for SockJS client.
*/
public class WebSocketClientSockJsSession extends AbstractClientSockJsSession {
// WebSocket-based session implementation
}
/**
* XHR transport session for SockJS client.
*/
public class XhrClientSockJsSession extends AbstractClientSockJsSession {
// XHR-based session implementation (streaming or polling)
}import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.sockjs.client.*;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.handler.TextWebSocketHandler;
public class SockJsClientExample {
public static void main(String[] args) throws Exception {
// Create transports in order of preference
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
// Create SockJS client
SockJsClient sockJsClient = new SockJsClient(transports);
// Create handler
WebSocketHandler handler = new TextWebSocketHandler() {
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
System.out.println("Connected via SockJS");
session.sendMessage(new TextMessage("Hello from SockJS client!"));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
System.out.println("Received: " + message.getPayload());
}
};
// Connect
CompletableFuture<WebSocketSession> future = sockJsClient.execute(
handler,
"http://localhost:8080/ws"
);
// Wait for connection
WebSocketSession session = future.get();
// Keep connection open
Thread.sleep(10000);
// Close
session.close();
}
}import org.springframework.web.socket.sockjs.client.*;
import org.springframework.web.client.RestTemplate;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.apache.http.impl.client.HttpClients;
public class CustomSockJsClient {
public SockJsClient createClient() {
// Create custom HTTP client for XHR transports
HttpComponentsClientHttpRequestFactory requestFactory =
new HttpComponentsClientHttpRequestFactory(
HttpClients.custom()
.setMaxConnTotal(100)
.setMaxConnPerRoute(20)
.build()
);
// Create RestTemplate with custom factory
RestTemplate restTemplate = new RestTemplate(requestFactory);
RestTemplateXhrTransport xhrTransport = new RestTemplateXhrTransport(restTemplate);
// Configure XHR transport
xhrTransport.setXhrStreaming(true); // Enable streaming
// Create transports list
List<Transport> transports = Arrays.asList(
new WebSocketTransport(new StandardWebSocketClient()),
xhrTransport
);
// Create and configure SockJS client
SockJsClient client = new SockJsClient(transports);
// Configure headers to include
client.setHttpHeaderNames("Authorization", "X-Client-Version");
// Set custom message codec
client.setMessageCodec(new Jackson2SockJsMessageCodec());
return client;
}
}import org.springframework.web.socket.WebSocketHttpHeaders;
public class AuthenticatedSockJsClient {
public void connectWithAuth(String authToken) {
List<Transport> transports = Arrays.asList(
new WebSocketTransport(new StandardWebSocketClient()),
new RestTemplateXhrTransport()
);
SockJsClient client = new SockJsClient(transports);
// Add authentication header
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.add("Authorization", "Bearer " + authToken);
headers.add("X-Client-ID", "client-123");
WebSocketHandler handler = new TextWebSocketHandler() {
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
System.out.println("Authenticated connection established");
}
};
// Connect with headers
URI uri = URI.create("http://localhost:8080/ws");
client.execute(handler, headers, uri)
.thenAccept(session -> {
System.out.println("Connected: " + session.getId());
})
.exceptionally(ex -> {
System.err.println("Connection failed: " + ex.getMessage());
return null;
});
}
}import org.springframework.web.socket.client.WebSocketConnectionManager;
@Configuration
public class SockJsClientConfig {
@Bean
public SockJsClient sockJsClient() {
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
return new SockJsClient(transports);
}
@Bean
public WebSocketConnectionManager connectionManager(SockJsClient client) {
WebSocketConnectionManager manager = new WebSocketConnectionManager(
client,
myHandler(),
"http://localhost:8080/ws"
);
manager.setAutoStartup(true); // Connect on startup
return manager;
}
@Bean
public WebSocketHandler myHandler() {
return new MyClientHandler();
}
}import org.springframework.web.socket.sockjs.client.*;
public class FallbackAwareClient {
public void connectWithFallback() {
// Create transports in fallback order
List<Transport> transports = Arrays.asList(
new WebSocketTransport(new StandardWebSocketClient()),
createStreamingXhrTransport(),
createPollingXhrTransport()
);
SockJsClient client = new SockJsClient(transports);
WebSocketHandler handler = new TextWebSocketHandler() {
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
// Determine which transport was used
String transportType = determineTransport(session);
System.out.println("Connected using: " + transportType);
}
private String determineTransport(WebSocketSession session) {
if (session instanceof WebSocketClientSockJsSession) {
return "WebSocket";
} else if (session instanceof XhrClientSockJsSession) {
return "XHR (Streaming or Polling)";
}
return "Unknown";
}
};
client.execute(handler, "http://localhost:8080/ws");
}
private RestTemplateXhrTransport createStreamingXhrTransport() {
RestTemplateXhrTransport transport = new RestTemplateXhrTransport();
transport.setXhrStreaming(true);
return transport;
}
private RestTemplateXhrTransport createPollingXhrTransport() {
RestTemplateXhrTransport transport = new RestTemplateXhrTransport();
transport.setXhrStreaming(false);
return transport;
}
}import org.springframework.web.socket.sockjs.client.InfoReceiver;
public class CachingInfoReceiver implements InfoReceiver {
private final Map<URI, String> infoCache = new ConcurrentHashMap<>();
private final RestTemplate restTemplate = new RestTemplate();
@Override
public String executeInfoRequest(URI infoUrl, HttpHeaders headers) {
// Check cache first
String cached = infoCache.get(infoUrl);
if (cached != null) {
System.out.println("Using cached info for: " + infoUrl);
return cached;
}
// Execute info request
HttpEntity<Void> entity = new HttpEntity<>(headers);
ResponseEntity<String> response = restTemplate.exchange(
infoUrl,
HttpMethod.GET,
entity,
String.class
);
String info = response.getBody();
// Cache for 5 minutes
infoCache.put(infoUrl, info);
scheduleEviction(infoUrl, 5 * 60 * 1000);
return info;
}
private void scheduleEviction(URI infoUrl, long delayMs) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
infoCache.remove(infoUrl);
}
}, delayMs);
}
}
// Use custom InfoReceiver
SockJsClient client = new SockJsClient(transports);
client.setInfoReceiver(new CachingInfoReceiver());public class ReconnectingSockJsClient {
private final SockJsClient client;
private final String url;
private final WebSocketHandler handler;
private WebSocketSession session;
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public ReconnectingSockJsClient(SockJsClient client, String url,
WebSocketHandler handler) {
this.client = client;
this.url = url;
this.handler = createReconnectingHandler(handler);
}
public void connect() {
client.execute(handler, url)
.thenAccept(s -> {
this.session = s;
System.out.println("Connected successfully");
})
.exceptionally(ex -> {
System.err.println("Connection failed: " + ex.getMessage());
scheduleReconnect();
return null;
});
}
private WebSocketHandler createReconnectingHandler(WebSocketHandler delegate) {
return new WebSocketHandlerDecorator(delegate) {
@Override
public void afterConnectionClosed(WebSocketSession session,
CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
System.out.println("Connection closed: " + status);
scheduleReconnect();
}
@Override
public void handleTransportError(WebSocketSession session,
Throwable exception) throws Exception {
super.handleTransportError(session, exception);
System.err.println("Transport error: " + exception.getMessage());
}
};
}
private void scheduleReconnect() {
scheduler.schedule(() -> {
System.out.println("Attempting to reconnect...");
connect();
}, 5, TimeUnit.SECONDS);
}
public void disconnect() {
scheduler.shutdown();
if (session != null && session.isOpen()) {
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.web.socket.messaging.WebSocketStompClient;
public class SockJsStompClient {
public void connectToStompOverSockJs() {
// Create SockJS client
List<Transport> transports = Arrays.asList(
new WebSocketTransport(new StandardWebSocketClient()),
new RestTemplateXhrTransport()
);
SockJsClient sockJsClient = new SockJsClient(transports);
// Create STOMP client over SockJS
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
StompSessionHandler sessionHandler = new StompSessionHandlerAdapter() {
@Override
public void afterConnected(StompSession session,
StompHeaders connectedHeaders) {
System.out.println("Connected to STOMP over SockJS");
// Subscribe to destination
session.subscribe("/topic/messages", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("Received: " + payload);
}
});
}
};
// Connect
stompClient.connectAsync("http://localhost:8080/ws", sessionHandler);
}
}public class TransportMonitoringClient {
public void connectWithMonitoring() {
List<Transport> transports = Arrays.asList(
wrapTransport(new WebSocketTransport(new StandardWebSocketClient()),
"WebSocket"),
wrapTransport(new RestTemplateXhrTransport(), "XHR")
);
SockJsClient client = new SockJsClient(transports);
client.execute(myHandler(), "http://localhost:8080/ws");
}
private Transport wrapTransport(Transport transport, String name) {
return new Transport() {
@Override
public List<TransportType> getTransportTypes() {
return transport.getTransportTypes();
}
@Override
public CompletableFuture<Void> connect(
TransportRequest request,
WebSocketHandler handler) {
System.out.println("Attempting connection with: " + name);
return transport.connect(request, handler)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Successfully connected using: " + name);
} else {
System.err.println("Failed to connect with " + name +
": " + ex.getMessage());
}
});
}
};
}
}public class SockJsSessionManager {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final SockJsClient client;
public SockJsSessionManager(SockJsClient client) {
this.client = client;
}
public CompletableFuture<WebSocketSession> getOrCreateSession(
String id, String url) {
WebSocketSession existing = sessions.get(id);
if (existing != null && existing.isOpen()) {
return CompletableFuture.completedFuture(existing);
}
return client.execute(createHandler(id), url)
.thenApply(session -> {
sessions.put(id, session);
return session;
});
}
private WebSocketHandler createHandler(String id) {
return new TextWebSocketHandler() {
@Override
public void afterConnectionClosed(WebSocketSession session,
CloseStatus status) throws Exception {
sessions.remove(id);
System.out.println("Session " + id + " closed");
}
};
}
public void closeAll() {
sessions.values().forEach(session -> {
try {
if (session.isOpen()) {
session.close();
}
} catch (IOException e) {
e.printStackTrace();
}
});
sessions.clear();
}
}