Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns and messaging capabilities for Spring-based applications.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns (EIP) and messaging capabilities for Spring-based applications. It enables message-driven architecture through channels, endpoints, handlers, and declarative configuration, supporting both imperative and reactive programming models.
DirectChannel (default, round-robin)ExecutorChannel (non-blocking send)QueueChannel (bounded/unbounded, pollable)PublishSubscribeChannel (all subscribers receive)PriorityChannel (custom ordering)RendezvousChannel (zero-capacity blocking)FluxMessageChannel (reactive streams)See Message Channels for detailed selection criteria.
See Message Endpoints for detailed patterns.
@Configuration
@EnableIntegration // REQUIRED for annotation-based configuration
public class IntegrationConfig {
// Integration infrastructure enabled
}ServiceActivator, Transformer, Router, Filter, Splitter, Aggregator)@EnableIntegration is mandatory for annotation-based configuration (must be on @Configuration class)MessageChannels factory for DSL)@IntegrationComponentScan enables component scanning for integration componentsDirectChannel: Round-robin load balancing, failover enabledPublishSubscribeChannel: Synchronous by default (use executor for async)QueueChannel: capacity=0 means unbounded (not zero capacity)PriorityChannel: Uses PRIORITY header for orderingerrorChannel bean name (if configured, errors sent automatically)nullChannel bean name (discards all messages silently)IntegrationContextUtils.DEFAULT_TIMEOUT (1000ms)autoStartup=true by default (must be started to process)maxMessagesPerPoll=-1 (unlimited), receiveTimeout=1000msDirectChannel: Synchronous in sender's thread (blocks until handler completes)ExecutorChannel: Asynchronous via executor (non-blocking send)QueueChannel: Thread-safe blocking operations (send blocks if full, receive blocks if empty)PublishSubscribeChannel: Can be synchronous (default) or asynchronous (with executor)TaskExecutor)SmartLifecycle (start/stop control)autoStartup=true by default (endpoints start automatically)phase controls initialization order (lower phases start first)role enables group-based lifecycle control (start/stop all endpoints with same role)Pausable)autoStartup=false)MessagingException - Base exception (always check getCause())MessageDeliveryException - Delivery failures (channel full, no subscribers)MessageRejectedException - Message rejected by filter/routerMessageTimeoutException - Timeout waiting for replyMessageHandlingException - Handler threw exception (wrapped with original message)getFailedMessage()@Configuration
public class ErrorHandlingConfig {
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable exception = errorMessage.getPayload();
Message<?> failedMessage = errorMessage.getOriginalMessage();
// Handle error
}
}copyHeadersIfAbsent() to avoid overwriting existing headerscapacity=0 means unbounded; capacity>0 means boundedrequireSubscribers=true throws exception if no subscribersnull when empty (not an error)setExpirationDate() for TTL messages (expired messages filtered automatically)PublishSubscribeChannel with requireSubscribers=true throws exception@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "orderChannel", replyChannel = "replyChannel")
OrderResult processOrder(Order order);
}
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "replyChannel")
public OrderResult handleOrder(Order order) {
return processOrder(order);
}@MessagingGateway
public interface NotificationGateway {
@Gateway(requestChannel = "notificationChannel")
void sendNotification(String message);
}
@ServiceActivator(inputChannel = "notificationChannel")
public void handleNotification(String message) {
// Process notification (no reply)
}@ServiceActivator(
inputChannel = "orderChannel",
outputChannel = "confirmationChannel",
errorChannel = "errorChannel"
)
public String handleOrderWithErrorHandling(String order) {
if (order == null || order.isEmpty()) {
throw new IllegalArgumentException("Order cannot be empty");
}
return "Order confirmed: " + order;
}@Bean
public IntegrationFlow enrichmentFlow() {
return IntegrationFlow
.from("inputChannel")
.enrich(e -> e
.requestChannel("lookupChannel")
.requestPayload(Message::getPayload)
.propertyFunction("customer", Message::getPayload))
.channel("outputChannel")
.get();
}@Router(inputChannel = "inputChannel")
public String route(Order order) {
if (order.getPriority() == Priority.HIGH) {
return "highPriorityChannel";
} else {
return "normalPriorityChannel";
}
}@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow
.from("inputChannel")
.split()
.channel("processingChannel")
.aggregate(a -> a
.correlationStrategy(Message::getHeaders)
.releaseStrategy(group -> group.size() == 3))
.channel("outputChannel")
.get();
}Maven:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-core:7.0.0'spring-integration-core (this package)spring-integration-java-dsl for Java DSL support// Configuration
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.integration.config.EnableMessageHistory;
import org.springframework.integration.annotation.IntegrationComponentScan;
// Annotations for endpoints
import org.springframework.integration.annotation.*;
// @ServiceActivator, @Transformer, @Router, @Filter, @Splitter, @Aggregator
// @MessagingGateway, @Gateway, @InboundChannelAdapter
// DSL for fluent API (requires spring-integration-java-dsl)
import org.springframework.integration.dsl.*;
// IntegrationFlow, IntegrationFlowBuilder, MessageChannels, Pollers
// Core interfaces
import org.springframework.integration.core.*;
// MessageSource, MessageProducer, GenericHandler, GenericTransformer, GenericSelector
// MessagingTemplate, AsyncMessagingTemplate
// Message channels
import org.springframework.integration.channel.*;
// DirectChannel, PublishSubscribeChannel, QueueChannel, ExecutorChannel
// PriorityChannel, RendezvousChannel, FluxMessageChannel, NullChannel
// Message handlers
import org.springframework.integration.handler.*;
// ServiceActivatingHandler, BridgeHandler, LoggingHandler, DelayHandler
// Message support
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.support.context.NamedComponent;
// Endpoints
import org.springframework.integration.endpoint.*;
// EventDrivenConsumer, PollingConsumer, ReactiveStreamsConsumer
// SourcePollingChannelAdapter@Configuration
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {
@Bean
public MessageChannel orderChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel confirmationChannel() {
return new QueueChannel(100);
}
}
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "orderChannel", replyChannel = "confirmationChannel")
String processOrder(String order);
}
@Component
public class OrderService {
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "confirmationChannel")
public String handleOrder(String order) {
return "Order confirmed: " + order;
}
}@Configuration
@EnableIntegration
public class FlowConfig {
@Bean
public IntegrationFlow orderProcessingFlow() {
return IntegrationFlow
.from("orderChannel")
.filter((String order) -> order != null && order.length() > 0)
.transform(String::toUpperCase)
.handle(msg -> System.out.println("Processing: " + msg.getPayload()))
.channel("outputChannel")
.get();
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow
.from(() -> generateMessage(),
e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(10)))
.channel("outputChannel")
.get();
}
}@Configuration
public class ProgrammaticConfig {
@Bean
public DirectChannel inputChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel outputChannel() {
return new QueueChannel(100);
}
@Bean
public EventDrivenConsumer eventConsumer() {
ServiceActivatingHandler handler = new ServiceActivatingHandler(
message -> System.out.println("Processing: " + message.getPayload()));
EventDrivenConsumer consumer = new EventDrivenConsumer(inputChannel(), handler);
consumer.setAutoStartup(true);
return consumer;
}
}Message<T>)MessageChannel)AbstractEndpoint)The framework implements Martin Fowler's Enterprise Integration Patterns:
Messages not being processed:
@EnableIntegration is present on a @Configuration classautoStartup=true or call start())Timeout exceptions:
MessagingTemplate or gatewayNo subscribers error:
PublishSubscribeChannel, set requireSubscribers=false if subscribers optionalHandler not receiving messages:
Transaction issues:
@Transactional on handler methods if neededNull message handling:
MessageSource.receive() returning null is normal (no message available)Channel capacity issues:
QueueChannel.getRemainingCapacity() for bounded queuesExecutorChannel for async processing to avoid blocking