or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-interfaces.mdindex.mdmessage-channels.mdmessage-endpoints.md
tile.json

tessl/maven-spring-integration-core

Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns and messaging capabilities for Spring-based applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.springframework.integration/spring-integration-core@7.0.x

To install, run

npx @tessl/cli install tessl/maven-spring-integration-core@7.0.0

index.mddocs/

Spring Integration Core

Spring Integration Core is the foundational module of the Spring Integration framework that provides enterprise integration patterns (EIP) and messaging capabilities for Spring-based applications. It enables message-driven architecture through channels, endpoints, handlers, and declarative configuration, supporting both imperative and reactive programming models.

Quick Decision Guide for Agents

Configuration Approach Selection

  • Use annotations when: Simple flows, Spring Boot auto-configuration, rapid development
  • Use Java DSL when: Complex flows, programmatic flow building, type-safe configuration
  • Use programmatic when: Dynamic configuration, runtime flow creation, fine-grained control

Channel Selection

  • Synchronous point-to-point: DirectChannel (default, round-robin)
  • Asynchronous point-to-point: ExecutorChannel (non-blocking send)
  • Queue-based: QueueChannel (bounded/unbounded, pollable)
  • Broadcast: PublishSubscribeChannel (all subscribers receive)
  • Priority queue: PriorityChannel (custom ordering)
  • Synchronization: RendezvousChannel (zero-capacity blocking)
  • Reactive: FluxMessageChannel (reactive streams)

See Message Channels for detailed selection criteria.

Endpoint Selection

  • EventDrivenConsumer: Subscribable channels, immediate processing, caller thread
  • PollingConsumer: Pollable channels, scheduled processing, executor threads
  • ReactiveStreamsConsumer: Reactive channels, backpressure support
  • SourcePollingChannelAdapter: Poll message sources, scheduled polling

See Message Endpoints for detailed patterns.

Essential Setup

Minimal Required Configuration

@Configuration
@EnableIntegration  // REQUIRED for annotation-based configuration
public class IntegrationConfig {
    // Integration infrastructure enabled
}

Core Components Overview

  • Channels: Transport messages - See Message Channels
  • Endpoints: Connect handlers to channels - See Message Endpoints
  • Handlers: Process messages (ServiceActivator, Transformer, Router, Filter, Splitter, Aggregator)
  • Interfaces: Functional contracts - See Core Interfaces

Critical Configuration Rules

Required Configuration

  • @EnableIntegration is mandatory for annotation-based configuration (must be on @Configuration class)
  • Channels must be defined as Spring beans (or use MessageChannels factory for DSL)
  • Endpoints are automatically registered when using annotations or DSL
  • @IntegrationComponentScan enables component scanning for integration components

Default Behaviors (Critical for Agents)

  • DirectChannel: Round-robin load balancing, failover enabled
  • PublishSubscribeChannel: Synchronous by default (use executor for async)
  • QueueChannel: capacity=0 means unbounded (not zero capacity)
  • PriorityChannel: Uses PRIORITY header for ordering
  • Error channel: errorChannel bean name (if configured, errors sent automatically)
  • Null channel: nullChannel bean name (discards all messages silently)
  • Default timeout: IntegrationContextUtils.DEFAULT_TIMEOUT (1000ms)
  • Endpoints: autoStartup=true by default (must be started to process)
  • Polling endpoints: maxMessagesPerPoll=-1 (unlimited), receiveTimeout=1000ms

Threading Model (Critical for Concurrency)

  • DirectChannel: Synchronous in sender's thread (blocks until handler completes)
  • ExecutorChannel: Asynchronous via executor (non-blocking send)
  • QueueChannel: Thread-safe blocking operations (send blocks if full, receive blocks if empty)
  • PublishSubscribeChannel: Can be synchronous (default) or asynchronous (with executor)
  • Rule: Message handlers should be stateless for thread-safety (or properly synchronized)
  • Polling consumers use executor threads (configurable via TaskExecutor)

Lifecycle Management

  • All endpoints implement SmartLifecycle (start/stop control)
  • autoStartup=true by default (endpoints start automatically)
  • phase controls initialization order (lower phases start first)
  • role enables group-based lifecycle control (start/stop all endpoints with same role)
  • Channels are passive (no lifecycle, but can be paused if Pausable)
  • Critical: Endpoints must be started to process messages (auto-started unless autoStartup=false)

Exception Handling

Exception Hierarchy

  • MessagingException - Base exception (always check getCause())
  • MessageDeliveryException - Delivery failures (channel full, no subscribers)
  • MessageRejectedException - Message rejected by filter/router
  • MessageTimeoutException - Timeout waiting for reply
  • MessageHandlingException - Handler threw exception (wrapped with original message)
  • All exceptions include the failed message in getFailedMessage()

Error Channel Pattern

@Configuration
public class ErrorHandlingConfig {
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }
    
    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        Throwable exception = errorMessage.getPayload();
        Message<?> failedMessage = errorMessage.getOriginalMessage();
        // Handle error
    }
}

Critical Edge Cases

Null Handling

  • Null payloads: Handled gracefully by most components (check documentation for specific behavior)
  • Null headers: Use copyHeadersIfAbsent() to avoid overwriting existing headers
  • Null return from handler: No output message sent (valid for one-way flows)
  • Null return from transformer: Throws exception (transformers must return non-null)
  • Null return from MessageSource: Not an error - indicates no message available (polling continues)

Channel Behavior

  • QueueChannel capacity: capacity=0 means unbounded; capacity>0 means bounded
  • PublishSubscribeChannel: requireSubscribers=true throws exception if no subscribers
  • QueueChannel.receive(): Returns null when empty (not an error)
  • Channels reject null messages: Throw exception if null message sent

Message Processing

  • Empty collections: Splitters handle empty collections (no output messages)
  • Timeout scenarios: Configure timeouts for send/receive operations (default 1000ms)
  • Message expiration: Use setExpirationDate() for TTL messages (expired messages filtered automatically)
  • No subscribers: PublishSubscribeChannel with requireSubscribers=true throws exception
  • Handler returns null: No output message sent (valid for one-way flows)

Common Integration Patterns

Request-Reply Pattern

@MessagingGateway
public interface OrderGateway {
    @Gateway(requestChannel = "orderChannel", replyChannel = "replyChannel")
    OrderResult processOrder(Order order);
}

@ServiceActivator(inputChannel = "orderChannel", outputChannel = "replyChannel")
public OrderResult handleOrder(Order order) {
    return processOrder(order);
}

Fire-and-Forget Pattern

@MessagingGateway
public interface NotificationGateway {
    @Gateway(requestChannel = "notificationChannel")
    void sendNotification(String message);
}

@ServiceActivator(inputChannel = "notificationChannel")
public void handleNotification(String message) {
    // Process notification (no reply)
}

Error Handling Pattern

@ServiceActivator(
    inputChannel = "orderChannel",
    outputChannel = "confirmationChannel",
    errorChannel = "errorChannel"
)
public String handleOrderWithErrorHandling(String order) {
    if (order == null || order.isEmpty()) {
        throw new IllegalArgumentException("Order cannot be empty");
    }
    return "Order confirmed: " + order;
}

Message Enrichment Pattern

@Bean
public IntegrationFlow enrichmentFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .enrich(e -> e
            .requestChannel("lookupChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("customer", Message::getPayload))
        .channel("outputChannel")
        .get();
}

Content-Based Routing Pattern

@Router(inputChannel = "inputChannel")
public String route(Order order) {
    if (order.getPriority() == Priority.HIGH) {
        return "highPriorityChannel";
    } else {
        return "normalPriorityChannel";
    }
}

Split-Aggregate Pattern

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .split()
        .channel("processingChannel")
        .aggregate(a -> a
            .correlationStrategy(Message::getHeaders)
            .releaseStrategy(group -> group.size() == 3))
        .channel("outputChannel")
        .get();
}

Package Information

  • Package Name: spring-integration-core
  • Group ID: org.springframework.integration
  • Package Type: maven
  • Language: Java
  • Version: 7.0.0

Installation

Maven:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>7.0.0</version>
</dependency>

Gradle:

implementation 'org.springframework.integration:spring-integration-core:7.0.0'

Required Dependencies

  • spring-integration-core (this package)
  • Spring Framework 6.x (for Spring Integration 7.0)
  • Optional: Spring Boot for auto-configuration
  • Optional: spring-integration-java-dsl for Java DSL support

Core Imports

// Configuration
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.integration.config.EnableMessageHistory;
import org.springframework.integration.annotation.IntegrationComponentScan;

// Annotations for endpoints
import org.springframework.integration.annotation.*;
// @ServiceActivator, @Transformer, @Router, @Filter, @Splitter, @Aggregator
// @MessagingGateway, @Gateway, @InboundChannelAdapter

// DSL for fluent API (requires spring-integration-java-dsl)
import org.springframework.integration.dsl.*;
// IntegrationFlow, IntegrationFlowBuilder, MessageChannels, Pollers

// Core interfaces
import org.springframework.integration.core.*;
// MessageSource, MessageProducer, GenericHandler, GenericTransformer, GenericSelector
// MessagingTemplate, AsyncMessagingTemplate

// Message channels
import org.springframework.integration.channel.*;
// DirectChannel, PublishSubscribeChannel, QueueChannel, ExecutorChannel
// PriorityChannel, RendezvousChannel, FluxMessageChannel, NullChannel

// Message handlers
import org.springframework.integration.handler.*;
// ServiceActivatingHandler, BridgeHandler, LoggingHandler, DelayHandler

// Message support
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.support.context.NamedComponent;

// Endpoints
import org.springframework.integration.endpoint.*;
// EventDrivenConsumer, PollingConsumer, ReactiveStreamsConsumer
// SourcePollingChannelAdapter

Configuration Approaches

Annotation-Based Configuration

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {
    
    @Bean
    public MessageChannel orderChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public MessageChannel confirmationChannel() {
        return new QueueChannel(100);
    }
}

@MessagingGateway
public interface OrderGateway {
    @Gateway(requestChannel = "orderChannel", replyChannel = "confirmationChannel")
    String processOrder(String order);
}

@Component
public class OrderService {
    @ServiceActivator(inputChannel = "orderChannel", outputChannel = "confirmationChannel")
    public String handleOrder(String order) {
        return "Order confirmed: " + order;
    }
}

DSL-Based Configuration

@Configuration
@EnableIntegration
public class FlowConfig {

    @Bean
    public IntegrationFlow orderProcessingFlow() {
        return IntegrationFlow
            .from("orderChannel")
            .filter((String order) -> order != null && order.length() > 0)
            .transform(String::toUpperCase)
            .handle(msg -> System.out.println("Processing: " + msg.getPayload()))
            .channel("outputChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow pollingFlow() {
        return IntegrationFlow
            .from(() -> generateMessage(), 
                  e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(10)))
            .channel("outputChannel")
            .get();
    }
}

Programmatic Configuration

@Configuration
public class ProgrammaticConfig {
    
    @Bean
    public DirectChannel inputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public QueueChannel outputChannel() {
        return new QueueChannel(100);
    }
    
    @Bean
    public EventDrivenConsumer eventConsumer() {
        ServiceActivatingHandler handler = new ServiceActivatingHandler(
            message -> System.out.println("Processing: " + message.getPayload()));
        EventDrivenConsumer consumer = new EventDrivenConsumer(inputChannel(), handler);
        consumer.setAutoStartup(true);
        return consumer;
    }
}

Architecture Overview

Message-Driven Architecture

  • Messages: Immutable data structures containing headers and payload (Message<T>)
  • Channels: Conduits for message transport between components (MessageChannel)
  • Endpoints: Message consumers and producers that connect to channels (AbstractEndpoint)
  • Handlers: Components that process messages (transform, route, filter, aggregate, split)

Enterprise Integration Patterns

The framework implements Martin Fowler's Enterprise Integration Patterns:

  • Message Channels: Point-to-point and publish-subscribe communication
  • Message Endpoints: Service activators, gateways, channel adapters
  • Message Routing: Content-based routing, recipient lists, splitters, aggregators
  • Message Transformation: Payload transformation, enrichment, header manipulation
  • Message Filtering: Selective message processing
  • Message Aggregation: Combining multiple messages into one
  • Message Splitting: Breaking one message into multiple

Troubleshooting

Common Issues and Solutions

Messages not being processed:

  • Check that @EnableIntegration is present on a @Configuration class
  • Verify endpoints are started (autoStartup=true or call start())
  • Check channel names match between producer and consumer
  • Verify handler is subscribed to channel (for event-driven consumers)

Timeout exceptions:

  • Increase timeout on MessagingTemplate or gateway
  • Check if reply channel is configured correctly
  • Verify handler is processing and sending reply

No subscribers error:

  • Check channel name matches between producer and consumer
  • Verify endpoint is started
  • For PublishSubscribeChannel, set requireSubscribers=false if subscribers optional

Handler not receiving messages:

  • Verify channel type matches consumer type (subscribable vs pollable)
  • Check endpoint lifecycle (must be started)
  • Verify channel bean names match

Transaction issues:

  • Configure transaction manager on polling endpoints
  • Use @Transactional on handler methods if needed
  • Check transaction synchronization factory configuration

Null message handling:

  • MessageSource.receive() returning null is normal (no message available)
  • Handlers receiving null payload should check for null before processing
  • Transformers must not return null (throws exception)

Channel capacity issues:

  • Monitor QueueChannel.getRemainingCapacity() for bounded queues
  • Configure appropriate capacity based on message volume
  • Consider using ExecutorChannel for async processing to avoid blocking

Related Documentation