or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-interfaces.mdindex.mdmessage-channels.mdmessage-endpoints.md
tile.json

core-interfaces.mddocs/

Core Interfaces

Core interfaces and abstractions provide the foundational contracts for Spring Integration components. These include message sources, producers, functional handlers, transformers, selectors, pausable components, and messaging templates for operations.

Quick Reference for Agents

Interface Selection Guide

  • MessageSource<T>: Poll or generate messages (returns null if none available)
  • MessageProducer: Components that produce messages to channels
  • GenericHandler<P>: Process messages with payload and headers (returns Object or null)
  • GenericTransformer<S, T>: Transform payload from source to target type (must return non-null)
  • GenericSelector<T>: Filter messages based on payload (returns boolean)
  • Pausable: Components that can be paused/resumed
  • MessagingTemplate: Synchronous messaging operations
  • AsyncMessagingTemplate: Asynchronous messaging operations

Critical Behaviors (Must Know)

  • MessageSource.receive() returns null when no message available (not an error - polling continues)
  • GenericHandler returning null produces no output message (valid for one-way flows)
  • GenericTransformer must return non-null (null throws exception)
  • GenericSelector returning false rejects message (sent to discard channel if configured)
  • MessagingTemplate uses default channel if set, otherwise requires explicit channel
  • Templates throw MessageTimeoutException on timeout (if errorOnTimeout=true)

Message Source and Producer

MessageSource<T>

Strategy interface for receiving messages from source. Implementations poll or generate messages on demand.

/**
 * Strategy interface for receiving messages from source.
 * Implementations poll or generate messages on demand.
 * Extends IntegrationPattern to identify as inbound channel adapter.
 *
 * @param <T> the payload type
 */
@FunctionalInterface
public interface MessageSource<T> extends IntegrationPattern {
    /**
     * Receive message from source.
     * Returns null if no message available.
     *
     * @return the message, or null if none available
     */
    @Nullable
    Message<T> receive();
}

Critical Behavior: Returning null is not an error - it indicates no message is available. Polling adapters will continue polling.

Usage Example:

@Bean
public IntegrationFlow messageSourceFlow() {
    return IntegrationFlow
        .from((MessageSource<String>) () -> {
            String data = fetchData();
            // Return null if no data available (not an error)
            return data != null ? MessageBuilder.withPayload(data).build() : null;
        }, e -> e.poller(Pollers.fixedDelay(1000)))
        .channel("outputChannel")
        .get();
}

// Edge case: Source with exception handling
@Bean
public MessageSource<String> safeMessageSource() {
    return () -> {
        try {
            String data = fetchData();
            if (data == null || data.isEmpty()) {
                return null; // No message available
            }
            return MessageBuilder.withPayload(data)
                .setHeader("timestamp", System.currentTimeMillis())
                .build();
        } catch (Exception e) {
            // Log but return null to avoid error
            log.error("Failed to fetch data", e);
            return null; // Polling continues
        }
    };
}

MessageProducer

Strategy interface for components that produce messages. Provides output channel management.

/**
 * Strategy interface for components that produce messages.
 * Provides output channel management.
 */
public interface MessageProducer {
    /**
     * Set output channel for produced messages.
     *
     * @param outputChannel the output channel
     */
    void setOutputChannel(MessageChannel outputChannel);

    /**
     * Set output channel by bean name.
     *
     * @param outputChannelName the output channel bean name
     */
    default void setOutputChannelName(String outputChannelName);

    /**
     * Get output channel.
     *
     * @return the output channel
     */
    @Nullable
    MessageChannel getOutputChannel();
}

Functional Handler Interfaces

GenericHandler<P>

Generic handler for messages with payload type. Receives payload and headers, returns result.

/**
 * Generic handler for messages with payload type.
 * Receives payload and headers, returns result.
 *
 * @param <P> the payload type
 */
@FunctionalInterface
public interface GenericHandler<P> {
    /**
     * Handle payload with access to headers.
     *
     * @param payload the message payload
     * @param headers the message headers
     * @return the result, or null for no output
     */
    Object handle(P payload, MessageHeaders headers);
}

Critical Behavior: Returning null produces no output message (valid for one-way flows).

Usage Example:

@Bean
public IntegrationFlow handlerFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .handle((GenericHandler<Order>) (payload, headers) -> {
            String customerId = (String) headers.get("customerId");
            return processOrder(payload, customerId);
        })
        .get();
}

// Edge case: Handler with null return (no output message)
@Bean
public IntegrationFlow handlerWithNullReturn() {
    return IntegrationFlow
        .from("inputChannel")
        .handle((GenericHandler<String>) (payload, headers) -> {
            logMessage(payload);
            return null; // No output message sent (valid behavior)
        })
        .get();
}

// Edge case: Handler with null payload handling
@Bean
public IntegrationFlow handlerWithNullPayload() {
    return IntegrationFlow
        .from("inputChannel")
        .handle((GenericHandler<String>) (payload, headers) -> {
            if (payload == null) {
                // Handle null payload gracefully
                return "NULL_PAYLOAD";
            }
            return processPayload(payload);
        })
        .get();
}

// Edge case: Handler with exception handling
@Bean
public IntegrationFlow handlerWithErrorHandling() {
    return IntegrationFlow
        .from("inputChannel")
        .handle((GenericHandler<Order>) (payload, headers) -> {
            try {
                return processOrder(payload);
            } catch (ValidationException e) {
                // Return error indicator instead of throwing
                return new ErrorResult("VALIDATION_FAILED", e.getMessage());
            } catch (Exception e) {
                // Re-throw for error channel handling
                throw new MessagingException(
                    MessageBuilder.withPayload(payload).build(),
                    "Processing failed", e);
            }
        }, e -> e.errorChannel("errorChannel"))
        .get();
}

GenericTransformer<S, T>

Generic transformer for payload transformation.

/**
 * Generic transformer for payload transformation.
 *
 * @param <S> source payload type
 * @param <T> target payload type
 */
@FunctionalInterface
public interface GenericTransformer<S, T> {
    /**
     * Transform source to target.
     *
     * @param source the source payload
     * @return the transformed payload
     */
    T transform(S source);
}

Critical Behavior: Must return non-null (null throws exception).

Usage Example:

@Bean
public IntegrationFlow transformerFlow() {
    return IntegrationFlow
        .from("orderChannel")
        .transform((GenericTransformer<Order, OrderDto>) order ->
            new OrderDto(order.getId(), order.getAmount()))
        .get();
}

// Edge case: Type-safe transformer with null handling
@Bean
public IntegrationFlow typeSafeTransformer() {
    return IntegrationFlow
        .from("orderChannel")
        .transform(Order.class, (GenericTransformer<Order, OrderDto>) order -> {
            if (order == null) {
                throw new IllegalArgumentException("Order cannot be null");
            }
            return new OrderDto(order.getId(), order.getAmount());
        })
        .get();
}

// Edge case: Transformer that must not return null
@Bean
public IntegrationFlow nonNullTransformer() {
    return IntegrationFlow
        .from("inputChannel")
        .transform((GenericTransformer<String, String>) source -> {
            if (source == null) {
                return ""; // Return empty string, not null
            }
            return source.toUpperCase();
        })
        .get();
}

GenericSelector<T>

Generic selector for filtering with payload type.

/**
 * Generic selector for filtering with payload type.
 *
 * @param <T> the payload type
 */
@FunctionalInterface
public interface GenericSelector<T> {
    /**
     * Select/filter source.
     *
     * @param source the source to evaluate
     * @return true if accepted, false if rejected
     */
    boolean accept(T source);
}

Critical Behavior: Returning false rejects message (sent to discard channel if configured).

Usage Example:

@Bean
public IntegrationFlow selectorFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .filter((GenericSelector<Order>) order ->
            order.getAmount() > 1000 && order.isValid())
        .get();
}

// Edge case: Selector with null handling
@Bean
public IntegrationFlow selectorWithNullHandling() {
    return IntegrationFlow
        .from("inputChannel")
        .filter((GenericSelector<Order>) order -> {
            if (order == null) {
                return false; // Reject null orders
            }
            return order.getAmount() != null 
                && order.getAmount() > 0 
                && order.isValid();
        })
        .get();
}

Error Handling

ErrorMessagePublisher

Component for publishing error messages to error channels.

/**
 * Component for publishing error messages to error channels.
 * Can be called or extended in error handling or retry scenarios.
 * Supports customization via ErrorMessageStrategy.
 */
public class ErrorMessagePublisher {
    /**
     * Set the error message strategy for customizing error messages.
     *
     * @param errorMessageStrategy the error message strategy
     */
    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy);

    /**
     * Set the error channel for publishing error messages.
     *
     * @param channel the error channel
     */
    public void setChannel(MessageChannel channel);

    /**
     * Set the error channel by name.
     *
     * @param channelName the error channel name
     */
    public void setChannelName(String channelName);

    /**
     * Set the send timeout for error message publishing.
     *
     * @param sendTimeout timeout in milliseconds
     */
    public void setSendTimeout(long sendTimeout);

    /**
     * Publish error message for MessagingException.
     *
     * @param exception the messaging exception
     */
    public void publish(MessagingException exception);

    /**
     * Publish error message for failed message and throwable.
     *
     * @param failedMessage the failed message
     * @param throwable the throwable
     */
    public void publish(Message<?> failedMessage, Throwable throwable);
}

Usage Example:

@Bean
public ErrorMessagePublisher errorPublisher() {
    ErrorMessagePublisher publisher = new ErrorMessagePublisher();
    publisher.setChannelName("customErrorChannel");
    publisher.setSendTimeout(5000);
    return publisher;
}

// Edge case: Error publisher with custom strategy
@Bean
public ErrorMessagePublisher customErrorPublisher() {
    ErrorMessagePublisher publisher = new ErrorMessagePublisher();
    publisher.setChannelName("customErrorChannel");
    publisher.setSendTimeout(5000);
    
    publisher.setErrorMessageStrategy((throwable, attributes) -> {
        ErrorMessage errorMessage = new ErrorMessage(throwable);
        errorMessage.getHeaders().put("errorTimestamp", System.currentTimeMillis());
        errorMessage.getHeaders().put("errorSource", "integration-core");
        return errorMessage;
    });
    
    return publisher;
}

RecoveryCallback<T>

Strategy interface for error recovery with fallback behavior.

/**
 * Strategy interface for error recovery with fallback behavior.
 * Provides recovery mechanism based on context and cause.
 *
 * @param <T> the type returned from recovery
 */
@FunctionalInterface
public interface RecoveryCallback<T> {
    /**
     * Recover from failure with fallback result.
     *
     * @param context the context for failure
     * @param cause the cause of the failure
     * @return result to replace the failed callback result
     */
    T recover(AttributeAccessor context, Throwable cause);
}

Usage Example:

@Bean
public RecoveryCallback<String> recoveryCallback() {
    return (context, cause) -> {
        log.error("Processing failed: " + cause.getMessage());
        return "FALLBACK_RESULT";
    };
}

// Edge case: Conditional recovery based on exception type
@Bean
public RecoveryCallback<OrderResult> conditionalRecoveryCallback() {
    return (context, cause) -> {
        if (cause instanceof ValidationException) {
            return new OrderResult("INVALID", "Validation failed: " + cause.getMessage());
        } else if (cause instanceof TimeoutException) {
            return new OrderResult("TIMEOUT", "Operation timed out, retry later");
        } else {
            log.error("Unexpected error", cause);
            return new OrderResult("ERROR", "Processing failed");
        }
    };
}

Lifecycle and Control Interfaces

Pausable

Interface for components that can be paused and resumed.

/**
 * Interface for components that can be paused and resumed.
 * Extends ManageableLifecycle to include start/stop capabilities.
 * Provides finer-grained control than start/stop.
 */
public interface Pausable extends ManageableLifecycle {
    /**
     * Pause component operations.
     * Component remains started but stops processing.
     */
    void pause();

    /**
     * Resume component operations after pause.
     */
    void resume();

    /**
     * Check if component is paused.
     *
     * @return true if paused
     */
    boolean isPaused();
}

Usage Example:

@Service
public class EndpointController {
    @Autowired
    private SourcePollingChannelAdapter adapter;

    public void pausePolling() {
        if (adapter instanceof Pausable) {
            ((Pausable) adapter).pause();
        }
    }

    public void resumePolling() {
        if (adapter instanceof Pausable) {
            ((Pausable) adapter).resume();
        }
    }

    // Edge case: Safe pause with state check
    public void safePause() {
        if (adapter instanceof Pausable) {
            Pausable pausable = (Pausable) adapter;
            if (!pausable.isPaused()) {
                pausable.pause();
            }
        }
    }
}

Messaging Templates

MessagingTemplate

Template for synchronous messaging operations.

/**
 * Template for synchronous messaging operations.
 * Provides send, receive, and send-and-receive methods.
 * Extends GenericMessagingTemplate from Spring Framework.
 */
public class MessagingTemplate extends GenericMessagingTemplate {
    /**
     * Create messaging template.
     */
    public MessagingTemplate();

    /**
     * Create messaging template with default channel.
     *
     * @param defaultChannel the default channel
     */
    public MessagingTemplate(MessageChannel defaultChannel);

    /**
     * Set default channel.
     *
     * @param defaultChannel the default channel
     */
    public void setDefaultChannel(MessageChannel defaultChannel);

    /**
     * Set default channel by name.
     *
     * @param defaultChannelName the channel name
     */
    public void setDefaultDestinationName(String defaultChannelName);

    /**
     * Set send timeout.
     *
     * @param sendTimeout timeout in milliseconds
     */
    public void setSendTimeout(long sendTimeout);

    /**
     * Set receive timeout.
     *
     * @param receiveTimeout timeout in milliseconds
     */
    public void setReceiveTimeout(long receiveTimeout);

    /**
     * Set whether to throw exception on late reply.
     *
     * @param throwExceptionOnLateReply true to throw exception
     */
    public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply);

    /**
     * Send message to default channel.
     *
     * @param message the message
     */
    @Override
    public void send(Message<?> message);

    /**
     * Send message to channel.
     *
     * @param channel the channel
     * @param message the message
     */
    @Override
    public void send(MessageChannel channel, Message<?> message);

    /**
     * Convert and send payload to channel.
     *
     * @param channel the channel
     * @param payload the payload
     */
    @Override
    public void convertAndSend(MessageChannel channel, Object payload);

    /**
     * Send and receive from channel.
     *
     * @param channel the channel
     * @param requestMessage the request message
     * @return the reply message, or null if timeout
     */
    @Override
    @Nullable
    public Message<?> sendAndReceive(MessageChannel channel, Message<?> requestMessage);

    /**
     * Convert, send, and receive from channel.
     *
     * @param channel the channel
     * @param request the request payload
     * @return the reply payload, or null if timeout
     */
    @Override
    @Nullable
    public Object convertSendAndReceive(MessageChannel channel, Object request);

    /**
     * Receive message from channel.
     *
     * @param channel the pollable channel
     * @return the message, or null if none available
     */
    @Override
    @Nullable
    public Message<?> receive(PollableChannel channel);

    /**
     * Receive message from channel with timeout.
     *
     * @param destination the message channel
     * @param timeout the timeout in milliseconds
     * @return the message, or null if timeout
     */
    @Nullable
    public Message<?> receive(MessageChannel destination, long timeout);
}

Usage Example:

@Service
public class MessagingService {
    private final MessagingTemplate template;

    public MessagingService(MessageChannel defaultChannel) {
        this.template = new MessagingTemplate(defaultChannel);
        this.template.setSendTimeout(5000);
        this.template.setReceiveTimeout(3000);
    }

    public void sendOrder(Order order) {
        template.convertAndSend("orderChannel", order);
    }

    public String processOrder(Order order) {
        Message<?> request = MessageBuilder.withPayload(order)
            .setHeader("priority", "HIGH")
            .build();

        Message<?> reply = template.sendAndReceive("orderChannel", request);
        return reply != null ? (String) reply.getPayload() : null;
    }

    // Edge case: Handling null replies
    public String processOrderSafely(Order order) {
        Message<?> reply = template.sendAndReceive("orderChannel", 
            MessageBuilder.withPayload(order).build());
        if (reply == null) {
            return "NO_REPLY"; // Handle timeout or no reply
        }
        return (String) reply.getPayload();
    }

    // Edge case: Template with error on timeout
    public String processOrderWithTimeoutError(Order order) {
        MessagingTemplate templateWithError = new MessagingTemplate();
        templateWithError.setDefaultChannelName("orderChannel");
        templateWithError.setThrowExceptionOnLateReply(true);
        templateWithError.setReceiveTimeout(5000);
        
        try {
            return (String) templateWithError.convertSendAndReceive(order);
        } catch (MessageTimeoutException e) {
            log.warn("Order processing timed out", e);
            return "TIMEOUT";
        }
    }
}

AsyncMessagingTemplate

Template for asynchronous messaging operations.

/**
 * Template for asynchronous messaging operations.
 * Returns ListenableFuture for async operations.
 */
public class AsyncMessagingTemplate extends MessagingTemplate
    implements AsyncMessagingOperations {
    /**
     * Set executor for async operations.
     *
     * @param executor the executor
     */
    public void setExecutor(Executor executor);

    /**
     * Async send to channel.
     *
     * @param channel the channel
     * @param message the message
     * @return future completing when sent
     */
    public Future<?> asyncSend(MessageChannel channel, Message<?> message);

    /**
     * Async convert and send to channel.
     *
     * @param channel the channel
     * @param payload the payload
     * @return future completing when sent
     */
    public Future<?> asyncConvertAndSend(MessageChannel channel, Object payload);

    /**
     * Async send and receive from channel.
     *
     * @param channel the channel
     * @param requestMessage the request
     * @return future with reply
     */
    public Future<Message<?>> asyncSendAndReceive(MessageChannel channel,
                                                            Message<?> requestMessage);

    /**
     * Async convert, send, and receive from channel.
     *
     * @param channel the channel
     * @param request the request payload
     * @return future with reply payload
     */
    public <R> Future<R> asyncConvertSendAndReceive(MessageChannel channel,
                                                               Object request);
}

Usage Example:

@Service
public class AsyncMessagingService {
    private final AsyncMessagingTemplate asyncTemplate;

    public AsyncMessagingService(MessageChannel defaultChannel) {
        this.asyncTemplate = new AsyncMessagingTemplate(defaultChannel);
        this.asyncTemplate.setExecutor(Executors.newFixedThreadPool(10));
    }

    public CompletableFuture<String> processOrderAsync(Order order) {
        ListenableFuture<Object> future =
            asyncTemplate.asyncConvertSendAndReceive("orderChannel", order);

        CompletableFuture<String> result = new CompletableFuture<>();
        future.addCallback(
            success -> result.complete((String) success),
            failure -> result.completeExceptionally(failure)
        );
        return result;
    }

    // Edge case: Async operations with error handling
    public CompletableFuture<String> processOrderAsyncWithErrorHandling(Order order) {
        ListenableFuture<Object> future = asyncTemplate.asyncConvertSendAndReceive(
            "orderChannel", order);
        
        CompletableFuture<String> result = new CompletableFuture<>();
        future.addCallback(
            success -> {
                if (success != null) {
                    result.complete((String) success);
                } else {
                    result.completeExceptionally(new IllegalStateException("Null reply"));
                }
            },
            failure -> result.completeExceptionally(failure)
        );
        return result;
    }
}

Threading and Concurrency

Thread Safety Rules

  • Functional interfaces: Stateless (thread-safe by design)
  • MessagingTemplate: Thread-safe (can be shared across threads)
  • AsyncMessagingTemplate: Thread-safe (uses executor for async operations)
  • MessageSource: Should be thread-safe if shared across multiple polling adapters
  • MessageProducer: Should be thread-safe

Lifecycle Rules

  • Pausable: Extends ManageableLifecycle (start/stop/pause/resume)
  • Templates: No lifecycle (stateless)
  • Message sources: May have lifecycle (depends on implementation)

Exception Handling

Exception Types

  • MessagingException - Base messaging exception (always check getCause())
  • MessageTimeoutException - Timeout waiting for reply
  • MessageDeliveryException - Message delivery failure
  • IllegalArgumentException - Invalid parameters (null channel, etc.)

Edge Cases

  • Null payload: GenericHandler receives null payload (check for null)
  • Null headers: Headers map may be empty but not null
  • Null return from handler: No output message sent (valid behavior)
  • Null return from transformer: Throws exception (must return non-null)
  • Null correlation key: Aggregators may reject or use special handling
  • Empty collections: Selectors should handle gracefully
  • Timeout scenarios: Configure appropriate timeouts for operations