or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-interfaces.mdindex.mdmessage-channels.mdmessage-endpoints.md
tile.json

message-endpoints.mddocs/

Message Endpoints

Message endpoints connect handlers to channels, managing message consumption and production. Endpoints support event-driven consumers, polling consumers, reactive consumers, and source adapters with lifecycle control, polling configuration, and transaction support.

Endpoint Selection Guide

When to Use Each Endpoint Type

EventDrivenConsumer:

  • Subscribable channels (DirectChannel, PublishSubscribeChannel, ExecutorChannel)
  • Immediate processing when message arrives
  • Uses caller thread (synchronous dispatch)
  • No polling overhead

PollingConsumer:

  • Pollable channels (QueueChannel, PriorityChannel, RendezvousChannel)
  • Scheduled/triggered processing
  • Uses executor threads (asynchronous)
  • Supports transactions

ReactiveStreamsConsumer:

  • Reactive channels (FluxMessageChannel)
  • Reactive streams integration
  • Backpressure support
  • Reactive threading model

SourcePollingChannelAdapter:

  • Poll message sources (MessageSource)
  • Scheduled polling with triggers
  • Publishes to output channel
  • Supports transactions

Critical Behaviors (Must Know)

  • All endpoints implement SmartLifecycle (must be started to process messages)
  • autoStartup=true by default (endpoints start automatically)
  • phase controls startup order (lower phases start first)
  • MessageSource.receive() returning null is normal (no message available, polling continues)
  • Handler exceptions wrapped in ErrorMessage and sent to error channel if configured
  • Transaction rollback: message remains in channel if transactional polling consumer throws

Endpoint Base Classes

AbstractEndpoint

Base class for all endpoints with lifecycle management.

/**
 * Base class for all endpoints.
 * Provides lifecycle management and component registration.
 */
public abstract class AbstractEndpoint implements SmartLifecycle, TrackableComponent {
    /**
     * Start endpoint.
     */
    @Override
    public void start();

    /**
     * Stop endpoint.
     */
    @Override
    public void stop();

    /**
     * Check if endpoint is running.
     *
     * @return true if running
     */
    @Override
    public boolean isRunning();

    /**
     * Set whether to auto-start on context refresh.
     *
     * @param autoStartup true to auto-start
     */
    public void setAutoStartup(boolean autoStartup);

    /**
     * Set startup phase.
     *
     * @param phase the phase value
     */
    public void setPhase(int phase);

    /**
     * Set role for group control.
     *
     * @param role the role name
     */
    public void setRole(String role);
}

AbstractPollingEndpoint

Base class for polling endpoints with polling configuration.

/**
 * Base class for polling endpoints.
 * Provides polling configuration and transaction support.
 */
public abstract class AbstractPollingEndpoint extends AbstractEndpoint {
    /**
     * Set trigger for polling.
     *
     * @param trigger the trigger
     */
    public void setTrigger(Trigger trigger);

    /**
     * Set task executor for polling.
     *
     * @param taskExecutor the task executor
     */
    public void setTaskExecutor(TaskExecutor taskExecutor);

    /**
     * Set max messages per poll.
     *
     * @param maxMessagesPerPoll the max messages
     */
    public void setMaxMessagesPerPoll(long maxMessagesPerPoll);

    /**
     * Set receive timeout.
     *
     * @param receiveTimeout timeout in milliseconds
     */
    public void setReceiveTimeout(long receiveTimeout);

    /**
     * Set transaction synchronization factory.
     *
     * @param synchronizationFactory the factory
     */
    public void setTransactionSynchronizationFactory(
        TransactionSynchronizationFactory synchronizationFactory);

    /**
     * Set advice chain for polling.
     *
     * @param adviceChain the advice chain
     */
    public void setAdviceChain(List<Advice> adviceChain);

    /**
     * Set error handler.
     *
     * @param errorHandler the error handler
     */
    public void setErrorHandler(ErrorHandler errorHandler);
}

Consumer Endpoints

EventDrivenConsumer

Event-driven consumer for subscribable channels.

/**
 * Event-driven consumer for subscribable channels.
 * Subscribes handler to channel on start.
 */
public class EventDrivenConsumer extends AbstractEndpoint implements IntegrationConsumer {
    /**
     * Create event-driven consumer.
     *
     * @param inputChannel the subscribable channel
     * @param handler the message handler
     */
    public EventDrivenConsumer(SubscribableChannel inputChannel, MessageHandler handler);

    /**
     * Get input channel.
     *
     * @return the input channel
     */
    public SubscribableChannel getInputChannel();

    /**
     * Get handler.
     *
     * @return the message handler
     */
    public MessageHandler getHandler();
}

Usage:

@Bean
public EventDrivenConsumer eventConsumer() {
    DirectChannel channel = new DirectChannel();
    MessageHandler handler = message -> {
        System.out.println("Processing: " + message.getPayload());
    };

    EventDrivenConsumer consumer = new EventDrivenConsumer(channel, handler);
    consumer.setAutoStartup(true);
    consumer.setPhase(100);
    return consumer;
}

// Edge case: Multiple event-driven consumers on same channel
@Bean
public EventDrivenConsumer consumer1() {
    return new EventDrivenConsumer(sharedChannel, handler1);
}

@Bean
public EventDrivenConsumer consumer2() {
    return new EventDrivenConsumer(sharedChannel, handler2); // Both receive all messages
}

PollingConsumer

Polling consumer for pollable channels.

/**
 * Polling consumer for pollable channels.
 * Polls channel on trigger and invokes handler.
 */
public class PollingConsumer extends AbstractPollingEndpoint implements IntegrationConsumer {
    /**
     * Create polling consumer.
     *
     * @param inputChannel the pollable channel
     * @param handler the message handler
     */
    public PollingConsumer(PollableChannel inputChannel, MessageHandler handler);

    /**
     * Get input channel.
     *
     * @return the input channel
     */
    public PollableChannel getInputChannel();

    /**
     * Get handler.
     *
     * @return the message handler
     */
    public MessageHandler getHandler();

    /**
     * Set whether handler is transactional.
     *
     * @param transactional true if transactional
     */
    public void setTransactional(boolean transactional);
}

Usage:

@Bean
public PollingConsumer pollingConsumer() {
    QueueChannel channel = new QueueChannel(100);
    MessageHandler handler = message -> {
        System.out.println("Polling: " + message.getPayload());
    };

    PollingConsumer consumer = new PollingConsumer(channel, handler);
    consumer.setTrigger(new PeriodicTrigger(1000));
    consumer.setMaxMessagesPerPoll(10);
    consumer.setReceiveTimeout(500);
    consumer.setAutoStartup(true);
    return consumer;
}

// Edge case: Polling consumer with transaction rollback
@Bean
public PollingConsumer transactionalConsumer() {
    PollingConsumer consumer = new PollingConsumer(queueChannel, handler);
    consumer.setTransactional(true);
    consumer.setAdviceChain(Collections.singletonList(
        new TransactionInterceptor(transactionManager, new DefaultTransactionAttribute())
    ));
    // If handler throws, transaction rolls back and message remains in queue
    return consumer;
}

ReactiveStreamsConsumer

Reactive consumer for reactive channels.

/**
 * Reactive consumer for reactive channels.
 * Subscribes handler to reactive publisher.
 */
public class ReactiveStreamsConsumer extends AbstractEndpoint implements IntegrationConsumer {
    /**
     * Create reactive consumer.
     *
     * @param inputChannel the reactive channel
     * @param handler the message handler
     */
    public ReactiveStreamsConsumer(ReactiveStreamsSubscribableChannel inputChannel,
                                   MessageHandler handler);

    /**
     * Create reactive consumer with subscriber executor.
     *
     * @param inputChannel the reactive channel
     * @param subscriber the subscriber
     */
    public ReactiveStreamsConsumer(ReactiveStreamsSubscribableChannel inputChannel,
                                   Subscriber<Message<?>> subscriber);

    /**
     * Get input channel.
     *
     * @return the input channel
     */
    public ReactiveStreamsSubscribableChannel getInputChannel();

    /**
     * Set subscriber executor.
     *
     * @param subscriberExecutor the executor
     */
    public void setSubscriberExecutor(Executor subscriberExecutor);
}

Usage:

@Bean
public ReactiveStreamsConsumer reactiveConsumer() {
    FluxMessageChannel channel = new FluxMessageChannel();
    MessageHandler handler = message -> {
        System.out.println("Reactive: " + message.getPayload());
    };

    ReactiveStreamsConsumer consumer = new ReactiveStreamsConsumer(channel, handler);
    consumer.setAutoStartup(true);
    return consumer;
}

// Edge case: Reactive consumer with backpressure
@Bean
public ReactiveStreamsConsumer backpressureConsumer() {
    ReactiveStreamsConsumer consumer = new ReactiveStreamsConsumer(fluxChannel, handler);
    consumer.setSubscriberExecutor(Executors.newFixedThreadPool(10));
    // Respects reactive backpressure automatically
    return consumer;
}

Source Polling Channel Adapter

SourcePollingChannelAdapter

Adapter polling message sources and publishing to channel.

/**
 * Adapter polling message sources and publishing to channel.
 * Polls source on trigger and sends messages to output channel.
 */
public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
    implements MessageProducer {
    /**
     * Create source polling adapter.
     */
    public SourcePollingChannelAdapter();

    /**
     * Set message source.
     *
     * @param source the message source
     */
    public void setSource(MessageSource<?> source);

    /**
     * Get message source.
     *
     * @return the message source
     */
    public MessageSource<?> getMessageSource();

    /**
     * Set output channel.
     *
     * @param outputChannel the output channel
     */
    @Override
    public void setOutputChannel(MessageChannel outputChannel);

    /**
     * Set output channel by name.
     *
     * @param outputChannelName the channel name
     */
    public void setOutputChannelName(String outputChannelName);

    /**
     * Set send timeout.
     *
     * @param sendTimeout timeout in milliseconds
     */
    public void setSendTimeout(long sendTimeout);

    /**
     * Set error channel.
     *
     * @param errorChannel the error channel
     */
    public void setErrorChannel(MessageChannel errorChannel);

    /**
     * Set whether source is transactional.
     *
     * @param shouldTrack true to track
     */
    public void setShouldTrack(boolean shouldTrack);
}

Usage:

@Bean
public SourcePollingChannelAdapter fileAdapter() {
    SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    adapter.setSource(fileMessageSource());
    adapter.setOutputChannelName("fileChannel");
    adapter.setTrigger(new PeriodicTrigger(5000));
    adapter.setMaxMessagesPerPoll(10);
    adapter.setErrorChannelName("errorChannel");
    adapter.setAutoStartup(true);
    return adapter;
}

// Edge case: Source returning null (no message available)
@Bean
public SourcePollingChannelAdapter nullHandlingAdapter() {
    SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    adapter.setSource(() -> null); // Returns null when no data
    // Polling continues, no message sent, no error
    adapter.setTrigger(new PeriodicTrigger(1000));
    return adapter;
}

// Edge case: Source with error handling
@Bean
public SourcePollingChannelAdapter errorHandlingAdapter() {
    SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    adapter.setSource(riskySource());
    adapter.setErrorChannel(errorChannel); // Errors sent here
    adapter.setErrorHandler(customErrorHandler); // Or handle here
    return adapter;
}

// Edge case: Source with output channel full
@Bean
public SourcePollingChannelAdapter channelFullAdapter() {
    SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    adapter.setSource(messageSource());
    adapter.setOutputChannel(queueChannel); // Limited capacity
    adapter.setSendTimeout(5000); // Throws exception if full after timeout
    return adapter;
}

Complete Endpoint Configuration Example

@Configuration
public class CompleteEndpointExample {

    @Bean
    public SourcePollingChannelAdapter dataPoller() {
        // Create message source
        MethodInvokingMessageSource source =
            new MethodInvokingMessageSource(dataService(), "fetchData");

        // Create adapter
        SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
        adapter.setSource(source);
        adapter.setOutputChannelName("dataChannel");

        // Configure polling
        adapter.setTrigger(new PeriodicTrigger(Duration.ofSeconds(10)));
        adapter.setMaxMessagesPerPoll(50);
        adapter.setReceiveTimeout(1000);

        // Configure lifecycle
        adapter.setAutoStartup(true);
        adapter.setPhase(1000);
        adapter.setRole("dataPollers");

        // Configure error handling
        adapter.setErrorChannelName("pollerErrorChannel");

        // Add transaction support
        TransactionInterceptorBuilder txBuilder = new TransactionInterceptorBuilder();
        txBuilder.transactionManager(transactionManager());
        adapter.setAdviceChain(Collections.singletonList(txBuilder.build()));

        return adapter;
    }

    @Bean
    public PollingConsumer dataConsumer() {
        QueueChannel channel = new QueueChannel(100);
        MessageHandler handler = message -> {
            processData(message.getPayload());
        };

        PollingConsumer consumer = new PollingConsumer(channel, handler);
        consumer.setTrigger(new PeriodicTrigger(1000));
        consumer.setMaxMessagesPerPoll(10);
        consumer.setReceiveTimeout(500);
        consumer.setAutoStartup(true);
        consumer.setPhase(2000); // Start after poller
        return consumer;
    }
}

Lifecycle Management

Lifecycle Rules

  • All endpoints implement SmartLifecycle (start/stop control)
  • autoStartup=true by default (endpoints start automatically)
  • phase controls startup order (lower phases start first)
  • role enables group-based lifecycle control
  • Endpoints must be started to process messages

Lifecycle Control Example

@Bean
public EventDrivenConsumer controlledConsumer() {
    EventDrivenConsumer consumer = new EventDrivenConsumer(channel, handler);
    consumer.setAutoStartup(false); // Must start manually
    consumer.setPhase(1000); // Starts after phase 999
    consumer.setRole("critical"); // Can be controlled as group
    return consumer;
}

// Manual lifecycle control
@Service
public class EndpointController {
    @Autowired
    private List<SmartLifecycle> criticalEndpoints;

    public void startCriticalEndpoints() {
        criticalEndpoints.stream()
            .filter(e -> "critical".equals(e.getRole()))
            .forEach(SmartLifecycle::start);
    }
}

Threading Model

Threading Rules

  • EventDrivenConsumer: Uses caller thread (synchronous dispatch)
  • PollingConsumer: Uses TaskExecutor if provided, otherwise default executor
  • ReactiveStreamsConsumer: Uses reactive streams threading (can set subscriberExecutor)
  • SourcePollingChannelAdapter: Uses polling executor for source polling

Exception Handling

Exception Types

  • IllegalStateException: If endpoint started without required configuration (channel, handler, source)
  • MessagingException: If message sending fails (timeout, channel full, etc.)
  • MessageHandlingException: If handler throws exception (wrapped in error message if error channel set)

Error Handling Pattern

@Bean
public PollingConsumer consumerWithErrorHandling() {
    PollingConsumer consumer = new PollingConsumer(channel, handler);
    consumer.setErrorChannel(errorChannel);
    consumer.setErrorHandler(errorHandler);
    return consumer;
}

@Bean
public MessageChannel errorChannel() {
    return new DirectChannel();
}

@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
    Throwable exception = errorMessage.getPayload();
    Message<?> failedMessage = errorMessage.getOriginalMessage();
    // Handle error
}

Edge Cases

Critical Edge Cases

  • Null messages from source: MessageSource.receive() returns null when no message available (polling continues)
  • Channel full: Producer blocks or throws exception depending on channel type and timeout
  • Handler throws exception: Wrapped in ErrorMessage and sent to error channel if configured
  • Concurrent start/stop: Lifecycle methods are thread-safe
  • Multiple consumers on same channel: Event-driven allows multiple; polling competes for messages
  • Reactive backpressure: ReactiveStreamsConsumer respects reactive backpressure
  • Transaction rollback: If transactional polling consumer throws, transaction rolls back and message remains in channel
  • Source polling with no messages: Polling continues but no message sent (no error)
  • Endpoint stopped while processing: Current message completes; new messages not processed

Edge Case Examples

Multiple consumers competing for messages:

@Bean
public PollingConsumer consumer1() {
    return new PollingConsumer(sharedQueueChannel, handler1);
}

@Bean
public PollingConsumer consumer2() {
    return new PollingConsumer(sharedQueueChannel, handler2);
    // Both compete for messages from same queue
}

Transaction rollback scenario:

@Bean
public PollingConsumer transactionalConsumer() {
    PollingConsumer consumer = new PollingConsumer(queueChannel, handler);
    consumer.setTransactional(true);
    consumer.setAdviceChain(Collections.singletonList(transactionInterceptor));
    // If handler throws, transaction rolls back and message remains in queue
    return consumer;
}

Endpoint lifecycle during processing:

@Service
public class EndpointLifecycleService {
    @Autowired
    private SourcePollingChannelAdapter adapter;

    public void stopSafely() {
        // Stop endpoint - current message completes, new messages not processed
        adapter.stop();
    }

    public void pauseIfPausable() {
        if (adapter instanceof Pausable) {
            ((Pausable) adapter).pause();
            // Endpoint paused but remains started
        }
    }
}