Message endpoints connect handlers to channels, managing message consumption and production. Endpoints support event-driven consumers, polling consumers, reactive consumers, and source adapters with lifecycle control, polling configuration, and transaction support.
EventDrivenConsumer:
DirectChannel, PublishSubscribeChannel, ExecutorChannel)PollingConsumer:
QueueChannel, PriorityChannel, RendezvousChannel)ReactiveStreamsConsumer:
FluxMessageChannel)SourcePollingChannelAdapter:
MessageSource)SmartLifecycle (must be started to process messages)autoStartup=true by default (endpoints start automatically)phase controls startup order (lower phases start first)MessageSource.receive() returning null is normal (no message available, polling continues)ErrorMessage and sent to error channel if configuredBase class for all endpoints with lifecycle management.
/**
* Base class for all endpoints.
* Provides lifecycle management and component registration.
*/
public abstract class AbstractEndpoint implements SmartLifecycle, TrackableComponent {
/**
* Start endpoint.
*/
@Override
public void start();
/**
* Stop endpoint.
*/
@Override
public void stop();
/**
* Check if endpoint is running.
*
* @return true if running
*/
@Override
public boolean isRunning();
/**
* Set whether to auto-start on context refresh.
*
* @param autoStartup true to auto-start
*/
public void setAutoStartup(boolean autoStartup);
/**
* Set startup phase.
*
* @param phase the phase value
*/
public void setPhase(int phase);
/**
* Set role for group control.
*
* @param role the role name
*/
public void setRole(String role);
}Base class for polling endpoints with polling configuration.
/**
* Base class for polling endpoints.
* Provides polling configuration and transaction support.
*/
public abstract class AbstractPollingEndpoint extends AbstractEndpoint {
/**
* Set trigger for polling.
*
* @param trigger the trigger
*/
public void setTrigger(Trigger trigger);
/**
* Set task executor for polling.
*
* @param taskExecutor the task executor
*/
public void setTaskExecutor(TaskExecutor taskExecutor);
/**
* Set max messages per poll.
*
* @param maxMessagesPerPoll the max messages
*/
public void setMaxMessagesPerPoll(long maxMessagesPerPoll);
/**
* Set receive timeout.
*
* @param receiveTimeout timeout in milliseconds
*/
public void setReceiveTimeout(long receiveTimeout);
/**
* Set transaction synchronization factory.
*
* @param synchronizationFactory the factory
*/
public void setTransactionSynchronizationFactory(
TransactionSynchronizationFactory synchronizationFactory);
/**
* Set advice chain for polling.
*
* @param adviceChain the advice chain
*/
public void setAdviceChain(List<Advice> adviceChain);
/**
* Set error handler.
*
* @param errorHandler the error handler
*/
public void setErrorHandler(ErrorHandler errorHandler);
}Event-driven consumer for subscribable channels.
/**
* Event-driven consumer for subscribable channels.
* Subscribes handler to channel on start.
*/
public class EventDrivenConsumer extends AbstractEndpoint implements IntegrationConsumer {
/**
* Create event-driven consumer.
*
* @param inputChannel the subscribable channel
* @param handler the message handler
*/
public EventDrivenConsumer(SubscribableChannel inputChannel, MessageHandler handler);
/**
* Get input channel.
*
* @return the input channel
*/
public SubscribableChannel getInputChannel();
/**
* Get handler.
*
* @return the message handler
*/
public MessageHandler getHandler();
}Usage:
@Bean
public EventDrivenConsumer eventConsumer() {
DirectChannel channel = new DirectChannel();
MessageHandler handler = message -> {
System.out.println("Processing: " + message.getPayload());
};
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, handler);
consumer.setAutoStartup(true);
consumer.setPhase(100);
return consumer;
}
// Edge case: Multiple event-driven consumers on same channel
@Bean
public EventDrivenConsumer consumer1() {
return new EventDrivenConsumer(sharedChannel, handler1);
}
@Bean
public EventDrivenConsumer consumer2() {
return new EventDrivenConsumer(sharedChannel, handler2); // Both receive all messages
}Polling consumer for pollable channels.
/**
* Polling consumer for pollable channels.
* Polls channel on trigger and invokes handler.
*/
public class PollingConsumer extends AbstractPollingEndpoint implements IntegrationConsumer {
/**
* Create polling consumer.
*
* @param inputChannel the pollable channel
* @param handler the message handler
*/
public PollingConsumer(PollableChannel inputChannel, MessageHandler handler);
/**
* Get input channel.
*
* @return the input channel
*/
public PollableChannel getInputChannel();
/**
* Get handler.
*
* @return the message handler
*/
public MessageHandler getHandler();
/**
* Set whether handler is transactional.
*
* @param transactional true if transactional
*/
public void setTransactional(boolean transactional);
}Usage:
@Bean
public PollingConsumer pollingConsumer() {
QueueChannel channel = new QueueChannel(100);
MessageHandler handler = message -> {
System.out.println("Polling: " + message.getPayload());
};
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(1000));
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(500);
consumer.setAutoStartup(true);
return consumer;
}
// Edge case: Polling consumer with transaction rollback
@Bean
public PollingConsumer transactionalConsumer() {
PollingConsumer consumer = new PollingConsumer(queueChannel, handler);
consumer.setTransactional(true);
consumer.setAdviceChain(Collections.singletonList(
new TransactionInterceptor(transactionManager, new DefaultTransactionAttribute())
));
// If handler throws, transaction rolls back and message remains in queue
return consumer;
}Reactive consumer for reactive channels.
/**
* Reactive consumer for reactive channels.
* Subscribes handler to reactive publisher.
*/
public class ReactiveStreamsConsumer extends AbstractEndpoint implements IntegrationConsumer {
/**
* Create reactive consumer.
*
* @param inputChannel the reactive channel
* @param handler the message handler
*/
public ReactiveStreamsConsumer(ReactiveStreamsSubscribableChannel inputChannel,
MessageHandler handler);
/**
* Create reactive consumer with subscriber executor.
*
* @param inputChannel the reactive channel
* @param subscriber the subscriber
*/
public ReactiveStreamsConsumer(ReactiveStreamsSubscribableChannel inputChannel,
Subscriber<Message<?>> subscriber);
/**
* Get input channel.
*
* @return the input channel
*/
public ReactiveStreamsSubscribableChannel getInputChannel();
/**
* Set subscriber executor.
*
* @param subscriberExecutor the executor
*/
public void setSubscriberExecutor(Executor subscriberExecutor);
}Usage:
@Bean
public ReactiveStreamsConsumer reactiveConsumer() {
FluxMessageChannel channel = new FluxMessageChannel();
MessageHandler handler = message -> {
System.out.println("Reactive: " + message.getPayload());
};
ReactiveStreamsConsumer consumer = new ReactiveStreamsConsumer(channel, handler);
consumer.setAutoStartup(true);
return consumer;
}
// Edge case: Reactive consumer with backpressure
@Bean
public ReactiveStreamsConsumer backpressureConsumer() {
ReactiveStreamsConsumer consumer = new ReactiveStreamsConsumer(fluxChannel, handler);
consumer.setSubscriberExecutor(Executors.newFixedThreadPool(10));
// Respects reactive backpressure automatically
return consumer;
}Adapter polling message sources and publishing to channel.
/**
* Adapter polling message sources and publishing to channel.
* Polls source on trigger and sends messages to output channel.
*/
public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
implements MessageProducer {
/**
* Create source polling adapter.
*/
public SourcePollingChannelAdapter();
/**
* Set message source.
*
* @param source the message source
*/
public void setSource(MessageSource<?> source);
/**
* Get message source.
*
* @return the message source
*/
public MessageSource<?> getMessageSource();
/**
* Set output channel.
*
* @param outputChannel the output channel
*/
@Override
public void setOutputChannel(MessageChannel outputChannel);
/**
* Set output channel by name.
*
* @param outputChannelName the channel name
*/
public void setOutputChannelName(String outputChannelName);
/**
* Set send timeout.
*
* @param sendTimeout timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout);
/**
* Set error channel.
*
* @param errorChannel the error channel
*/
public void setErrorChannel(MessageChannel errorChannel);
/**
* Set whether source is transactional.
*
* @param shouldTrack true to track
*/
public void setShouldTrack(boolean shouldTrack);
}Usage:
@Bean
public SourcePollingChannelAdapter fileAdapter() {
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(fileMessageSource());
adapter.setOutputChannelName("fileChannel");
adapter.setTrigger(new PeriodicTrigger(5000));
adapter.setMaxMessagesPerPoll(10);
adapter.setErrorChannelName("errorChannel");
adapter.setAutoStartup(true);
return adapter;
}
// Edge case: Source returning null (no message available)
@Bean
public SourcePollingChannelAdapter nullHandlingAdapter() {
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(() -> null); // Returns null when no data
// Polling continues, no message sent, no error
adapter.setTrigger(new PeriodicTrigger(1000));
return adapter;
}
// Edge case: Source with error handling
@Bean
public SourcePollingChannelAdapter errorHandlingAdapter() {
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(riskySource());
adapter.setErrorChannel(errorChannel); // Errors sent here
adapter.setErrorHandler(customErrorHandler); // Or handle here
return adapter;
}
// Edge case: Source with output channel full
@Bean
public SourcePollingChannelAdapter channelFullAdapter() {
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(messageSource());
adapter.setOutputChannel(queueChannel); // Limited capacity
adapter.setSendTimeout(5000); // Throws exception if full after timeout
return adapter;
}@Configuration
public class CompleteEndpointExample {
@Bean
public SourcePollingChannelAdapter dataPoller() {
// Create message source
MethodInvokingMessageSource source =
new MethodInvokingMessageSource(dataService(), "fetchData");
// Create adapter
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setOutputChannelName("dataChannel");
// Configure polling
adapter.setTrigger(new PeriodicTrigger(Duration.ofSeconds(10)));
adapter.setMaxMessagesPerPoll(50);
adapter.setReceiveTimeout(1000);
// Configure lifecycle
adapter.setAutoStartup(true);
adapter.setPhase(1000);
adapter.setRole("dataPollers");
// Configure error handling
adapter.setErrorChannelName("pollerErrorChannel");
// Add transaction support
TransactionInterceptorBuilder txBuilder = new TransactionInterceptorBuilder();
txBuilder.transactionManager(transactionManager());
adapter.setAdviceChain(Collections.singletonList(txBuilder.build()));
return adapter;
}
@Bean
public PollingConsumer dataConsumer() {
QueueChannel channel = new QueueChannel(100);
MessageHandler handler = message -> {
processData(message.getPayload());
};
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(1000));
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(500);
consumer.setAutoStartup(true);
consumer.setPhase(2000); // Start after poller
return consumer;
}
}SmartLifecycle (start/stop control)autoStartup=true by default (endpoints start automatically)phase controls startup order (lower phases start first)role enables group-based lifecycle control@Bean
public EventDrivenConsumer controlledConsumer() {
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, handler);
consumer.setAutoStartup(false); // Must start manually
consumer.setPhase(1000); // Starts after phase 999
consumer.setRole("critical"); // Can be controlled as group
return consumer;
}
// Manual lifecycle control
@Service
public class EndpointController {
@Autowired
private List<SmartLifecycle> criticalEndpoints;
public void startCriticalEndpoints() {
criticalEndpoints.stream()
.filter(e -> "critical".equals(e.getRole()))
.forEach(SmartLifecycle::start);
}
}TaskExecutor if provided, otherwise default executorsubscriberExecutor)IllegalStateException: If endpoint started without required configuration (channel, handler, source)MessagingException: If message sending fails (timeout, channel full, etc.)MessageHandlingException: If handler throws exception (wrapped in error message if error channel set)@Bean
public PollingConsumer consumerWithErrorHandling() {
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setErrorChannel(errorChannel);
consumer.setErrorHandler(errorHandler);
return consumer;
}
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable exception = errorMessage.getPayload();
Message<?> failedMessage = errorMessage.getOriginalMessage();
// Handle error
}MessageSource.receive() returns null when no message available (polling continues)ErrorMessage and sent to error channel if configuredMultiple consumers competing for messages:
@Bean
public PollingConsumer consumer1() {
return new PollingConsumer(sharedQueueChannel, handler1);
}
@Bean
public PollingConsumer consumer2() {
return new PollingConsumer(sharedQueueChannel, handler2);
// Both compete for messages from same queue
}Transaction rollback scenario:
@Bean
public PollingConsumer transactionalConsumer() {
PollingConsumer consumer = new PollingConsumer(queueChannel, handler);
consumer.setTransactional(true);
consumer.setAdviceChain(Collections.singletonList(transactionInterceptor));
// If handler throws, transaction rolls back and message remains in queue
return consumer;
}Endpoint lifecycle during processing:
@Service
public class EndpointLifecycleService {
@Autowired
private SourcePollingChannelAdapter adapter;
public void stopSafely() {
// Stop endpoint - current message completes, new messages not processed
adapter.stop();
}
public void pauseIfPausable() {
if (adapter instanceof Pausable) {
((Pausable) adapter).pause();
// Endpoint paused but remains started
}
}
}