STOMP session management provides connection lifecycle control, automatic reconnection on failures, and support for multiple transport protocols (WebSocket and TCP).
Core interface for managing STOMP session lifecycle and connection state.
/**
* Contract to manage STOMP session and connection/disconnection lifecycle.
* Implementations handle establishing connections, reconnection on failure,
* and maintaining session state.
*
* @since 4.2
*/
public interface StompSessionManager {
/**
* Connect to the STOMP broker and register a session handler.
* If already connected, the handler is added to the composite handler.
*
* @param handler the StompSessionHandler to register
*/
void connect(StompSessionHandler handler);
/**
* Disconnect from the STOMP broker and unregister a session handler.
* If this is the last handler, the connection is closed.
*
* @param handler the StompSessionHandler to unregister
*/
void disconnect(StompSessionHandler handler);
/**
* Check if currently connected to the STOMP broker.
*
* @return true if connected, false otherwise
*/
boolean isConnected();
/**
* Check if auto-receipt is enabled for STOMP frames.
* When enabled, RECEIPT headers are automatically added to frames.
*
* @return true if auto-receipt is enabled
*/
boolean isAutoReceiptEnabled();
}Usage Example:
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
// Create a custom session handler
StompSessionHandler handler = new StompSessionHandlerAdapter() {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
System.out.println("Connected to STOMP broker");
}
};
// Connect using session manager
stompSessionManager.connect(handler);
// Check connection status
if (stompSessionManager.isConnected()) {
System.out.println("STOMP connection is active");
}
// Disconnect when done
stompSessionManager.disconnect(handler);Base implementation providing connection management, automatic reconnection, lifecycle support, and event publishing. All concrete session manager implementations extend this class.
/**
* Abstract base implementation of StompSessionManager with lifecycle support,
* automatic reconnection on failures, and event publishing capabilities.
* Subclasses must implement the doConnect() template method.
*
* @since 4.2
*/
public abstract class AbstractStompSessionManager
implements StompSessionManager,
ApplicationEventPublisherAware,
SmartLifecycle,
DisposableBean,
BeanNameAware {
/**
* Create a session manager with the specified STOMP client.
*
* @param stompClient the StompClientSupport implementation (WebSocketStompClient
* or ReactorNettyTcpStompClient)
*/
public AbstractStompSessionManager(StompClientSupport stompClient);
/**
* Set custom headers to include in the STOMP CONNECT frame.
* Useful for authentication credentials, virtual host specification, etc.
*
* @param connectHeaders the StompHeaders to include in CONNECT frame
*/
public void setConnectHeaders(StompHeaders connectHeaders);
/**
* Enable or disable automatic RECEIPT header generation for STOMP frames.
* When enabled, the client will request receipts for frames and wait for
* broker confirmation.
*
* @param autoReceipt true to enable auto-receipt (default: false)
*/
public void setAutoReceipt(boolean autoReceipt);
/**
* Set the interval (in milliseconds) to wait before attempting reconnection
* after connection failure or disconnection.
*
* @param recoveryInterval reconnection interval in milliseconds (default: 10000)
*/
public void setRecoveryInterval(int recoveryInterval);
/**
* Configure whether this session manager should automatically start
* when the ApplicationContext is refreshed.
*
* @param autoStartup true for automatic startup (default: true)
*/
public void setAutoStartup(boolean autoStartup);
/**
* Set the lifecycle phase for ordered startup/shutdown.
* Lower values start earlier and stop later.
*
* @param phase the lifecycle phase value
*/
public void setPhase(int phase);
/**
* Get the configured recovery interval for reconnection attempts.
*
* @return recovery interval in milliseconds
*/
public long getRecoveryInterval();
/**
* Check if auto-startup is enabled.
*
* @return true if auto-startup is enabled
*/
public boolean isAutoStartup();
/**
* Check if the session manager is currently running.
*
* @return true if running (started and not stopped)
*/
public boolean isRunning();
/**
* Get the configured lifecycle phase.
*
* @return the phase value
*/
public int getPhase();
/**
* Start the session manager and initiate connection to STOMP broker.
* Part of SmartLifecycle interface.
*/
public void start();
/**
* Stop the session manager and disconnect from STOMP broker.
* Part of SmartLifecycle interface.
*/
public void stop();
/**
* Clean up resources when bean is destroyed.
* Part of DisposableBean interface.
*/
public void destroy();
/**
* Connect to STOMP broker with the specified handler.
* Implements StompSessionManager interface.
*
* @param handler the StompSessionHandler to register
*/
public void connect(StompSessionHandler handler);
/**
* Disconnect from STOMP broker and remove the specified handler.
* Implements StompSessionManager interface.
*
* @param handler the StompSessionHandler to unregister
*/
public void disconnect(StompSessionHandler handler);
/**
* Check if currently connected to STOMP broker.
* Implements StompSessionManager interface.
*
* @return true if connected
*/
public boolean isConnected();
/**
* Check if auto-receipt is enabled.
* Implements StompSessionManager interface.
*
* @return true if auto-receipt is enabled
*/
public boolean isAutoReceiptEnabled();
/**
* Get the configured CONNECT headers.
*
* @return StompHeaders for CONNECT frame
*/
protected StompHeaders getConnectHeaders();
/**
* Template method for subclasses to implement connection logic.
* Called when connection or reconnection is needed.
*
* @param handler the composite StompSessionHandler
* @return CompletableFuture that completes with the StompSession
*/
protected abstract CompletableFuture<StompSession> doConnect(StompSessionHandler handler);
}Usage Example:
import org.springframework.integration.stomp.WebSocketStompSessionManager;
import org.springframework.messaging.simp.stomp.StompHeaders;
// Create session manager
AbstractStompSessionManager sessionManager =
new WebSocketStompSessionManager(stompClient, "ws://localhost:61613/stomp");
// Configure CONNECT headers (e.g., for authentication)
StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setLogin("username");
connectHeaders.setPasscode("password");
connectHeaders.setHost("example.com");
sessionManager.setConnectHeaders(connectHeaders);
// Enable auto-receipt for reliable delivery confirmation
sessionManager.setAutoReceipt(true);
// Configure reconnection interval (15 seconds)
sessionManager.setRecoveryInterval(15000);
// Configure lifecycle
sessionManager.setAutoStartup(true);
sessionManager.setPhase(100);
// Start the session manager (or wait for auto-startup)
sessionManager.start();
// Check status
System.out.println("Connected: " + sessionManager.isConnected());
System.out.println("Recovery interval: " + sessionManager.getRecoveryInterval() + "ms");WebSocket-based STOMP session manager for connecting to message brokers over WebSocket protocol. Suitable for web applications and scenarios where WebSocket support is available.
/**
* WebSocket-based STOMP session manager implementation.
* Uses Spring's WebSocketStompClient for connection management.
*
* @since 4.2
*/
public class WebSocketStompSessionManager extends AbstractStompSessionManager {
/**
* Create a WebSocket-based STOMP session manager.
*
* @param webSocketStompClient the configured WebSocketStompClient
* @param url the WebSocket URL (e.g., "ws://localhost:61613/stomp")
* @param uriVariables optional URI template variables for URL expansion
*/
public WebSocketStompSessionManager(
WebSocketStompClient webSocketStompClient,
String url,
Object... uriVariables
);
/**
* Set custom headers to include in the WebSocket handshake request.
* Useful for authentication tokens, custom headers, etc.
*
* @param handshakeHeaders the WebSocketHttpHeaders to include in handshake
*/
public void setHandshakeHeaders(WebSocketHttpHeaders handshakeHeaders);
}Usage Example:
import org.springframework.integration.stomp.WebSocketStompSessionManager;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.http.HttpHeaders;
import org.springframework.web.socket.WebSocketHttpHeaders;
// Create WebSocket STOMP client
WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
stompClient.setMessageConverter(new StringMessageConverter());
// Create session manager with WebSocket URL
WebSocketStompSessionManager sessionManager =
new WebSocketStompSessionManager(stompClient, "ws://localhost:61613/stomp");
// Add custom handshake headers (e.g., authentication token)
WebSocketHttpHeaders handshakeHeaders = new WebSocketHttpHeaders();
handshakeHeaders.add("Authorization", "Bearer token123");
sessionManager.setHandshakeHeaders(handshakeHeaders);
// Configure session
sessionManager.setAutoReceipt(true);
sessionManager.setRecoveryInterval(10000);
// Start connection
sessionManager.start();WebSocket URL with URI Variables:
// URL with placeholders
String urlTemplate = "ws://{host}:{port}/stomp";
// Create with URI variables
WebSocketStompSessionManager sessionManager =
new WebSocketStompSessionManager(stompClient, urlTemplate, "localhost", 61613);TCP-based STOMP session manager using Reactor Netty for non-blocking I/O. Provides better performance for server-to-server communication and scenarios where WebSocket is not required.
/**
* TCP-based STOMP session manager implementation using Reactor Netty.
* Provides reactive, non-blocking connection management over raw TCP.
*
* @since 5.0
*/
public class ReactorNettyTcpStompSessionManager extends AbstractStompSessionManager {
/**
* Create a TCP-based STOMP session manager using Reactor Netty.
*
* @param reactorNettyTcpStompClient the configured ReactorNettyTcpStompClient
*/
public ReactorNettyTcpStompSessionManager(
ReactorNettyTcpStompClient reactorNettyTcpStompClient
);
}Usage Example:
import org.springframework.integration.stomp.ReactorNettyTcpStompSessionManager;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpStompClient;
import reactor.netty.tcp.TcpClient;
// Create Reactor Netty TCP client
TcpClient tcpClient = TcpClient.create()
.host("localhost")
.port(61613);
// Create STOMP client with TCP client
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient(tcpClient);
stompClient.setMessageConverter(new StringMessageConverter());
// Create TCP-based session manager
ReactorNettyTcpStompSessionManager sessionManager =
new ReactorNettyTcpStompSessionManager(stompClient);
// Configure session
sessionManager.setAutoReceipt(true);
sessionManager.setRecoveryInterval(10000);
// Start connection
sessionManager.start();Advanced TCP Configuration:
// Configure TCP client with advanced options
TcpClient tcpClient = TcpClient.create()
.host("broker.example.com")
.port(61613)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.secure(); // Enable TLS
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient(tcpClient);
ReactorNettyTcpStompSessionManager sessionManager =
new ReactorNettyTcpStompSessionManager(stompClient);import org.springframework.messaging.simp.stomp.StompHeaders;
// Configure authentication headers
StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setLogin("user");
connectHeaders.setPasscode("password");
connectHeaders.setHost("virtual-host.example.com");
sessionManager.setConnectHeaders(connectHeaders);// Configure automatic reconnection with 5-second interval
sessionManager.setRecoveryInterval(5000);
// The session manager will automatically attempt reconnection
// on connection failures, publishing StompConnectionFailedEvent
// for each failed attemptimport org.springframework.context.SmartLifecycle;
// Session managers implement SmartLifecycle for container-managed lifecycle
sessionManager.setAutoStartup(true); // Start automatically with context
sessionManager.setPhase(0); // Start early in lifecycle
// Manual lifecycle control
sessionManager.start(); // Start connection
sessionManager.stop(); // Stop connection gracefully
// Check running state
if (sessionManager.isRunning()) {
// Session manager is active
}// Enable auto-receipt for delivery confirmation
sessionManager.setAutoReceipt(true);
// When enabled, the session manager adds RECEIPT headers to frames
// and publishes StompReceiptEvent when receipts are received
// or StompReceiptEvent with lost=true when timeouts occur/**
* STOMP frame headers (from Spring Framework).
* Represents headers in STOMP protocol frames.
*/
public class StompHeaders {
public void setLogin(String login);
public void setPasscode(String passcode);
public void setHost(String host);
public void setHeartbeat(long[] heartbeat);
// Additional methods for other STOMP headers
}/**
* Represents an active STOMP session (from Spring Framework).
* Provides methods for subscribing, sending, and session management.
*/
public interface StompSession {
Receiptable subscribe(String destination, StompFrameHandler handler);
Receiptable send(String destination, Object payload);
void disconnect();
String getSessionId();
boolean isConnected();
}/**
* Handler for STOMP session lifecycle events (from Spring Framework).
* Implement this interface or extend StompSessionHandlerAdapter.
*/
public interface StompSessionHandler {
void afterConnected(StompSession session, StompHeaders connectedHeaders);
void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable exception);
void handleTransportError(StompSession session, Throwable exception);
Type getPayloadType(StompHeaders headers);
void handleFrame(StompHeaders headers, Object payload);
}