The JDBC Channel Message Store provides optimized persistent storage for Spring Integration QueueChannels. Unlike JdbcMessageStore, this implementation is specifically designed for high-throughput channel operations with support for priority messaging, transactional polling, and efficient queue-like semantics.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDataSource bean must be configuredDefault Behaviors:
"INT_" (table: INT_CHANNEL_MESSAGE)"DEFAULT"priorityEnabled=false (priority support disabled)usingIdCache=false (ID caching disabled)checkDatabaseOnStart=true (validates schema on startup)Threading Model:
JdbcOperations (connection pooling)Lifecycle:
SmartLifecycle (auto-starts by default)Exceptions:
DataAccessException - Database access failuresIllegalArgumentException - Invalid configurationDeserializationException - Deserialization failuresEdge Cases:
MESSAGE_PRIORITY column in databasepackage org.springframework.integration.jdbc.store;
public class JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {
public static final String DEFAULT_REGION = "DEFAULT";
public static final String DEFAULT_TABLE_PREFIX = "INT_";
public JdbcChannelMessageStore();
public JdbcChannelMessageStore(DataSource dataSource);
public void setDataSource(DataSource dataSource);
public void setJdbcTemplate(JdbcTemplate jdbcTemplate);
public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
public void addAllowedPatterns(String... patterns);
public void setSerializer(Serializer<? super Message<?>> serializer);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public String getRegion();
public void setMessageRowMapper(RowMapper<Message<?>> messageRowMapper);
public void setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter setter);
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider provider);
public void setUsingIdCache(boolean usingIdCache);
public void setPriorityEnabled(boolean priorityEnabled);
public boolean isPriorityEnabled();
public void setMessageGroupFactory(MessageGroupFactory factory);
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);
public MessageGroup addMessageToGroup(Object groupId, Message<?> message);
public Message<?> pollMessageFromGroup(Object groupId);
public MessageGroup getMessageGroup(Object groupId);
public int getMessageGroupCount();
public int messageGroupSize(Object groupId);
public void removeMessageGroup(Object groupId);
public void removeFromIdCache(String messageId);
public int getSizeOfIdCache();
}import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.channel.QueueChannel;
// Basic channel message store setup
JdbcChannelMessageStore channelStore = new JdbcChannelMessageStore(dataSource);
// Create persistent queue channel
QueueChannel persistentQueue = new QueueChannel(channelStore, "orderQueue");
// Send messages
Order order = new Order("ORD-123", new BigDecimal("99.99"));
Message<Order> message = MessageBuilder.withPayload(order).build();
persistentQueue.send(message);
// Receive messages
Message<?> received = persistentQueue.receive(5000); // 5 second timeout// Enable priority messaging
JdbcChannelMessageStore priorityStore = new JdbcChannelMessageStore(dataSource);
priorityStore.setPriorityEnabled(true);
QueueChannel priorityChannel = new QueueChannel(priorityStore, "priorityQueue");
// Send with priority (higher number = higher priority)
Message<String> highPriority = MessageBuilder
.withPayload("URGENT")
.setPriority(10)
.build();
Message<String> lowPriority = MessageBuilder
.withPayload("NORMAL")
.setPriority(1)
.build();
priorityChannel.send(lowPriority);
priorityChannel.send(highPriority);
// High priority message received first
Message<?> first = priorityChannel.receive(); // "URGENT"// Enable ID caching for concurrent polling
JdbcChannelMessageStore concurrentStore = new JdbcChannelMessageStore(dataSource);
concurrentStore.setUsingIdCache(true);
QueueChannel concurrentChannel = new QueueChannel(concurrentStore, "workQueue");
// Multiple consumers can safely poll without duplicates
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
while (true) {
Message<?> msg = concurrentChannel.receive(1000);
if (msg != null) {
// Process message
System.out.println("Processed: " + msg.getPayload());
}
}
});
}JdbcChannelMessageStore integrates with QueueChannels for persistent messaging:
import org.springframework.integration.dsl.IntegrationFlow;
@Bean
public JdbcChannelMessageStore channelMessageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setRegion("PRODUCTION");
store.setUsingIdCache(true);
store.setPriorityEnabled(true);
return store;
}
@Bean
public QueueChannel persistentQueue(JdbcChannelMessageStore channelMessageStore) {
return new QueueChannel(channelMessageStore, "orders");
}
@Bean
public IntegrationFlow persistentFlow(QueueChannel persistentQueue) {
return IntegrationFlow
.from("inputChannel")
.channel(persistentQueue)
.handle(msg -> {
// Process message
System.out.println("Processing: " + msg.getPayload());
})
.get();
}Required table (with default prefix "INT_"):
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_BYTES BLOB NOT NULL,
CONSTRAINT INT_CHANNEL_MESSAGE_PK PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_ID)
);
CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE);
CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (MESSAGE_PRIORITY DESC);Note: Some databases require different primary key strategies. SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql.
setPriorityEnabled(true) and MESSAGE_PRIORITY columnsetUsingIdCache(true) for multiple consumers