Core interfaces and abstractions provide the foundational contracts for Spring Integration components. These include message sources, producers, functional handlers, transformers, selectors, pausable components, and messaging templates for operations.
MessageSource.receive() returns null when no message available (not an error - polling continues)GenericHandler returning null produces no output message (valid for one-way flows)GenericTransformer must return non-null (null throws exception)GenericSelector returning false rejects message (sent to discard channel if configured)MessagingTemplate uses default channel if set, otherwise requires explicit channelMessageTimeoutException on timeout (if errorOnTimeout=true)Strategy interface for receiving messages from source. Implementations poll or generate messages on demand.
/**
* Strategy interface for receiving messages from source.
* Implementations poll or generate messages on demand.
* Extends IntegrationPattern to identify as inbound channel adapter.
*
* @param <T> the payload type
*/
@FunctionalInterface
public interface MessageSource<T> extends IntegrationPattern {
/**
* Receive message from source.
* Returns null if no message available.
*
* @return the message, or null if none available
*/
@Nullable
Message<T> receive();
}Critical Behavior: Returning null is not an error - it indicates no message is available. Polling adapters will continue polling.
Usage Example:
@Bean
public IntegrationFlow messageSourceFlow() {
return IntegrationFlow
.from((MessageSource<String>) () -> {
String data = fetchData();
// Return null if no data available (not an error)
return data != null ? MessageBuilder.withPayload(data).build() : null;
}, e -> e.poller(Pollers.fixedDelay(1000)))
.channel("outputChannel")
.get();
}
// Edge case: Source with exception handling
@Bean
public MessageSource<String> safeMessageSource() {
return () -> {
try {
String data = fetchData();
if (data == null || data.isEmpty()) {
return null; // No message available
}
return MessageBuilder.withPayload(data)
.setHeader("timestamp", System.currentTimeMillis())
.build();
} catch (Exception e) {
// Log but return null to avoid error
log.error("Failed to fetch data", e);
return null; // Polling continues
}
};
}Strategy interface for components that produce messages. Provides output channel management.
/**
* Strategy interface for components that produce messages.
* Provides output channel management.
*/
public interface MessageProducer {
/**
* Set output channel for produced messages.
*
* @param outputChannel the output channel
*/
void setOutputChannel(MessageChannel outputChannel);
/**
* Set output channel by bean name.
*
* @param outputChannelName the output channel bean name
*/
default void setOutputChannelName(String outputChannelName);
/**
* Get output channel.
*
* @return the output channel
*/
@Nullable
MessageChannel getOutputChannel();
}Generic handler for messages with payload type. Receives payload and headers, returns result.
/**
* Generic handler for messages with payload type.
* Receives payload and headers, returns result.
*
* @param <P> the payload type
*/
@FunctionalInterface
public interface GenericHandler<P> {
/**
* Handle payload with access to headers.
*
* @param payload the message payload
* @param headers the message headers
* @return the result, or null for no output
*/
Object handle(P payload, MessageHeaders headers);
}Critical Behavior: Returning null produces no output message (valid for one-way flows).
Usage Example:
@Bean
public IntegrationFlow handlerFlow() {
return IntegrationFlow
.from("inputChannel")
.handle((GenericHandler<Order>) (payload, headers) -> {
String customerId = (String) headers.get("customerId");
return processOrder(payload, customerId);
})
.get();
}
// Edge case: Handler with null return (no output message)
@Bean
public IntegrationFlow handlerWithNullReturn() {
return IntegrationFlow
.from("inputChannel")
.handle((GenericHandler<String>) (payload, headers) -> {
logMessage(payload);
return null; // No output message sent (valid behavior)
})
.get();
}
// Edge case: Handler with null payload handling
@Bean
public IntegrationFlow handlerWithNullPayload() {
return IntegrationFlow
.from("inputChannel")
.handle((GenericHandler<String>) (payload, headers) -> {
if (payload == null) {
// Handle null payload gracefully
return "NULL_PAYLOAD";
}
return processPayload(payload);
})
.get();
}
// Edge case: Handler with exception handling
@Bean
public IntegrationFlow handlerWithErrorHandling() {
return IntegrationFlow
.from("inputChannel")
.handle((GenericHandler<Order>) (payload, headers) -> {
try {
return processOrder(payload);
} catch (ValidationException e) {
// Return error indicator instead of throwing
return new ErrorResult("VALIDATION_FAILED", e.getMessage());
} catch (Exception e) {
// Re-throw for error channel handling
throw new MessagingException(
MessageBuilder.withPayload(payload).build(),
"Processing failed", e);
}
}, e -> e.errorChannel("errorChannel"))
.get();
}Generic transformer for payload transformation.
/**
* Generic transformer for payload transformation.
*
* @param <S> source payload type
* @param <T> target payload type
*/
@FunctionalInterface
public interface GenericTransformer<S, T> {
/**
* Transform source to target.
*
* @param source the source payload
* @return the transformed payload
*/
T transform(S source);
}Critical Behavior: Must return non-null (null throws exception).
Usage Example:
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlow
.from("orderChannel")
.transform((GenericTransformer<Order, OrderDto>) order ->
new OrderDto(order.getId(), order.getAmount()))
.get();
}
// Edge case: Type-safe transformer with null handling
@Bean
public IntegrationFlow typeSafeTransformer() {
return IntegrationFlow
.from("orderChannel")
.transform(Order.class, (GenericTransformer<Order, OrderDto>) order -> {
if (order == null) {
throw new IllegalArgumentException("Order cannot be null");
}
return new OrderDto(order.getId(), order.getAmount());
})
.get();
}
// Edge case: Transformer that must not return null
@Bean
public IntegrationFlow nonNullTransformer() {
return IntegrationFlow
.from("inputChannel")
.transform((GenericTransformer<String, String>) source -> {
if (source == null) {
return ""; // Return empty string, not null
}
return source.toUpperCase();
})
.get();
}Generic selector for filtering with payload type.
/**
* Generic selector for filtering with payload type.
*
* @param <T> the payload type
*/
@FunctionalInterface
public interface GenericSelector<T> {
/**
* Select/filter source.
*
* @param source the source to evaluate
* @return true if accepted, false if rejected
*/
boolean accept(T source);
}Critical Behavior: Returning false rejects message (sent to discard channel if configured).
Usage Example:
@Bean
public IntegrationFlow selectorFlow() {
return IntegrationFlow
.from("inputChannel")
.filter((GenericSelector<Order>) order ->
order.getAmount() > 1000 && order.isValid())
.get();
}
// Edge case: Selector with null handling
@Bean
public IntegrationFlow selectorWithNullHandling() {
return IntegrationFlow
.from("inputChannel")
.filter((GenericSelector<Order>) order -> {
if (order == null) {
return false; // Reject null orders
}
return order.getAmount() != null
&& order.getAmount() > 0
&& order.isValid();
})
.get();
}Component for publishing error messages to error channels.
/**
* Component for publishing error messages to error channels.
* Can be called or extended in error handling or retry scenarios.
* Supports customization via ErrorMessageStrategy.
*/
public class ErrorMessagePublisher {
/**
* Set the error message strategy for customizing error messages.
*
* @param errorMessageStrategy the error message strategy
*/
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy);
/**
* Set the error channel for publishing error messages.
*
* @param channel the error channel
*/
public void setChannel(MessageChannel channel);
/**
* Set the error channel by name.
*
* @param channelName the error channel name
*/
public void setChannelName(String channelName);
/**
* Set the send timeout for error message publishing.
*
* @param sendTimeout timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout);
/**
* Publish error message for MessagingException.
*
* @param exception the messaging exception
*/
public void publish(MessagingException exception);
/**
* Publish error message for failed message and throwable.
*
* @param failedMessage the failed message
* @param throwable the throwable
*/
public void publish(Message<?> failedMessage, Throwable throwable);
}Usage Example:
@Bean
public ErrorMessagePublisher errorPublisher() {
ErrorMessagePublisher publisher = new ErrorMessagePublisher();
publisher.setChannelName("customErrorChannel");
publisher.setSendTimeout(5000);
return publisher;
}
// Edge case: Error publisher with custom strategy
@Bean
public ErrorMessagePublisher customErrorPublisher() {
ErrorMessagePublisher publisher = new ErrorMessagePublisher();
publisher.setChannelName("customErrorChannel");
publisher.setSendTimeout(5000);
publisher.setErrorMessageStrategy((throwable, attributes) -> {
ErrorMessage errorMessage = new ErrorMessage(throwable);
errorMessage.getHeaders().put("errorTimestamp", System.currentTimeMillis());
errorMessage.getHeaders().put("errorSource", "integration-core");
return errorMessage;
});
return publisher;
}Strategy interface for error recovery with fallback behavior.
/**
* Strategy interface for error recovery with fallback behavior.
* Provides recovery mechanism based on context and cause.
*
* @param <T> the type returned from recovery
*/
@FunctionalInterface
public interface RecoveryCallback<T> {
/**
* Recover from failure with fallback result.
*
* @param context the context for failure
* @param cause the cause of the failure
* @return result to replace the failed callback result
*/
T recover(AttributeAccessor context, Throwable cause);
}Usage Example:
@Bean
public RecoveryCallback<String> recoveryCallback() {
return (context, cause) -> {
log.error("Processing failed: " + cause.getMessage());
return "FALLBACK_RESULT";
};
}
// Edge case: Conditional recovery based on exception type
@Bean
public RecoveryCallback<OrderResult> conditionalRecoveryCallback() {
return (context, cause) -> {
if (cause instanceof ValidationException) {
return new OrderResult("INVALID", "Validation failed: " + cause.getMessage());
} else if (cause instanceof TimeoutException) {
return new OrderResult("TIMEOUT", "Operation timed out, retry later");
} else {
log.error("Unexpected error", cause);
return new OrderResult("ERROR", "Processing failed");
}
};
}Interface for components that can be paused and resumed.
/**
* Interface for components that can be paused and resumed.
* Extends ManageableLifecycle to include start/stop capabilities.
* Provides finer-grained control than start/stop.
*/
public interface Pausable extends ManageableLifecycle {
/**
* Pause component operations.
* Component remains started but stops processing.
*/
void pause();
/**
* Resume component operations after pause.
*/
void resume();
/**
* Check if component is paused.
*
* @return true if paused
*/
boolean isPaused();
}Usage Example:
@Service
public class EndpointController {
@Autowired
private SourcePollingChannelAdapter adapter;
public void pausePolling() {
if (adapter instanceof Pausable) {
((Pausable) adapter).pause();
}
}
public void resumePolling() {
if (adapter instanceof Pausable) {
((Pausable) adapter).resume();
}
}
// Edge case: Safe pause with state check
public void safePause() {
if (adapter instanceof Pausable) {
Pausable pausable = (Pausable) adapter;
if (!pausable.isPaused()) {
pausable.pause();
}
}
}
}Template for synchronous messaging operations.
/**
* Template for synchronous messaging operations.
* Provides send, receive, and send-and-receive methods.
* Extends GenericMessagingTemplate from Spring Framework.
*/
public class MessagingTemplate extends GenericMessagingTemplate {
/**
* Create messaging template.
*/
public MessagingTemplate();
/**
* Create messaging template with default channel.
*
* @param defaultChannel the default channel
*/
public MessagingTemplate(MessageChannel defaultChannel);
/**
* Set default channel.
*
* @param defaultChannel the default channel
*/
public void setDefaultChannel(MessageChannel defaultChannel);
/**
* Set default channel by name.
*
* @param defaultChannelName the channel name
*/
public void setDefaultDestinationName(String defaultChannelName);
/**
* Set send timeout.
*
* @param sendTimeout timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout);
/**
* Set receive timeout.
*
* @param receiveTimeout timeout in milliseconds
*/
public void setReceiveTimeout(long receiveTimeout);
/**
* Set whether to throw exception on late reply.
*
* @param throwExceptionOnLateReply true to throw exception
*/
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply);
/**
* Send message to default channel.
*
* @param message the message
*/
@Override
public void send(Message<?> message);
/**
* Send message to channel.
*
* @param channel the channel
* @param message the message
*/
@Override
public void send(MessageChannel channel, Message<?> message);
/**
* Convert and send payload to channel.
*
* @param channel the channel
* @param payload the payload
*/
@Override
public void convertAndSend(MessageChannel channel, Object payload);
/**
* Send and receive from channel.
*
* @param channel the channel
* @param requestMessage the request message
* @return the reply message, or null if timeout
*/
@Override
@Nullable
public Message<?> sendAndReceive(MessageChannel channel, Message<?> requestMessage);
/**
* Convert, send, and receive from channel.
*
* @param channel the channel
* @param request the request payload
* @return the reply payload, or null if timeout
*/
@Override
@Nullable
public Object convertSendAndReceive(MessageChannel channel, Object request);
/**
* Receive message from channel.
*
* @param channel the pollable channel
* @return the message, or null if none available
*/
@Override
@Nullable
public Message<?> receive(PollableChannel channel);
/**
* Receive message from channel with timeout.
*
* @param destination the message channel
* @param timeout the timeout in milliseconds
* @return the message, or null if timeout
*/
@Nullable
public Message<?> receive(MessageChannel destination, long timeout);
}Usage Example:
@Service
public class MessagingService {
private final MessagingTemplate template;
public MessagingService(MessageChannel defaultChannel) {
this.template = new MessagingTemplate(defaultChannel);
this.template.setSendTimeout(5000);
this.template.setReceiveTimeout(3000);
}
public void sendOrder(Order order) {
template.convertAndSend("orderChannel", order);
}
public String processOrder(Order order) {
Message<?> request = MessageBuilder.withPayload(order)
.setHeader("priority", "HIGH")
.build();
Message<?> reply = template.sendAndReceive("orderChannel", request);
return reply != null ? (String) reply.getPayload() : null;
}
// Edge case: Handling null replies
public String processOrderSafely(Order order) {
Message<?> reply = template.sendAndReceive("orderChannel",
MessageBuilder.withPayload(order).build());
if (reply == null) {
return "NO_REPLY"; // Handle timeout or no reply
}
return (String) reply.getPayload();
}
// Edge case: Template with error on timeout
public String processOrderWithTimeoutError(Order order) {
MessagingTemplate templateWithError = new MessagingTemplate();
templateWithError.setDefaultChannelName("orderChannel");
templateWithError.setThrowExceptionOnLateReply(true);
templateWithError.setReceiveTimeout(5000);
try {
return (String) templateWithError.convertSendAndReceive(order);
} catch (MessageTimeoutException e) {
log.warn("Order processing timed out", e);
return "TIMEOUT";
}
}
}Template for asynchronous messaging operations.
/**
* Template for asynchronous messaging operations.
* Returns ListenableFuture for async operations.
*/
public class AsyncMessagingTemplate extends MessagingTemplate
implements AsyncMessagingOperations {
/**
* Set executor for async operations.
*
* @param executor the executor
*/
public void setExecutor(Executor executor);
/**
* Async send to channel.
*
* @param channel the channel
* @param message the message
* @return future completing when sent
*/
public Future<?> asyncSend(MessageChannel channel, Message<?> message);
/**
* Async convert and send to channel.
*
* @param channel the channel
* @param payload the payload
* @return future completing when sent
*/
public Future<?> asyncConvertAndSend(MessageChannel channel, Object payload);
/**
* Async send and receive from channel.
*
* @param channel the channel
* @param requestMessage the request
* @return future with reply
*/
public Future<Message<?>> asyncSendAndReceive(MessageChannel channel,
Message<?> requestMessage);
/**
* Async convert, send, and receive from channel.
*
* @param channel the channel
* @param request the request payload
* @return future with reply payload
*/
public <R> Future<R> asyncConvertSendAndReceive(MessageChannel channel,
Object request);
}Usage Example:
@Service
public class AsyncMessagingService {
private final AsyncMessagingTemplate asyncTemplate;
public AsyncMessagingService(MessageChannel defaultChannel) {
this.asyncTemplate = new AsyncMessagingTemplate(defaultChannel);
this.asyncTemplate.setExecutor(Executors.newFixedThreadPool(10));
}
public CompletableFuture<String> processOrderAsync(Order order) {
ListenableFuture<Object> future =
asyncTemplate.asyncConvertSendAndReceive("orderChannel", order);
CompletableFuture<String> result = new CompletableFuture<>();
future.addCallback(
success -> result.complete((String) success),
failure -> result.completeExceptionally(failure)
);
return result;
}
// Edge case: Async operations with error handling
public CompletableFuture<String> processOrderAsyncWithErrorHandling(Order order) {
ListenableFuture<Object> future = asyncTemplate.asyncConvertSendAndReceive(
"orderChannel", order);
CompletableFuture<String> result = new CompletableFuture<>();
future.addCallback(
success -> {
if (success != null) {
result.complete((String) success);
} else {
result.completeExceptionally(new IllegalStateException("Null reply"));
}
},
failure -> result.completeExceptionally(failure)
);
return result;
}
}ManageableLifecycle (start/stop/pause/resume)MessagingException - Base messaging exception (always check getCause())MessageTimeoutException - Timeout waiting for replyMessageDeliveryException - Message delivery failureIllegalArgumentException - Invalid parameters (null channel, etc.)GenericHandler receives null payload (check for null)