CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-spring-integration-core

Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns and messaging capabilities for Spring-based applications.

Overview
Eval results
Files

Spring Integration Core

Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns (EIP) and messaging capabilities for Spring-based applications. It enables message-driven architecture through channels, endpoints, handlers, and declarative configuration, supporting both imperative and reactive programming models.

Quick Decision Guide for Agents

Configuration Approach Selection

  • Use annotations when: Simple flows, Spring Boot auto-configuration, rapid development
  • Use Java DSL when: Complex flows, programmatic flow building, type-safe configuration
  • Use programmatic when: Dynamic configuration, runtime flow creation, fine-grained control

Channel Selection

  • Synchronous point-to-point: DirectChannel (default, round-robin)
  • Asynchronous point-to-point: ExecutorChannel (non-blocking send)
  • Queue-based: QueueChannel (bounded/unbounded, pollable)
  • Broadcast: PublishSubscribeChannel (all subscribers receive)
  • Priority queue: PriorityChannel (custom ordering)
  • Synchronization: RendezvousChannel (zero-capacity blocking)
  • Reactive: FluxMessageChannel (reactive streams)

See Message Channels for detailed selection criteria.

Endpoint Selection

  • EventDrivenConsumer: Subscribable channels, immediate processing, caller thread
  • PollingConsumer: Pollable channels, scheduled processing, executor threads
  • ReactiveStreamsConsumer: Reactive channels, backpressure support
  • SourcePollingChannelAdapter: Poll message sources, scheduled polling

See Message Endpoints for detailed patterns.

Essential Setup

Minimal Required Configuration

@Configuration
@EnableIntegration  // REQUIRED for annotation-based configuration
public class IntegrationConfig {
    // Integration infrastructure enabled
}

Core Components Overview

  • Channels: Transport messages - See Message Channels
  • Endpoints: Connect handlers to channels - See Message Endpoints
  • Handlers: Process messages (ServiceActivator, Transformer, Router, Filter, Splitter, Aggregator)
  • Interfaces: Functional contracts - See Core Interfaces

Critical Configuration Rules

Required Configuration

  • @EnableIntegration is mandatory for annotation-based configuration (must be on @Configuration class)
  • Channels must be defined as Spring beans (or use MessageChannels factory for DSL)
  • Endpoints are automatically registered when using annotations or DSL
  • @IntegrationComponentScan enables component scanning for integration components

Default Behaviors (Critical for Agents)

  • DirectChannel: Round-robin load balancing, failover enabled
  • PublishSubscribeChannel: Synchronous by default (use executor for async)
  • QueueChannel: capacity=0 means unbounded (not zero capacity)
  • PriorityChannel: Uses PRIORITY header for ordering
  • Error channel: errorChannel bean name (if configured, errors sent automatically)
  • Null channel: nullChannel bean name (discards all messages silently)
  • Default timeout: IntegrationContextUtils.DEFAULT_TIMEOUT (1000ms)
  • Endpoints: autoStartup=true by default (must be started to process)
  • Polling endpoints: maxMessagesPerPoll=-1 (unlimited), receiveTimeout=1000ms

Threading Model (Critical for Concurrency)

  • DirectChannel: Synchronous in sender's thread (blocks until handler completes)
  • ExecutorChannel: Asynchronous via executor (non-blocking send)
  • QueueChannel: Thread-safe blocking operations (send blocks if full, receive blocks if empty)
  • PublishSubscribeChannel: Can be synchronous (default) or asynchronous (with executor)
  • Rule: Message handlers should be stateless for thread-safety (or properly synchronized)
  • Polling consumers use executor threads (configurable via TaskExecutor)

Lifecycle Management

  • All endpoints implement SmartLifecycle (start/stop control)
  • autoStartup=true by default (endpoints start automatically)
  • phase controls initialization order (lower phases start first)
  • role enables group-based lifecycle control (start/stop all endpoints with same role)
  • Channels are passive (no lifecycle, but can be paused if Pausable)
  • Critical: Endpoints must be started to process messages (auto-started unless autoStartup=false)

Exception Handling

Exception Hierarchy

  • MessagingException - Base exception (always check getCause())
  • MessageDeliveryException - Delivery failures (channel full, no subscribers)
  • MessageRejectedException - Message rejected by filter/router
  • MessageTimeoutException - Timeout waiting for reply
  • MessageHandlingException - Handler threw exception (wrapped with original message)
  • All exceptions include the failed message in getFailedMessage()

Error Channel Pattern

@Configuration
public class ErrorHandlingConfig {
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }
    
    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        Throwable exception = errorMessage.getPayload();
        Message<?> failedMessage = errorMessage.getOriginalMessage();
        // Handle error
    }
}

Critical Edge Cases

Null Handling

  • Null payloads: Handled gracefully by most components (check documentation for specific behavior)
  • Null headers: Use copyHeadersIfAbsent() to avoid overwriting existing headers
  • Null return from handler: No output message sent (valid for one-way flows)
  • Null return from transformer: Throws exception (transformers must return non-null)
  • Null return from MessageSource: Not an error - indicates no message available (polling continues)

Channel Behavior

  • QueueChannel capacity: capacity=0 means unbounded; capacity>0 means bounded
  • PublishSubscribeChannel: requireSubscribers=true throws exception if no subscribers
  • QueueChannel.receive(): Returns null when empty (not an error)
  • Channels reject null messages: Throw exception if null message sent

Message Processing

  • Empty collections: Splitters handle empty collections (no output messages)
  • Timeout scenarios: Configure timeouts for send/receive operations (default 1000ms)
  • Message expiration: Use setExpirationDate() for TTL messages (expired messages filtered automatically)
  • No subscribers: PublishSubscribeChannel with requireSubscribers=true throws exception
  • Handler returns null: No output message sent (valid for one-way flows)

Common Integration Patterns

Request-Reply Pattern

@MessagingGateway
public interface OrderGateway {
    @Gateway(requestChannel = "orderChannel", replyChannel = "replyChannel")
    OrderResult processOrder(Order order);
}

@ServiceActivator(inputChannel = "orderChannel", outputChannel = "replyChannel")
public OrderResult handleOrder(Order order) {
    return processOrder(order);
}

Fire-and-Forget Pattern

@MessagingGateway
public interface NotificationGateway {
    @Gateway(requestChannel = "notificationChannel")
    void sendNotification(String message);
}

@ServiceActivator(inputChannel = "notificationChannel")
public void handleNotification(String message) {
    // Process notification (no reply)
}

Error Handling Pattern

@ServiceActivator(
    inputChannel = "orderChannel",
    outputChannel = "confirmationChannel",
    errorChannel = "errorChannel"
)
public String handleOrderWithErrorHandling(String order) {
    if (order == null || order.isEmpty()) {
        throw new IllegalArgumentException("Order cannot be empty");
    }
    return "Order confirmed: " + order;
}

Message Enrichment Pattern

@Bean
public IntegrationFlow enrichmentFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .enrich(e -> e
            .requestChannel("lookupChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("customer", Message::getPayload))
        .channel("outputChannel")
        .get();
}

Content-Based Routing Pattern

@Router(inputChannel = "inputChannel")
public String route(Order order) {
    if (order.getPriority() == Priority.HIGH) {
        return "highPriorityChannel";
    } else {
        return "normalPriorityChannel";
    }
}

Split-Aggregate Pattern

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .split()
        .channel("processingChannel")
        .aggregate(a -> a
            .correlationStrategy(Message::getHeaders)
            .releaseStrategy(group -> group.size() == 3))
        .channel("outputChannel")
        .get();
}

Package Information

  • Package Name: spring-integration-core
  • Group ID: org.springframework.integration
  • Package Type: maven
  • Language: Java
  • Version: 7.0.0

Installation

Maven:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>7.0.0</version>
</dependency>

Gradle:

implementation 'org.springframework.integration:spring-integration-core:7.0.0'

Required Dependencies

  • spring-integration-core (this package)
  • Spring Framework 6.x (for Spring Integration 7.0)
  • Optional: Spring Boot for auto-configuration
  • Optional: spring-integration-java-dsl for Java DSL support

Core Imports

// Configuration
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.integration.config.EnableMessageHistory;
import org.springframework.integration.annotation.IntegrationComponentScan;

// Annotations for endpoints
import org.springframework.integration.annotation.*;
// @ServiceActivator, @Transformer, @Router, @Filter, @Splitter, @Aggregator
// @MessagingGateway, @Gateway, @InboundChannelAdapter

// DSL for fluent API (requires spring-integration-java-dsl)
import org.springframework.integration.dsl.*;
// IntegrationFlow, IntegrationFlowBuilder, MessageChannels, Pollers

// Core interfaces
import org.springframework.integration.core.*;
// MessageSource, MessageProducer, GenericHandler, GenericTransformer, GenericSelector
// MessagingTemplate, AsyncMessagingTemplate

// Message channels
import org.springframework.integration.channel.*;
// DirectChannel, PublishSubscribeChannel, QueueChannel, ExecutorChannel
// PriorityChannel, RendezvousChannel, FluxMessageChannel, NullChannel

// Message handlers
import org.springframework.integration.handler.*;
// ServiceActivatingHandler, BridgeHandler, LoggingHandler, DelayHandler

// Message support
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.support.context.NamedComponent;

// Endpoints
import org.springframework.integration.endpoint.*;
// EventDrivenConsumer, PollingConsumer, ReactiveStreamsConsumer
// SourcePollingChannelAdapter

Configuration Approaches

Annotation-Based Configuration

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {
    
    @Bean
    public MessageChannel orderChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public MessageChannel confirmationChannel() {
        return new QueueChannel(100);
    }
}

@MessagingGateway
public interface OrderGateway {
    @Gateway(requestChannel = "orderChannel", replyChannel = "confirmationChannel")
    String processOrder(String order);
}

@Component
public class OrderService {
    @ServiceActivator(inputChannel = "orderChannel", outputChannel = "confirmationChannel")
    public String handleOrder(String order) {
        return "Order confirmed: " + order;
    }
}

DSL-Based Configuration

@Configuration
@EnableIntegration
public class FlowConfig {

    @Bean
    public IntegrationFlow orderProcessingFlow() {
        return IntegrationFlow
            .from("orderChannel")
            .filter((String order) -> order != null && order.length() > 0)
            .transform(String::toUpperCase)
            .handle(msg -> System.out.println("Processing: " + msg.getPayload()))
            .channel("outputChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow pollingFlow() {
        return IntegrationFlow
            .from(() -> generateMessage(), 
                  e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(10)))
            .channel("outputChannel")
            .get();
    }
}

Programmatic Configuration

@Configuration
public class ProgrammaticConfig {
    
    @Bean
    public DirectChannel inputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public QueueChannel outputChannel() {
        return new QueueChannel(100);
    }
    
    @Bean
    public EventDrivenConsumer eventConsumer() {
        ServiceActivatingHandler handler = new ServiceActivatingHandler(
            message -> System.out.println("Processing: " + message.getPayload()));
        EventDrivenConsumer consumer = new EventDrivenConsumer(inputChannel(), handler);
        consumer.setAutoStartup(true);
        return consumer;
    }
}

Architecture Overview

Message-Driven Architecture

  • Messages: Immutable data structures containing headers and payload (Message<T>)
  • Channels: Conduits for message transport between components (MessageChannel)
  • Endpoints: Message consumers and producers that connect to channels (AbstractEndpoint)
  • Handlers: Components that process messages (transform, route, filter, aggregate, split)

Enterprise Integration Patterns

The framework implements Martin Fowler's Enterprise Integration Patterns:

  • Message Channels: Point-to-point and publish-subscribe communication
  • Message Endpoints: Service activators, gateways, channel adapters
  • Message Routing: Content-based routing, recipient lists, splitters, aggregators
  • Message Transformation: Payload transformation, enrichment, header manipulation
  • Message Filtering: Selective message processing
  • Message Aggregation: Combining multiple messages into one
  • Message Splitting: Breaking one message into multiple

Troubleshooting

Common Issues and Solutions

Messages not being processed:

  • Check that @EnableIntegration is present on a @Configuration class
  • Verify endpoints are started (autoStartup=true or call start())
  • Check channel names match between producer and consumer
  • Verify handler is subscribed to channel (for event-driven consumers)

Timeout exceptions:

  • Increase timeout on MessagingTemplate or gateway
  • Check if reply channel is configured correctly
  • Verify handler is processing and sending reply

No subscribers error:

  • Check channel name matches between producer and consumer
  • Verify endpoint is started
  • For PublishSubscribeChannel, set requireSubscribers=false if subscribers optional

Handler not receiving messages:

  • Verify channel type matches consumer type (subscribable vs pollable)
  • Check endpoint lifecycle (must be started)
  • Verify channel bean names match

Transaction issues:

  • Configure transaction manager on polling endpoints
  • Use @Transactional on handler methods if needed
  • Check transaction synchronization factory configuration

Null message handling:

  • MessageSource.receive() returning null is normal (no message available)
  • Handlers receiving null payload should check for null before processing
  • Transformers must not return null (throws exception)

Channel capacity issues:

  • Monitor QueueChannel.getRemainingCapacity() for bounded queues
  • Configure appropriate capacity based on message volume
  • Consider using ExecutorChannel for async processing to avoid blocking

Related Documentation

Install with Tessl CLI

npx tessl i tessl/maven-spring-integration-core
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.springframework.integration/spring-integration-core@7.0.x