or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

events.mdheader-mapping.mdinbound-adapter.mdindex.mdoutbound-handler.mdsession-management.mdxml-configuration.md
tile.json

events.mddocs/

Event System

Spring Integration STOMP publishes application events for connection lifecycle, message receipts, and error conditions. Applications can listen for these events to implement monitoring, alerting, and custom error handling.

Capabilities

StompIntegrationEvent

Base class for all STOMP-related events.

/**
 * Base class for all STOMP-related Spring Integration events.
 * Extends Spring Integration's IntegrationEvent.
 *
 * Thread Safety: Events are immutable and thread-safe.
 * Events are published on Spring's application event publisher thread.
 *
 * @since 4.2
 */
public abstract class StompIntegrationEvent extends IntegrationEvent {

    /**
     * Create a STOMP event with the specified source.
     *
     * @param source the component that published the event (must not be null)
     * @throws IllegalArgumentException if source is null
     */
    public StompIntegrationEvent(Object source);

    /**
     * Create a STOMP event with source and exception cause.
     *
     * @param source the component that published the event (must not be null)
     * @param cause  the exception that triggered this event (nullable)
     * @throws IllegalArgumentException if source is null
     */
    public StompIntegrationEvent(Object source, @Nullable Throwable cause);

    /**
     * Get the exception that caused this event, if any.
     * Thread-safe.
     *
     * @return Throwable or null if no exception
     */
    @Nullable
    public Throwable getCause();
}

StompSessionConnectedEvent

Event published when STOMP session is successfully established.

/**
 * Event published when a STOMP session successfully connects to the broker.
 * Applications can listen for this event to perform post-connection initialization.
 *
 * @since 4.2.2
 */
public class StompSessionConnectedEvent extends StompIntegrationEvent {

    /**
     * Create a connection success event.
     *
     * @param source the StompSessionManager that connected
     */
    public StompSessionConnectedEvent(Object source);
}

StompConnectionFailedEvent

Event published when STOMP connection attempt fails.

/**
 * Event published when connection to the STOMP broker fails.
 * Contains the exception that caused the failure.
 * Applications can listen for this event to implement custom error handling or alerting.
 *
 * @since 4.2.2
 */
public class StompConnectionFailedEvent extends StompIntegrationEvent {

    /**
     * Create a connection failure event.
     *
     * @param source the StompSessionManager that failed to connect
     * @param cause  the exception that caused the connection failure (nullable)
     */
    public StompConnectionFailedEvent(Object source, @Nullable Throwable cause);
}

StompReceiptEvent

Event published for STOMP receipt confirmation or timeout.

/**
 * Event published for STOMP RECEIPT frames indicating message delivery confirmation
 * or lost receipts (timeout). Supports both SUBSCRIBE and SEND commands.
 *
 * Thread Safety: Events are immutable except for setMessage() which should be called
 * before event is published. After publication, events are read-only.
 *
 * @since 4.2
 */
public class StompReceiptEvent extends StompIntegrationEvent {

    /**
     * Create a receipt event.
     *
     * @param source        the component that published the event (must not be null)
     * @param destination   the STOMP destination (nullable, may be null for some commands)
     * @param receiptId     the receipt identifier (nullable, may be null if not provided)
     * @param stompCommand  the STOMP command (SUBSCRIBE or SEND, must not be null)
     * @param lost          true if receipt was lost (timeout), false if received
     * @throws IllegalArgumentException if source is null
     * @throws IllegalArgumentException if stompCommand is null
     */
    public StompReceiptEvent(
        Object source,
        @Nullable String destination,
        @Nullable String receiptId,
        StompCommand stompCommand,
        boolean lost
    );

    /**
     * Get the STOMP destination for this receipt.
     * Thread-safe.
     *
     * @return destination string (nullable, null for some command types)
     */
    @Nullable
    public String getDestination();

    /**
     * Get the receipt identifier.
     * Thread-safe.
     *
     * @return receipt ID string (nullable, null if not provided by broker)
     */
    @Nullable
    public String getReceiptId();

    /**
     * Get the STOMP command that triggered this receipt.
     * Thread-safe.
     *
     * @return StompCommand (SUBSCRIBE or SEND, never null)
     */
    public StompCommand getStompCommand();

    /**
     * Check if the receipt was lost (timeout occurred).
     * Thread-safe.
     *
     * @return true if receipt was not received within timeout period, false if received
     */
    public boolean isLost();

    /**
     * Get the associated Spring Integration message (for SEND commands).
     * Thread-safe.
     *
     * @return Message or null (null for SUBSCRIBE commands or if not set)
     */
    @Nullable
    public Message<?> getMessage();

    /**
     * Set the associated Spring Integration message.
     * Should be called before event is published.
     * Not thread-safe if called after publication.
     *
     * @param message the Message (can be null)
     */
    public void setMessage(Message<?> message);
}

StompExceptionEvent

Event published when exceptions occur during STOMP processing.

/**
 * Event published when exceptions occur during STOMP adapter processing.
 * Applications can listen for this event to implement centralized error handling and logging.
 *
 * @since 4.2
 */
public class StompExceptionEvent extends StompIntegrationEvent {

    /**
     * Create an exception event.
     *
     * @param source the component where the exception occurred
     * @param cause  the exception (nullable)
     */
    public StompExceptionEvent(Object source, @Nullable Throwable cause);
}

Usage Examples

Listening for Connection Success

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompSessionConnectedEvent;
import org.springframework.stereotype.Component;

@Component
public class ConnectionMonitor {

    @EventListener
    public void handleConnectionSuccess(StompSessionConnectedEvent event) {
        System.out.println("STOMP connection established: " + event.getSource());

        // Perform post-connection initialization
        // - Log connection event
        // - Update monitoring dashboard
        // - Enable dependent services
    }
}

Listening for Connection Failures

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompConnectionFailedEvent;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class ConnectionErrorHandler {

    private static final Logger logger = LoggerFactory.getLogger(ConnectionErrorHandler.class);

    @EventListener
    public void handleConnectionFailure(StompConnectionFailedEvent event) {
        Throwable cause = event.getCause();

        logger.error("STOMP connection failed from {}: {}",
            event.getSource(),
            cause.getMessage(),
            cause);

        // Implement custom error handling
        // - Send alerts
        // - Update health check status
        // - Trigger fallback mechanisms
        // - Record metrics
    }
}

Listening for Receipt Confirmation

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompReceiptEvent;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.stereotype.Component;

@Component
public class ReceiptHandler {

    @EventListener
    public void handleReceipt(StompReceiptEvent event) {
        String destination = event.getDestination();
        String receiptId = event.getReceiptId();
        StompCommand command = event.getStompCommand();
        boolean lost = event.isLost();

        if (command == StompCommand.SUBSCRIBE) {
            handleSubscribeReceipt(destination, receiptId, lost);
        } else if (command == StompCommand.SEND) {
            handleSendReceipt(destination, receiptId, lost, event.getMessage());
        }
    }

    private void handleSubscribeReceipt(String destination, String receiptId, boolean lost) {
        if (lost) {
            System.err.println("Subscription receipt lost for: " + destination);
            // Implement retry logic or alerting
        } else {
            System.out.println("Subscription confirmed: " + destination);
            // Subscription is active and ready
        }
    }

    private void handleSendReceipt(String destination, String receiptId,
                                   boolean lost, Message<?> message) {
        if (lost) {
            System.err.println("Send receipt lost for message to: " + destination);
            // Implement retry logic
            // Consider message as potentially not delivered
        } else {
            System.out.println("Message delivery confirmed: " + destination);
            // Message successfully delivered to broker
            // Update delivery metrics
        }
    }
}

Listening for Processing Exceptions

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.StompExceptionEvent;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class ExceptionHandler {

    private static final Logger logger = LoggerFactory.getLogger(ExceptionHandler.class);

    @EventListener
    public void handleStompException(StompExceptionEvent event) {
        Throwable cause = event.getCause();

        logger.error("STOMP processing exception from {}: {}",
            event.getSource(),
            cause.getMessage(),
            cause);

        // Implement error handling logic
        // - Log to error tracking system
        // - Send alerts for critical errors
        // - Update error metrics
    }
}

Comprehensive Event Monitor

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.*;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class StompEventMonitor {

    private final AtomicInteger connectionCount = new AtomicInteger(0);
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicInteger receiptCount = new AtomicInteger(0);
    private final AtomicInteger lostReceiptCount = new AtomicInteger(0);
    private Instant lastConnectionTime;

    @EventListener
    public void handleConnected(StompSessionConnectedEvent event) {
        connectionCount.incrementAndGet();
        lastConnectionTime = Instant.now();
        System.out.println("[" + lastConnectionTime + "] STOMP Connected");
        printStats();
    }

    @EventListener
    public void handleConnectionFailed(StompConnectionFailedEvent event) {
        failureCount.incrementAndGet();
        System.err.println("[" + Instant.now() + "] STOMP Connection Failed: " +
            event.getCause().getMessage());
        printStats();
    }

    @EventListener
    public void handleReceipt(StompReceiptEvent event) {
        if (event.isLost()) {
            lostReceiptCount.incrementAndGet();
            System.err.println("[" + Instant.now() + "] Receipt Lost: " +
                event.getReceiptId() + " for " + event.getDestination());
        } else {
            receiptCount.incrementAndGet();
            System.out.println("[" + Instant.now() + "] Receipt Confirmed: " +
                event.getReceiptId() + " for " + event.getDestination());
        }
    }

    @EventListener
    public void handleException(StompExceptionEvent event) {
        System.err.println("[" + Instant.now() + "] STOMP Exception: " +
            event.getCause().getMessage());
    }

    private void printStats() {
        System.out.println("STOMP Stats - Connections: " + connectionCount.get() +
            ", Failures: " + failureCount.get() +
            ", Receipts: " + receiptCount.get() +
            ", Lost: " + lostReceiptCount.get());
    }

    public void resetStats() {
        connectionCount.set(0);
        failureCount.set(0);
        receiptCount.set(0);
        lostReceiptCount.set(0);
        lastConnectionTime = null;
    }
}

Event-Based Alerting

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class AlertingService {

    @Autowired
    private NotificationService notificationService;

    @EventListener
    public void handleConnectionFailure(StompConnectionFailedEvent event) {
        // Send alert for connection failures
        notificationService.sendAlert(
            "STOMP Connection Failed",
            "Unable to connect to STOMP broker: " + event.getCause().getMessage(),
            AlertLevel.CRITICAL
        );
    }

    @EventListener
    public void handleLostReceipt(StompReceiptEvent event) {
        if (event.isLost()) {
            // Send alert for lost receipts
            notificationService.sendAlert(
                "STOMP Receipt Lost",
                "Receipt timeout for " + event.getStompCommand() +
                " to " + event.getDestination(),
                AlertLevel.WARNING
            );
        }
    }

    @EventListener
    public void handleException(StompExceptionEvent event) {
        // Send alert for processing exceptions
        notificationService.sendAlert(
            "STOMP Processing Exception",
            event.getCause().getMessage(),
            AlertLevel.ERROR
        );
    }
}

Metrics Collection

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.*;
import org.springframework.stereotype.Component;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.time.Instant;

@Component
public class StompMetricsCollector {

    private final Counter connectionsCounter;
    private final Counter failuresCounter;
    private final Counter receiptsCounter;
    private final Counter lostReceiptsCounter;
    private final Counter exceptionsCounter;
    private Instant lastDisconnectTime;

    public StompMetricsCollector(MeterRegistry meterRegistry) {
        this.connectionsCounter = meterRegistry.counter("stomp.connections");
        this.failuresCounter = meterRegistry.counter("stomp.failures");
        this.receiptsCounter = meterRegistry.counter("stomp.receipts.confirmed");
        this.lostReceiptsCounter = meterRegistry.counter("stomp.receipts.lost");
        this.exceptionsCounter = meterRegistry.counter("stomp.exceptions");
    }

    @EventListener
    public void handleConnected(StompSessionConnectedEvent event) {
        connectionsCounter.increment();
    }

    @EventListener
    public void handleConnectionFailed(StompConnectionFailedEvent event) {
        failuresCounter.increment();
        lastDisconnectTime = Instant.now();
    }

    @EventListener
    public void handleReceipt(StompReceiptEvent event) {
        if (event.isLost()) {
            lostReceiptsCounter.increment();
        } else {
            receiptsCounter.increment();
        }
    }

    @EventListener
    public void handleException(StompExceptionEvent event) {
        exceptionsCounter.increment();
    }
}

Event-Based State Management

import org.springframework.context.event.EventListener;
import org.springframework.integration.stomp.event.*;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicBoolean;
import java.time.Instant;

@Component
public class StompStateManager {

    private final AtomicBoolean connected = new AtomicBoolean(false);
    private Instant lastConnectionTime;
    private Instant lastDisconnectionTime;
    private int consecutiveFailures = 0;

    @EventListener
    public void handleConnected(StompSessionConnectedEvent event) {
        connected.set(true);
        lastConnectionTime = Instant.now();
        consecutiveFailures = 0;
        System.out.println("STOMP state: CONNECTED at " + lastConnectionTime);
    }

    @EventListener
    public void handleConnectionFailed(StompConnectionFailedEvent event) {
        connected.set(false);
        lastDisconnectionTime = Instant.now();
        consecutiveFailures++;
        System.err.println("STOMP state: DISCONNECTED at " + lastDisconnectionTime +
            " (consecutive failures: " + consecutiveFailures + ")");

        if (consecutiveFailures >= 5) {
            System.err.println("WARNING: " + consecutiveFailures +
                " consecutive connection failures detected");
            // Trigger escalation or fallback
        }
    }

    public boolean isConnected() {
        return connected.get();
    }

    public Instant getLastConnectionTime() {
        return lastConnectionTime;
    }

    public Instant getLastDisconnectionTime() {
        return lastDisconnectionTime;
    }

    public int getConsecutiveFailures() {
        return consecutiveFailures;
    }
}

Event Publishing Configuration

Enable Auto-Receipt for Receipt Events

import org.springframework.integration.stomp.AbstractStompSessionManager;

@Bean
public StompSessionManager sessionManager(WebSocketStompClient client) {
    WebSocketStompSessionManager sessionManager =
        new WebSocketStompSessionManager(client, "ws://localhost:61613/stomp");

    // Enable auto-receipt to trigger StompReceiptEvent
    sessionManager.setAutoReceipt(true);

    return sessionManager;
}

Configure Event Publisher for Adapters

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;

@Configuration
public class StompConfig {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Bean
    public StompInboundChannelAdapter inbound(StompSessionManager sessionManager) {
        StompInboundChannelAdapter adapter =
            new StompInboundChannelAdapter(sessionManager, "/topic/messages");
        adapter.setApplicationEventPublisher(eventPublisher);
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "outputChannel")
    public StompMessageHandler outbound(StompSessionManager sessionManager) {
        StompMessageHandler handler = new StompMessageHandler(sessionManager);
        handler.setDestination("/topic/messages");
        handler.setApplicationEventPublisher(eventPublisher);
        return handler;
    }
}

Common Patterns

Connection Health Check

import org.springframework.scheduling.annotation.Scheduled;

@Component
public class ConnectionHealthCheck {

    @Autowired
    private StompStateManager stateManager;

    @Autowired
    private NotificationService notificationService;

    @Scheduled(fixedRate = 60000)  // Every minute
    public void checkHealth() {
        if (!stateManager.isConnected()) {
            int failures = stateManager.getConsecutiveFailures();
            if (failures > 0) {
                notificationService.sendAlert(
                    "STOMP Health Check Failed",
                    "Not connected. Consecutive failures: " + failures,
                    AlertLevel.WARNING
                );
            }
        }
    }
}

Automatic Retry on Failure

@Component
public class RetryController {

    @Autowired
    private StompSessionManager sessionManager;

    @EventListener
    public void handleConnectionFailed(StompConnectionFailedEvent event) {
        // Note: AbstractStompSessionManager already implements automatic retry
        // This is just for custom logic or manual intervention

        Throwable cause = event.getCause();

        // Check if it's a transient error
        if (isTransientError(cause)) {
            // Let automatic recovery handle it
            System.out.println("Transient error, waiting for automatic recovery");
        } else {
            // Permanent error, may need manual intervention
            System.err.println("Permanent connection error detected");
            // Send alert for manual intervention
        }
    }

    private boolean isTransientError(Throwable cause) {
        // Implement logic to determine if error is transient
        return cause instanceof java.net.ConnectException ||
               cause instanceof java.net.SocketTimeoutException;
    }
}

Receipt Timeout Handling

@Component
public class ReceiptTimeoutHandler {

    @EventListener
    public void handleLostReceipt(StompReceiptEvent event) {
        if (event.isLost()) {
            String destination = event.getDestination();
            StompCommand command = event.getStompCommand();

            if (command == StompCommand.SEND) {
                // Message may not have been delivered
                Message<?> message = event.getMessage();
                System.err.println("Message delivery uncertain to " + destination);

                // Implement retry or compensation logic
                retryMessage(message, destination);
            } else if (command == StompCommand.SUBSCRIBE) {
                // Subscription status uncertain
                System.err.println("Subscription status uncertain for " + destination);

                // May need to verify subscription or resubscribe
            }
        }
    }

    private void retryMessage(Message<?> message, String destination) {
        // Implement message retry logic
        // - Requeue message
        // - Send to dead letter queue
        // - Alert operations team
    }
}

Event Ordering and Edge Cases

Event Publication Order

Events are published synchronously on the thread that triggers them:

  1. Connection events: Published on STOMP client thread
  2. Receipt events: Published on STOMP client thread
  3. Exception events: Published on adapter processing thread

Important: Event listeners should not block or perform long-running operations to avoid blocking STOMP processing.

Event Listener Ordering

Multiple listeners for the same event type are called in registration order (unless using @Order annotation).

@Component
@Order(1)
public class FirstListener {
    @EventListener
    public void handle(StompSessionConnectedEvent event) {
        // Called first
    }
}

@Component
@Order(2)
public class SecondListener {
    @EventListener
    public void handle(StompSessionConnectedEvent event) {
        // Called second
    }
}

Edge Cases

Null Cause in Events:

@EventListener
public void handle(StompConnectionFailedEvent event) {
    Throwable cause = event.getCause();
    if (cause == null) {
        // Connection failed but no exception details available
        // May occur in rare edge cases
    }
}

Null Destination in Receipt Events:

@EventListener
public void handle(StompReceiptEvent event) {
    String destination = event.getDestination();
    if (destination == null) {
        // Destination may be null for some receipt types
        // Check command type to determine expected behavior
    }
}

Lost Receipts:

@EventListener
public void handle(StompReceiptEvent event) {
    if (event.isLost()) {
        // Receipt timeout occurred
        // Message/subscription status is uncertain
        // May need to retry or verify status
    }
}

Rapid Connection/Disconnection:

// Multiple connection events may be published rapidly
// During reconnection attempts, you may receive:
// - StompConnectionFailedEvent
// - StompSessionConnectedEvent (if reconnection succeeds)
// - StompConnectionFailedEvent (if reconnection fails again)
// Listeners should handle rapid state changes gracefully

Event Source Identification:

@EventListener
public void handle(StompSessionConnectedEvent event) {
    Object source = event.getSource();
    if (source instanceof StompSessionManager) {
        // Event from session manager
    } else if (source instanceof StompInboundChannelAdapter) {
        // Event from inbound adapter
    }
}

Error Handling in Event Listeners

@Component
public class SafeEventListener {

    @EventListener
    public void handle(StompSessionConnectedEvent event) {
        try {
            // Event handling logic
        } catch (Exception e) {
            // Exceptions in listeners do not affect STOMP processing
            // But should be logged or handled appropriately
            logger.error("Error handling STOMP event", e);
        }
    }
}

Types

IntegrationEvent

/**
 * Base class for Spring Integration events (from Spring Integration).
 */
public abstract class IntegrationEvent extends ApplicationEvent {
    public IntegrationEvent(Object source);
    public IntegrationEvent(Object source, Throwable cause);
}

StompCommand

/**
 * STOMP protocol commands (from Spring Framework).
 */
public enum StompCommand {
    CONNECT,
    CONNECTED,
    SEND,
    SUBSCRIBE,
    UNSUBSCRIBE,
    MESSAGE,
    RECEIPT,
    DISCONNECT,
    ERROR
}

ApplicationEvent

/**
 * Base class for application events (from Spring Framework).
 */
public abstract class ApplicationEvent {
    public Object getSource();
    public long getTimestamp();
}