Spring Integration STOMP provides XML namespace support for declarative configuration of STOMP adapters with schema validation and IDE autocomplete support.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
<!-- STOMP configuration here -->
</beans><!--
Configures a STOMP inbound channel adapter for receiving messages.
Attributes:
- id: Bean identifier (optional)
- stomp-session-manager: Reference to StompSessionManager bean (required)
- channel: Output message channel reference (required)
- destinations: Comma-separated STOMP destinations to subscribe (required)
- error-channel: Error message channel for exceptions (optional)
- send-timeout: Channel send timeout in milliseconds (optional)
- payload-type: Target payload type for deserialization (optional, default: java.lang.String)
- header-mapper: Reference to HeaderMapper bean (optional)
- mapped-headers: Comma-separated header patterns (optional)
-->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="sessionManager"
channel="inputChannel"
destinations="/topic/messages,/queue/orders"
error-channel="errorChannel"
send-timeout="5000"
payload-type="java.lang.String"
header-mapper="headerMapper"
mapped-headers="content-type, message-id, app-*"/><!--
Configures a STOMP outbound channel adapter for sending messages.
Attributes:
- id: Bean identifier (optional)
- stomp-session-manager: Reference to StompSessionManager bean (required)
- channel: Input message channel reference (required)
- destination: Static STOMP destination (optional, mutually exclusive with destination-expression)
- destination-expression: SpEL expression for dynamic destination (optional, mutually exclusive with destination)
- header-mapper: Reference to HeaderMapper bean (optional)
- mapped-headers: Comma-separated header patterns (optional)
- order: Handler invocation order (optional)
Nested elements:
- <int:poller>: Polling configuration (optional)
- <int:request-handler-advice-chain>: Advice chain (optional)
-->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="sessionManager"
channel="outputChannel"
destination="/topic/messages"
header-mapper="headerMapper"
mapped-headers="content-type, stomp_destination"
order="1"/><?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
<!-- WebSocket STOMP Client -->
<bean id="webSocketClient"
class="org.springframework.web.socket.client.standard.StandardWebSocketClient"/>
<bean id="stompClient"
class="org.springframework.web.socket.messaging.WebSocketStompClient">
<constructor-arg ref="webSocketClient"/>
<property name="messageConverter">
<bean class="org.springframework.messaging.converter.StringMessageConverter"/>
</property>
</bean>
<!-- STOMP Session Manager -->
<bean id="stompSessionManager"
class="org.springframework.integration.stomp.WebSocketStompSessionManager">
<constructor-arg ref="stompClient"/>
<constructor-arg value="ws://localhost:61613/stomp"/>
<property name="autoReceipt" value="true"/>
<property name="recoveryInterval" value="10000"/>
<property name="autoStartup" value="true"/>
</bean>
<!-- Channels -->
<int:channel id="stompInputChannel"/>
<int:channel id="stompOutputChannel"/>
<int:channel id="errorChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<!-- Inbound Adapter -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="stompInputChannel"
destinations="/topic/messages,/queue/orders"
error-channel="errorChannel"
payload-type="java.lang.String"
send-timeout="5000"/>
<!-- Outbound Adapter -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="stompOutputChannel"
destination="/topic/messages"/>
<!-- Message Handler -->
<int:service-activator
input-channel="stompInputChannel"
ref="messageProcessor"
method="process"/>
<bean id="messageProcessor" class="com.example.MessageProcessor"/>
</beans><!-- WebSocket-based STOMP connection -->
<bean id="webSocketClient"
class="org.springframework.web.socket.client.standard.StandardWebSocketClient"/>
<bean id="stompClient"
class="org.springframework.web.socket.messaging.WebSocketStompClient">
<constructor-arg ref="webSocketClient"/>
<property name="messageConverter">
<bean class="org.springframework.messaging.converter.MappingJackson2MessageConverter"/>
</property>
</bean>
<bean id="stompSessionManager"
class="org.springframework.integration.stomp.WebSocketStompSessionManager">
<constructor-arg ref="stompClient"/>
<constructor-arg value="ws://broker.example.com:61613/stomp"/>
<property name="connectHeaders">
<bean class="org.springframework.messaging.simp.stomp.StompHeaders">
<property name="login" value="user"/>
<property name="passcode" value="password"/>
<property name="host" value="virtual-host"/>
</bean>
</property>
<property name="autoReceipt" value="true"/>
<property name="recoveryInterval" value="15000"/>
<property name="autoStartup" value="true"/>
<property name="phase" value="100"/>
</bean><!-- TCP-based STOMP connection using Reactor Netty -->
<bean id="tcpClient" class="reactor.netty.tcp.TcpClient" factory-method="create">
<!-- Configure host and port using builder pattern -->
</bean>
<bean id="reactorTcpStompClient"
class="org.springframework.messaging.tcp.reactor.ReactorNettyTcpStompClient">
<constructor-arg ref="tcpClient"/>
<property name="messageConverter">
<bean class="org.springframework.messaging.converter.StringMessageConverter"/>
</property>
</bean>
<bean id="stompSessionManager"
class="org.springframework.integration.stomp.ReactorNettyTcpStompSessionManager">
<constructor-arg ref="reactorTcpStompClient"/>
<property name="autoReceipt" value="true"/>
<property name="recoveryInterval" value="10000"/>
</bean><!-- Subscribe to multiple destinations -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="inputChannel"
destinations="/topic/news,/topic/alerts,/queue/orders,/queue/notifications"
error-channel="errorChannel"
payload-type="java.lang.String"/><!-- Receive messages as custom domain objects -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="orderChannel"
destinations="/queue/orders"
payload-type="com.example.Order"
send-timeout="10000"/>
<!-- Requires appropriate message converter on STOMP client -->
<bean id="stompClient"
class="org.springframework.web.socket.messaging.WebSocketStompClient">
<constructor-arg ref="webSocketClient"/>
<property name="messageConverter">
<bean class="org.springframework.messaging.converter.MappingJackson2MessageConverter"/>
</property>
</bean><!-- Custom header mapper bean -->
<bean id="customHeaderMapper"
class="org.springframework.integration.stomp.support.StompHeaderMapper">
<property name="inboundHeaderNames">
<array>
<value>content-type</value>
<value>message-id</value>
<value>timestamp</value>
<value>correlation-id</value>
<value>app-*</value>
</array>
</property>
</bean>
<!-- Inbound adapter using custom header mapper -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="inputChannel"
destinations="/topic/messages"
header-mapper="customHeaderMapper"/><!-- Use mapped-headers attribute instead of separate header mapper bean -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="inputChannel"
destinations="/topic/messages"
mapped-headers="content-type, message-id, correlation-id, app-*"/><!-- Send all messages to a static destination -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination="/topic/broadcasts"/><!-- Route messages based on header value -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination-expression="headers['destination']"/>
<!-- Route based on payload type -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination-expression="payload instanceof T(com.example.Order) ? '/queue/orders' : '/topic/general'"/>
<!-- Route based on message property -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination-expression="'/topic/' + headers['category']"/><!-- Custom header mapper for outbound messages -->
<bean id="outboundHeaderMapper"
class="org.springframework.integration.stomp.support.StompHeaderMapper">
<property name="outboundHeaderNames">
<array>
<value>content-type</value>
<value>stomp_destination</value>
<value>stomp_receipt</value>
<value>priority</value>
<value>correlation-id</value>
</array>
</property>
</bean>
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination="/topic/messages"
header-mapper="outboundHeaderMapper"/><!-- Pollable channel -->
<int:channel id="pollableChannel">
<int:queue capacity="100"/>
</int:channel>
<!-- Outbound adapter with poller for rate limiting -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="pollableChannel"
destination="/topic/messages">
<int:poller fixed-rate="1000" max-messages-per-poll="10"/>
</int-stomp:outbound-channel-adapter><!-- Retry advice bean -->
<bean id="retryAdvice"
class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="retryTemplate">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3"/>
</bean>
</property>
</bean>
</property>
</bean>
<!-- Outbound adapter with advice chain -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination="/topic/messages">
<int:request-handler-advice-chain>
<ref bean="retryAdvice"/>
</int:request-handler-advice-chain>
</int-stomp:outbound-channel-adapter><!-- Shared session manager -->
<bean id="sharedSessionManager"
class="org.springframework.integration.stomp.WebSocketStompSessionManager">
<constructor-arg ref="stompClient"/>
<constructor-arg value="ws://localhost:61613/stomp"/>
<property name="autoReceipt" value="true"/>
</bean>
<!-- Multiple inbound adapters -->
<int-stomp:inbound-channel-adapter
id="ordersInbound"
stomp-session-manager="sharedSessionManager"
channel="ordersChannel"
destinations="/queue/orders"/>
<int-stomp:inbound-channel-adapter
id="notificationsInbound"
stomp-session-manager="sharedSessionManager"
channel="notificationsChannel"
destinations="/topic/notifications"/>
<!-- Multiple outbound adapters -->
<int-stomp:outbound-channel-adapter
id="ordersOutbound"
stomp-session-manager="sharedSessionManager"
channel="ordersOutputChannel"
destination="/queue/orders"/>
<int-stomp:outbound-channel-adapter
id="broadcastsOutbound"
stomp-session-manager="sharedSessionManager"
channel="broadcastsOutputChannel"
destination="/topic/broadcasts"/><!-- Complete flow: receive, transform, process, send -->
<int:channel id="rawInputChannel"/>
<int:channel id="processedChannel"/>
<int:channel id="outputChannel"/>
<!-- Receive messages -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="rawInputChannel"
destinations="/topic/raw-data"/>
<!-- Transform messages -->
<int:transformer
input-channel="rawInputChannel"
output-channel="processedChannel"
ref="dataTransformer"
method="transform"/>
<!-- Process messages -->
<int:service-activator
input-channel="processedChannel"
output-channel="outputChannel"
ref="dataProcessor"
method="process"/>
<!-- Send results -->
<int-stomp:outbound-channel-adapter
id="stompOutbound"
stomp-session-manager="stompSessionManager"
channel="outputChannel"
destination="/topic/results"/>
<bean id="dataTransformer" class="com.example.DataTransformer"/>
<bean id="dataProcessor" class="com.example.DataProcessor"/><!-- Error channel for inbound adapter -->
<int:channel id="stompErrorChannel">
<int:interceptors>
<int:wire-tap channel="errorLogger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter
id="errorLogger"
level="ERROR"
log-full-message="true"/>
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="inputChannel"
destinations="/topic/messages"
error-channel="stompErrorChannel"/>
<!-- Error handler -->
<int:service-activator
input-channel="stompErrorChannel"
ref="errorHandler"
method="handle"/>
<bean id="errorHandler" class="com.example.ErrorHandler"/><!-- STOMP CONNECT headers with authentication -->
<bean id="connectHeaders"
class="org.springframework.messaging.simp.stomp.StompHeaders">
<property name="login" value="${stomp.username}"/>
<property name="passcode" value="${stomp.password}"/>
<property name="host" value="${stomp.virtualHost}"/>
<property name="heartbeat">
<array>
<value>10000</value> <!-- Outgoing heartbeat: 10 seconds -->
<value>10000</value> <!-- Incoming heartbeat: 10 seconds -->
</array>
</property>
</bean>
<bean id="stompSessionManager"
class="org.springframework.integration.stomp.WebSocketStompSessionManager">
<constructor-arg ref="stompClient"/>
<constructor-arg value="${stomp.url}"/>
<property name="connectHeaders" ref="connectHeaders"/>
<property name="autoReceipt" value="true"/>
<property name="recoveryInterval" value="${stomp.recoveryInterval}"/>
</bean>
<!-- Properties file -->
<!-- stomp.url=ws://localhost:61613/stomp -->
<!-- stomp.username=admin -->
<!-- stomp.password=secret -->
<!-- stomp.virtualHost=/ -->
<!-- stomp.recoveryInterval=10000 --><int:channel id="requestChannel"/>
<int:channel id="replyChannel"/>
<!-- Send requests -->
<int-stomp:outbound-channel-adapter
id="requestSender"
stomp-session-manager="stompSessionManager"
channel="requestChannel"
destination="/queue/requests"/>
<!-- Receive replies -->
<int-stomp:inbound-channel-adapter
id="replyReceiver"
stomp-session-manager="stompSessionManager"
channel="replyChannel"
destinations="/user/queue/replies"/>
<!-- Gateway for request-reply -->
<int:gateway
id="requestReplyGateway"
service-interface="com.example.RequestReplyService"
default-request-channel="requestChannel"
default-reply-channel="replyChannel"
default-reply-timeout="30000"/><!-- Publisher -->
<int:channel id="publishChannel"/>
<int-stomp:outbound-channel-adapter
id="publisher"
stomp-session-manager="stompSessionManager"
channel="publishChannel"
destination="/topic/broadcasts"/>
<!-- Multiple Subscribers -->
<int:channel id="subscriber1Channel"/>
<int:channel id="subscriber2Channel"/>
<int-stomp:inbound-channel-adapter
id="subscriber1"
stomp-session-manager="sessionManager1"
channel="subscriber1Channel"
destinations="/topic/broadcasts"/>
<int-stomp:inbound-channel-adapter
id="subscriber2"
stomp-session-manager="sessionManager2"
channel="subscriber2Channel"
destinations="/topic/broadcasts"/><!-- Producer -->
<int:channel id="taskChannel"/>
<int-stomp:outbound-channel-adapter
id="taskProducer"
stomp-session-manager="stompSessionManager"
channel="taskChannel"
destination="/queue/tasks"/>
<!-- Multiple Consumers -->
<int:channel id="worker1Channel">
<int:queue capacity="50"/>
</int:channel>
<int:channel id="worker2Channel">
<int:queue capacity="50"/>
</int:channel>
<int-stomp:inbound-channel-adapter
id="worker1"
stomp-session-manager="sessionManager1"
channel="worker1Channel"
destinations="/queue/tasks"/>
<int-stomp:inbound-channel-adapter
id="worker2"
stomp-session-manager="sessionManager2"
channel="worker2Channel"
destinations="/queue/tasks"/>
<!-- Workers with pollers -->
<int:service-activator
input-channel="worker1Channel"
ref="taskProcessor"
method="process">
<int:poller fixed-rate="100"/>
</int:service-activator>
<int:service-activator
input-channel="worker2Channel"
ref="taskProcessor"
method="process">
<int:poller fixed-rate="100"/>
</int:service-activator>The STOMP namespace supports multiple schema versions:
Recommended schema location (version-less):
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsdVersion-specific schema location:
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp-5.0.xsd@Bean
public StompInboundChannelAdapter inbound(StompSessionManager sessionManager) {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setOutputChannel(inputChannel());
adapter.setPayloadType(String.class);
return adapter;
}<int-stomp:inbound-channel-adapter
id="inbound"
stomp-session-manager="sessionManager"
channel="inputChannel"
destinations="/topic/messages"
payload-type="java.lang.String"/>XML configuration does not support:
addDestination/removeDestination)For these scenarios, use Java configuration or Spring Integration DSL.