Spring Integration STOMP Support provides client-side support for the STOMP (Simple Text Oriented Messaging Protocol) within the Spring Integration framework. It enables Spring applications to connect to STOMP message brokers, subscribe to destinations, and send/receive messages asynchronously through Spring Integration channels.
Package Name: spring-integration-stomp
Package Type: Maven
Maven Coordinates: org.springframework.integration:spring-integration-stomp:7.0.0
Language: Java
Installation:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-stomp:7.0.0'import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.stomp.WebSocketStompSessionManager;
import org.springframework.integration.stomp.ReactorNettyTcpStompSessionManager;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.outbound.StompMessageHandler;
import org.springframework.integration.stomp.support.StompHeaderMapper;
import org.springframework.integration.stomp.event.*;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.stomp.WebSocketStompSessionManager;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.outbound.StompMessageHandler;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.integration.channel.DirectChannel;
@Configuration
public class StompConfiguration {
// Create WebSocket STOMP client
@Bean
public WebSocketStompClient stompClient() {
WebSocketStompClient client = new WebSocketStompClient(new StandardWebSocketClient());
client.setMessageConverter(new StringMessageConverter());
return client;
}
// Create STOMP session manager for WebSocket connection
@Bean
public StompSessionManager stompSessionManager(WebSocketStompClient stompClient) {
return new WebSocketStompSessionManager(stompClient, "ws://localhost:61613/stomp");
}
// Inbound adapter - receive messages from STOMP destinations
@Bean
public StompInboundChannelAdapter stompInbound(StompSessionManager sessionManager) {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setOutputChannel(inboundChannel());
return adapter;
}
// Outbound handler - send messages to STOMP destinations
@Bean
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
StompMessageHandler handler = new StompMessageHandler(sessionManager);
handler.setDestination("/topic/messages");
return handler;
}
@Bean
public DirectChannel inboundChannel() {
return new DirectChannel();
}
}Spring Integration STOMP is built around several key components:
StompSessionManager abstraction manages STOMP connections, automatic reconnection, and lifecycle. Two implementations: WebSocketStompSessionManager for WebSocket-based connections and ReactorNettyTcpStompSessionManager for TCP-based connectionsStompInboundChannelAdapter subscribes to STOMP destinations and produces Spring Integration messages to output channels with runtime subscription managementStompMessageHandler consumes Spring Integration messages and sends them to STOMP destinations with support for static or dynamic destination routingStompHeaderMapper provides bidirectional conversion between Spring Integration message headers and STOMP frame headersStompSessionConnectedEvent, StompConnectionFailedEvent, StompReceiptEvent, StompExceptionEvent) for monitoring connection lifecycle and message deliveryThe module provides two primary patterns: message reception (inbound) and message sending (outbound), both integrated with Spring Integration's channel-based messaging architecture.
Core STOMP session lifecycle management including connection, disconnection, automatic reconnection, and support for WebSocket and TCP transports.
// Primary session manager interface
public interface StompSessionManager {
void connect(StompSessionHandler handler);
void disconnect(StompSessionHandler handler);
boolean isConnected();
boolean isAutoReceiptEnabled();
}
// WebSocket-based session manager
public class WebSocketStompSessionManager extends AbstractStompSessionManager {
public WebSocketStompSessionManager(
WebSocketStompClient webSocketStompClient,
String url,
Object... uriVariables
);
public void setHandshakeHeaders(WebSocketHttpHeaders handshakeHeaders);
}
// TCP-based session manager using Reactor Netty
public class ReactorNettyTcpStompSessionManager extends AbstractStompSessionManager {
public ReactorNettyTcpStompSessionManager(
ReactorNettyTcpStompClient reactorNettyTcpStompClient
);
}
// Base abstract implementation with lifecycle and reconnection
public abstract class AbstractStompSessionManager
implements StompSessionManager, SmartLifecycle {
public AbstractStompSessionManager(StompClientSupport stompClient);
public void setConnectHeaders(StompHeaders connectHeaders);
public void setAutoReceipt(boolean autoReceipt);
public void setRecoveryInterval(int recoveryInterval);
public void setAutoStartup(boolean autoStartup);
public void setPhase(int phase);
public long getRecoveryInterval();
public void start();
public void stop();
public boolean isRunning();
public boolean isAutoStartup();
public int getPhase();
}Subscribe to STOMP destinations and receive messages as Spring Integration messages with runtime subscription management and payload type conversion.
public class StompInboundChannelAdapter
extends MessageProducerSupport
implements ApplicationEventPublisherAware {
public StompInboundChannelAdapter(
StompSessionManager stompSessionManager,
String... destinations
);
public void setPayloadType(Class<?> payloadType);
public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper);
// Runtime subscription management
public String[] getDestinations();
public void addDestination(String... destination);
public void removeDestination(String... destination);
}Send Spring Integration messages to STOMP destinations with support for static and dynamic destination routing.
public class StompMessageHandler
extends AbstractMessageHandler
implements ManageableLifecycle {
public StompMessageHandler(StompSessionManager stompSessionManager);
public void setDestination(String destination);
public void setDestinationExpression(Expression destinationExpression);
public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper);
public void setConnectTimeout(long connectTimeout);
public void start();
public void stop();
public boolean isRunning();
}Bidirectional mapping between Spring Integration message headers and STOMP frame headers with configurable patterns and wildcards.
public class StompHeaderMapper implements HeaderMapper<StompHeaders> {
public static final String STOMP_INBOUND_HEADER_NAME_PATTERN = "STOMP_INBOUND_HEADERS";
public static final String STOMP_OUTBOUND_HEADER_NAME_PATTERN = "STOMP_OUTBOUND_HEADERS";
public void setInboundHeaderNames(String[] inboundHeaderNames);
public void setOutboundHeaderNames(String[] outboundHeaderNames);
public void fromHeaders(MessageHeaders headers, StompHeaders target);
public Map<String, Object> toHeaders(StompHeaders source);
}
// Header name constants with "stomp_" prefix
public abstract class IntegrationStompHeaders {
public static final String PREFIX = "stomp_";
public static final String RECEIPT = "stomp_receipt";
public static final String DESTINATION = "stomp_destination";
public static final String MESSAGE_ID = "stomp_message_id";
public static final String SUBSCRIPTION = "stomp_subscription";
// ... additional constants
}Application events published for STOMP connection lifecycle, message receipts, and error conditions enabling monitoring and custom error handling.
// Base event class
public abstract class StompIntegrationEvent extends IntegrationEvent {
public StompIntegrationEvent(Object source);
public StompIntegrationEvent(Object source, @Nullable Throwable cause);
}
// Connection success event
public class StompSessionConnectedEvent extends StompIntegrationEvent {
public StompSessionConnectedEvent(Object source);
}
// Connection failure event
public class StompConnectionFailedEvent extends StompIntegrationEvent {
public StompConnectionFailedEvent(Object source, @Nullable Throwable cause);
}
// Receipt confirmation or timeout event
public class StompReceiptEvent extends StompIntegrationEvent {
public StompReceiptEvent(
Object source,
@Nullable String destination,
@Nullable String receiptId,
StompCommand stompCommand,
boolean lost
);
@Nullable
public String getDestination();
@Nullable
public String getReceiptId();
public StompCommand getStompCommand();
public boolean isLost();
@Nullable
public Message<?> getMessage();
}
// Exception event
public class StompExceptionEvent extends StompIntegrationEvent {
public StompExceptionEvent(Object source, @Nullable Throwable cause);
}XML namespace support for declarative STOMP adapter configuration with schema validation.
<!-- XML namespace declaration -->
<beans xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
<!-- Inbound channel adapter -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="sessionManager"
channel="inboundChannel"
destinations="/topic/messages,/queue/updates"
error-channel="errorChannel"
payload-type="java.lang.String"
header-mapper="headerMapper"
send-timeout="5000"/>
<!-- Outbound channel adapter -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="sessionManager"
channel="outboundChannel"
destination="/topic/messages"
header-mapper="headerMapper"
order="1"/>
</beans>