or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

events.mdheader-mapping.mdinbound-adapter.mdindex.mdoutbound-handler.mdsession-management.mdxml-configuration.md
tile.json

outbound-handler.mddocs/

Outbound Message Sending

The StompMessageHandler consumes Spring Integration messages from an input channel and sends them to STOMP destinations. It supports both static and dynamic destination routing, header mapping, and receipt confirmation.

Capabilities

StompMessageHandler

Message handler that sends Spring Integration messages to STOMP destinations.

/**
 * Message handler endpoint that sends Spring Integration messages
 * to STOMP destinations. Supports static and dynamic destination routing.
 *
 * Thread Safety:
 * - Thread-safe for concurrent message handling
 * - Multiple threads can send messages through the same handler safely
 * - Destination expression evaluation is thread-safe (SpEL expressions are stateless)
 *
 * @since 4.2
 */
public class StompMessageHandler
    extends AbstractMessageHandler
    implements ApplicationEventPublisherAware, ManageableLifecycle {

    /**
     * Create an outbound message handler.
     *
     * @param stompSessionManager the StompSessionManager for connection management (must not be null)
     * @throws IllegalArgumentException if stompSessionManager is null
     */
    public StompMessageHandler(StompSessionManager stompSessionManager);

    /**
     * Set a static STOMP destination for all messages.
     * Mutually exclusive with destinationExpression.
     * Must be set before handler is started.
     *
     * @param destination the STOMP destination (e.g., "/topic/messages", must not be null or empty)
     * @throws IllegalArgumentException if destination is null or empty
     * @throws IllegalStateException if destinationExpression is already set
     */
    public void setDestination(String destination);

    /**
     * Set a SpEL expression for dynamic destination routing.
     * The expression is evaluated against each message.
     * Mutually exclusive with destination.
     * Must be set before handler is started.
     *
     * @param destinationExpression the SpEL Expression for destination (must not be null)
     * @throws IllegalArgumentException if destinationExpression is null
     * @throws IllegalStateException if destination is already set
     */
    public void setDestinationExpression(Expression destinationExpression);

    /**
     * Set a custom header mapper for converting Spring Integration
     * message headers to STOMP headers.
     * Must be set before handler is started.
     *
     * @param headerMapper the HeaderMapper implementation (can be null to use default)
     *                    (default: StompHeaderMapper)
     */
    public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper);

    /**
     * Set the timeout (in milliseconds) to wait for STOMP connection.
     * If connection is not available within timeout, an exception is thrown.
     * Can be changed at runtime.
     *
     * @param connectTimeout timeout in milliseconds (default: 3000, must be positive)
     * @throws IllegalArgumentException if connectTimeout <= 0
     */
    public void setConnectTimeout(long connectTimeout);

    /**
     * Set the SpEL evaluation context for expression evaluation.
     * Must be set before handler is started.
     *
     * @param evaluationContext the EvaluationContext (can be null to use default)
     */
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext);

    /**
     * Set the application event publisher for publishing receipt events.
     * Automatically set by Spring if ApplicationEventPublisherAware is supported.
     *
     * @param applicationEventPublisher the ApplicationEventPublisher (can be null)
     */
    public void setApplicationEventPublisher(
        ApplicationEventPublisher applicationEventPublisher
    );

    /**
     * Start the handler and connect to STOMP broker.
     * Part of ManageableLifecycle interface.
     * Thread-safe: Can be called concurrently, but only one start will proceed.
     *
     * @throws IllegalStateException if already started
     * @throws MessagingException if connection fails
     */
    public void start();

    /**
     * Stop the handler and disconnect from STOMP broker.
     * Part of ManageableLifecycle interface.
     * Thread-safe: Can be called concurrently.
     * Gracefully stops message processing and disconnects.
     */
    public void stop();

    /**
     * Check if the handler is running.
     * Part of ManageableLifecycle interface.
     * Thread-safe.
     *
     * @return true if handler is running
     */
    public boolean isRunning();
}

Usage Examples

Basic Outbound Configuration

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.stomp.outbound.StompMessageHandler;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;

@Configuration
public class OutboundConfig {

    @Bean
    public MessageChannel stompOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
        StompMessageHandler handler = new StompMessageHandler(sessionManager);

        // Set static destination
        handler.setDestination("/topic/messages");

        return handler;
    }
}

Dynamic Destination Routing

import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);

    // Route based on message header
    SpelExpressionParser parser = new SpelExpressionParser();
    handler.setDestinationExpression(
        parser.parseExpression("headers['destination']")
    );

    return handler;
}

// Send message with destination header
Message<String> message = MessageBuilder
    .withPayload("Hello")
    .setHeader("destination", "/topic/alerts")
    .build();

stompOutputChannel.send(message);

Dynamic Routing with stomp_destination Header

import org.springframework.integration.stomp.support.IntegrationStompHeaders;
import org.springframework.messaging.support.MessageBuilder;

// The default StompHeaderMapper recognizes "stomp_destination" header
Message<String> message = MessageBuilder
    .withPayload("Order processed")
    .setHeader(IntegrationStompHeaders.DESTINATION, "/queue/orders")
    .build();

// Message will be routed to /queue/orders
stompOutputChannel.send(message);

Payload-Based Routing

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);

    // Route based on payload type or content
    SpelExpressionParser parser = new SpelExpressionParser();
    handler.setDestinationExpression(
        parser.parseExpression(
            "payload instanceof T(com.example.Order) ? '/queue/orders' : '/topic/general'"
        )
    );

    return handler;
}

Custom Header Mapping

import org.springframework.integration.stomp.support.StompHeaderMapper;

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/topic/messages");

    // Create custom header mapper
    StompHeaderMapper headerMapper = new StompHeaderMapper();

    // Configure which headers to map to STOMP frames
    headerMapper.setOutboundHeaderNames(
        "content-type",
        "priority",
        "correlation-id",
        "custom-*"  // Wildcard for custom headers
    );

    handler.setHeaderMapper(headerMapper);

    return handler;
}

Connection Timeout Configuration

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/topic/messages");

    // Wait up to 10 seconds for connection before throwing exception
    handler.setConnectTimeout(10000);

    return handler;
}

Receipt Acknowledgment

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompReceiptEvent;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.stereotype.Component;

@Component
public class OutboundReceiptListener {

    @EventListener
    public void handleReceiptEvent(StompReceiptEvent event) {
        if (event.getStompCommand() == StompCommand.SEND) {
            if (event.isLost()) {
                System.err.println("Send receipt lost for message to: " +
                    event.getDestination());
                // Implement retry logic or alerting
            } else {
                System.out.println("Send confirmed for: " +
                    event.getDestination());
                // Message successfully delivered to broker
            }
        }
    }
}

Lifecycle Management

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class OutboundLifecycleManager {

    @Autowired
    private StompMessageHandler stompOutbound;

    public void enableOutbound() {
        if (!stompOutbound.isRunning()) {
            stompOutbound.start();
            System.out.println("STOMP outbound handler started");
        }
    }

    public void disableOutbound() {
        if (stompOutbound.isRunning()) {
            stompOutbound.stop();
            System.out.println("STOMP outbound handler stopped");
        }
    }
}

Sending Different Payload Types

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.web.socket.messaging.WebSocketStompClient;

// Configure STOMP client with JSON converter
@Bean
public WebSocketStompClient stompClient() {
    WebSocketStompClient client = new WebSocketStompClient(new StandardWebSocketClient());

    // Support JSON serialization
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setObjectMapper(new ObjectMapper());
    client.setMessageConverter(converter);

    return client;
}

// Send custom objects
public class Order {
    private String id;
    private double amount;
    // getters/setters
}

// Message will be automatically serialized to JSON
Message<Order> message = MessageBuilder
    .withPayload(new Order("123", 99.99))
    .setHeader("content-type", "application/json")
    .build();

stompOutputChannel.send(message);

Integration Flow with Transformer

import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.transformer.ObjectToJsonTransformer;

@Bean
public IntegrationFlow stompOutboundFlow(StompSessionManager sessionManager) {
    return IntegrationFlow
        .from("inputChannel")
        .transform(new ObjectToJsonTransformer())  // Convert to JSON
        .handle(stompOutbound(sessionManager))
        .get();
}

@Bean
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/topic/json-messages");
    return handler;
}

DSL Configuration

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.stomp.outbound.StompMessageHandler;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.dsl.MessageChannels;

@Configuration
public class DslOutboundConfig {

    @Bean
    public IntegrationFlow stompOutboundFlow(StompSessionManager sessionManager) {
        StompMessageHandler handler = new StompMessageHandler(sessionManager);
        handler.setDestination("/topic/messages");

        return IntegrationFlow
            .from(MessageChannels.direct("input"))
            .transform(String.class, String::toUpperCase)
            .handle(handler)
            .get();
    }
}

XML Configuration

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           https://www.springframework.org/schema/integration/spring-integration.xsd
           http://www.springframework.org/schema/integration/stomp
           https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">

    <!-- Session Manager Bean -->
    <bean id="stompSessionManager"
          class="org.springframework.integration.stomp.WebSocketStompSessionManager">
        <constructor-arg ref="stompClient"/>
        <constructor-arg value="ws://localhost:61613/stomp"/>
    </bean>

    <!-- Outbound Channel -->
    <int:channel id="stompOutputChannel"/>

    <!-- Outbound Adapter with Static Destination -->
    <int-stomp:outbound-channel-adapter
        id="stompOutbound"
        stomp-session-manager="stompSessionManager"
        channel="stompOutputChannel"
        destination="/topic/messages"
        order="1"/>

    <!-- Outbound Adapter with Dynamic Destination -->
    <int-stomp:outbound-channel-adapter
        id="stompOutboundDynamic"
        stomp-session-manager="stompSessionManager"
        channel="dynamicOutputChannel"
        destination-expression="headers['destination']"/>

</beans>

Message Structure

Outbound Message Headers

Messages sent to StompMessageHandler can include:

Routing Headers:

  • stomp_destination - Override destination (if no static destination configured)

STOMP Protocol Headers:

  • content-type - Message content type (e.g., "application/json")
  • stomp_receipt - Request receipt with this ID
  • Custom headers as configured by header mapper

Example:

import org.springframework.integration.stomp.support.IntegrationStompHeaders;

Message<String> message = MessageBuilder
    .withPayload("Hello World")
    .setHeader(IntegrationStompHeaders.DESTINATION, "/topic/greetings")
    .setHeader(IntegrationStompHeaders.RECEIPT, "msg-001")
    .setHeader("content-type", "text/plain")
    .setHeader("priority", "high")
    .build();

stompOutputChannel.send(message);

Common Patterns

Topic Broadcasting

// Send messages to topic for multiple subscribers
StompMessageHandler handler = new StompMessageHandler(sessionManager);
handler.setDestination("/topic/broadcasts");

Queue Point-to-Point

// Send messages to queue for single consumer
StompMessageHandler handler = new StompMessageHandler(sessionManager);
handler.setDestination("/queue/tasks");

User-Specific Messages

// Send to specific user (Spring WebSocket convention)
SpelExpressionParser parser = new SpelExpressionParser();
handler.setDestinationExpression(
    parser.parseExpression("'/user/' + headers['userId'] + '/notifications'")
);

// Usage
Message<String> message = MessageBuilder
    .withPayload("You have a new notification")
    .setHeader("userId", "john.doe")
    .build();

Request-Reply Pattern

import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.retry.support.RetryTemplate;

@Bean
public IntegrationFlow requestReplyFlow(StompSessionManager sessionManager) {
    // Configure outbound with retry
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/queue/requests");

    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    retryAdvice.setRetryTemplate(retryTemplate);

    return IntegrationFlow
        .from("requestChannel")
        .handle(handler, e -> e.advice(retryAdvice))
        .get();
}

Priority-Based Routing

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);

    SpelExpressionParser parser = new SpelExpressionParser();
    handler.setDestinationExpression(
        parser.parseExpression(
            "headers['priority'] == 'high' ? '/queue/high-priority' : '/queue/normal'"
        )
    );

    return handler;
}

Content-Type Routing

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);

    SpelExpressionParser parser = new SpelExpressionParser();
    handler.setDestinationExpression(
        parser.parseExpression(
            "headers['content-type']?.contains('json') ? '/topic/json' : '/topic/text'"
        )
    );

    return handler;
}

Error Handling

Common Exceptions

Configuration Errors:

  • IllegalArgumentException: Null session manager, null/empty destination, null expression, invalid timeout
  • IllegalStateException: Setting destination when expression is set (or vice versa), calling methods before handler is started

Runtime Errors:

  • org.springframework.messaging.MessageHandlingException: Failed to send message to STOMP broker
  • org.springframework.messaging.MessagingException: General messaging errors
  • org.springframework.expression.EvaluationException: SpEL expression evaluation failure
  • java.util.concurrent.TimeoutException: Connection timeout (wrapped in MessageHandlingException)
  • StompExceptionEvent: Published for handler processing exceptions

Connection Errors:

  • Handled by StompSessionManager - publishes StompConnectionFailedEvent
  • Messages may be lost if sent while disconnected (depends on channel type)

Connection Failures

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompConnectionFailedEvent;

@Component
public class ConnectionErrorHandler {

    @EventListener
    public void handleConnectionFailure(StompConnectionFailedEvent event) {
        Throwable cause = event.getCause();
        System.err.println("STOMP connection failed: " + cause.getMessage());
        
        // Check exception type
        if (cause instanceof java.net.ConnectException) {
            // Connection refused
        } else if (cause instanceof java.net.SocketTimeoutException) {
            // Connection timeout
        }
        
        // Implement alerting, logging, or fallback logic
    }
}

Send Timeouts

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public StompMessageHandler stompOutbound(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/topic/messages");

    // Set connection timeout (must be positive)
    handler.setConnectTimeout(5000);

    return handler;
}

// If connection is not available within 5 seconds, MessageHandlingException is thrown
// with TimeoutException as cause

Timeout Edge Cases:

// Very short timeout (may cause frequent timeouts)
handler.setConnectTimeout(100); // 100ms

// Very long timeout (may delay error detection)
handler.setConnectTimeout(60000); // 60 seconds

// Zero or negative timeout (throws exception)
try {
    handler.setConnectTimeout(0);
} catch (IllegalArgumentException e) {
    // Timeout must be positive
}

Expression Evaluation Errors

// Expression that may fail
handler.setDestinationExpression(
    parser.parseExpression("headers['destination']")
);

// If header is missing, expression may return null
// Handler should handle null destination gracefully or throw exception

Expression Error Handling:

// Safe expression with default
handler.setDestinationExpression(
    parser.parseExpression("headers['destination'] ?: '/topic/default'")
);

// Expression that throws on error
try {
    handler.handleMessage(message);
} catch (EvaluationException e) {
    // Expression evaluation failed
}

Exception Events

import org.springframework.integration.stomp.event.StompExceptionEvent;

@Component
public class ExceptionEventHandler {

    @EventListener
    public void handleStompException(StompExceptionEvent event) {
        Throwable cause = event.getCause();
        System.err.println("STOMP exception: " + cause.getMessage());
        
        // Check exception type
        if (cause instanceof MessageHandlingException) {
            // Message handling failed
        } else if (cause instanceof EvaluationException) {
            // Expression evaluation failed
        }
        
        // Implement error handling logic
    }
}

Edge Cases

Null Destination from Expression:

// Expression returns null
handler.setDestinationExpression(
    parser.parseExpression("headers['destination']")
);

// If expression evaluates to null, handler may throw exception
// Use default value in expression:
handler.setDestinationExpression(
    parser.parseExpression("headers['destination'] ?: '/topic/default'")
);

Empty Destination String:

try {
    handler.setDestination("");
} catch (IllegalArgumentException e) {
    // Destination cannot be empty
}

Both Destination and Expression Set:

handler.setDestination("/topic/messages");
try {
    handler.setDestinationExpression(parser.parseExpression("headers['dest']"));
} catch (IllegalStateException e) {
    // Cannot set both destination and expression
}

Sending While Disconnected:

// Behavior depends on channel type and handler configuration
// DirectChannel: Throws exception immediately
// QueueChannel: Message queued, may be sent when connection restored
// ExecutorChannel: May throw exception or queue depending on timing

Performance Considerations

Asynchronous Sending

For high-throughput scenarios, use asynchronous channels:

import org.springframework.integration.channel.ExecutorChannel;
import java.util.concurrent.Executors;

@Bean
public MessageChannel stompOutputChannel() {
    // Asynchronous channel with thread pool
    return new ExecutorChannel(Executors.newFixedThreadPool(10));
}

Batch Sending

import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.config.AggregatorFactoryBean;

@Bean
public IntegrationFlow batchOutboundFlow(StompSessionManager sessionManager) {
    return IntegrationFlow
        .from("inputChannel")
        .aggregate(a -> a
            .correlationExpression("1")  // Single group
            .releaseExpression("size() == 10")  // Batch size
            .sendPartialResultOnExpiry(true))
        .split()  // Split batch back to individual messages
        .handle(stompOutbound(sessionManager))
        .get();
}

Connection Pooling

For high-concurrency scenarios with multiple handlers:

// Share a single StompSessionManager across multiple handlers
// The session manager maintains a single connection and coordinates access

@Bean
public StompSessionManager sharedSessionManager(WebSocketStompClient client) {
    return new WebSocketStompSessionManager(client, "ws://localhost:61613/stomp");
}

@Bean
@ServiceActivator(inputChannel = "channel1")
public StompMessageHandler handler1(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/topic/channel1");
    return handler;
}

@Bean
@ServiceActivator(inputChannel = "channel2")
public StompMessageHandler handler2(StompSessionManager sessionManager) {
    StompMessageHandler handler = new StompMessageHandler(sessionManager);
    handler.setDestination("/topic/channel2");
    return handler;
}

Types

AbstractMessageHandler

/**
 * Base class for message handlers (from Spring Integration).
 * Provides message handling infrastructure.
 */
public abstract class AbstractMessageHandler implements MessageHandler {
    public void setOrder(int order);
    public int getOrder();
}

ManageableLifecycle

/**
 * Interface for components with manageable lifecycle (from Spring Framework).
 */
public interface ManageableLifecycle extends Lifecycle {
    void start();
    void stop();
    boolean isRunning();
}

Expression

/**
 * SpEL expression interface (from Spring Framework).
 * Represents a compiled SpEL expression.
 */
public interface Expression {
    Object getValue(EvaluationContext context);
    <T> T getValue(EvaluationContext context, Class<T> desiredResultType);
}