or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-support.mdcore-messaging.mdhandler-annotations.mdindex.mdmessage-converters.mdmessaging-templates.mdrsocket.mdsimp-configuration.mdstomp-websocket.md
tile.json

index.mddocs/

Spring Messaging

Spring Messaging (org.springframework.messaging) is Spring Framework's comprehensive messaging abstraction layer that provides a unified programming model for message-based applications. It supports multiple messaging protocols including STOMP over WebSocket, RSocket, and provides building blocks for custom messaging solutions. The library offers core messaging abstractions, annotation-driven handler methods, flexible message conversion, and both synchronous and reactive programming models.

Package Information

  • Package Name: org.springframework/spring-messaging
  • Package Type: maven
  • Language: Java
  • Version: 7.0.1
  • Installation:

Maven:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
    <version>7.0.1</version>
</dependency>

Gradle:

implementation 'org.springframework:spring-messaging:7.0.1'

Core Imports

// Core messaging abstractions
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;

// Message building and support
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.GenericMessage;

// Messaging templates
import org.springframework.messaging.core.GenericMessagingTemplate;
import org.springframework.messaging.simp.SimpMessagingTemplate;

// Handler annotations
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.DestinationVariable;

// Message converters
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;

// STOMP support
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompCommand;

// RSocket support
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;

Basic Usage

Simple Message Creation and Sending

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.core.GenericMessagingTemplate;
import org.springframework.messaging.support.ExecutorSubscribableChannel;

// Create a message channel
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();

// Create a messaging template
GenericMessagingTemplate template = new GenericMessagingTemplate(channel);

// Build and send a message
Message<String> message = MessageBuilder
    .withPayload("Hello, World!")
    .setHeader("priority", "high")
    .setHeader("timestamp", System.currentTimeMillis())
    .build();

template.send(message);

// Or use convertAndSend for automatic conversion
template.convertAndSend("myDestination", "Hello, World!");

Annotation-Based Message Handling

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class ChatController {

    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public String handleChatMessage(
            @Payload String message,
            @Header("sender") String sender) {
        return sender + ": " + message;
    }

    @MessageMapping("/greet/{name}")
    public String handleGreeting(
            @DestinationVariable String name,
            @Payload String message) {
        return "Hello " + name + ", " + message;
    }
}

Message Conversion

import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.Arrays;

// Configure message converters
MappingJackson2MessageConverter jsonConverter = new MappingJackson2MessageConverter();
StringMessageConverter stringConverter = new StringMessageConverter();

CompositeMessageConverter converter = new CompositeMessageConverter(
    Arrays.asList(jsonConverter, stringConverter)
);

// Use with template
GenericMessagingTemplate template = new GenericMessagingTemplate(channel);
template.setMessageConverter(converter);

// Send objects that will be automatically converted to JSON
MyData data = new MyData("value1", 42);
template.convertAndSend("/topic/data", data);

Architecture

Spring Messaging consists of four key architectural layers:

  1. Core Abstractions: Message (payload + headers), MessageChannel (send messages), MessageHandler (process messages), MessageHeaders (immutable metadata)

  2. Handler Framework: Annotation-driven message handling (@MessageMapping, @Payload, @Header), argument resolution, return value handling, reactive support

  3. Protocol Support: STOMP over WebSocket, RSocket (4 interaction models), TCP with Reactor Netty

  4. Conversion System: MessageConverter interface with built-in support for JSON (Jackson), XML, Protobuf, String, ByteArray; content-type-based automatic selection

Capabilities

Core Messaging

Fundamental messaging abstractions including Message interface, MessageChannel implementations, MessageHandler, and MessageHeaders for building message-based applications.

// Message - encapsulates payload and headers
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

// MessageChannel - abstraction for sending messages
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1;
    default boolean send(Message<?> message);
    boolean send(Message<?> message, long timeout);
}

// SubscribableChannel - for handler subscription
public interface SubscribableChannel extends MessageChannel {
    boolean subscribe(MessageHandler handler);
    boolean unsubscribe(MessageHandler handler);
}

// MessageHandler - callback for processing messages
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

// MessageHeaders - immutable message metadata
public class MessageHeaders implements Map<String, Object>, Serializable {
    public static final String ID = "id";
    public static final String TIMESTAMP = "timestamp";
    public static final String CONTENT_TYPE = "contentType";
    public static final String REPLY_CHANNEL = "replyChannel";
    public static final String ERROR_CHANNEL = "errorChannel";

    UUID getId();
    Long getTimestamp();
    Object getReplyChannel();
    Object getErrorChannel();
    <T> T get(Object key, Class<T> type);
}

Core Messaging

Message Converters

Comprehensive message payload conversion supporting JSON (Jackson, Gson, JSON-B, Kotlin Serialization), XML, Protobuf, String, ByteArray, and custom formats with content-type-based automatic selection.

// MessageConverter - base conversion interface
public interface MessageConverter {
    Object fromMessage(Message<?> message, Class<?> targetClass);
    Message<?> toMessage(Object payload, MessageHeaders headers);
}

// SmartMessageConverter - with conversion hints
public interface SmartMessageConverter extends MessageConverter {
    Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint);
    Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint);
}

// Jackson JSON converter (most commonly used)
public class MappingJackson2MessageConverter extends AbstractMessageConverter {
    public void setObjectMapper(ObjectMapper objectMapper);
    public ObjectMapper getObjectMapper();
    public void setPrettyPrint(boolean prettyPrint);
}

// Composite converter for multiple formats
public class CompositeMessageConverter implements SmartMessageConverter {
    public CompositeMessageConverter(Collection<MessageConverter> converters);
    public List<MessageConverter> getConverters();
}

// Protobuf support
public class ProtobufMessageConverter extends AbstractMessageConverter {
    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
    public static final MimeType PROTOBUF = new MimeType("application", "x-protobuf", DEFAULT_CHARSET);
}

Message Converters

Messaging Templates

High-level templates for simplified message sending and receiving with destination resolution, message conversion, and request-reply patterns.

// GenericMessagingTemplate - for any channel type
public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> {
    public GenericMessagingTemplate(MessageChannel defaultDestination);
    public void setDefaultDestination(MessageChannel defaultDestination);
    public void setMessageConverter(MessageConverter messageConverter);
    public void setSendTimeout(long sendTimeout);
    public void setReceiveTimeout(long receiveTimeout);
}

// SimpMessagingTemplate - for user-specific destinations
public class SimpMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<String> {
    public SimpMessagingTemplate(MessageChannel messageChannel);
    public void setUserDestinationPrefix(String prefix);
    public String getUserDestinationPrefix();

    // Send to specific user
    public void convertAndSendToUser(String user, String destination, Object payload);
    public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers);
}

// Core operations
public interface MessageSendingOperations<D> {
    void send(D destination, Message<?> message);
    <T> void convertAndSend(D destination, T payload);
    <T> void convertAndSend(D destination, T payload, Map<String, Object> headers);
}

// Request-reply operations
public interface MessageRequestReplyOperations<D> {
    Message<?> sendAndReceive(D destination, Message<?> requestMessage);
    <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass);
}

Messaging Templates

Handler Annotations

Annotation-driven message handling with @MessageMapping, @Payload, @Header, @Headers, @DestinationVariable, @SendTo for declarative message processing.

// @MessageMapping - maps messages to handler methods
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface MessageMapping {
    String[] value() default {};
}

// @Payload - extracts message payload with optional validation
@Target(ElementType.PARAMETER)
public @interface Payload {
    String expression() default "";
    boolean required() default true;
}

// @Header - extracts single header value
@Target(ElementType.PARAMETER)
public @interface Header {
    String value() default "";
    String name() default "";
    boolean required() default true;
    String defaultValue() default ValueConstants.DEFAULT_NONE;
}

// @Headers - binds all message headers to Map
@Target(ElementType.PARAMETER)
public @interface Headers {
}

// @DestinationVariable - extracts URI template variables
@Target(ElementType.PARAMETER)
public @interface DestinationVariable {
    String value() default "";
}

// @SendTo - specifies reply destination
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SendTo {
    String[] value() default {};
}

// @MessageExceptionHandler - handles exceptions from message handlers
@Target(ElementType.METHOD)
public @interface MessageExceptionHandler {
    Class<? extends Throwable>[] value() default {};
}

Handler Annotations

STOMP over WebSocket

Complete STOMP protocol implementation for WebSocket communication including session management, broker relay, subscription handling, and client/server support.

// StompSession - client-side session
public interface StompSession {
    String getSessionId();
    boolean isConnected();
    Receiptable send(String destination, Object payload);
    Receiptable send(StompHeaders headers, Object payload);
    Subscription subscribe(String destination, StompFrameHandler handler);
    Subscription subscribe(StompHeaders headers, StompFrameHandler handler);
    void disconnect();
}

// StompHeaders - STOMP-specific headers
public class StompHeaders implements MultiValueMap<String, String>, Serializable {
    public void setDestination(String destination);
    public String getDestination();
    public void setContentType(MimeType contentType);
    public MimeType getContentType();
    public void setSubscriptionId(String subscriptionId);
    public void setReceipt(String receipt);
    public void setHeartbeat(long[] heartbeat);
}

// StompCommand - STOMP frame commands
public enum StompCommand {
    CONNECT, STOMP, CONNECTED, SEND, SUBSCRIBE, UNSUBSCRIBE,
    ACK, NACK, BEGIN, COMMIT, ABORT, DISCONNECT, MESSAGE,
    RECEIPT, ERROR
}

// Annotation support
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SubscribeMapping {
    String[] value() default {};
}

@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SendToUser {
    String[] value() default {};
    String[] destinations() default {};
    boolean broadcast() default true;
}

STOMP over WebSocket

RSocket Support

RSocket protocol integration with reactive streams, supporting request-response, fire-and-forget, request-stream, and bidirectional channel interaction models.

// RSocketRequester - fluent API for RSocket requests
public interface RSocketRequester extends Disposable {
    RequestSpec route(String route);
    RequestSpec route(String route, Object... routeVars);

    interface RequestSpec {
        RequestSpec metadata(Object metadata, MimeType mimeType);
        ResponseSpec data(Object data);
        ResponseSpec data(Publisher<?> data);
    }

    interface ResponseSpec {
        <T> Mono<T> retrieveMono(Class<T> dataType);
        <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataType);
        <T> Flux<T> retrieveFlux(Class<T> dataType);
        <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataType);
    }

    interface Builder {
        Builder dataMimeType(MimeType mimeType);
        Builder metadataMimeType(MimeType mimeType);
        Builder rsocketStrategies(RSocketStrategies strategies);
        RSocketRequester tcp(String host, int port);
        RSocketRequester websocket(URI uri);
        Mono<RSocketRequester> connectTcp(String host, int port);
        Mono<RSocketRequester> connectWebSocket(URI uri);
    }
}

// RSocketStrategies - configuration for encoders/decoders
public interface RSocketStrategies {
    List<Encoder<?>> encoders();
    List<Decoder<?>> decoders();

    interface Builder {
        Builder encoder(Encoder<?>... encoders);
        Builder decoder(Decoder<?>... decoders);
        Builder routeMatcher(RouteMatcher routeMatcher);
        RSocketStrategies build();
    }

    static Builder builder();
}

// @ConnectMapping - handle RSocket connection setup
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ConnectMapping {
    String[] value() default {};
}

RSocket Support

Channel Support

MessageChannel implementations, interceptors, MessageBuilder utilities, and advanced channel features for message routing and processing pipelines.

// ExecutorSubscribableChannel - async message handling
public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
    public ExecutorSubscribableChannel();
    public ExecutorSubscribableChannel(Executor executor);
    public void setExecutor(Executor executor);
    public void setInterceptors(List<ChannelInterceptor> interceptors);
    public void addInterceptor(ChannelInterceptor interceptor);
}

// PublishSubscribeChannel - broadcasts to all subscribers
public class PublishSubscribeChannel extends AbstractSubscribableChannel {
    public PublishSubscribeChannel();
    public PublishSubscribeChannel(Executor executor);
    public void setTaskExecutor(Executor executor);
    public void setApplySequence(boolean applySequence);
    public void setMinSubscribers(int minSubscribers);
}

// ChannelInterceptor - intercept channel operations
public interface ChannelInterceptor {
    Message<?> preSend(Message<?> message, MessageChannel channel);
    void postSend(Message<?> message, MessageChannel channel, boolean sent);
    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
    boolean preReceive(MessageChannel channel);
    Message<?> postReceive(Message<?> message, MessageChannel channel);
    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

// MessageBuilder - fluent message construction
public final class MessageBuilder<T> {
    static <T> MessageBuilder<T> withPayload(T payload);
    static <T> MessageBuilder<T> fromMessage(Message<T> message);

    MessageBuilder<T> setHeader(String headerName, Object headerValue);
    MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
    MessageBuilder<T> removeHeader(String headerName);
    MessageBuilder<T> copyHeaders(Map<String, ?> headers);
    MessageBuilder<T> setReplyChannel(MessageChannel replyChannel);
    MessageBuilder<T> setErrorChannel(MessageChannel errorChannel);

    Message<T> build();
}

Channel Support

SIMP Broker Configuration

Configuration DSL for Simple Messaging Protocol including message broker registration, STOMP endpoints, destination prefixes, and broker relay setup.

// MessageBrokerRegistry - configure message brokers
public class MessageBrokerRegistry {
    public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes);
    public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes);
    public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes);
    public MessageBrokerRegistry setUserDestinationPrefix(String prefix);
    public ChannelRegistration configureBrokerChannel();
}

// SimpleBrokerRegistration - in-memory broker
public class SimpleBrokerRegistration {
    public SimpleBrokerRegistration setHeartbeatValue(long[] heartbeat);
    public SimpleBrokerRegistration setTaskScheduler(TaskScheduler taskScheduler);
    public SimpleBrokerRegistration setSelectorHeaderName(String selectorHeaderName);
}

// StompBrokerRelayRegistration - external STOMP broker
public class StompBrokerRelayRegistration {
    public StompBrokerRelayRegistration setRelayHost(String relayHost);
    public StompBrokerRelayRegistration setRelayPort(int relayPort);
    public StompBrokerRelayRegistration setClientLogin(String login);
    public StompBrokerRelayRegistration setClientPasscode(String passcode);
    public StompBrokerRelayRegistration setSystemLogin(String login);
    public StompBrokerRelayRegistration setSystemPasscode(String passcode);
    public StompBrokerRelayRegistration setVirtualHost(String virtualHost);
}

SIMP Broker Configuration

Exception Handling

// Base messaging exception
public class MessagingException extends RuntimeException {
    public MessagingException(Message<?> message);
    public MessagingException(String description);
    public MessagingException(Message<?> message, String description, Throwable cause);
    public Message<?> getFailedMessage();
}

// Specific exception types
public class MessageDeliveryException extends MessagingException {
    // Thrown when message cannot be delivered to channel
}

public class MessageHandlingException extends MessagingException {
    // Thrown when handler fails to process message
}

public class MessageConversionException extends MessagingException {
    // Thrown when message conversion fails
}

Types

Core Types

// Reactive message handler
public interface ReactiveMessageHandler {
    Mono<Void> handleMessage(Message<?> message);
}

// Pollable channel for active message retrieval
public interface PollableChannel extends MessageChannel {
    Message<?> receive();
    Message<?> receive(long timeout);
}

// Message post-processor for modification before sending
@FunctionalInterface
public interface MessagePostProcessor {
    Message<?> postProcessMessage(Message<?> message);
}

// Destination resolver
@FunctionalInterface
public interface DestinationResolver<D> {
    D resolveDestination(String name) throws DestinationResolutionException;
}

// Generic message implementation
public class GenericMessage<T> implements Message<T>, Serializable {
    public GenericMessage(T payload);
    public GenericMessage(T payload, Map<String, Object> headers);
}