MessageChannel implementations, interceptors, MessageBuilder utilities, and advanced channel features for message routing, asynchronous processing, and building message processing pipelines.
Subscribable channel that dispatches messages to handlers using an Executor for async processing.
/**
* A SubscribableChannel that sends messages to each of its subscribers
* using an Executor for asynchronous message dispatching.
*/
public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
/**
* Create an ExecutorSubscribableChannel with a default single-thread executor.
*/
public ExecutorSubscribableChannel();
/**
* Create an ExecutorSubscribableChannel with the given Executor.
*/
public ExecutorSubscribableChannel(Executor executor);
/**
* Return the configured Executor.
*/
public Executor getExecutor();
/**
* Set the list of channel interceptors to apply.
*/
public void setInterceptors(List<ChannelInterceptor> interceptors);
/**
* Add a channel interceptor.
*/
public void addInterceptor(ChannelInterceptor interceptor);
/**
* Add a channel interceptor at a specific index.
*/
public void addInterceptor(int index, ChannelInterceptor interceptor);
}Usage Examples:
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.concurrent.Executors;
// Create with default executor
ExecutorSubscribableChannel channel1 = new ExecutorSubscribableChannel();
// Create with custom executor
ExecutorSubscribableChannel channel2 = new ExecutorSubscribableChannel(
Executors.newFixedThreadPool(4)
);
// Subscribe handlers
channel2.subscribe(message ->
System.out.println("Handler 1: " + message.getPayload())
);
channel2.subscribe(message ->
System.out.println("Handler 2: " + message.getPayload())
);
// Send message (handlers invoked asynchronously)
Message<String> message = MessageBuilder
.withPayload("Hello")
.build();
channel2.send(message);Broadcasts messages to all subscribers, optionally with sequence numbers.
/**
* A SubscribableChannel that broadcasts messages to all subscribers.
* Optionally maintains order with sequence numbers.
*/
public class PublishSubscribeChannel extends AbstractSubscribableChannel
implements BeanFactoryAware {
/**
* Create a PublishSubscribeChannel with synchronous dispatching.
*/
public PublishSubscribeChannel();
/**
* Create a PublishSubscribeChannel with the given Executor for async dispatching.
*/
public PublishSubscribeChannel(Executor executor);
/**
* Set the TaskExecutor for message dispatching.
*/
public void setTaskExecutor(Executor executor);
/**
* Whether to apply sequence numbers to messages.
* Default is false.
*/
public void setApplySequence(boolean applySequence);
/**
* Set the minimum number of subscribers required before messages can be sent.
* Default is 0.
*/
public void setMinSubscribers(int minSubscribers);
}Usage Examples:
import org.springframework.messaging.support.PublishSubscribeChannel;
// Create publish-subscribe channel
PublishSubscribeChannel channel = new PublishSubscribeChannel();
// Enable sequence numbers
channel.setApplySequence(true);
// Require at least 2 subscribers
channel.setMinSubscribers(2);
// Subscribe multiple handlers
channel.subscribe(msg -> System.out.println("Sub 1: " + msg.getPayload()));
channel.subscribe(msg -> System.out.println("Sub 2: " + msg.getPayload()));
channel.subscribe(msg -> System.out.println("Sub 3: " + msg.getPayload()));
// All subscribers receive the message
channel.send(MessageBuilder.withPayload("Broadcast").build());Intercepts channel operations for cross-cutting concerns like logging, security, or metrics.
/**
* Interface for interceptors that can view and/or modify Messages being
* sent through or received from a MessageChannel.
*/
public interface ChannelInterceptor {
/**
* Invoked before the Message is actually sent to the channel.
* Return the Message to send (can be modified), or null to prevent sending.
*/
default Message<?> preSend(Message<?> message, MessageChannel channel) {
return message;
}
/**
* Invoked immediately after send invocation, whether or not it succeeded.
*/
default void postSend(Message<?> message, MessageChannel channel, boolean sent) {
}
/**
* Invoked after send completion (success or failure).
*/
default void afterSendCompletion(Message<?> message, MessageChannel channel,
boolean sent, Exception ex) {
}
/**
* Invoked before a Message is retrieved from a PollableChannel.
* Return false to prevent the receive operation.
*/
default boolean preReceive(MessageChannel channel) {
return true;
}
/**
* Invoked immediately after a Message is retrieved from a PollableChannel.
* Return modified message or null to prevent it from being received.
*/
default Message<?> postReceive(Message<?> message, MessageChannel channel) {
return message;
}
/**
* Invoked after receive completion (success or failure).
*/
default void afterReceiveCompletion(Message<?> message, MessageChannel channel,
Exception ex) {
}
}Usage Examples:
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
// Logging interceptor
ChannelInterceptor loggingInterceptor = new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("Sending: " + message.getPayload());
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
System.out.println("Sent: " + sent);
}
};
// Header enrichment interceptor
ChannelInterceptor enrichmentInterceptor = new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader("timestamp", System.currentTimeMillis())
.setHeader("enriched", true)
.build();
}
};
// Security interceptor
ChannelInterceptor securityInterceptor = new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// Check authorization
if (!isAuthorized(message)) {
throw new SecurityException("Not authorized");
}
return message;
}
private boolean isAuthorized(Message<?> message) {
// Authorization logic
return true;
}
};
// Add interceptors to channel
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
channel.addInterceptor(loggingInterceptor);
channel.addInterceptor(enrichmentInterceptor);
channel.addInterceptor(securityInterceptor);Fluent API for constructing messages with headers.
/**
* A builder for creating GenericMessage instances with headers.
*/
public final class MessageBuilder<T> {
/**
* Create a builder for a new Message with the given payload.
*/
public static <T> MessageBuilder<T> withPayload(T payload);
/**
* Create a builder from an existing message.
*/
public static <T> MessageBuilder<T> fromMessage(Message<T> message);
/**
* Create a message directly with the given payload and headers.
* This is a convenience factory method that bypasses the builder pattern.
*/
public static <T> Message<T> createMessage(T payload, MessageHeaders messageHeaders);
/**
* Set the value for the given header name.
*/
public MessageBuilder<T> setHeader(String headerName, Object headerValue);
/**
* Set all headers from the given MessageHeaderAccessor.
* Replaces existing headers with values from the accessor.
*/
public MessageBuilder<T> setHeaders(MessageHeaderAccessor accessor);
/**
* Set the value for the given header name only if not already set.
*/
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
/**
* Remove the value for the given header name.
*/
public MessageBuilder<T> removeHeader(String headerName);
/**
* Remove headers matching the given pattern(s).
*/
public MessageBuilder<T> removeHeaders(String... headerPatterns);
/**
* Copy all headers from the given Map.
*/
public MessageBuilder<T> copyHeaders(Map<String, ?> headers);
/**
* Copy headers from the given Map only if not already set.
*/
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headers);
/**
* Set the reply channel.
*/
public MessageBuilder<T> setReplyChannel(MessageChannel replyChannel);
/**
* Set the reply channel name.
*/
public MessageBuilder<T> setReplyChannelName(String replyChannelName);
/**
* Set the error channel.
*/
public MessageBuilder<T> setErrorChannel(MessageChannel errorChannel);
/**
* Set the error channel name.
*/
public MessageBuilder<T> setErrorChannelName(String errorChannelName);
/**
* Build the message.
*/
public Message<T> build();
}Usage Examples:
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.Message;
import java.util.HashMap;
import java.util.Map;
// Simple message
Message<String> msg1 = MessageBuilder
.withPayload("Hello World")
.build();
// Message with headers
Message<String> msg2 = MessageBuilder
.withPayload("Data")
.setHeader("priority", "high")
.setHeader("sender", "user123")
.setHeader("timestamp", System.currentTimeMillis())
.build();
// Copy headers from map
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
headers.put("encoding", "UTF-8");
Message<String> msg3 = MessageBuilder
.withPayload("{}")
.copyHeaders(headers)
.build();
// Modify existing message
Message<String> original = MessageBuilder
.withPayload("Original")
.setHeader("version", 1)
.build();
Message<String> modified = MessageBuilder
.fromMessage(original)
.setHeader("version", 2)
.setHeader("modified", true)
.build();
// Set reply channel
Message<String> msg4 = MessageBuilder
.withPayload("Request")
.setReplyChannelName("replyChannel")
.build();
// Remove headers
Message<String> msg5 = MessageBuilder
.fromMessage(msg2)
.removeHeader("sender")
.removeHeaders("timestamp", "priority")
.build();Mutable accessor for message headers with convenient methods.
/**
* Wrapper around MessageHeaders that provides extra features such as
* mutable access to headers and support for typed header values.
*/
public class MessageHeaderAccessor {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
/**
* Create a MessageHeaderAccessor for an existing message.
*/
public static MessageHeaderAccessor getMutableAccessor(Message<?> message);
/**
* Create a new MessageHeaderAccessor.
*/
public MessageHeaderAccessor();
/**
* Create a MessageHeaderAccessor from an existing Message.
*/
public MessageHeaderAccessor(Message<?> message);
/**
* Set whether to leave the accessor in mutable state.
*/
public void setLeaveMutable(boolean leaveMutable);
/**
* Make headers immutable.
*/
public void setImmutable();
/**
* Whether the headers are still mutable.
*/
public boolean isMutable();
/**
* Whether headers have been modified.
*/
public boolean isModified();
/**
* Return the underlying MessageHeaders (immutable).
*/
public MessageHeaders getMessageHeaders();
/**
* Get all headers as a Map.
*/
public Map<String, Object> toMap();
/**
* Get a header value.
*/
public Object getHeader(String headerName);
/**
* Set a header value.
*/
public void setHeader(String name, Object value);
/**
* Set a header value only if not already present.
*/
public void setHeaderIfAbsent(String name, Object value);
/**
* Remove a header.
*/
public void removeHeader(String headerName);
/**
* Remove multiple headers.
*/
public void removeHeaders(String... headerNames);
/**
* Copy all headers.
*/
public void copyHeaders(Map<String, ?> headers);
/**
* Copy headers if not already present.
*/
public void copyHeadersIfAbsent(Map<String, ?> headers);
/**
* Set the reply channel.
*/
public void setReplyChannel(MessageChannel replyChannel);
/**
* Set the error channel.
*/
public void setErrorChannel(MessageChannel errorChannel);
/**
* Set the content type.
*/
public void setContentType(MimeType contentType);
/**
* Get the content type.
*/
public MimeType getContentType();
/**
* Create a Message with the current headers and given payload.
*/
public <T> Message<T> createMessage(T payload);
}Usage Examples:
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.Message;
import org.springframework.util.MimeTypeUtils;
// Create accessor
MessageHeaderAccessor accessor = new MessageHeaderAccessor();
accessor.setHeader("key1", "value1");
accessor.setHeader("key2", 123);
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
// Create message from accessor
Message<String> message = accessor.createMessage("payload");
// Get mutable accessor from existing message
MessageHeaderAccessor mutable = MessageHeaderAccessor.getMutableAccessor(message);
mutable.setHeader("key3", "value3");
Message<String> updated = mutable.createMessage(message.getPayload());
// Modify headers conditionally
MessageHeaderAccessor conditional = new MessageHeaderAccessor();
conditional.setHeader("existing", "value");
conditional.setHeaderIfAbsent("existing", "new-value"); // Won't override
conditional.setHeaderIfAbsent("new", "value"); // Will setOne-way message channel that sends through an Executor without subscribers.
/**
* A simple point-to-point MessageChannel implementation that sends messages
* through an Executor.
*/
public class ExecutorChannel extends AbstractMessageChannel {
/**
* Create an ExecutorChannel with the given Executor.
*/
public ExecutorChannel(Executor executor);
/**
* Set the list of channel interceptors.
*/
public void setInterceptors(List<ChannelInterceptor> interceptors);
}Callback interface for initializing message headers.
/**
* Callback interface for initializing a MessageHeaderAccessor.
*/
@FunctionalInterface
public interface MessageHeaderInitializer {
/**
* Initialize the given MessageHeaderAccessor.
* @param headerAccessor the MessageHeaderAccessor to initialize
*/
void initHeaders(MessageHeaderAccessor headerAccessor);
}Usage Examples:
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.messaging.support.MessageHeaderAccessor;
// Create custom header initializer
MessageHeaderInitializer initializer = accessor -> {
accessor.setHeader("appName", "MyApp");
accessor.setHeader("appVersion", "1.0.0");
accessor.setHeader("timestamp", System.currentTimeMillis());
};
// Use with SimpMessagingTemplate
SimpMessagingTemplate template = new SimpMessagingTemplate(channel);
template.setHeaderInitializer(initializer);
// Now all messages sent through the template will have these headers
template.convertAndSend("/topic/events", "Event data");Specialized message for transporting exceptions.
/**
* A GenericMessage with a Throwable payload.
*/
public class ErrorMessage extends GenericMessage<Throwable> {
/**
* Create an ErrorMessage with the given Throwable.
*/
public ErrorMessage(Throwable payload);
/**
* Create an ErrorMessage with a Throwable and headers.
*/
public ErrorMessage(Throwable payload, Map<String, Object> headers);
/**
* Create an ErrorMessage with a Throwable and the original message.
*/
public ErrorMessage(Throwable payload, Message<?> originalMessage);
/**
* Return the original message if available.
*/
public Message<?> getOriginalMessage();
}Usage Examples:
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
// Send error to error channel
try {
processMessage(message);
} catch (Exception ex) {
ErrorMessage errorMsg = new ErrorMessage(ex, message);
errorChannel.send(errorMsg);
}
// Handle errors
MessageChannel errorChannel = new ExecutorSubscribableChannel();
errorChannel.subscribe(msg -> {
if (msg instanceof ErrorMessage) {
ErrorMessage error = (ErrorMessage) msg;
Throwable exception = error.getPayload();
Message<?> original = error.getOriginalMessage();
System.err.println("Error processing message: " + exception.getMessage());
}
});
void processMessage(Message<?> message) {
// Processing logic
}
MessageChannel channel = new ExecutorSubscribableChannel();import org.springframework.messaging.support.*;
import org.springframework.messaging.*;
import java.util.concurrent.Executors;
// Create channels
ExecutorSubscribableChannel inputChannel = new ExecutorSubscribableChannel(
Executors.newFixedThreadPool(4)
);
ExecutorSubscribableChannel outputChannel = new ExecutorSubscribableChannel();
ExecutorSubscribableChannel errorChannel = new ExecutorSubscribableChannel();
// Add interceptors
inputChannel.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("Processing: " + message.getPayload());
return MessageBuilder.fromMessage(message)
.setHeader("timestamp", System.currentTimeMillis())
.build();
}
});
// Subscribe handlers
inputChannel.subscribe(message -> {
try {
// Process message
String result = processData((String) message.getPayload());
// Send to output
Message<String> outputMsg = MessageBuilder
.withPayload(result)
.copyHeaders(message.getHeaders())
.build();
outputChannel.send(outputMsg);
} catch (Exception ex) {
// Send to error channel
ErrorMessage errorMsg = new ErrorMessage(ex, message);
errorChannel.send(errorMsg);
}
});
outputChannel.subscribe(message ->
System.out.println("Success: " + message.getPayload())
);
errorChannel.subscribe(message -> {
if (message instanceof ErrorMessage) {
ErrorMessage error = (ErrorMessage) message;
System.err.println("Error: " + error.getPayload().getMessage());
}
});
// Send messages
inputChannel.send(MessageBuilder.withPayload("data1").build());
inputChannel.send(MessageBuilder.withPayload("data2").build());
String processData(String data) {
return "Processed: " + data;
}