or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-message-store.mdinbound-polling.mdindex.mdjava-dsl.mdlock-registry.mdmessage-store.mdmetadata-store.mdoutbound-gateway.mdoutbound-updates.mdparameter-factories.mdpostgres-channels.mdstored-procedures.md
tile.json

channel-message-store.mddocs/

Channel Message Store

The JDBC Channel Message Store provides optimized persistent storage for Spring Integration QueueChannels. Unlike JdbcMessageStore, this implementation is specifically designed for high-throughput channel operations with support for priority messaging, transactional polling, and efficient queue-like semantics.

Key Information for Agents

Required Dependencies:

  • spring-integration-jdbc (this package)
  • spring-integration-core is required
  • DataSource bean must be configured
  • Database schema must exist (single table design)

Default Behaviors:

  • Default table prefix: "INT_" (table: INT_CHANNEL_MESSAGE)
  • Default region: "DEFAULT"
  • priorityEnabled=false (priority support disabled)
  • usingIdCache=false (ID caching disabled)
  • checkDatabaseOnStart=true (validates schema on startup)
  • Default serializer: Java serialization
  • Messages stored as BLOB with priority and timestamp
  • FIFO ordering by default (CREATED_DATE)

Threading Model:

  • Thread-safe when using JdbcOperations (connection pooling)
  • Polling is transactional (messages restored on rollback when ID cache enabled)
  • Concurrent polling supported with ID cache enabled
  • Priority polling uses database-level ordering

Lifecycle:

  • Implements SmartLifecycle (auto-starts by default)
  • Checks database schema on startup (can be disabled)
  • Stops gracefully on context shutdown

Exceptions:

  • DataAccessException - Database access failures
  • IllegalArgumentException - Invalid configuration
  • DeserializationException - Deserialization failures

Edge Cases:

  • Single table design (more efficient than JdbcMessageStore for channels)
  • Priority requires MESSAGE_PRIORITY column in database
  • ID cache prevents duplicate polling across multiple consumers
  • Transaction rollback restores messages when ID cache enabled
  • Database-specific query providers optimize polling performance
  • Custom serialization can be used (JSON via custom mappers)
  • Region partitioning allows multiple logical channels to share same physical table
  • FIFO ordering unless priority enabled (then priority order)

Core Class

package org.springframework.integration.jdbc.store;

public class JdbcChannelMessageStore
    implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {

    public static final String DEFAULT_REGION = "DEFAULT";
    public static final String DEFAULT_TABLE_PREFIX = "INT_";

    public JdbcChannelMessageStore();
    public JdbcChannelMessageStore(DataSource dataSource);

    public void setDataSource(DataSource dataSource);
    public void setJdbcTemplate(JdbcTemplate jdbcTemplate);
    public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
    public void addAllowedPatterns(String... patterns);
    public void setSerializer(Serializer<? super Message<?>> serializer);
    public void setTablePrefix(String tablePrefix);
    public void setRegion(String region);
    public String getRegion();
    public void setMessageRowMapper(RowMapper<Message<?>> messageRowMapper);
    public void setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter setter);
    public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider provider);
    public void setUsingIdCache(boolean usingIdCache);
    public void setPriorityEnabled(boolean priorityEnabled);
    public boolean isPriorityEnabled();
    public void setMessageGroupFactory(MessageGroupFactory factory);
    public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);

    public MessageGroup addMessageToGroup(Object groupId, Message<?> message);
    public Message<?> pollMessageFromGroup(Object groupId);
    public MessageGroup getMessageGroup(Object groupId);
    public int getMessageGroupCount();
    public int messageGroupSize(Object groupId);
    public void removeMessageGroup(Object groupId);
    public void removeFromIdCache(String messageId);
    public int getSizeOfIdCache();
}

Usage Examples

Basic Setup

import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.channel.QueueChannel;

// Basic channel message store setup
JdbcChannelMessageStore channelStore = new JdbcChannelMessageStore(dataSource);

// Create persistent queue channel
QueueChannel persistentQueue = new QueueChannel(channelStore, "orderQueue");

// Send messages
Order order = new Order("ORD-123", new BigDecimal("99.99"));
Message<Order> message = MessageBuilder.withPayload(order).build();
persistentQueue.send(message);

// Receive messages
Message<?> received = persistentQueue.receive(5000); // 5 second timeout

Priority Messaging

// Enable priority messaging
JdbcChannelMessageStore priorityStore = new JdbcChannelMessageStore(dataSource);
priorityStore.setPriorityEnabled(true);

QueueChannel priorityChannel = new QueueChannel(priorityStore, "priorityQueue");

// Send with priority (higher number = higher priority)
Message<String> highPriority = MessageBuilder
    .withPayload("URGENT")
    .setPriority(10)
    .build();

Message<String> lowPriority = MessageBuilder
    .withPayload("NORMAL")
    .setPriority(1)
    .build();

priorityChannel.send(lowPriority);
priorityChannel.send(highPriority);

// High priority message received first
Message<?> first = priorityChannel.receive(); // "URGENT"

Concurrent Polling with ID Cache

// Enable ID caching for concurrent polling
JdbcChannelMessageStore concurrentStore = new JdbcChannelMessageStore(dataSource);
concurrentStore.setUsingIdCache(true);

QueueChannel concurrentChannel = new QueueChannel(concurrentStore, "workQueue");

// Multiple consumers can safely poll without duplicates
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
    executor.submit(() -> {
        while (true) {
            Message<?> msg = concurrentChannel.receive(1000);
            if (msg != null) {
                // Process message
                System.out.println("Processed: " + msg.getPayload());
            }
        }
    });
}

Integration with Spring Integration

JdbcChannelMessageStore integrates with QueueChannels for persistent messaging:

import org.springframework.integration.dsl.IntegrationFlow;

@Bean
public JdbcChannelMessageStore channelMessageStore(DataSource dataSource) {
    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setRegion("PRODUCTION");
    store.setUsingIdCache(true);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public QueueChannel persistentQueue(JdbcChannelMessageStore channelMessageStore) {
    return new QueueChannel(channelMessageStore, "orders");
}

@Bean
public IntegrationFlow persistentFlow(QueueChannel persistentQueue) {
    return IntegrationFlow
        .from("inputChannel")
        .channel(persistentQueue)
        .handle(msg -> {
            // Process message
            System.out.println("Processing: " + msg.getPayload());
        })
        .get();
}

Database Schema

Required table (with default prefix "INT_"):

CREATE TABLE INT_CHANNEL_MESSAGE (
    MESSAGE_ID CHAR(36) NOT NULL,
    GROUP_KEY CHAR(36) NOT NULL,
    REGION VARCHAR(100) NOT NULL,
    CREATED_DATE BIGINT NOT NULL,
    MESSAGE_PRIORITY BIGINT,
    MESSAGE_BYTES BLOB NOT NULL,
    CONSTRAINT INT_CHANNEL_MESSAGE_PK PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_ID)
);

CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE);
CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (MESSAGE_PRIORITY DESC);

Note: Some databases require different primary key strategies. SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql.

Key Considerations

  • Channel-Specific: Optimized for QueueChannel backing, not general message groups
  • Single Table: Uses single table design for better performance than JdbcMessageStore
  • Priority Support: Enable with setPriorityEnabled(true) and MESSAGE_PRIORITY column
  • Concurrent Access: Enable ID caching with setUsingIdCache(true) for multiple consumers
  • Transaction Safety: Polling is transactional; messages restored on rollback when ID cache enabled
  • Region Partitioning: Multiple logical channels can share same physical table
  • Query Optimization: Use database-specific query providers for best performance
  • Serialization: Default Java serialization; JSON serialization available via custom mappers
  • Security: Configure allowed deserialization patterns to prevent vulnerabilities
  • Monitoring: Use count methods and cache size for operational visibility
  • Cleanup: Implement retention policies to prevent unbounded table growth
  • Schema Management: Table must exist before use; enable checkDatabaseOnStart for validation
  • Performance: Far more efficient than JdbcMessageStore for channel operations
  • FIFO Ordering: Messages polled in insertion order (CREATED_DATE) unless priority enabled