Spring Messaging (org.springframework.messaging) is Spring Framework's comprehensive messaging abstraction layer that provides a unified programming model for message-based applications. It supports multiple messaging protocols including STOMP over WebSocket, RSocket, and provides building blocks for custom messaging solutions. The library offers core messaging abstractions, annotation-driven handler methods, flexible message conversion, and both synchronous and reactive programming models.
Maven:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>7.0.1</version>
</dependency>Gradle:
implementation 'org.springframework:spring-messaging:7.0.1'// Core messaging abstractions
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
// Message building and support
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.GenericMessage;
// Messaging templates
import org.springframework.messaging.core.GenericMessagingTemplate;
import org.springframework.messaging.simp.SimpMessagingTemplate;
// Handler annotations
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.DestinationVariable;
// Message converters
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
// STOMP support
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompCommand;
// RSocket support
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.core.GenericMessagingTemplate;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
// Create a message channel
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
// Create a messaging template
GenericMessagingTemplate template = new GenericMessagingTemplate(channel);
// Build and send a message
Message<String> message = MessageBuilder
.withPayload("Hello, World!")
.setHeader("priority", "high")
.setHeader("timestamp", System.currentTimeMillis())
.build();
template.send(message);
// Or use convertAndSend for automatic conversion
template.convertAndSend("myDestination", "Hello, World!");import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class ChatController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
public String handleChatMessage(
@Payload String message,
@Header("sender") String sender) {
return sender + ": " + message;
}
@MessageMapping("/greet/{name}")
public String handleGreeting(
@DestinationVariable String name,
@Payload String message) {
return "Hello " + name + ", " + message;
}
}import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.Arrays;
// Configure message converters
MappingJackson2MessageConverter jsonConverter = new MappingJackson2MessageConverter();
StringMessageConverter stringConverter = new StringMessageConverter();
CompositeMessageConverter converter = new CompositeMessageConverter(
Arrays.asList(jsonConverter, stringConverter)
);
// Use with template
GenericMessagingTemplate template = new GenericMessagingTemplate(channel);
template.setMessageConverter(converter);
// Send objects that will be automatically converted to JSON
MyData data = new MyData("value1", 42);
template.convertAndSend("/topic/data", data);Spring Messaging consists of four key architectural layers:
Core Abstractions: Message (payload + headers), MessageChannel (send messages), MessageHandler (process messages), MessageHeaders (immutable metadata)
Handler Framework: Annotation-driven message handling (@MessageMapping, @Payload, @Header), argument resolution, return value handling, reactive support
Protocol Support: STOMP over WebSocket, RSocket (4 interaction models), TCP with Reactor Netty
Conversion System: MessageConverter interface with built-in support for JSON (Jackson), XML, Protobuf, String, ByteArray; content-type-based automatic selection
Fundamental messaging abstractions including Message interface, MessageChannel implementations, MessageHandler, and MessageHeaders for building message-based applications.
// Message - encapsulates payload and headers
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
// MessageChannel - abstraction for sending messages
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message);
boolean send(Message<?> message, long timeout);
}
// SubscribableChannel - for handler subscription
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
// MessageHandler - callback for processing messages
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
// MessageHeaders - immutable message metadata
public class MessageHeaders implements Map<String, Object>, Serializable {
public static final String ID = "id";
public static final String TIMESTAMP = "timestamp";
public static final String CONTENT_TYPE = "contentType";
public static final String REPLY_CHANNEL = "replyChannel";
public static final String ERROR_CHANNEL = "errorChannel";
UUID getId();
Long getTimestamp();
Object getReplyChannel();
Object getErrorChannel();
<T> T get(Object key, Class<T> type);
}Comprehensive message payload conversion supporting JSON (Jackson, Gson, JSON-B, Kotlin Serialization), XML, Protobuf, String, ByteArray, and custom formats with content-type-based automatic selection.
// MessageConverter - base conversion interface
public interface MessageConverter {
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, MessageHeaders headers);
}
// SmartMessageConverter - with conversion hints
public interface SmartMessageConverter extends MessageConverter {
Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint);
Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint);
}
// Jackson JSON converter (most commonly used)
public class MappingJackson2MessageConverter extends AbstractMessageConverter {
public void setObjectMapper(ObjectMapper objectMapper);
public ObjectMapper getObjectMapper();
public void setPrettyPrint(boolean prettyPrint);
}
// Composite converter for multiple formats
public class CompositeMessageConverter implements SmartMessageConverter {
public CompositeMessageConverter(Collection<MessageConverter> converters);
public List<MessageConverter> getConverters();
}
// Protobuf support
public class ProtobufMessageConverter extends AbstractMessageConverter {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
public static final MimeType PROTOBUF = new MimeType("application", "x-protobuf", DEFAULT_CHARSET);
}High-level templates for simplified message sending and receiving with destination resolution, message conversion, and request-reply patterns.
// GenericMessagingTemplate - for any channel type
public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> {
public GenericMessagingTemplate(MessageChannel defaultDestination);
public void setDefaultDestination(MessageChannel defaultDestination);
public void setMessageConverter(MessageConverter messageConverter);
public void setSendTimeout(long sendTimeout);
public void setReceiveTimeout(long receiveTimeout);
}
// SimpMessagingTemplate - for user-specific destinations
public class SimpMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<String> {
public SimpMessagingTemplate(MessageChannel messageChannel);
public void setUserDestinationPrefix(String prefix);
public String getUserDestinationPrefix();
// Send to specific user
public void convertAndSendToUser(String user, String destination, Object payload);
public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers);
}
// Core operations
public interface MessageSendingOperations<D> {
void send(D destination, Message<?> message);
<T> void convertAndSend(D destination, T payload);
<T> void convertAndSend(D destination, T payload, Map<String, Object> headers);
}
// Request-reply operations
public interface MessageRequestReplyOperations<D> {
Message<?> sendAndReceive(D destination, Message<?> requestMessage);
<T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass);
}Annotation-driven message handling with @MessageMapping, @Payload, @Header, @Headers, @DestinationVariable, @SendTo for declarative message processing.
// @MessageMapping - maps messages to handler methods
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface MessageMapping {
String[] value() default {};
}
// @Payload - extracts message payload with optional validation
@Target(ElementType.PARAMETER)
public @interface Payload {
String expression() default "";
boolean required() default true;
}
// @Header - extracts single header value
@Target(ElementType.PARAMETER)
public @interface Header {
String value() default "";
String name() default "";
boolean required() default true;
String defaultValue() default ValueConstants.DEFAULT_NONE;
}
// @Headers - binds all message headers to Map
@Target(ElementType.PARAMETER)
public @interface Headers {
}
// @DestinationVariable - extracts URI template variables
@Target(ElementType.PARAMETER)
public @interface DestinationVariable {
String value() default "";
}
// @SendTo - specifies reply destination
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SendTo {
String[] value() default {};
}
// @MessageExceptionHandler - handles exceptions from message handlers
@Target(ElementType.METHOD)
public @interface MessageExceptionHandler {
Class<? extends Throwable>[] value() default {};
}Complete STOMP protocol implementation for WebSocket communication including session management, broker relay, subscription handling, and client/server support.
// StompSession - client-side session
public interface StompSession {
String getSessionId();
boolean isConnected();
Receiptable send(String destination, Object payload);
Receiptable send(StompHeaders headers, Object payload);
Subscription subscribe(String destination, StompFrameHandler handler);
Subscription subscribe(StompHeaders headers, StompFrameHandler handler);
void disconnect();
}
// StompHeaders - STOMP-specific headers
public class StompHeaders implements MultiValueMap<String, String>, Serializable {
public void setDestination(String destination);
public String getDestination();
public void setContentType(MimeType contentType);
public MimeType getContentType();
public void setSubscriptionId(String subscriptionId);
public void setReceipt(String receipt);
public void setHeartbeat(long[] heartbeat);
}
// StompCommand - STOMP frame commands
public enum StompCommand {
CONNECT, STOMP, CONNECTED, SEND, SUBSCRIBE, UNSUBSCRIBE,
ACK, NACK, BEGIN, COMMIT, ABORT, DISCONNECT, MESSAGE,
RECEIPT, ERROR
}
// Annotation support
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SubscribeMapping {
String[] value() default {};
}
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SendToUser {
String[] value() default {};
String[] destinations() default {};
boolean broadcast() default true;
}RSocket protocol integration with reactive streams, supporting request-response, fire-and-forget, request-stream, and bidirectional channel interaction models.
// RSocketRequester - fluent API for RSocket requests
public interface RSocketRequester extends Disposable {
RequestSpec route(String route);
RequestSpec route(String route, Object... routeVars);
interface RequestSpec {
RequestSpec metadata(Object metadata, MimeType mimeType);
ResponseSpec data(Object data);
ResponseSpec data(Publisher<?> data);
}
interface ResponseSpec {
<T> Mono<T> retrieveMono(Class<T> dataType);
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataType);
<T> Flux<T> retrieveFlux(Class<T> dataType);
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataType);
}
interface Builder {
Builder dataMimeType(MimeType mimeType);
Builder metadataMimeType(MimeType mimeType);
Builder rsocketStrategies(RSocketStrategies strategies);
RSocketRequester tcp(String host, int port);
RSocketRequester websocket(URI uri);
Mono<RSocketRequester> connectTcp(String host, int port);
Mono<RSocketRequester> connectWebSocket(URI uri);
}
}
// RSocketStrategies - configuration for encoders/decoders
public interface RSocketStrategies {
List<Encoder<?>> encoders();
List<Decoder<?>> decoders();
interface Builder {
Builder encoder(Encoder<?>... encoders);
Builder decoder(Decoder<?>... decoders);
Builder routeMatcher(RouteMatcher routeMatcher);
RSocketStrategies build();
}
static Builder builder();
}
// @ConnectMapping - handle RSocket connection setup
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ConnectMapping {
String[] value() default {};
}MessageChannel implementations, interceptors, MessageBuilder utilities, and advanced channel features for message routing and processing pipelines.
// ExecutorSubscribableChannel - async message handling
public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
public ExecutorSubscribableChannel();
public ExecutorSubscribableChannel(Executor executor);
public void setExecutor(Executor executor);
public void setInterceptors(List<ChannelInterceptor> interceptors);
public void addInterceptor(ChannelInterceptor interceptor);
}
// PublishSubscribeChannel - broadcasts to all subscribers
public class PublishSubscribeChannel extends AbstractSubscribableChannel {
public PublishSubscribeChannel();
public PublishSubscribeChannel(Executor executor);
public void setTaskExecutor(Executor executor);
public void setApplySequence(boolean applySequence);
public void setMinSubscribers(int minSubscribers);
}
// ChannelInterceptor - intercept channel operations
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
// MessageBuilder - fluent message construction
public final class MessageBuilder<T> {
static <T> MessageBuilder<T> withPayload(T payload);
static <T> MessageBuilder<T> fromMessage(Message<T> message);
MessageBuilder<T> setHeader(String headerName, Object headerValue);
MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
MessageBuilder<T> removeHeader(String headerName);
MessageBuilder<T> copyHeaders(Map<String, ?> headers);
MessageBuilder<T> setReplyChannel(MessageChannel replyChannel);
MessageBuilder<T> setErrorChannel(MessageChannel errorChannel);
Message<T> build();
}Configuration DSL for Simple Messaging Protocol including message broker registration, STOMP endpoints, destination prefixes, and broker relay setup.
// MessageBrokerRegistry - configure message brokers
public class MessageBrokerRegistry {
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes);
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes);
public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes);
public MessageBrokerRegistry setUserDestinationPrefix(String prefix);
public ChannelRegistration configureBrokerChannel();
}
// SimpleBrokerRegistration - in-memory broker
public class SimpleBrokerRegistration {
public SimpleBrokerRegistration setHeartbeatValue(long[] heartbeat);
public SimpleBrokerRegistration setTaskScheduler(TaskScheduler taskScheduler);
public SimpleBrokerRegistration setSelectorHeaderName(String selectorHeaderName);
}
// StompBrokerRelayRegistration - external STOMP broker
public class StompBrokerRelayRegistration {
public StompBrokerRelayRegistration setRelayHost(String relayHost);
public StompBrokerRelayRegistration setRelayPort(int relayPort);
public StompBrokerRelayRegistration setClientLogin(String login);
public StompBrokerRelayRegistration setClientPasscode(String passcode);
public StompBrokerRelayRegistration setSystemLogin(String login);
public StompBrokerRelayRegistration setSystemPasscode(String passcode);
public StompBrokerRelayRegistration setVirtualHost(String virtualHost);
}// Base messaging exception
public class MessagingException extends RuntimeException {
public MessagingException(Message<?> message);
public MessagingException(String description);
public MessagingException(Message<?> message, String description, Throwable cause);
public Message<?> getFailedMessage();
}
// Specific exception types
public class MessageDeliveryException extends MessagingException {
// Thrown when message cannot be delivered to channel
}
public class MessageHandlingException extends MessagingException {
// Thrown when handler fails to process message
}
public class MessageConversionException extends MessagingException {
// Thrown when message conversion fails
}// Reactive message handler
public interface ReactiveMessageHandler {
Mono<Void> handleMessage(Message<?> message);
}
// Pollable channel for active message retrieval
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
// Message post-processor for modification before sending
@FunctionalInterface
public interface MessagePostProcessor {
Message<?> postProcessMessage(Message<?> message);
}
// Destination resolver
@FunctionalInterface
public interface DestinationResolver<D> {
D resolveDestination(String name) throws DestinationResolutionException;
}
// Generic message implementation
public class GenericMessage<T> implements Message<T>, Serializable {
public GenericMessage(T payload);
public GenericMessage(T payload, Map<String, Object> headers);
}