Core messaging abstractions that form the foundation of Spring Messaging, including the Message interface, MessageChannel implementations, MessageHandler, and MessageHeaders for building message-based applications.
The central abstraction representing a message with a typed payload and headers.
/**
* A generic message representation with headers and body.
*
* @param <T> the payload type
*/
public interface Message<T> {
/**
* Return the message payload.
*/
T getPayload();
/**
* Return message headers for the message (never null).
*/
MessageHeaders getHeaders();
}Usage Examples:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import java.util.Map;
import java.util.HashMap;
// Create message with GenericMessage
Message<String> message1 = new GenericMessage<>("Hello World");
// Create message with headers
Map<String, Object> headers = new HashMap<>();
headers.put("priority", "high");
headers.put("timestamp", System.currentTimeMillis());
Message<String> message2 = new GenericMessage<>("Hello World", headers);
// Use MessageBuilder for fluent construction
Message<String> message3 = MessageBuilder
.withPayload("Hello World")
.setHeader("sender", "user123")
.setHeader("contentType", "text/plain")
.build();
// Access message data
String payload = message3.getPayload();
MessageHeaders headers = message3.getHeaders();
Object sender = headers.get("sender");Concrete implementation of Message with a generic payload type.
/**
* An implementation of Message with a generic payload.
* Once created, a GenericMessage is immutable.
*/
public class GenericMessage<T> implements Message<T>, Serializable {
/**
* Create a new message with the given payload.
* @param payload the message payload (never null)
*/
public GenericMessage(T payload);
/**
* Create a new message with the given payload and headers.
* @param payload the message payload (never null)
* @param headers message headers (may be null)
*/
public GenericMessage(T payload, Map<String, Object> headers);
/**
* Create a new message with the given payload and MessageHeaders.
* @param payload the message payload (never null)
* @param headers message headers
*/
public GenericMessage(T payload, MessageHeaders headers);
@Override
public T getPayload();
@Override
public MessageHeaders getHeaders();
@Override
public boolean equals(Object other);
@Override
public int hashCode();
@Override
public String toString();
}Immutable map-based container for message metadata with standard header names.
/**
* The headers for a Message.
* IMPORTANT: MessageHeaders is immutable. Use MessageHeaderAccessor or
* MessageBuilder to create messages with specific headers.
*/
public class MessageHeaders implements Map<String, Object>, Serializable {
/**
* A special UUID value to indicate that no ID should be generated.
*/
public static final UUID ID_VALUE_NONE = new UUID(0, 0);
/**
* The key for the Message ID. This is an automatically generated UUID and
* should be unique.
*/
public static final String ID = "id";
/**
* The key for the Message timestamp.
*/
public static final String TIMESTAMP = "timestamp";
/**
* The key for the content type header.
*/
public static final String CONTENT_TYPE = "contentType";
/**
* The key for the reply channel header.
*/
public static final String REPLY_CHANNEL = "replyChannel";
/**
* The key for the error channel header.
*/
public static final String ERROR_CHANNEL = "errorChannel";
/**
* Construct a MessageHeaders with the given headers.
*/
public MessageHeaders(Map<String, Object> headers);
/**
* Construct a MessageHeaders with the given headers and optional ID/timestamp generation.
*/
public MessageHeaders(Map<String, Object> headers, UUID id, Long timestamp);
/**
* Return the id header value, if present.
*/
@Nullable
public UUID getId();
/**
* Return the timestamp header value, if present.
*/
@Nullable
public Long getTimestamp();
/**
* Return the reply channel header value, if present.
*/
@Nullable
public Object getReplyChannel();
/**
* Return the error channel header value, if present.
*/
@Nullable
public Object getErrorChannel();
/**
* Return the value for the header with the given key as a specific type.
* @param key the key
* @param type the expected type (can be null)
* @return the value or null if not found
*/
@Nullable
public <T> T get(Object key, Class<T> type);
// Map interface methods
@Override
public boolean containsKey(Object key);
@Override
public boolean containsValue(Object value);
@Override
public Set<Map.Entry<String, Object>> entrySet();
@Override
@Nullable
public Object get(Object key);
@Override
public boolean isEmpty();
@Override
public Set<String> keySet();
@Override
public int size();
@Override
public Collection<Object> values();
@Override
public boolean equals(Object other);
@Override
public int hashCode();
@Override
public String toString();
}Usage Examples:
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import java.util.UUID;
Message<String> message = new GenericMessage<>("payload");
MessageHeaders headers = message.getHeaders();
// Access standard headers
UUID id = headers.getId();
Long timestamp = headers.getTimestamp();
// Access custom headers
String priority = headers.get("priority", String.class);
Integer timeout = headers.get("timeout", Integer.class);
// Check for header presence
if (headers.containsKey("contentType")) {
Object contentType = headers.get("contentType");
}
// Iterate through all headers
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}Base abstraction for sending messages with timeout support.
/**
* Defines methods for sending messages.
*/
public interface MessageChannel {
/**
* Constant for sending a message without a specified timeout.
*/
long INDEFINITE_TIMEOUT = -1;
/**
* Send a message, blocking indefinitely if necessary.
* @param message the message to send
* @return true if the message is sent successfully, false otherwise
*/
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
/**
* Send a message, blocking until either the message is accepted or the
* specified timeout period elapses.
* @param message the message to send
* @param timeout the timeout in milliseconds or INDEFINITE_TIMEOUT
* @return true if the message is sent successfully, false if the
* message cannot be sent within the timeout period
*/
boolean send(Message<?> message, long timeout);
}Usage Examples:
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
MessageChannel channel = new ExecutorSubscribableChannel();
Message<String> message = MessageBuilder
.withPayload("Hello")
.build();
// Send with indefinite timeout
boolean sent1 = channel.send(message);
// Send with specific timeout (5 seconds)
boolean sent2 = channel.send(message, 5000);
if (sent2) {
System.out.println("Message sent successfully");
} else {
System.out.println("Message sending timed out");
}MessageChannel that maintains a registry of subscribers for message distribution.
/**
* A MessageChannel that maintains a registry of subscribers and invokes
* them to handle messages sent through this channel.
*/
public interface SubscribableChannel extends MessageChannel {
/**
* Register a message handler.
* @param handler the handler to register
* @return true if the handler was registered, false if it was already registered
*/
boolean subscribe(MessageHandler handler);
/**
* Un-register a message handler.
* @param handler the handler to un-register
* @return true if the handler was un-registered, false if it was not registered
*/
boolean unsubscribe(MessageHandler handler);
}Usage Examples:
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
SubscribableChannel channel = new ExecutorSubscribableChannel();
// Subscribe handlers
MessageHandler handler1 = message -> {
System.out.println("Handler 1: " + message.getPayload());
};
MessageHandler handler2 = message -> {
System.out.println("Handler 2: " + message.getPayload());
};
channel.subscribe(handler1);
channel.subscribe(handler2);
// Send message - both handlers will be invoked
Message<String> message = MessageBuilder
.withPayload("Hello Subscribers")
.build();
channel.send(message);
// Unsubscribe handler
channel.unsubscribe(handler1);MessageChannel from which messages may be actively received through polling.
/**
* A MessageChannel from which messages may be actively received through polling.
*/
public interface PollableChannel extends MessageChannel {
/**
* Receive a message from this channel, blocking indefinitely if necessary.
* @return the next available Message or null if interrupted
*/
@Nullable
Message<?> receive();
/**
* Receive a message from this channel, blocking until either a message is
* available or the specified timeout period elapses.
* @param timeout the timeout in milliseconds or INDEFINITE_TIMEOUT
* @return the next available Message or null if the specified timeout
* period elapses or interrupted
*/
@Nullable
Message<?> receive(long timeout);
}Callback interface for processing messages.
/**
* Simple contract for handling a Message.
*/
@FunctionalInterface
public interface MessageHandler {
/**
* Handle the given message.
* @param message the message to be handled
* @throws MessagingException if the handler failed to process the message
*/
void handleMessage(Message<?> message) throws MessagingException;
}Usage Examples:
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
// Lambda-based handler
MessageHandler simpleHandler = message -> {
System.out.println("Received: " + message.getPayload());
};
// Class-based handler with error handling
MessageHandler complexHandler = new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
String payload = (String) message.getPayload();
processPayload(payload);
} catch (Exception e) {
throw new MessagingException(message, "Failed to process message", e);
}
}
private void processPayload(String payload) {
// Processing logic
}
};
// Subscribe handler to channel
SubscribableChannel channel = new ExecutorSubscribableChannel();
channel.subscribe(simpleHandler);
channel.subscribe(complexHandler);Reactive variant of MessageHandler returning a Mono for async processing.
/**
* Contract for handling a Message with reactive, non-blocking behavior.
*/
@FunctionalInterface
public interface ReactiveMessageHandler {
/**
* Handle the given message.
* @param message the message to handle
* @return a Mono that completes when the message is handled
*/
Mono<Void> handleMessage(Message<?> message);
}Usage Examples:
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.messaging.Message;
import reactor.core.publisher.Mono;
import java.time.Duration;
// Simple reactive handler
ReactiveMessageHandler handler1 = message -> {
System.out.println("Processing: " + message.getPayload());
return Mono.empty();
};
// Async processing with delay
ReactiveMessageHandler handler2 = message -> {
return Mono.fromRunnable(() -> {
// Simulate async processing
System.out.println("Processing: " + message.getPayload());
}).delayElement(Duration.ofMillis(100)).then();
};
// Error handling in reactive handler
ReactiveMessageHandler handler3 = message -> {
return Mono.fromCallable(() -> {
if (message.getPayload() == null) {
throw new IllegalArgumentException("Payload cannot be null");
}
return processMessage(message);
})
.then()
.onErrorResume(error -> {
System.err.println("Error processing message: " + error.getMessage());
return Mono.empty();
});
};
private static Object processMessage(Message<?> message) {
// Processing logic
return message.getPayload();
}Base exception for all messaging-related errors.
/**
* The base exception for any failures related to messaging.
*/
public class MessagingException extends RuntimeException {
/**
* Create a new MessagingException.
* @param description the detail message
*/
public MessagingException(String description);
/**
* Create a new MessagingException.
* @param description the detail message
* @param cause the root cause
*/
public MessagingException(String description, Throwable cause);
/**
* Create a new MessagingException.
* @param failedMessage the message that failed
*/
public MessagingException(Message<?> failedMessage);
/**
* Create a new MessagingException.
* @param failedMessage the message that failed
* @param description a description of the failure
*/
public MessagingException(Message<?> failedMessage, String description);
/**
* Create a new MessagingException.
* @param failedMessage the message that failed
* @param cause the root cause
*/
public MessagingException(Message<?> failedMessage, Throwable cause);
/**
* Create a new MessagingException.
* @param failedMessage the message that failed
* @param description a description of the failure
* @param cause the root cause
*/
public MessagingException(Message<?> failedMessage, String description, Throwable cause);
/**
* Return the Message for which this exception was raised, if available.
*/
@Nullable
public Message<?> getFailedMessage();
@Override
public String toString();
}Exception thrown when a message cannot be delivered to a channel.
/**
* Exception that indicates an error occurred during message delivery.
*/
public class MessageDeliveryException extends MessagingException {
/**
* Create a new MessageDeliveryException.
* @param description the detail message
*/
public MessageDeliveryException(String description);
/**
* Create a new MessageDeliveryException.
* @param description the detail message
* @param cause the root cause
*/
public MessageDeliveryException(String description, Throwable cause);
/**
* Create a new MessageDeliveryException.
* @param failedMessage the message that could not be delivered
* @param description a description of the failure
*/
public MessageDeliveryException(Message<?> failedMessage, String description);
/**
* Create a new MessageDeliveryException.
* @param failedMessage the message that could not be delivered
* @param cause the root cause
*/
public MessageDeliveryException(Message<?> failedMessage, Throwable cause);
/**
* Create a new MessageDeliveryException.
* @param failedMessage the message that could not be delivered
* @param description a description of the failure
* @param cause the root cause
*/
public MessageDeliveryException(Message<?> failedMessage, String description, Throwable cause);
}Exception thrown when a handler fails to process a message.
/**
* Exception that indicates an error occurred during message handling.
*/
public class MessageHandlingException extends MessagingException {
/**
* Create a new MessageHandlingException.
* @param description the detail message
*/
public MessageHandlingException(String description);
/**
* Create a new MessageHandlingException.
* @param description the detail message
* @param cause the root cause
*/
public MessageHandlingException(String description, Throwable cause);
/**
* Create a new MessageHandlingException.
* @param failedMessage the message that could not be handled
* @param description a description of the failure
*/
public MessageHandlingException(Message<?> failedMessage, String description);
/**
* Create a new MessageHandlingException.
* @param failedMessage the message that could not be handled
* @param cause the root cause
*/
public MessageHandlingException(Message<?> failedMessage, Throwable cause);
/**
* Create a new MessageHandlingException.
* @param failedMessage the message that could not be handled
* @param description a description of the failure
* @param cause the root cause
*/
public MessageHandlingException(Message<?> failedMessage, String description, Throwable cause);
}Usage Examples:
import org.springframework.messaging.*;
import org.springframework.messaging.support.MessageBuilder;
try {
// Attempt to send message
MessageChannel channel = getChannel();
Message<String> message = MessageBuilder
.withPayload("data")
.build();
boolean sent = channel.send(message, 1000);
if (!sent) {
throw new MessageDeliveryException(message,
"Message could not be delivered within timeout");
}
} catch (MessageDeliveryException e) {
System.err.println("Delivery failed: " + e.getMessage());
Message<?> failedMessage = e.getFailedMessage();
// Handle failed message
}
// Handler with exception throwing
MessageHandler handler = message -> {
try {
processPayload(message.getPayload());
} catch (IllegalArgumentException e) {
throw new MessageHandlingException(message,
"Invalid payload format", e);
}
};
private static void processPayload(Object payload) {
// Processing logic
}
private static MessageChannel getChannel() {
return new org.springframework.messaging.support.ExecutorSubscribableChannel();
}