Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns and messaging capabilities for Spring-based applications.
npx @tessl/cli install tessl/maven-spring-integration-core@7.0.0Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns (EIP) and messaging capabilities for Spring-based applications. It enables message-driven architecture through channels, endpoints, handlers, and declarative configuration, supporting both imperative and reactive programming models.
DirectChannel (default, round-robin)ExecutorChannel (non-blocking send)QueueChannel (bounded/unbounded, pollable)PublishSubscribeChannel (all subscribers receive)PriorityChannel (custom ordering)RendezvousChannel (zero-capacity blocking)FluxMessageChannel (reactive streams)See Message Channels for detailed selection criteria.
See Message Endpoints for detailed patterns.
@Configuration
@EnableIntegration // REQUIRED for annotation-based configuration
public class IntegrationConfig {
// Integration infrastructure enabled
}ServiceActivator, Transformer, Router, Filter, Splitter, Aggregator)@EnableIntegration is mandatory for annotation-based configuration (must be on @Configuration class)MessageChannels factory for DSL)@IntegrationComponentScan enables component scanning for integration componentsDirectChannel: Round-robin load balancing, failover enabledPublishSubscribeChannel: Synchronous by default (use executor for async)QueueChannel: capacity=0 means unbounded (not zero capacity)PriorityChannel: Uses PRIORITY header for orderingerrorChannel bean name (if configured, errors sent automatically)nullChannel bean name (discards all messages silently)IntegrationContextUtils.DEFAULT_TIMEOUT (1000ms)autoStartup=true by default (must be started to process)maxMessagesPerPoll=-1 (unlimited), receiveTimeout=1000msDirectChannel: Synchronous in sender's thread (blocks until handler completes)ExecutorChannel: Asynchronous via executor (non-blocking send)QueueChannel: Thread-safe blocking operations (send blocks if full, receive blocks if empty)PublishSubscribeChannel: Can be synchronous (default) or asynchronous (with executor)TaskExecutor)SmartLifecycle (start/stop control)autoStartup=true by default (endpoints start automatically)phase controls initialization order (lower phases start first)role enables group-based lifecycle control (start/stop all endpoints with same role)Pausable)autoStartup=false)MessagingException - Base exception (always check getCause())MessageDeliveryException - Delivery failures (channel full, no subscribers)MessageRejectedException - Message rejected by filter/routerMessageTimeoutException - Timeout waiting for replyMessageHandlingException - Handler threw exception (wrapped with original message)getFailedMessage()@Configuration
public class ErrorHandlingConfig {
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable exception = errorMessage.getPayload();
Message<?> failedMessage = errorMessage.getOriginalMessage();
// Handle error
}
}copyHeadersIfAbsent() to avoid overwriting existing headerscapacity=0 means unbounded; capacity>0 means boundedrequireSubscribers=true throws exception if no subscribersnull when empty (not an error)setExpirationDate() for TTL messages (expired messages filtered automatically)PublishSubscribeChannel with requireSubscribers=true throws exception@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "orderChannel", replyChannel = "replyChannel")
OrderResult processOrder(Order order);
}
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "replyChannel")
public OrderResult handleOrder(Order order) {
return processOrder(order);
}@MessagingGateway
public interface NotificationGateway {
@Gateway(requestChannel = "notificationChannel")
void sendNotification(String message);
}
@ServiceActivator(inputChannel = "notificationChannel")
public void handleNotification(String message) {
// Process notification (no reply)
}@ServiceActivator(
inputChannel = "orderChannel",
outputChannel = "confirmationChannel",
errorChannel = "errorChannel"
)
public String handleOrderWithErrorHandling(String order) {
if (order == null || order.isEmpty()) {
throw new IllegalArgumentException("Order cannot be empty");
}
return "Order confirmed: " + order;
}@Bean
public IntegrationFlow enrichmentFlow() {
return IntegrationFlow
.from("inputChannel")
.enrich(e -> e
.requestChannel("lookupChannel")
.requestPayload(Message::getPayload)
.propertyFunction("customer", Message::getPayload))
.channel("outputChannel")
.get();
}@Router(inputChannel = "inputChannel")
public String route(Order order) {
if (order.getPriority() == Priority.HIGH) {
return "highPriorityChannel";
} else {
return "normalPriorityChannel";
}
}@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow
.from("inputChannel")
.split()
.channel("processingChannel")
.aggregate(a -> a
.correlationStrategy(Message::getHeaders)
.releaseStrategy(group -> group.size() == 3))
.channel("outputChannel")
.get();
}Maven:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-core:7.0.0'spring-integration-core (this package)spring-integration-java-dsl for Java DSL support// Configuration
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.integration.config.EnableMessageHistory;
import org.springframework.integration.annotation.IntegrationComponentScan;
// Annotations for endpoints
import org.springframework.integration.annotation.*;
// @ServiceActivator, @Transformer, @Router, @Filter, @Splitter, @Aggregator
// @MessagingGateway, @Gateway, @InboundChannelAdapter
// DSL for fluent API (requires spring-integration-java-dsl)
import org.springframework.integration.dsl.*;
// IntegrationFlow, IntegrationFlowBuilder, MessageChannels, Pollers
// Core interfaces
import org.springframework.integration.core.*;
// MessageSource, MessageProducer, GenericHandler, GenericTransformer, GenericSelector
// MessagingTemplate, AsyncMessagingTemplate
// Message channels
import org.springframework.integration.channel.*;
// DirectChannel, PublishSubscribeChannel, QueueChannel, ExecutorChannel
// PriorityChannel, RendezvousChannel, FluxMessageChannel, NullChannel
// Message handlers
import org.springframework.integration.handler.*;
// ServiceActivatingHandler, BridgeHandler, LoggingHandler, DelayHandler
// Message support
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.support.context.NamedComponent;
// Endpoints
import org.springframework.integration.endpoint.*;
// EventDrivenConsumer, PollingConsumer, ReactiveStreamsConsumer
// SourcePollingChannelAdapter@Configuration
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {
@Bean
public MessageChannel orderChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel confirmationChannel() {
return new QueueChannel(100);
}
}
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "orderChannel", replyChannel = "confirmationChannel")
String processOrder(String order);
}
@Component
public class OrderService {
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "confirmationChannel")
public String handleOrder(String order) {
return "Order confirmed: " + order;
}
}@Configuration
@EnableIntegration
public class FlowConfig {
@Bean
public IntegrationFlow orderProcessingFlow() {
return IntegrationFlow
.from("orderChannel")
.filter((String order) -> order != null && order.length() > 0)
.transform(String::toUpperCase)
.handle(msg -> System.out.println("Processing: " + msg.getPayload()))
.channel("outputChannel")
.get();
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow
.from(() -> generateMessage(),
e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(10)))
.channel("outputChannel")
.get();
}
}@Configuration
public class ProgrammaticConfig {
@Bean
public DirectChannel inputChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel outputChannel() {
return new QueueChannel(100);
}
@Bean
public EventDrivenConsumer eventConsumer() {
ServiceActivatingHandler handler = new ServiceActivatingHandler(
message -> System.out.println("Processing: " + message.getPayload()));
EventDrivenConsumer consumer = new EventDrivenConsumer(inputChannel(), handler);
consumer.setAutoStartup(true);
return consumer;
}
}Message<T>)MessageChannel)AbstractEndpoint)The framework implements Martin Fowler's Enterprise Integration Patterns:
Messages not being processed:
@EnableIntegration is present on a @Configuration classautoStartup=true or call start())Timeout exceptions:
MessagingTemplate or gatewayNo subscribers error:
PublishSubscribeChannel, set requireSubscribers=false if subscribers optionalHandler not receiving messages:
Transaction issues:
@Transactional on handler methods if neededNull message handling:
MessageSource.receive() returning null is normal (no message available)Channel capacity issues:
QueueChannel.getRemainingCapacity() for bounded queuesExecutorChannel for async processing to avoid blocking