or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-support.mdcore-messaging.mdhandler-annotations.mdindex.mdmessage-converters.mdmessaging-templates.mdrsocket.mdsimp-configuration.mdstomp-websocket.md
tile.json

stomp-websocket.mddocs/

STOMP over WebSocket

Complete STOMP (Simple Text Oriented Messaging Protocol) implementation for WebSocket communication including session management, broker relay, subscription handling, and client/server support for real-time bidirectional messaging.

Capabilities

StompSession Interface

Client-side STOMP session for sending messages and managing subscriptions.

/**
 * Represents a STOMP session with operations to send messages,
 * subscribe to destinations, and manage the session lifecycle.
 */
public interface StompSession {
    /**
     * Return the id for the STOMP session.
     */
    String getSessionId();

    /**
     * Whether the session is connected.
     */
    boolean isConnected();

    /**
     * Set whether to automatically add receipt headers to SEND, SUBSCRIBE,
     * and UNSUBSCRIBE frames. Default is false.
     * @param enabled whether to enable auto-receipts
     */
    void setAutoReceipt(boolean enabled);

    /**
     * Send a message to the specified destination.
     */
    Receiptable send(String destination, Object payload);

    /**
     * Send a message to the specified destination with headers.
     */
    Receiptable send(StompHeaders headers, Object payload);

    /**
     * Subscribe to the given destination.
     */
    Subscription subscribe(String destination, StompFrameHandler handler);

    /**
     * Subscribe to the given destination with headers.
     */
    Subscription subscribe(StompHeaders headers, StompFrameHandler handler);

    /**
     * Send an acknowledgement for the given message.
     */
    Receiptable acknowledge(String messageId, boolean consumed);

    /**
     * Send an acknowledgement for consumption of a message.
     */
    Receiptable acknowledge(StompHeaders headers, boolean consumed);

    /**
     * Disconnect the session.
     */
    void disconnect();

    /**
     * Disconnect the session with custom headers.
     */
    void disconnect(StompHeaders headers);

    interface Receiptable {
        /**
         * Return the receipt id, or null if the STOMP frame did not have a receipt header.
         */
        String getReceiptId();

        /**
         * Task to invoke when a receipt is received.
         * @param task the task to invoke
         */
        void addReceiptTask(Runnable task);

        /**
         * Variant of addReceiptTask with a Consumer of the headers from the RECEIPT frame.
         * @param task the consumer to invoke
         * @since 5.3.23
         */
        void addReceiptTask(Consumer<StompHeaders> task);

        /**
         * Task to invoke when a receipt is not received in the configured time.
         * @param task the task to invoke
         */
        void addReceiptLostTask(Runnable task);
    }

    interface Subscription {
        String getSubscriptionId();
        StompHeaders getSubscriptionHeaders();
        void unsubscribe();
        void unsubscribe(StompHeaders headers);
    }
}

Usage Examples:

import org.springframework.messaging.simp.stomp.*;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import java.util.concurrent.CompletableFuture;

// Create STOMP client
ReactorNettyTcpStompClient client = new ReactorNettyTcpStompClient("localhost", 61613);
client.setMessageConverter(new MappingJackson2MessageConverter());

// Connect and get session
CompletableFuture<StompSession> sessionFuture = client.connectAsync(new StompSessionHandlerAdapter() {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        System.out.println("Connected! Session: " + session.getSessionId());

        // Send message
        session.send("/topic/news", "Breaking news!");

        // Subscribe to topic
        session.subscribe("/topic/updates", new StompFrameHandler() {
            @Override
            public Type getPayloadType(StompHeaders headers) {
                return String.class;
            }

            @Override
            public void handleFrame(StompHeaders headers, Object payload) {
                System.out.println("Received: " + payload);
            }
        });
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        System.err.println("Transport error: " + exception.getMessage());
    }
});

// Use session when connected
sessionFuture.thenAccept(session -> {
    // Send with receipt
    StompSession.Receiptable receipt = session.send("/queue/tasks", taskData);
    receipt.addReceiptTask(() -> System.out.println("Message received by broker"));

    // Subscribe with custom headers
    StompHeaders subHeaders = new StompHeaders();
    subHeaders.setDestination("/user/queue/notifications");
    subHeaders.setId("sub-1");

    StompSession.Subscription subscription = session.subscribe(subHeaders, new StompFrameHandler() {
        @Override
        public Type getPayloadType(StompHeaders headers) {
            return Notification.class;
        }

        @Override
        public void handleFrame(StompHeaders headers, Object payload) {
            Notification notification = (Notification) payload;
            System.out.println("Notification: " + notification.getMessage());
        }
    });

    // Unsubscribe later
    // subscription.unsubscribe();
}).exceptionally(ex -> {
    System.err.println("Connection failed: " + ex.getMessage());
    return null;
});

Object taskData = "task-data";

class Notification {
    private String message;
    public String getMessage() { return message; }
    public void setMessage(String message) { this.message = message; }
}

StompHeaders Class

Represents STOMP frame headers with typed accessors for standard STOMP headers.

/**
 * Represents STOMP frame headers.
 * Extends MultiValueMap for general header access.
 */
public class StompHeaders implements MultiValueMap<String, String>, Serializable {
    public static final String CONTENT_TYPE = "content-type";
    public static final String CONTENT_LENGTH = "content-length";
    public static final String RECEIPT = "receipt";
    public static final String HOST = "host";
    public static final String ACCEPT_VERSION = "accept-version";
    public static final String LOGIN = "login";
    public static final String PASSCODE = "passcode";
    public static final String HEARTBEAT = "heart-beat";
    public static final String SESSION = "session";
    public static final String SERVER = "server";
    public static final String DESTINATION = "destination";
    public static final String ID = "id";
    public static final String ACK = "ack";
    public static final String SUBSCRIPTION = "subscription";
    public static final String MESSAGE_ID = "message-id";
    public static final String MESSAGE = "message";
    public static final String RECEIPT_ID = "receipt-id";

    public StompHeaders();

    public void setContentType(MimeType mimeType);
    public MimeType getContentType();

    public void setContentLength(long contentLength);
    public long getContentLength();

    public void setReceipt(String receipt);
    public String getReceipt();

    public void setHost(String host);
    public String getHost();

    public void setAcceptVersion(String... acceptVersions);
    public String[] getAcceptVersion();

    public void setLogin(String login);
    public String getLogin();

    public void setPasscode(String passcode);
    public String getPasscode();

    public void setHeartbeat(long[] heartbeat);
    public long[] getHeartbeat();

    public void setSession(String session);
    public String getSession();

    public void setServer(String server);
    public String getServer();

    public void setDestination(String destination);
    public String getDestination();

    public void setId(String id);
    public String getId();

    public void setAck(String ack);
    public String getAck();

    public void setSubscription(String subscription);
    public String getSubscription();

    public void setMessageId(String messageId);
    public String getMessageId();

    public void setMessage(String message);
    public String getMessage();

    public void setReceiptId(String receiptId);
    public String getReceiptId();
}

Usage Examples:

import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.util.MimeTypeUtils;

// Create headers for CONNECT frame
StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setAcceptVersion("1.1", "1.2");
connectHeaders.setHeartbeat(new long[]{10000, 10000}); // 10s send/receive
connectHeaders.setLogin("user");
connectHeaders.setPasscode("pass");
connectHeaders.setHost("example.com");

// Create headers for SEND frame
StompHeaders sendHeaders = new StompHeaders();
sendHeaders.setDestination("/topic/news");
sendHeaders.setContentType(MimeTypeUtils.APPLICATION_JSON);
sendHeaders.setReceipt("msg-001");

// Create headers for SUBSCRIBE frame
StompHeaders subscribeHeaders = new StompHeaders();
subscribeHeaders.setDestination("/user/queue/notifications");
subscribeHeaders.setId("sub-1");
subscribeHeaders.setAck("client");

// Access headers
String destination = subscribeHeaders.getDestination();
String subscriptionId = subscribeHeaders.getId();
MimeType contentType = sendHeaders.getContentType();

StompCommand Enum

Represents STOMP protocol commands.

/**
 * Represents a STOMP command.
 */
public enum StompCommand {
    STOMP,
    CONNECT,
    CONNECTED,
    SEND,
    SUBSCRIBE,
    UNSUBSCRIBE,
    ACK,
    NACK,
    BEGIN,
    COMMIT,
    ABORT,
    DISCONNECT,
    MESSAGE,
    RECEIPT,
    ERROR;

    /**
     * Return the SimpMessageType for this STOMP command.
     */
    public SimpMessageType getMessageType();

    /**
     * Whether this command requires a destination header.
     */
    public boolean requiresDestination();

    /**
     * Whether this command requires a subscription id header.
     */
    public boolean requiresSubscriptionId();

    /**
     * Whether this command requires content-length header.
     */
    public boolean requiresContentLength();

    /**
     * Whether a body is allowed for this command.
     */
    public boolean isBodyAllowed();
}

@SubscribeMapping Annotation

Maps subscription requests to handler methods.

/**
 * Annotation for mapping subscription messages onto specific handler methods
 * based on the destination of a subscription.
 * Typically used in combination with @MessageMapping.
 */
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SubscribeMapping {
    /**
     * Destination patterns to match.
     */
    String[] value() default {};
}

Usage Examples:

import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

@Controller
public class SubscriptionController {

    // Return initial data on subscription
    @SubscribeMapping("/topic/initial")
    public InitialData handleSubscription() {
        return new InitialData("Welcome!", System.currentTimeMillis());
    }

    // User-specific subscription
    @SubscribeMapping("/user/queue/position")
    public Position getCurrentPosition() {
        return new Position(0, 0);
    }

    // Combined with MessageMapping
    @MessageMapping("/data")
    @SubscribeMapping("/topic/data")
    public String handleDataSubscription() {
        return "Current data state";
    }
}

class InitialData {
    private String message;
    private long timestamp;
    public InitialData(String message, long timestamp) {
        this.message = message;
        this.timestamp = timestamp;
    }
    public String getMessage() { return message; }
    public long getTimestamp() { return timestamp; }
}

class Position {
    private int x, y;
    public Position(int x, int y) {
        this.x = x;
        this.y = y;
    }
    public int getX() { return x; }
    public int getY() { return y; }
}

@SendToUser Annotation

Sends return value to user-specific destinations.

/**
 * Annotation that indicates the return value of a message-handling method
 * should be sent as a Message to the user-specific destination(s).
 */
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SendToUser {
    /**
     * One or more user destinations.
     */
    String[] value() default {};

    /**
     * Alias for value().
     */
    String[] destinations() default {};

    /**
     * Whether to broadcast to all sessions for the user or just the session
     * that sent the message. Default is true (broadcast to all sessions).
     */
    boolean broadcast() default true;
}

Usage Examples:

import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.handler.annotation.MessageMapping;

@Controller
public class UserMessagingController {

    // Send reply to user who sent the message
    @MessageMapping("/private")
    @SendToUser("/queue/reply")
    public String handlePrivateMessage(String message) {
        return "Reply: " + message;
    }

    // Send to specific user destinations
    @MessageMapping("/notification")
    @SendToUser({"/queue/notifications", "/queue/alerts"})
    public Notification handleNotification(String data) {
        return new Notification(data, "high");
    }

    // Broadcast to all user sessions
    @MessageMapping("/sync")
    @SendToUser(value = "/queue/sync", broadcast = true)
    public SyncData handleSync(String request) {
        return new SyncData(request, System.currentTimeMillis());
    }

    // Send only to requesting session
    @MessageMapping("/session")
    @SendToUser(value = "/queue/session", broadcast = false)
    public String handleSessionMessage(String message) {
        return "Session response: " + message;
    }
}

class SyncData {
    private String request;
    private long timestamp;
    public SyncData(String request, long timestamp) {
        this.request = request;
        this.timestamp = timestamp;
    }
    public String getRequest() { return request; }
    public long getTimestamp() { return timestamp; }
}

SimpMessageHeaderAccessor Class

Accessor for SIMP message headers.

/**
 * A base class for working with message headers in simple messaging protocols
 * that support basic messaging patterns.
 */
public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
    public static final String DESTINATION_HEADER = "simpDestination";
    public static final String MESSAGE_TYPE_HEADER = "simpMessageType";
    public static final String SESSION_ID_HEADER = "simpSessionId";
    public static final String SESSION_ATTRIBUTES = "simpSessionAttributes";
    public static final String SUBSCRIPTION_ID_HEADER = "simpSubscriptionId";
    public static final String USER_HEADER = "simpUser";
    public static final String CONNECT_MESSAGE_HEADER = "simpConnectMessage";
    public static final String DISCONNECT_MESSAGE_HEADER = "simpDisconnectMessage";
    public static final String HEART_BEAT_HEADER = "simpHeartbeat";
    public static final String ORIGINAL_DESTINATION = "simpOriginalDestination";

    protected SimpMessageHeaderAccessor(SimpMessageType messageType, Map<String, List<String>> externalSourceHeaders);

    public static SimpMessageHeaderAccessor create(SimpMessageType messageType);
    public static SimpMessageHeaderAccessor wrap(Message<?> message);

    public void setMessageTypeIfNotSet(SimpMessageType messageType);
    public SimpMessageType getMessageType();

    public void setDestination(String destination);
    public String getDestination();

    public void setSubscriptionId(String subscriptionId);
    public String getSubscriptionId();

    public void setSessionId(String sessionId);
    public String getSessionId();

    public void setSessionAttributes(Map<String, Object> attributes);
    public Map<String, Object> getSessionAttributes();

    public void setUser(Principal principal);
    public Principal getUser();

    public long[] getHeartbeat();
    public void setHeartbeat(long cx, long cy);
}

SimpMessageType Enum

Types of simple messaging protocol messages.

/**
 * A generic representation of different kinds of messages found in
 * simple messaging protocols like STOMP.
 */
public enum SimpMessageType {
    CONNECT,
    CONNECT_ACK,
    MESSAGE,
    SUBSCRIBE,
    UNSUBSCRIBE,
    HEARTBEAT,
    DISCONNECT,
    DISCONNECT_ACK,
    OTHER
}

StompFrameHandler Interface

Contract for handling STOMP frames received from subscriptions.

/**
 * Contract to handle a STOMP frame.
 */
public interface StompFrameHandler {
    /**
     * Invoked before handleFrame to determine the type of Object
     * the payload should be converted to.
     * @param headers the headers of a message
     * @return the target type for the payload
     */
    Type getPayloadType(StompHeaders headers);

    /**
     * Handle a STOMP frame with the payload converted to the target type.
     * @param headers the headers of the frame
     * @param payload the payload, or null if there was no payload
     */
    void handleFrame(StompHeaders headers, Object payload);
}

StompSessionHandler Interface

Contract for session-level STOMP events.

/**
 * Contract for handling STOMP session lifecycle events.
 */
public interface StompSessionHandler extends StompFrameHandler {
    /**
     * Invoked when the STOMP session is established.
     * @param session the client STOMP session
     * @param connectedHeaders the CONNECTED frame headers
     */
    void afterConnected(StompSession session, StompHeaders connectedHeaders);

    /**
     * Handle any exception that arises during STOMP message handling.
     * @param session the client STOMP session
     * @param command the STOMP command, may be null
     * @param headers the headers
     * @param payload the raw payload
     * @param exception the exception
     */
    void handleException(StompSession session, StompCommand command,
                        StompHeaders headers, byte[] payload, Throwable exception);

    /**
     * Handle a low-level transport error.
     * @param session the client STOMP session
     * @param exception the exception that occurred
     */
    void handleTransportError(StompSession session, Throwable exception);

    // Inherits getPayloadType and handleFrame from StompFrameHandler
}

StompSessionHandlerAdapter Class

Abstract adapter class for StompSessionHandler with default empty implementations.

/**
 * Abstract adapter class for StompSessionHandler with mostly empty
 * implementation methods except for getPayloadType which returns String.
 */
public abstract class StompSessionHandlerAdapter implements StompSessionHandler {
    /**
     * This implementation returns String as the expected payload type
     * for STOMP ERROR frames.
     */
    @Override
    public Type getPayloadType(StompHeaders headers);

    /**
     * This implementation is empty.
     */
    @Override
    public void handleFrame(StompHeaders headers, Object payload);

    /**
     * This implementation is empty.
     */
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders);

    /**
     * This implementation is empty.
     */
    @Override
    public void handleException(StompSession session, StompCommand command,
                               StompHeaders headers, byte[] payload, Throwable exception);

    /**
     * This implementation is empty.
     */
    @Override
    public void handleTransportError(StompSession session, Throwable exception);
}

Complete STOMP Server Example

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // Register STOMP endpoint
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // Enable simple in-memory broker
        registry.enableSimpleBroker("/topic", "/queue");

        // Set application destination prefix
        registry.setApplicationDestinationPrefixes("/app");

        // Set user destination prefix
        registry.setUserDestinationPrefix("/user");
    }
}

@Controller
public class StompController {

    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public ChatMessage handleChatMessage(ChatMessage message) {
        message.setTimestamp(System.currentTimeMillis());
        return message;
    }

    @MessageMapping("/private")
    @SendToUser("/queue/reply")
    public String handlePrivateMessage(String message, Principal principal) {
        return "Hello " + principal.getName() + ": " + message;
    }

    @SubscribeMapping("/topic/greetings")
    public String handleSubscription() {
        return "Welcome to the chat!";
    }
}

class ChatMessage {
    private String content;
    private String sender;
    private long timestamp;

    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
    public String getSender() { return sender; }
    public void setSender(String sender) { this.sender = sender; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}