The StompInboundChannelAdapter subscribes to STOMP destinations and converts received STOMP frames into Spring Integration messages sent to an output channel. It supports runtime subscription management, payload type conversion, and header mapping.
Message producer that subscribes to one or more STOMP destinations and produces Spring Integration messages.
/**
* Message producer endpoint that subscribes to STOMP destinations
* and produces messages to a Spring Integration output channel.
* Supports runtime subscription management via JMX.
*
* Thread Safety:
* - Thread-safe for concurrent addDestination/removeDestination calls
* - Message production to output channel is thread-safe
* - Subscription management is synchronized
*
* @since 4.2
*/
public class StompInboundChannelAdapter
extends MessageProducerSupport
implements ApplicationEventPublisherAware {
/**
* Create an inbound channel adapter with initial destinations.
*
* @param stompSessionManager the StompSessionManager for connection management (must not be null)
* @param destinations initial STOMP destinations to subscribe (can be empty array)
* (can be modified at runtime)
* @throws IllegalArgumentException if stompSessionManager is null
*/
public StompInboundChannelAdapter(
StompSessionManager stompSessionManager,
String... destinations
);
/**
* Set the expected payload type for message deserialization.
* The STOMP message payload will be converted to this type.
* Must be set before adapter is started.
*
* @param payloadType the target payload type (default: String.class, must not be null)
* @throws IllegalArgumentException if payloadType is null
*/
public void setPayloadType(Class<?> payloadType);
/**
* Set a custom header mapper for converting STOMP headers to
* Spring Integration message headers.
* Must be set before adapter is started.
*
* @param headerMapper the HeaderMapper implementation (can be null to use default)
* (default: StompHeaderMapper)
*/
public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper);
/**
* Set the application event publisher for publishing receipt events.
* Automatically set by Spring if ApplicationEventPublisherAware is supported.
*
* @param applicationEventPublisher the ApplicationEventPublisher (can be null)
*/
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher
);
/**
* Get the currently subscribed destinations.
* JMX ManagedAttribute.
* Thread-safe.
*
* @return array of destination names (never null, may be empty)
*/
public String[] getDestinations();
/**
* Add and subscribe to additional destinations at runtime.
* JMX ManagedOperation.
* Thread-safe: Can be called concurrently.
* Requires active STOMP connection.
*
* @param destination one or more destination names to subscribe (must not be null or empty)
* @throws IllegalArgumentException if destination is null or empty
* @throws IllegalStateException if adapter is not started or STOMP session is not connected
*/
public void addDestination(String... destination);
/**
* Remove and unsubscribe from destinations at runtime.
* JMX ManagedOperation.
* Thread-safe: Can be called concurrently.
* Requires active STOMP connection.
*
* @param destination one or more destination names to unsubscribe (must not be null or empty)
* @throws IllegalArgumentException if destination is null or empty
* @throws IllegalStateException if adapter is not started or STOMP session is not connected
*/
public void removeDestination(String... destination);
/**
* Get the component type identifier.
* Thread-safe.
*
* @return "stomp:inbound-channel-adapter"
*/
public String getComponentType();
}import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
@Configuration
public class InboundConfig {
@Bean
public MessageChannel stompInputChannel() {
return new DirectChannel();
}
@Bean
public StompInboundChannelAdapter stompInbound(
StompSessionManager sessionManager) {
// Subscribe to a single destination
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
}// Subscribe to multiple destinations simultaneously
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
sessionManager,
"/topic/messages",
"/queue/orders",
"/topic/notifications"
);
adapter.setOutputChannel(outputChannel);import com.example.Order;
// Receive messages as custom domain objects
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/queue/orders");
// Configure payload deserialization to Order class
adapter.setPayloadType(Order.class);
adapter.setOutputChannel(orderChannel);
// Requires appropriate message converter on STOMP client
// e.g., MappingJackson2MessageConverter for JSONimport org.springframework.integration.stomp.support.StompHeaderMapper;
// Create custom header mapper
StompHeaderMapper headerMapper = new StompHeaderMapper();
// Configure which headers to map from STOMP frames
headerMapper.setInboundHeaderNames(
"content-type",
"message-id",
"timestamp",
"custom-*" // Wildcard pattern for custom headers
);
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setHeaderMapper(headerMapper);
adapter.setOutputChannel(outputChannel);import org.springframework.integration.channel.PublishSubscribeChannel;
@Bean
public MessageChannel errorChannel() {
return new PublishSubscribeChannel();
}
@Bean
public StompInboundChannelAdapter stompInbound(
StompSessionManager sessionManager) {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setOutputChannel(outputChannel);
// Configure error channel for handling exceptions during message processing
adapter.setErrorChannelName("errorChannel");
return adapter;
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable throwable = errorMessage.getPayload();
System.err.println("Error processing STOMP message: " + throwable.getMessage());
// Implement custom error handling logic
}import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SubscriptionManager {
@Autowired
private StompInboundChannelAdapter stompInbound;
public void addUserSubscription(String userId) {
// Dynamically subscribe to user-specific destination
String destination = "/user/" + userId + "/notifications";
stompInbound.addDestination(destination);
System.out.println("Subscribed to: " + destination);
}
public void removeUserSubscription(String userId) {
// Dynamically unsubscribe from user-specific destination
String destination = "/user/" + userId + "/notifications";
stompInbound.removeDestination(destination);
System.out.println("Unsubscribed from: " + destination);
}
public void listSubscriptions() {
// Get current subscriptions
String[] destinations = stompInbound.getDestinations();
System.out.println("Current subscriptions: " + Arrays.toString(destinations));
}
}import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompReceiptEvent;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.stereotype.Component;
@Component
public class ReceiptEventListener {
@EventListener
public void handleReceiptEvent(StompReceiptEvent event) {
if (event.getStompCommand() == StompCommand.SUBSCRIBE) {
if (event.isLost()) {
System.err.println("Subscription receipt lost for: " +
event.getDestination());
} else {
System.out.println("Subscription confirmed for: " +
event.getDestination());
}
}
}
}import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class MessageProcessor {
// Process messages received from STOMP inbound adapter
@ServiceActivator(inputChannel = "stompInputChannel")
public void processMessage(Message<?> message) {
// Access message payload
Object payload = message.getPayload();
// Access STOMP headers (mapped with "stomp_" prefix)
String messageId = (String) message.getHeaders().get("stomp_message_id");
String destination = (String) message.getHeaders().get("stomp_destination");
System.out.println("Received from " + destination + ": " + payload);
// Process message content
// ...
}
}import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.StompSessionManager;
@Configuration
public class DslInboundConfig {
@Bean
public IntegrationFlow stompInboundFlow(StompSessionManager sessionManager) {
return IntegrationFlow
.from(new StompInboundChannelAdapter(sessionManager, "/topic/messages"))
.transform(String.class, String::toUpperCase)
.handle(msg -> System.out.println("Received: " + msg.getPayload()))
.get();
}
}<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
<!-- Session Manager Bean -->
<bean id="stompSessionManager"
class="org.springframework.integration.stomp.WebSocketStompSessionManager">
<constructor-arg ref="stompClient"/>
<constructor-arg value="ws://localhost:61613/stomp"/>
<property name="autoReceipt" value="true"/>
</bean>
<!-- Inbound Channel -->
<int:channel id="stompInputChannel"/>
<!-- Inbound Adapter -->
<int-stomp:inbound-channel-adapter
id="stompInbound"
stomp-session-manager="stompSessionManager"
channel="stompInputChannel"
destinations="/topic/messages,/queue/orders"
error-channel="errorChannel"
payload-type="java.lang.String"
send-timeout="5000"/>
<!-- Message Handler -->
<int:service-activator
input-channel="stompInputChannel"
ref="messageProcessor"
method="process"/>
</beans>Messages produced by the inbound adapter contain the following headers:
Standard Integration Headers:
id - Unique message identifier (UUID)timestamp - Message creation timestamp (milliseconds)STOMP Headers (with "stomp_" prefix):
stomp_destination - Original STOMP destinationstomp_message_id - STOMP message identifierstomp_subscription - Subscription identifierstomp_receipt_id - Receipt identifier (if applicable)content-type - Message content typecontent-length - Message content lengthCustom Headers:
@ServiceActivator(inputChannel = "stompInputChannel")
public void inspectMessage(Message<?> message) {
// Get payload
String payload = (String) message.getPayload();
// Get headers
MessageHeaders headers = message.getHeaders();
System.out.println("Message ID: " + headers.getId());
System.out.println("Timestamp: " + headers.getTimestamp());
System.out.println("STOMP Destination: " + headers.get("stomp_destination"));
System.out.println("STOMP Message ID: " + headers.get("stomp_message_id"));
System.out.println("Content Type: " + headers.get("content-type"));
System.out.println("Payload: " + payload);
}// Subscribe to broadcast topics for pub/sub messaging
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
sessionManager,
"/topic/market-data",
"/topic/news-feed",
"/topic/alerts"
);// Subscribe to point-to-point queues for work distribution
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
sessionManager,
"/queue/orders",
"/queue/payments",
"/queue/notifications"
);// Some STOMP brokers support wildcard subscriptions
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
sessionManager,
"/topic/users.*", // All user topics
"/topic/orders.>" // All order topics and sub-topics
);
// Note: Wildcard syntax depends on broker implementation// Subscribe to user-specific destinations (Spring WebSocket convention)
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
sessionManager,
"/user/queue/notifications",
"/user/queue/replies"
);Configuration Errors:
IllegalArgumentException: Null session manager, null/empty destinations, null payload typeIllegalStateException: Calling methods before adapter is startedRuntime Errors:
org.springframework.messaging.MessageDeliveryException: Failed to send message to output channelorg.springframework.messaging.converter.MessageConversionException: Payload conversion failureorg.springframework.messaging.MessagingException: General messaging errorsStompExceptionEvent: Published for adapter processing exceptionsConnection Errors:
StompSessionManager - publishes StompConnectionFailedEvent// Handle errors via error channel
@Bean
public MessageChannel errorChannel() {
return new PublishSubscribeChannel();
}
@Bean
public StompInboundChannelAdapter stompInbound(StompSessionManager sessionManager) {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setOutputChannel(outputChannel());
adapter.setErrorChannelName("errorChannel");
return adapter;
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable throwable = errorMessage.getPayload();
System.err.println("Error processing STOMP message: " + throwable.getMessage());
// Check exception type
if (throwable instanceof MessageConversionException) {
// Payload conversion failed
} else if (throwable instanceof MessageDeliveryException) {
// Failed to send to output channel
}
}Empty Destinations:
// Adapter can be created with no destinations
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(sessionManager);
// Destinations can be added later
adapter.addDestination("/topic/messages");Duplicate Destinations:
// Adding duplicate destination is safe (no-op or handled internally)
adapter.addDestination("/topic/messages");
adapter.addDestination("/topic/messages"); // Safe, may be ignoredRemoving Non-existent Destination:
// Removing destination that was never subscribed is safe (no-op)
adapter.removeDestination("/topic/nonexistent"); // Safe, no exceptionNull/Empty Destination Strings:
try {
adapter.addDestination((String) null);
} catch (IllegalArgumentException e) {
// Destination cannot be null
}
try {
adapter.addDestination("");
} catch (IllegalArgumentException e) {
// Destination cannot be empty
}Subscription While Disconnected:
// Adding destination while disconnected
// May throw IllegalStateException or queue for later subscription
// Behavior depends on StompSessionManager implementationThe adapter relies on StompSessionManager for connection management:
Setting payloadType affects performance:
String.class (default): Minimal overhead, converts bytes to StringOrder.class): Requires JSON/XML deserialization, higher overheadOutput channel type affects throughput:
DirectChannel: Synchronous, caller thread processes messageQueueChannel: Asynchronous, buffered processing with configurable capacityPublishSubscribeChannel: Fan-out to multiple subscribersExecutorChannel: Asynchronous with thread poolLimiting mapped headers improves performance:
// Map only essential headers
StompHeaderMapper headerMapper = new StompHeaderMapper();
headerMapper.setInboundHeaderNames("content-type", "message-id");
adapter.setHeaderMapper(headerMapper);/**
* Base class for message-producing endpoints (from Spring Integration).
* Provides output channel configuration and message sending capabilities.
*/
public abstract class MessageProducerSupport {
public void setOutputChannel(MessageChannel outputChannel);
public void setOutputChannelName(String outputChannelName);
public void setErrorChannelName(String errorChannelName);
public void setSendTimeout(long sendTimeout);
}/**
* Strategy interface for mapping headers between protocols (from Spring Integration).
*
* @param <T> the target header type
*/
public interface HeaderMapper<T> {
void fromHeaders(MessageHeaders headers, T target);
Map<String, Object> toHeaders(T source);
}