or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

events.mdheader-mapping.mdinbound-adapter.mdindex.mdoutbound-handler.mdsession-management.mdxml-configuration.md
tile.json

inbound-adapter.mddocs/

Inbound Message Reception

The StompInboundChannelAdapter subscribes to STOMP destinations and converts received STOMP frames into Spring Integration messages sent to an output channel. It supports runtime subscription management, payload type conversion, and header mapping.

Capabilities

StompInboundChannelAdapter

Message producer that subscribes to one or more STOMP destinations and produces Spring Integration messages.

/**
 * Message producer endpoint that subscribes to STOMP destinations
 * and produces messages to a Spring Integration output channel.
 * Supports runtime subscription management via JMX.
 *
 * Thread Safety:
 * - Thread-safe for concurrent addDestination/removeDestination calls
 * - Message production to output channel is thread-safe
 * - Subscription management is synchronized
 *
 * @since 4.2
 */
public class StompInboundChannelAdapter
    extends MessageProducerSupport
    implements ApplicationEventPublisherAware {

    /**
     * Create an inbound channel adapter with initial destinations.
     *
     * @param stompSessionManager the StompSessionManager for connection management (must not be null)
     * @param destinations        initial STOMP destinations to subscribe (can be empty array)
     *                           (can be modified at runtime)
     * @throws IllegalArgumentException if stompSessionManager is null
     */
    public StompInboundChannelAdapter(
        StompSessionManager stompSessionManager,
        String... destinations
    );

    /**
     * Set the expected payload type for message deserialization.
     * The STOMP message payload will be converted to this type.
     * Must be set before adapter is started.
     *
     * @param payloadType the target payload type (default: String.class, must not be null)
     * @throws IllegalArgumentException if payloadType is null
     */
    public void setPayloadType(Class<?> payloadType);

    /**
     * Set a custom header mapper for converting STOMP headers to
     * Spring Integration message headers.
     * Must be set before adapter is started.
     *
     * @param headerMapper the HeaderMapper implementation (can be null to use default)
     *                    (default: StompHeaderMapper)
     */
    public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper);

    /**
     * Set the application event publisher for publishing receipt events.
     * Automatically set by Spring if ApplicationEventPublisherAware is supported.
     *
     * @param applicationEventPublisher the ApplicationEventPublisher (can be null)
     */
    public void setApplicationEventPublisher(
        ApplicationEventPublisher applicationEventPublisher
    );

    /**
     * Get the currently subscribed destinations.
     * JMX ManagedAttribute.
     * Thread-safe.
     *
     * @return array of destination names (never null, may be empty)
     */
    public String[] getDestinations();

    /**
     * Add and subscribe to additional destinations at runtime.
     * JMX ManagedOperation.
     * Thread-safe: Can be called concurrently.
     * Requires active STOMP connection.
     *
     * @param destination one or more destination names to subscribe (must not be null or empty)
     * @throws IllegalArgumentException if destination is null or empty
     * @throws IllegalStateException if adapter is not started or STOMP session is not connected
     */
    public void addDestination(String... destination);

    /**
     * Remove and unsubscribe from destinations at runtime.
     * JMX ManagedOperation.
     * Thread-safe: Can be called concurrently.
     * Requires active STOMP connection.
     *
     * @param destination one or more destination names to unsubscribe (must not be null or empty)
     * @throws IllegalArgumentException if destination is null or empty
     * @throws IllegalStateException if adapter is not started or STOMP session is not connected
     */
    public void removeDestination(String... destination);

    /**
     * Get the component type identifier.
     * Thread-safe.
     *
     * @return "stomp:inbound-channel-adapter"
     */
    public String getComponentType();
}

Usage Examples

Basic Inbound Configuration

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class InboundConfig {

    @Bean
    public MessageChannel stompInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInbound(
            StompSessionManager sessionManager) {

        // Subscribe to a single destination
        StompInboundChannelAdapter adapter =
            new StompInboundChannelAdapter(sessionManager, "/topic/messages");

        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }
}

Multiple Destinations

// Subscribe to multiple destinations simultaneously
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
    sessionManager,
    "/topic/messages",
    "/queue/orders",
    "/topic/notifications"
);

adapter.setOutputChannel(outputChannel);

Custom Payload Type

import com.example.Order;

// Receive messages as custom domain objects
StompInboundChannelAdapter adapter =
    new StompInboundChannelAdapter(sessionManager, "/queue/orders");

// Configure payload deserialization to Order class
adapter.setPayloadType(Order.class);
adapter.setOutputChannel(orderChannel);

// Requires appropriate message converter on STOMP client
// e.g., MappingJackson2MessageConverter for JSON

Custom Header Mapping

import org.springframework.integration.stomp.support.StompHeaderMapper;

// Create custom header mapper
StompHeaderMapper headerMapper = new StompHeaderMapper();

// Configure which headers to map from STOMP frames
headerMapper.setInboundHeaderNames(
    "content-type",
    "message-id",
    "timestamp",
    "custom-*"  // Wildcard pattern for custom headers
);

StompInboundChannelAdapter adapter =
    new StompInboundChannelAdapter(sessionManager, "/topic/messages");
adapter.setHeaderMapper(headerMapper);
adapter.setOutputChannel(outputChannel);

Error Handling

import org.springframework.integration.channel.PublishSubscribeChannel;

@Bean
public MessageChannel errorChannel() {
    return new PublishSubscribeChannel();
}

@Bean
public StompInboundChannelAdapter stompInbound(
        StompSessionManager sessionManager) {

    StompInboundChannelAdapter adapter =
        new StompInboundChannelAdapter(sessionManager, "/topic/messages");

    adapter.setOutputChannel(outputChannel);

    // Configure error channel for handling exceptions during message processing
    adapter.setErrorChannelName("errorChannel");

    return adapter;
}

@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
    Throwable throwable = errorMessage.getPayload();
    System.err.println("Error processing STOMP message: " + throwable.getMessage());
    // Implement custom error handling logic
}

Runtime Subscription Management

import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SubscriptionManager {

    @Autowired
    private StompInboundChannelAdapter stompInbound;

    public void addUserSubscription(String userId) {
        // Dynamically subscribe to user-specific destination
        String destination = "/user/" + userId + "/notifications";
        stompInbound.addDestination(destination);
        System.out.println("Subscribed to: " + destination);
    }

    public void removeUserSubscription(String userId) {
        // Dynamically unsubscribe from user-specific destination
        String destination = "/user/" + userId + "/notifications";
        stompInbound.removeDestination(destination);
        System.out.println("Unsubscribed from: " + destination);
    }

    public void listSubscriptions() {
        // Get current subscriptions
        String[] destinations = stompInbound.getDestinations();
        System.out.println("Current subscriptions: " + Arrays.toString(destinations));
    }
}

Receipt Event Handling

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompReceiptEvent;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.stereotype.Component;

@Component
public class ReceiptEventListener {

    @EventListener
    public void handleReceiptEvent(StompReceiptEvent event) {
        if (event.getStompCommand() == StompCommand.SUBSCRIBE) {
            if (event.isLost()) {
                System.err.println("Subscription receipt lost for: " +
                    event.getDestination());
            } else {
                System.out.println("Subscription confirmed for: " +
                    event.getDestination());
            }
        }
    }
}

Integration with Message Handler

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class MessageProcessor {

    // Process messages received from STOMP inbound adapter
    @ServiceActivator(inputChannel = "stompInputChannel")
    public void processMessage(Message<?> message) {
        // Access message payload
        Object payload = message.getPayload();

        // Access STOMP headers (mapped with "stomp_" prefix)
        String messageId = (String) message.getHeaders().get("stomp_message_id");
        String destination = (String) message.getHeaders().get("stomp_destination");

        System.out.println("Received from " + destination + ": " + payload);

        // Process message content
        // ...
    }
}

DSL Configuration

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.StompSessionManager;

@Configuration
public class DslInboundConfig {

    @Bean
    public IntegrationFlow stompInboundFlow(StompSessionManager sessionManager) {
        return IntegrationFlow
            .from(new StompInboundChannelAdapter(sessionManager, "/topic/messages"))
            .transform(String.class, String::toUpperCase)
            .handle(msg -> System.out.println("Received: " + msg.getPayload()))
            .get();
    }
}

XML Configuration

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           https://www.springframework.org/schema/integration/spring-integration.xsd
           http://www.springframework.org/schema/integration/stomp
           https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">

    <!-- Session Manager Bean -->
    <bean id="stompSessionManager"
          class="org.springframework.integration.stomp.WebSocketStompSessionManager">
        <constructor-arg ref="stompClient"/>
        <constructor-arg value="ws://localhost:61613/stomp"/>
        <property name="autoReceipt" value="true"/>
    </bean>

    <!-- Inbound Channel -->
    <int:channel id="stompInputChannel"/>

    <!-- Inbound Adapter -->
    <int-stomp:inbound-channel-adapter
        id="stompInbound"
        stomp-session-manager="stompSessionManager"
        channel="stompInputChannel"
        destinations="/topic/messages,/queue/orders"
        error-channel="errorChannel"
        payload-type="java.lang.String"
        send-timeout="5000"/>

    <!-- Message Handler -->
    <int:service-activator
        input-channel="stompInputChannel"
        ref="messageProcessor"
        method="process"/>

</beans>

Message Structure

Inbound Message Headers

Messages produced by the inbound adapter contain the following headers:

Standard Integration Headers:

  • id - Unique message identifier (UUID)
  • timestamp - Message creation timestamp (milliseconds)

STOMP Headers (with "stomp_" prefix):

  • stomp_destination - Original STOMP destination
  • stomp_message_id - STOMP message identifier
  • stomp_subscription - Subscription identifier
  • stomp_receipt_id - Receipt identifier (if applicable)
  • content-type - Message content type
  • content-length - Message content length

Custom Headers:

  • Any custom headers from STOMP frame (based on header mapper configuration)

Example Message Inspection

@ServiceActivator(inputChannel = "stompInputChannel")
public void inspectMessage(Message<?> message) {
    // Get payload
    String payload = (String) message.getPayload();

    // Get headers
    MessageHeaders headers = message.getHeaders();

    System.out.println("Message ID: " + headers.getId());
    System.out.println("Timestamp: " + headers.getTimestamp());
    System.out.println("STOMP Destination: " + headers.get("stomp_destination"));
    System.out.println("STOMP Message ID: " + headers.get("stomp_message_id"));
    System.out.println("Content Type: " + headers.get("content-type"));
    System.out.println("Payload: " + payload);
}

Common Patterns

Topic Subscription Pattern

// Subscribe to broadcast topics for pub/sub messaging
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
    sessionManager,
    "/topic/market-data",
    "/topic/news-feed",
    "/topic/alerts"
);

Queue Subscription Pattern

// Subscribe to point-to-point queues for work distribution
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
    sessionManager,
    "/queue/orders",
    "/queue/payments",
    "/queue/notifications"
);

Wildcard Destinations (if broker supports)

// Some STOMP brokers support wildcard subscriptions
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
    sessionManager,
    "/topic/users.*",      // All user topics
    "/topic/orders.>"      // All order topics and sub-topics
);
// Note: Wildcard syntax depends on broker implementation

User-Specific Destinations

// Subscribe to user-specific destinations (Spring WebSocket convention)
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(
    sessionManager,
    "/user/queue/notifications",
    "/user/queue/replies"
);

Error Handling

Common Exceptions

Configuration Errors:

  • IllegalArgumentException: Null session manager, null/empty destinations, null payload type
  • IllegalStateException: Calling methods before adapter is started

Runtime Errors:

  • org.springframework.messaging.MessageDeliveryException: Failed to send message to output channel
  • org.springframework.messaging.converter.MessageConversionException: Payload conversion failure
  • org.springframework.messaging.MessagingException: General messaging errors
  • StompExceptionEvent: Published for adapter processing exceptions

Connection Errors:

  • Handled by StompSessionManager - publishes StompConnectionFailedEvent
  • Subscriptions automatically re-established after reconnection

Error Handling Examples

// Handle errors via error channel
@Bean
public MessageChannel errorChannel() {
    return new PublishSubscribeChannel();
}

@Bean
public StompInboundChannelAdapter stompInbound(StompSessionManager sessionManager) {
    StompInboundChannelAdapter adapter =
        new StompInboundChannelAdapter(sessionManager, "/topic/messages");
    adapter.setOutputChannel(outputChannel());
    adapter.setErrorChannelName("errorChannel");
    return adapter;
}

@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
    Throwable throwable = errorMessage.getPayload();
    System.err.println("Error processing STOMP message: " + throwable.getMessage());
    
    // Check exception type
    if (throwable instanceof MessageConversionException) {
        // Payload conversion failed
    } else if (throwable instanceof MessageDeliveryException) {
        // Failed to send to output channel
    }
}

Edge Cases

Empty Destinations:

// Adapter can be created with no destinations
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(sessionManager);
// Destinations can be added later
adapter.addDestination("/topic/messages");

Duplicate Destinations:

// Adding duplicate destination is safe (no-op or handled internally)
adapter.addDestination("/topic/messages");
adapter.addDestination("/topic/messages"); // Safe, may be ignored

Removing Non-existent Destination:

// Removing destination that was never subscribed is safe (no-op)
adapter.removeDestination("/topic/nonexistent"); // Safe, no exception

Null/Empty Destination Strings:

try {
    adapter.addDestination((String) null);
} catch (IllegalArgumentException e) {
    // Destination cannot be null
}

try {
    adapter.addDestination("");
} catch (IllegalArgumentException e) {
    // Destination cannot be empty
}

Subscription While Disconnected:

// Adding destination while disconnected
// May throw IllegalStateException or queue for later subscription
// Behavior depends on StompSessionManager implementation

Lifecycle

Startup Behavior

  1. Adapter is created and configured
  2. When started (via SmartLifecycle or explicitly), adapter connects via StompSessionManager
  3. Subscriptions are created for all configured destinations
  4. Receipt events are published if auto-receipt is enabled
  5. Incoming STOMP frames are converted to messages and sent to output channel

Shutdown Behavior

  1. When stopped, adapter unsubscribes from all destinations
  2. Connection is disconnected (via StompSessionManager)
  3. Resources are cleaned up

Automatic Reconnection

The adapter relies on StompSessionManager for connection management:

  • If connection is lost, StompSessionManager automatically attempts reconnection
  • Subscriptions are automatically re-established after successful reconnection
  • No manual intervention required for recovery

Performance Considerations

Payload Conversion

Setting payloadType affects performance:

  • String.class (default): Minimal overhead, converts bytes to String
  • Custom types (e.g., Order.class): Requires JSON/XML deserialization, higher overhead
  • Choose based on downstream processing needs

Channel Selection

Output channel type affects throughput:

  • DirectChannel: Synchronous, caller thread processes message
  • QueueChannel: Asynchronous, buffered processing with configurable capacity
  • PublishSubscribeChannel: Fan-out to multiple subscribers
  • ExecutorChannel: Asynchronous with thread pool

Header Mapping

Limiting mapped headers improves performance:

// Map only essential headers
StompHeaderMapper headerMapper = new StompHeaderMapper();
headerMapper.setInboundHeaderNames("content-type", "message-id");
adapter.setHeaderMapper(headerMapper);

Types

MessageProducerSupport

/**
 * Base class for message-producing endpoints (from Spring Integration).
 * Provides output channel configuration and message sending capabilities.
 */
public abstract class MessageProducerSupport {
    public void setOutputChannel(MessageChannel outputChannel);
    public void setOutputChannelName(String outputChannelName);
    public void setErrorChannelName(String errorChannelName);
    public void setSendTimeout(long sendTimeout);
}

HeaderMapper

/**
 * Strategy interface for mapping headers between protocols (from Spring Integration).
 *
 * @param <T> the target header type
 */
public interface HeaderMapper<T> {
    void fromHeaders(MessageHeaders headers, T target);
    Map<String, Object> toHeaders(T source);
}