Message channels provide the conduit for message transport between endpoints in Spring Integration. Channels implement point-to-point or publish-subscribe semantics with support for synchronous, asynchronous, queued, reactive, and priority-based message delivery.
Point-to-Point (Single Consumer):
Publish-Subscribe (Multiple Consumers):
Special Purpose:
DirectChannel for sync, ExecutorChannel for asyncQueueChannel for buffering, DirectChannel for immediate processingDirectChannel/QueueChannel for single, PublishSubscribeChannel for multiplePriorityChannel when message order mattersFluxMessageChannel for reactive streams integrationDirectChannel: Synchronous delivery in sender's thread (blocks until handler completes)QueueChannel: capacity=0 means unbounded (not zero capacity); capacity>0 means boundedPublishSubscribeChannel: requireSubscribers=true throws exception if no subscribersQueueChannel.receive() returns null when empty (not an error)null messages (throw exception)Synchronous point-to-point channel with round-robin load balancing.
/**
* Point-to-point channel with round-robin load balancing.
* Messages delivered to single subscriber in sender's thread.
*/
public class DirectChannel extends AbstractSubscribableChannel {
/**
* Create direct channel with default round-robin dispatcher.
*/
public DirectChannel();
/**
* Create direct channel with custom load balancing strategy.
*
* @param loadBalancingStrategy the load balancing strategy
*/
public DirectChannel(LoadBalancingStrategy loadBalancingStrategy);
/**
* Set maximum subscribers allowed.
*
* @param maxSubscribers the maximum subscribers
*/
public void setMaxSubscribers(int maxSubscribers);
/**
* Set failover strategy when subscriber fails.
*
* @param failover true to try next subscriber on failure
*/
public void setFailover(boolean failover);
/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown. Overrides the {@link #setFailover(boolean)} option.
*
* @param failoverStrategy the failover strategy predicate
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy);
}Usage:
@Bean
public MessageChannel directChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(true);
channel.setMaxSubscribers(3);
return channel;
}
// Edge case: DirectChannel with failover strategy
@Bean
public MessageChannel directChannelWithFailoverStrategy() {
DirectChannel channel = new DirectChannel();
channel.setFailoverStrategy(exception -> {
// Only failover for specific exceptions
return exception instanceof RetryableException;
});
return channel;
}Publish-subscribe channel broadcasting to all subscribers.
/**
* Publish-subscribe channel broadcasting to all subscribers.
* Can execute subscribers asynchronously with task executor.
*/
public class PublishSubscribeChannel extends AbstractSubscribableChannel
implements BroadcastCapableChannel {
/**
* Create publish-subscribe channel with synchronous execution.
*/
public PublishSubscribeChannel();
/**
* Create publish-subscribe channel with requireSubscribers flag.
*
* @param requireSubscribers if set to true, the sent message is considered as non-dispatched
* and rejected to the caller with the "Dispatcher has no subscribers"
*/
public PublishSubscribeChannel(boolean requireSubscribers);
/**
* Create publish-subscribe channel with async execution.
*
* @param executor the task executor for async delivery
*/
public PublishSubscribeChannel(Executor executor);
/**
* Create publish-subscribe channel with async execution and requireSubscribers flag.
*
* @param executor the task executor for async delivery
* @param requireSubscribers if set to true, the sent message is considered as non-dispatched
* and rejected to the caller with the "Dispatcher has no subscribers"
*/
public PublishSubscribeChannel(Executor executor, boolean requireSubscribers);
/**
* Set error handler for async execution.
*
* @param errorHandler the error handler
*/
public void setErrorHandler(ErrorHandler errorHandler);
/**
* Whether to apply sequence information to messages.
*
* @param applySequence true to add sequence headers
*/
public void setApplySequence(boolean applySequence);
/**
* Set minimum subscribers required.
*
* @param minSubscribers the minimum subscribers
*/
public void setMinSubscribers(int minSubscribers);
/**
* Whether to ignore send failures to subscribers.
*
* @param ignoreFailures true to ignore failures
*/
public void setIgnoreFailures(boolean ignoreFailures);
}Usage:
@Bean
public MessageChannel pubSubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel(taskExecutor());
channel.setApplySequence(true);
channel.setMinSubscribers(1);
return channel;
}
// Edge case: Pub-sub channel with subscriber requirements
@Bean
public MessageChannel pubSubWithSubscriberRequirement() {
PublishSubscribeChannel channel = new PublishSubscribeChannel(
taskExecutor(), true); // requireSubscribers = true
channel.setMinSubscribers(1); // At least one subscriber required
channel.setErrorHandler(errorHandler());
return channel;
}Pollable channel backed by queue with optional capacity limit.
/**
* Pollable channel backed by queue with optional capacity limit.
* Supports blocking send and receive operations.
*/
public class QueueChannel extends AbstractPollableChannel
implements QueueChannelOperations {
/**
* Create queue channel with unbounded capacity.
*/
public QueueChannel();
/**
* Create queue channel with specific capacity.
*
* @param capacity the queue capacity (0 for unbounded)
*/
public QueueChannel(int capacity);
/**
* Create queue channel with existing queue.
*
* @param queue the backing queue
*/
public QueueChannel(Queue<Message<?>> queue);
/**
* Get queue size.
*
* @return number of messages in queue
*/
public int getQueueSize();
/**
* Get remaining capacity.
*
* @return remaining capacity, or Integer.MAX_VALUE if unbounded
*/
public int getRemainingCapacity();
/**
* Clear all messages from queue.
*
* @return list of cleared messages
*/
public List<Message<?>> clear();
/**
* Purge messages matching selector.
*
* @param selector the message selector
* @return list of purged messages
*/
public List<Message<?>> purge(MessageSelector selector);
}Critical: capacity=0 means unbounded, not zero capacity.
Usage:
@Bean
public MessageChannel queueChannel() {
return new QueueChannel(100); // Bounded queue with capacity 100
}
@Bean
public MessageChannel unboundedQueueChannel() {
return new QueueChannel(); // Or new QueueChannel(0) - unbounded
}
// Edge case: QueueChannel with capacity monitoring
@Bean
public MessageChannel monitoredQueueChannel() {
QueueChannel channel = new QueueChannel(100);
// Monitor capacity in production
if (channel.getRemainingCapacity() < 10) {
log.warn("Queue channel nearly full: " + channel.getQueueSize());
}
return channel;
}
// Edge case: QueueChannel receive with timeout handling
public Message<?> receiveWithTimeoutHandling(QueueChannel queue, long timeout) {
Message<?> message = queue.receive(timeout);
if (message == null) {
// Timeout occurred - not an error, just no message available
log.debug("No message received within " + timeout + "ms");
}
return message;
}Point-to-point channel with async dispatch via executor.
/**
* Point-to-point channel with async dispatch via executor.
* Each message handled in separate thread.
*/
public class ExecutorChannel extends AbstractExecutorChannel
implements ExecutorChannelInterceptorAware {
/**
* Create executor channel with executor.
*
* @param executor the task executor
*/
public ExecutorChannel(Executor executor);
/**
* Create executor channel with executor and load balancing strategy.
*
* @param executor the task executor
* @param loadBalancingStrategy the load balancing strategy
*/
public ExecutorChannel(Executor executor, LoadBalancingStrategy loadBalancingStrategy);
/**
* Set failover strategy when subscriber fails.
*
* @param failover true to try next subscriber on failure
*/
public void setFailover(boolean failover);
/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
*
* @param failoverStrategy the failover strategy predicate
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy);
}Usage:
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(taskExecutor());
}
// Edge case: Executor channel with custom error handling
@Bean
public MessageChannel executorChannelWithErrorHandling() {
ExecutorChannel channel = new ExecutorChannel(taskExecutor());
channel.setFailover(true);
return channel;
}Pollable channel with message prioritization.
/**
* Pollable channel with message prioritization.
* Messages delivered in priority order based on comparator.
*/
public class PriorityChannel extends QueueChannel {
/**
* Create priority channel with default comparator.
* Uses PRIORITY header for ordering.
*/
public PriorityChannel();
/**
* Create priority channel with capacity and default comparator.
*
* @param capacity the queue capacity
*/
public PriorityChannel(int capacity);
/**
* Create priority channel with custom comparator.
*
* @param comparator the message comparator
*/
public PriorityChannel(Comparator<Message<?>> comparator);
/**
* Create priority channel with capacity and comparator.
*
* @param capacity the queue capacity
* @param comparator the message comparator
*/
public PriorityChannel(int capacity, Comparator<Message<?>> comparator);
}Usage:
@Bean
public MessageChannel priorityChannel() {
return new PriorityChannel((m1, m2) -> {
Integer p1 = (Integer) m1.getHeaders().get("priority");
Integer p2 = (Integer) m2.getHeaders().get("priority");
return p2.compareTo(p1); // Higher priority first
});
}
// Edge case: Priority channel with null handling
@Bean
public MessageChannel priorityChannelWithNullHandling() {
return new PriorityChannel((m1, m2) -> {
Integer p1 = (Integer) m1.getHeaders().get("priority");
Integer p2 = (Integer) m2.getHeaders().get("priority");
// Handle null priorities
if (p1 == null && p2 == null) return 0;
if (p1 == null) return 1; // Nulls go to end
if (p2 == null) return -1;
return p2.compareTo(p1); // Higher priority first
});
}Zero-capacity channel for direct handoff.
/**
* Zero-capacity channel for direct handoff.
* Sender blocks until receiver accepts message.
*/
public class RendezvousChannel extends QueueChannel {
/**
* Create rendezvous channel.
*/
public RendezvousChannel();
}Usage:
@Bean
public MessageChannel rendezvousChannel() {
// Blocks sender until receiver accepts (synchronization point)
return new RendezvousChannel();
}Reactive channel based on Project Reactor Flux.
/**
* Reactive channel based on Project Reactor Flux.
* Implements Publisher for reactive streams.
*/
public class FluxMessageChannel extends AbstractMessageChannel
implements ReactiveStreamsSubscribableChannel, Publisher<Message<?>> {
/**
* Create flux message channel.
*/
public FluxMessageChannel();
/**
* Subscribe to this channel as reactive Publisher.
*
* @param subscriber the reactive subscriber
*/
@Override
public void subscribe(Subscriber<? super Message<?>> subscriber);
/**
* Subscribe with publisher integration.
*
* @param publisher the message publisher
*/
public void subscribeTo(Publisher<? extends Message<?>> publisher);
}Usage:
@Bean
public MessageChannel reactiveChannel() {
FluxMessageChannel channel = new FluxMessageChannel();
// Use with reactive streams
return channel;
}Load-balancing channel with multiple partitions.
/**
* Load-balancing channel with multiple partitions.
* Distributes messages across partitions based on partition key.
*/
public class PartitionedChannel extends AbstractExecutorChannel {
/**
* Create partitioned channel with partition count.
* Uses CORRELATION_ID header for partition key by default.
*
* @param partitionCount the number of partitions
*/
public PartitionedChannel(int partitionCount);
/**
* Create partitioned channel with partition count and partition key function.
*
* @param partitionCount the number of partitions
* @param partitionKeyFunction the function to resolve partition key against the message
*/
public PartitionedChannel(int partitionCount, Function<Message<?>, Object> partitionKeyFunction);
/**
* Set a ThreadFactory for executors per partitions.
*
* @param threadFactory the thread factory to use
*/
public void setThreadFactory(ThreadFactory threadFactory);
/**
* Set failover strategy when subscriber fails.
*
* @param failover true to try next subscriber on failure
*/
public void setFailover(boolean failover);
/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
*
* @param failoverStrategy the failover strategy predicate
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy);
/**
* Provide a LoadBalancingStrategy for the PartitionedDispatcher.
*
* @param loadBalancingStrategy the load balancing strategy implementation
*/
public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy);
}Usage:
@Bean
public MessageChannel partitionedChannel() {
PartitionedChannel channel = new PartitionedChannel(4,
m -> m.getHeaders().get("customerId"));
return channel;
}
// Edge case: PartitionedChannel with null partition key handling
@Bean
public MessageChannel partitionedChannelWithNullHandling() {
PartitionedChannel channel = new PartitionedChannel(4,
message -> {
Object key = message.getHeaders().get("customerId");
// Use default partition for null keys
return key != null ? key : "default";
});
return channel;
}Channel that discards all messages.
/**
* Channel that discards all messages.
* Send operations always succeed but do nothing.
*/
public class NullChannel implements PollableChannel, BeanNameAware, IntegrationManagement, IntegrationPattern {
/**
* Create null channel.
*/
public NullChannel();
}Usage:
@Bean
public MessageChannel nullChannel() {
return new NullChannel(); // Discards all messages (testing/debugging)
}Wire tap interceptor that publishes copy of message to secondary channel.
/**
* Wire tap interceptor that publishes copy of message to secondary channel.
* Useful for monitoring and debugging message flows.
*/
public class WireTap implements ChannelInterceptor, VetoCapableInterceptor {
/**
* Create wire tap with target channel.
*
* @param channel the channel to send copies to
*/
public WireTap(MessageChannel channel);
/**
* Create wire tap with target channel and selector.
*
* @param channel the channel to send copies to
* @param selector the selector for filtering messages
*/
public WireTap(MessageChannel channel, @Nullable MessageSelector selector);
/**
* Create wire tap with target channel name.
*
* @param channelName the name of channel to send copies to
*/
public WireTap(String channelName);
/**
* Set timeout for sending to wire tap channel.
*
* @param timeout the timeout in milliseconds
*/
public void setTimeout(long timeout);
}Usage:
@Bean
public DirectChannel monitoredChannel() {
DirectChannel channel = new DirectChannel();
// Wire tap for monitoring
MessageChannel logChannel = new QueueChannel();
WireTap wireTap = new WireTap(logChannel);
wireTap.setTimeout(1000);
channel.addInterceptor(wireTap);
return channel;
}
// Edge case: Wire tap with selector for conditional monitoring
@Bean
public DirectChannel conditionalWireTapChannel() {
DirectChannel channel = new DirectChannel();
MessageSelector selector = message -> {
String priority = (String) message.getHeaders().get("priority");
return "HIGH".equals(priority);
};
WireTap wireTap = new WireTap("highPriorityAuditChannel", selector);
wireTap.setTimeout(5000);
channel.addInterceptor(wireTap);
return channel;
}ManageableLifecycle (start/stop)PausableMessageDeliveryException - Message delivery failureIllegalStateException - Channel not started, no subscribers (if required)MessageTimeoutException - Send/receive timeoutInterruptedException - Thread interrupted during blocking operationPublishSubscribeChannel with requireSubscribers=true throws exceptionQueueChannel blocks on send if bounded (or throws if non-blocking)QueueChannel.receive() returns null (not an error)PriorityChannel comparator must handle null prioritiesPartitionedChannel may route to default partition or rejectgetRemainingCapacity() for bounded queuesDirectChannel with maxSubscribers rejects additional subscriptions