or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-interfaces.mdindex.mdmessage-channels.mdmessage-endpoints.md
tile.json

message-channels.mddocs/

Message Channels

Message channels provide the conduit for message transport between endpoints in Spring Integration. Channels implement point-to-point or publish-subscribe semantics with support for synchronous, asynchronous, queued, reactive, and priority-based message delivery.

Channel Selection Decision Tree

When to Use Each Channel Type

Point-to-Point (Single Consumer):

  • DirectChannel: Synchronous, immediate delivery, round-robin load balancing
  • ExecutorChannel: Asynchronous, non-blocking send, executor-based dispatch
  • QueueChannel: Bounded/unbounded queue, pollable, thread-safe blocking
  • PriorityChannel: Queue with priority ordering, custom comparator
  • RendezvousChannel: Zero-capacity, blocking handoff (synchronization point)

Publish-Subscribe (Multiple Consumers):

  • PublishSubscribeChannel: Broadcast to all subscribers, sync or async

Special Purpose:

  • FluxMessageChannel: Reactive streams integration
  • PartitionedChannel: Load balancing across partitions
  • FixedSubscriberChannel: Single fixed subscriber (no dynamic subscription)
  • NullChannel: Discard all messages (testing/debugging)

Selection Criteria

  1. Synchronous vs Asynchronous: Use DirectChannel for sync, ExecutorChannel for async
  2. Queue vs Direct: Use QueueChannel for buffering, DirectChannel for immediate processing
  3. Single vs Multiple Consumers: Use DirectChannel/QueueChannel for single, PublishSubscribeChannel for multiple
  4. Priority Ordering: Use PriorityChannel when message order matters
  5. Reactive: Use FluxMessageChannel for reactive streams integration

Critical Behaviors (Must Know)

  • DirectChannel: Synchronous delivery in sender's thread (blocks until handler completes)
  • QueueChannel: capacity=0 means unbounded (not zero capacity); capacity>0 means bounded
  • PublishSubscribeChannel: requireSubscribers=true throws exception if no subscribers
  • QueueChannel.receive() returns null when empty (not an error)
  • Channels reject null messages (throw exception)
  • Wire tap failures don't block main flow (fire-and-forget)

Channel Implementations

DirectChannel

Synchronous point-to-point channel with round-robin load balancing.

/**
 * Point-to-point channel with round-robin load balancing.
 * Messages delivered to single subscriber in sender's thread.
 */
public class DirectChannel extends AbstractSubscribableChannel {
    /**
     * Create direct channel with default round-robin dispatcher.
     */
    public DirectChannel();

    /**
     * Create direct channel with custom load balancing strategy.
     *
     * @param loadBalancingStrategy the load balancing strategy
     */
    public DirectChannel(LoadBalancingStrategy loadBalancingStrategy);

    /**
     * Set maximum subscribers allowed.
     *
     * @param maxSubscribers the maximum subscribers
     */
    public void setMaxSubscribers(int maxSubscribers);

    /**
     * Set failover strategy when subscriber fails.
     *
     * @param failover true to try next subscriber on failure
     */
    public void setFailover(boolean failover);

    /**
     * Configure a strategy whether the channel's dispatcher should have failover enabled
     * for the exception thrown. Overrides the {@link #setFailover(boolean)} option.
     *
     * @param failoverStrategy the failover strategy predicate
     */
    public void setFailoverStrategy(Predicate<Exception> failoverStrategy);
}

Usage:

@Bean
public MessageChannel directChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(true);
    channel.setMaxSubscribers(3);
    return channel;
}

// Edge case: DirectChannel with failover strategy
@Bean
public MessageChannel directChannelWithFailoverStrategy() {
    DirectChannel channel = new DirectChannel();
    channel.setFailoverStrategy(exception -> {
        // Only failover for specific exceptions
        return exception instanceof RetryableException;
    });
    return channel;
}

PublishSubscribeChannel

Publish-subscribe channel broadcasting to all subscribers.

/**
 * Publish-subscribe channel broadcasting to all subscribers.
 * Can execute subscribers asynchronously with task executor.
 */
public class PublishSubscribeChannel extends AbstractSubscribableChannel
    implements BroadcastCapableChannel {
    /**
     * Create publish-subscribe channel with synchronous execution.
     */
    public PublishSubscribeChannel();

    /**
     * Create publish-subscribe channel with requireSubscribers flag.
     *
     * @param requireSubscribers if set to true, the sent message is considered as non-dispatched
     * and rejected to the caller with the "Dispatcher has no subscribers"
     */
    public PublishSubscribeChannel(boolean requireSubscribers);

    /**
     * Create publish-subscribe channel with async execution.
     *
     * @param executor the task executor for async delivery
     */
    public PublishSubscribeChannel(Executor executor);

    /**
     * Create publish-subscribe channel with async execution and requireSubscribers flag.
     *
     * @param executor the task executor for async delivery
     * @param requireSubscribers if set to true, the sent message is considered as non-dispatched
     * and rejected to the caller with the "Dispatcher has no subscribers"
     */
    public PublishSubscribeChannel(Executor executor, boolean requireSubscribers);

    /**
     * Set error handler for async execution.
     *
     * @param errorHandler the error handler
     */
    public void setErrorHandler(ErrorHandler errorHandler);

    /**
     * Whether to apply sequence information to messages.
     *
     * @param applySequence true to add sequence headers
     */
    public void setApplySequence(boolean applySequence);

    /**
     * Set minimum subscribers required.
     *
     * @param minSubscribers the minimum subscribers
     */
    public void setMinSubscribers(int minSubscribers);

    /**
     * Whether to ignore send failures to subscribers.
     *
     * @param ignoreFailures true to ignore failures
     */
    public void setIgnoreFailures(boolean ignoreFailures);
}

Usage:

@Bean
public MessageChannel pubSubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel(taskExecutor());
    channel.setApplySequence(true);
    channel.setMinSubscribers(1);
    return channel;
}

// Edge case: Pub-sub channel with subscriber requirements
@Bean
public MessageChannel pubSubWithSubscriberRequirement() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel(
        taskExecutor(), true); // requireSubscribers = true
    channel.setMinSubscribers(1); // At least one subscriber required
    channel.setErrorHandler(errorHandler());
    return channel;
}

QueueChannel

Pollable channel backed by queue with optional capacity limit.

/**
 * Pollable channel backed by queue with optional capacity limit.
 * Supports blocking send and receive operations.
 */
public class QueueChannel extends AbstractPollableChannel
    implements QueueChannelOperations {
    /**
     * Create queue channel with unbounded capacity.
     */
    public QueueChannel();

    /**
     * Create queue channel with specific capacity.
     *
     * @param capacity the queue capacity (0 for unbounded)
     */
    public QueueChannel(int capacity);

    /**
     * Create queue channel with existing queue.
     *
     * @param queue the backing queue
     */
    public QueueChannel(Queue<Message<?>> queue);

    /**
     * Get queue size.
     *
     * @return number of messages in queue
     */
    public int getQueueSize();

    /**
     * Get remaining capacity.
     *
     * @return remaining capacity, or Integer.MAX_VALUE if unbounded
     */
    public int getRemainingCapacity();

    /**
     * Clear all messages from queue.
     *
     * @return list of cleared messages
     */
    public List<Message<?>> clear();

    /**
     * Purge messages matching selector.
     *
     * @param selector the message selector
     * @return list of purged messages
     */
    public List<Message<?>> purge(MessageSelector selector);
}

Critical: capacity=0 means unbounded, not zero capacity.

Usage:

@Bean
public MessageChannel queueChannel() {
    return new QueueChannel(100); // Bounded queue with capacity 100
}

@Bean
public MessageChannel unboundedQueueChannel() {
    return new QueueChannel(); // Or new QueueChannel(0) - unbounded
}

// Edge case: QueueChannel with capacity monitoring
@Bean
public MessageChannel monitoredQueueChannel() {
    QueueChannel channel = new QueueChannel(100);
    // Monitor capacity in production
    if (channel.getRemainingCapacity() < 10) {
        log.warn("Queue channel nearly full: " + channel.getQueueSize());
    }
    return channel;
}

// Edge case: QueueChannel receive with timeout handling
public Message<?> receiveWithTimeoutHandling(QueueChannel queue, long timeout) {
    Message<?> message = queue.receive(timeout);
    if (message == null) {
        // Timeout occurred - not an error, just no message available
        log.debug("No message received within " + timeout + "ms");
    }
    return message;
}

ExecutorChannel

Point-to-point channel with async dispatch via executor.

/**
 * Point-to-point channel with async dispatch via executor.
 * Each message handled in separate thread.
 */
public class ExecutorChannel extends AbstractExecutorChannel
    implements ExecutorChannelInterceptorAware {
    /**
     * Create executor channel with executor.
     *
     * @param executor the task executor
     */
    public ExecutorChannel(Executor executor);

    /**
     * Create executor channel with executor and load balancing strategy.
     *
     * @param executor the task executor
     * @param loadBalancingStrategy the load balancing strategy
     */
    public ExecutorChannel(Executor executor, LoadBalancingStrategy loadBalancingStrategy);

    /**
     * Set failover strategy when subscriber fails.
     *
     * @param failover true to try next subscriber on failure
     */
    public void setFailover(boolean failover);

    /**
     * Configure a strategy whether the channel's dispatcher should have failover enabled
     * for the exception thrown.
     *
     * @param failoverStrategy the failover strategy predicate
     */
    public void setFailoverStrategy(Predicate<Exception> failoverStrategy);
}

Usage:

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(taskExecutor());
}

// Edge case: Executor channel with custom error handling
@Bean
public MessageChannel executorChannelWithErrorHandling() {
    ExecutorChannel channel = new ExecutorChannel(taskExecutor());
    channel.setFailover(true);
    return channel;
}

PriorityChannel

Pollable channel with message prioritization.

/**
 * Pollable channel with message prioritization.
 * Messages delivered in priority order based on comparator.
 */
public class PriorityChannel extends QueueChannel {
    /**
     * Create priority channel with default comparator.
     * Uses PRIORITY header for ordering.
     */
    public PriorityChannel();

    /**
     * Create priority channel with capacity and default comparator.
     *
     * @param capacity the queue capacity
     */
    public PriorityChannel(int capacity);

    /**
     * Create priority channel with custom comparator.
     *
     * @param comparator the message comparator
     */
    public PriorityChannel(Comparator<Message<?>> comparator);

    /**
     * Create priority channel with capacity and comparator.
     *
     * @param capacity the queue capacity
     * @param comparator the message comparator
     */
    public PriorityChannel(int capacity, Comparator<Message<?>> comparator);
}

Usage:

@Bean
public MessageChannel priorityChannel() {
    return new PriorityChannel((m1, m2) -> {
        Integer p1 = (Integer) m1.getHeaders().get("priority");
        Integer p2 = (Integer) m2.getHeaders().get("priority");
        return p2.compareTo(p1); // Higher priority first
    });
}

// Edge case: Priority channel with null handling
@Bean
public MessageChannel priorityChannelWithNullHandling() {
    return new PriorityChannel((m1, m2) -> {
        Integer p1 = (Integer) m1.getHeaders().get("priority");
        Integer p2 = (Integer) m2.getHeaders().get("priority");
        // Handle null priorities
        if (p1 == null && p2 == null) return 0;
        if (p1 == null) return 1; // Nulls go to end
        if (p2 == null) return -1;
        return p2.compareTo(p1); // Higher priority first
    });
}

RendezvousChannel

Zero-capacity channel for direct handoff.

/**
 * Zero-capacity channel for direct handoff.
 * Sender blocks until receiver accepts message.
 */
public class RendezvousChannel extends QueueChannel {
    /**
     * Create rendezvous channel.
     */
    public RendezvousChannel();
}

Usage:

@Bean
public MessageChannel rendezvousChannel() {
    // Blocks sender until receiver accepts (synchronization point)
    return new RendezvousChannel();
}

FluxMessageChannel

Reactive channel based on Project Reactor Flux.

/**
 * Reactive channel based on Project Reactor Flux.
 * Implements Publisher for reactive streams.
 */
public class FluxMessageChannel extends AbstractMessageChannel
    implements ReactiveStreamsSubscribableChannel, Publisher<Message<?>> {
    /**
     * Create flux message channel.
     */
    public FluxMessageChannel();

    /**
     * Subscribe to this channel as reactive Publisher.
     *
     * @param subscriber the reactive subscriber
     */
    @Override
    public void subscribe(Subscriber<? super Message<?>> subscriber);

    /**
     * Subscribe with publisher integration.
     *
     * @param publisher the message publisher
     */
    public void subscribeTo(Publisher<? extends Message<?>> publisher);
}

Usage:

@Bean
public MessageChannel reactiveChannel() {
    FluxMessageChannel channel = new FluxMessageChannel();
    // Use with reactive streams
    return channel;
}

PartitionedChannel

Load-balancing channel with multiple partitions.

/**
 * Load-balancing channel with multiple partitions.
 * Distributes messages across partitions based on partition key.
 */
public class PartitionedChannel extends AbstractExecutorChannel {
    /**
     * Create partitioned channel with partition count.
     * Uses CORRELATION_ID header for partition key by default.
     *
     * @param partitionCount the number of partitions
     */
    public PartitionedChannel(int partitionCount);

    /**
     * Create partitioned channel with partition count and partition key function.
     *
     * @param partitionCount the number of partitions
     * @param partitionKeyFunction the function to resolve partition key against the message
     */
    public PartitionedChannel(int partitionCount, Function<Message<?>, Object> partitionKeyFunction);

    /**
     * Set a ThreadFactory for executors per partitions.
     *
     * @param threadFactory the thread factory to use
     */
    public void setThreadFactory(ThreadFactory threadFactory);

    /**
     * Set failover strategy when subscriber fails.
     *
     * @param failover true to try next subscriber on failure
     */
    public void setFailover(boolean failover);

    /**
     * Configure a strategy whether the channel's dispatcher should have failover enabled
     * for the exception thrown.
     *
     * @param failoverStrategy the failover strategy predicate
     */
    public void setFailoverStrategy(Predicate<Exception> failoverStrategy);

    /**
     * Provide a LoadBalancingStrategy for the PartitionedDispatcher.
     *
     * @param loadBalancingStrategy the load balancing strategy implementation
     */
    public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy);
}

Usage:

@Bean
public MessageChannel partitionedChannel() {
    PartitionedChannel channel = new PartitionedChannel(4,
        m -> m.getHeaders().get("customerId"));
    return channel;
}

// Edge case: PartitionedChannel with null partition key handling
@Bean
public MessageChannel partitionedChannelWithNullHandling() {
    PartitionedChannel channel = new PartitionedChannel(4,
        message -> {
            Object key = message.getHeaders().get("customerId");
            // Use default partition for null keys
            return key != null ? key : "default";
        });
    return channel;
}

NullChannel

Channel that discards all messages.

/**
 * Channel that discards all messages.
 * Send operations always succeed but do nothing.
 */
public class NullChannel implements PollableChannel, BeanNameAware, IntegrationManagement, IntegrationPattern {
    /**
     * Create null channel.
     */
    public NullChannel();
}

Usage:

@Bean
public MessageChannel nullChannel() {
    return new NullChannel(); // Discards all messages (testing/debugging)
}

Channel Interceptors

WireTap

Wire tap interceptor that publishes copy of message to secondary channel.

/**
 * Wire tap interceptor that publishes copy of message to secondary channel.
 * Useful for monitoring and debugging message flows.
 */
public class WireTap implements ChannelInterceptor, VetoCapableInterceptor {
    /**
     * Create wire tap with target channel.
     *
     * @param channel the channel to send copies to
     */
    public WireTap(MessageChannel channel);

    /**
     * Create wire tap with target channel and selector.
     *
     * @param channel the channel to send copies to
     * @param selector the selector for filtering messages
     */
    public WireTap(MessageChannel channel, @Nullable MessageSelector selector);

    /**
     * Create wire tap with target channel name.
     *
     * @param channelName the name of channel to send copies to
     */
    public WireTap(String channelName);

    /**
     * Set timeout for sending to wire tap channel.
     *
     * @param timeout the timeout in milliseconds
     */
    public void setTimeout(long timeout);
}

Usage:

@Bean
public DirectChannel monitoredChannel() {
    DirectChannel channel = new DirectChannel();
    
    // Wire tap for monitoring
    MessageChannel logChannel = new QueueChannel();
    WireTap wireTap = new WireTap(logChannel);
    wireTap.setTimeout(1000);
    channel.addInterceptor(wireTap);
    
    return channel;
}

// Edge case: Wire tap with selector for conditional monitoring
@Bean
public DirectChannel conditionalWireTapChannel() {
    DirectChannel channel = new DirectChannel();
    
    MessageSelector selector = message -> {
        String priority = (String) message.getHeaders().get("priority");
        return "HIGH".equals(priority);
    };
    
    WireTap wireTap = new WireTap("highPriorityAuditChannel", selector);
    wireTap.setTimeout(5000);
    channel.addInterceptor(wireTap);
    
    return channel;
}

Threading Model

Thread Safety Rules

  • DirectChannel: Synchronous in sender's thread (blocks until handler completes)
  • ExecutorChannel: Asynchronous via executor (thread-safe)
  • QueueChannel: Thread-safe blocking operations
  • PublishSubscribeChannel: Can be sync or async (depends on executor)
  • PriorityChannel: Thread-safe priority queue
  • Channel interceptors: Execute in message flow thread
  • Wire tap: Sends asynchronously (non-blocking)

Lifecycle

  • Channels implement ManageableLifecycle (start/stop)
  • Channels can be paused if they implement Pausable
  • Interceptors are added/removed at runtime (thread-safe)
  • Channel metrics available if management enabled

Exception Handling

Exception Types

  • MessageDeliveryException - Message delivery failure
  • IllegalStateException - Channel not started, no subscribers (if required)
  • MessageTimeoutException - Send/receive timeout
  • InterruptedException - Thread interrupted during blocking operation

Edge Cases

  • No subscribers: PublishSubscribeChannel with requireSubscribers=true throws exception
  • Queue full: QueueChannel blocks on send if bounded (or throws if non-blocking)
  • Null message: Channels reject null messages (throw exception)
  • Empty queue: QueueChannel.receive() returns null (not an error)
  • Priority null: PriorityChannel comparator must handle null priorities
  • Partition key null: PartitionedChannel may route to default partition or reject
  • Interceptor failure: Can block message delivery (depends on interceptor)
  • Wire tap failure: Does not block main flow (fires and forgets)
  • Channel capacity: Monitor getRemainingCapacity() for bounded queues
  • Subscriber limit: DirectChannel with maxSubscribers rejects additional subscriptions