or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-message-store.mdinbound-polling.mdindex.mdjava-dsl.mdlock-registry.mdmessage-store.mdmetadata-store.mdoutbound-gateway.mdoutbound-updates.mdparameter-factories.mdpostgres-channels.mdstored-procedures.md
tile.json

message-store.mddocs/

Message Store

The JDBC Message Store provides persistent storage for Spring Integration messages using a database. This is essential for correlation components (aggregators, resequencers), delayers, claim checks, and ensuring message durability across application restarts.

Key Information for Agents

Required Dependencies:

  • spring-integration-jdbc (this package)
  • spring-integration-core is required
  • DataSource or JdbcOperations bean must be configured
  • Database schema must exist (tables created from schema scripts)

Default Behaviors:

  • Default table prefix: "INT_" (tables: INT_MESSAGE, INT_MESSAGE_GROUP, INT_GROUP_TO_MESSAGE)
  • Default region: "DEFAULT"
  • Default serializer: Java serialization
  • Default deserializer: Java deserialization with allowed patterns
  • checkDatabaseOnStart=true (validates schema on startup)
  • Messages stored as BLOB (serialized bytes)
  • Message groups stored with metadata (timestamps, completion status)

Threading Model:

  • Thread-safe when using JdbcOperations (connection pooling)
  • All operations are transactional when transaction manager configured
  • Concurrent access to same message group is safe (database-level locking)

Lifecycle:

  • Implements SmartLifecycle (auto-starts by default)
  • Checks database schema on startup (can be disabled)
  • Stops gracefully on context shutdown

Exceptions:

  • DataAccessException - Database access failures
  • IllegalArgumentException - Invalid configuration
  • DeserializationException - Deserialization failures (class not allowed, etc.)

Edge Cases:

  • NOT optimized for channel message storage (use JdbcChannelMessageStore for channels)
  • Designed for correlation components (aggregators, resequencers, delayers)
  • Large message groups should use streamMessagesForGroup() instead of getMessagesForGroup()
  • Custom serialization can be used (JSON, etc.) but must handle all message types
  • Allowed patterns prevent deserialization attacks (whitelist approach)
  • Region partitioning allows multiple logical stores to share same physical tables
  • Message expiry must be implemented manually (no automatic cleanup)

Core Class

package org.springframework.integration.jdbc.store;

public class JdbcMessageStore extends AbstractMessageGroupStore
        implements MessageStore, BeanClassLoaderAware, SmartLifecycle {

    public static final String DEFAULT_TABLE_PREFIX = "INT_";

    public JdbcMessageStore(DataSource dataSource);
    public JdbcMessageStore(JdbcOperations jdbcOperations);

    public void setTablePrefix(String tablePrefix);
    public void setRegion(String region);
    public void setSerializer(Serializer<? super Message<?>> serializer);
    public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
    public void addAllowedPatterns(String... patterns);
    public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);
    public void setBeanClassLoader(ClassLoader classLoader);

    public <T> Message<T> addMessage(Message<T> message);
    public Message<?> getMessage(UUID id);
    public MessageMetadata getMessageMetadata(UUID id);
    public Message<?> removeMessage(UUID id);
    public long getMessageCount();
    public int getMessageGroupCount();
    public int getMessageCountForAllMessageGroups();
    public int messageGroupSize(Object groupId);
    public MessageGroup getMessageGroup(Object groupId);
    public MessageGroupMetadata getGroupMetadata(Object groupId);
    public Message<?> getMessageFromGroup(Object groupId, UUID messageId);
    public Message<?> getOneMessageFromGroup(Object groupId);
    public Collection<Message<?>> getMessagesForGroup(Object groupId);
    public Stream<Message<?>> streamMessagesForGroup(Object groupId);
    public Iterator<MessageGroup> iterator();
}

Usage Examples

Basic Setup

import org.springframework.integration.jdbc.store.JdbcMessageStore;
import javax.sql.DataSource;

// Basic message store setup
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);

// Store a message
Order order = new Order("ORD-123", new BigDecimal("99.99"));
Message<Order> message = MessageBuilder.withPayload(order).build();
Message<Order> storedMessage = messageStore.addMessage(message);

UUID messageId = storedMessage.getHeaders().getId();

// Retrieve message
Message<?> retrieved = messageStore.getMessage(messageId);

// Remove message
Message<?> removed = messageStore.removeMessage(messageId);

Region and Table Prefix

// Configure with custom table prefix and region
JdbcMessageStore regionalStore = new JdbcMessageStore(dataSource);
regionalStore.setTablePrefix("APP_");
regionalStore.setRegion("US_EAST");

// Messages stored in APP_MESSAGE, APP_MESSAGE_GROUP, APP_GROUP_TO_MESSAGE
// with REGION column = 'US_EAST'

Message Groups

// Working with message groups (for aggregators, resequencers)
JdbcMessageStore groupStore = new JdbcMessageStore(dataSource);

Object correlationId = "ORDER-123";

// Check group size
int groupSize = groupStore.messageGroupSize(correlationId);

// Get all messages in group
MessageGroup group = groupStore.getMessageGroup(correlationId);
Collection<Message<?>> messages = group.getMessages();

// Get group metadata
MessageGroupMetadata metadata = groupStore.getGroupMetadata(correlationId);
long timestamp = metadata.getTimestamp();
boolean complete = metadata.isComplete();

// Stream messages (memory efficient)
groupStore.streamMessagesForGroup(correlationId)
    .forEach(msg -> System.out.println(msg.getPayload()));

Custom Serialization

import com.fasterxml.jackson.databind.ObjectMapper;

// Custom JSON serialization
JdbcMessageStore jsonStore = new JdbcMessageStore(dataSource);

ObjectMapper mapper = new ObjectMapper();
jsonStore.setSerializer((message, outputStream) -> {
    byte[] json = mapper.writeValueAsBytes(message);
    outputStream.write(json);
});

jsonStore.setDeserializer((inputStream) -> {
    return mapper.readValue(inputStream, Message.class);
});

Security Configuration

// Configure allowed deserialization patterns
JdbcMessageStore secureStore = new JdbcMessageStore(dataSource);
secureStore.addAllowedPatterns(
    "com.example.domain.*",
    "com.example.events.*",
    "java.util.*",
    "java.time.*"
);

Integration with Spring Integration

JdbcMessageStore integrates with correlation and stateful components:

import org.springframework.integration.dsl.IntegrationFlow;

@Bean
public JdbcMessageStore messageStore(DataSource dataSource) {
    JdbcMessageStore store = new JdbcMessageStore(dataSource);
    store.setRegion("ORDERS");
    return store;
}

@Bean
public IntegrationFlow aggregatorFlow(JdbcMessageStore messageStore) {
    return IntegrationFlow
        .from("inputChannel")
        .aggregate(aggregator -> aggregator
            .messageStore(messageStore)
            .correlationStrategy(new OrderCorrelationStrategy())
            .releaseStrategy(new OrderReleaseStrategy())
            .expireGroupsUponCompletion(true)
            .sendPartialResultOnExpiry(true)
        )
        .channel("outputChannel")
        .get();
}

Database Schema

Required tables (with default prefix "INT_"):

CREATE TABLE INT_MESSAGE (
    MESSAGE_ID CHAR(36) NOT NULL,
    REGION VARCHAR(100) NOT NULL,
    CREATED_DATE TIMESTAMP NOT NULL,
    MESSAGE_BYTES BLOB,
    CONSTRAINT INT_MESSAGE_PK PRIMARY KEY (MESSAGE_ID, REGION)
);

CREATE TABLE INT_MESSAGE_GROUP (
    GROUP_KEY CHAR(36) NOT NULL,
    REGION VARCHAR(100) NOT NULL,
    MARKED BIGINT,
    COMPLETE BIGINT,
    LAST_RELEASED_SEQUENCE BIGINT,
    CREATED_DATE TIMESTAMP NOT NULL,
    UPDATED_DATE TIMESTAMP DEFAULT NULL,
    CONSTRAINT INT_MESSAGE_GROUP_PK PRIMARY KEY (GROUP_KEY, REGION)
);

CREATE TABLE INT_GROUP_TO_MESSAGE (
    GROUP_KEY CHAR(36) NOT NULL,
    MESSAGE_ID CHAR(36) NOT NULL,
    REGION VARCHAR(100) NOT NULL,
    CONSTRAINT INT_GROUP_TO_MESSAGE_PK PRIMARY KEY (GROUP_KEY, MESSAGE_ID, REGION)
);

SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql for various databases.

Key Considerations

  • Not for Channels: JdbcMessageStore is designed for correlation components, not channels. Use JdbcChannelMessageStore for QueueChannel backing
  • Message Groups: Primary use case is storing correlated message groups for aggregators and resequencers
  • Region Partitioning: Multiple logical stores can share same physical tables using different regions
  • Serialization: Default Java serialization; consider JSON serialization for cross-language compatibility
  • Security: Configure allowed deserialization patterns to prevent security vulnerabilities
  • Performance: Not optimized for high-throughput channel operations; appropriate for stateful components
  • Transaction Management: All operations are transactional when transaction manager is configured
  • Cleanup: Implement message expiry strategy to prevent unbounded growth
  • Schema Management: Tables must be created before use; check database on start for validation
  • Memory: Large message groups should use streamMessagesForGroup() instead of getMessagesForGroup()
  • Monitoring: Use count methods for operational monitoring and capacity planning