Spring Integration JDBC Support - Provides channel adapters and gateways for database integration using JDBC
The JDBC Channel Message Store provides optimized persistent storage for Spring Integration QueueChannels. Unlike JdbcMessageStore, this implementation is specifically designed for high-throughput channel operations with support for priority messaging, transactional polling, and efficient queue-like semantics.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDataSource bean must be configuredDefault Behaviors:
"INT_" (table: INT_CHANNEL_MESSAGE)"DEFAULT"priorityEnabled=false (priority support disabled)usingIdCache=false (ID caching disabled)checkDatabaseOnStart=true (validates schema on startup)Threading Model:
JdbcOperations (connection pooling)Lifecycle:
SmartLifecycle (auto-starts by default)Exceptions:
DataAccessException - Database access failuresIllegalArgumentException - Invalid configurationDeserializationException - Deserialization failuresEdge Cases:
MESSAGE_PRIORITY column in databasepackage org.springframework.integration.jdbc.store;
public class JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {
public static final String DEFAULT_REGION = "DEFAULT";
public static final String DEFAULT_TABLE_PREFIX = "INT_";
public JdbcChannelMessageStore();
public JdbcChannelMessageStore(DataSource dataSource);
public void setDataSource(DataSource dataSource);
public void setJdbcTemplate(JdbcTemplate jdbcTemplate);
public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
public void addAllowedPatterns(String... patterns);
public void setSerializer(Serializer<? super Message<?>> serializer);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public String getRegion();
public void setMessageRowMapper(RowMapper<Message<?>> messageRowMapper);
public void setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter setter);
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider provider);
public void setUsingIdCache(boolean usingIdCache);
public void setPriorityEnabled(boolean priorityEnabled);
public boolean isPriorityEnabled();
public void setMessageGroupFactory(MessageGroupFactory factory);
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);
public MessageGroup addMessageToGroup(Object groupId, Message<?> message);
public Message<?> pollMessageFromGroup(Object groupId);
public MessageGroup getMessageGroup(Object groupId);
public int getMessageGroupCount();
public int messageGroupSize(Object groupId);
public void removeMessageGroup(Object groupId);
public void removeFromIdCache(String messageId);
public int getSizeOfIdCache();
}import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.channel.QueueChannel;
// Basic channel message store setup
JdbcChannelMessageStore channelStore = new JdbcChannelMessageStore(dataSource);
// Create persistent queue channel
QueueChannel persistentQueue = new QueueChannel(channelStore, "orderQueue");
// Send messages
Order order = new Order("ORD-123", new BigDecimal("99.99"));
Message<Order> message = MessageBuilder.withPayload(order).build();
persistentQueue.send(message);
// Receive messages
Message<?> received = persistentQueue.receive(5000); // 5 second timeout// Enable priority messaging
JdbcChannelMessageStore priorityStore = new JdbcChannelMessageStore(dataSource);
priorityStore.setPriorityEnabled(true);
QueueChannel priorityChannel = new QueueChannel(priorityStore, "priorityQueue");
// Send with priority (higher number = higher priority)
Message<String> highPriority = MessageBuilder
.withPayload("URGENT")
.setPriority(10)
.build();
Message<String> lowPriority = MessageBuilder
.withPayload("NORMAL")
.setPriority(1)
.build();
priorityChannel.send(lowPriority);
priorityChannel.send(highPriority);
// High priority message received first
Message<?> first = priorityChannel.receive(); // "URGENT"// Enable ID caching for concurrent polling
JdbcChannelMessageStore concurrentStore = new JdbcChannelMessageStore(dataSource);
concurrentStore.setUsingIdCache(true);
QueueChannel concurrentChannel = new QueueChannel(concurrentStore, "workQueue");
// Multiple consumers can safely poll without duplicates
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
while (true) {
Message<?> msg = concurrentChannel.receive(1000);
if (msg != null) {
// Process message
System.out.println("Processed: " + msg.getPayload());
}
}
});
}JdbcChannelMessageStore integrates with QueueChannels for persistent messaging:
import org.springframework.integration.dsl.IntegrationFlow;
@Bean
public JdbcChannelMessageStore channelMessageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setRegion("PRODUCTION");
store.setUsingIdCache(true);
store.setPriorityEnabled(true);
return store;
}
@Bean
public QueueChannel persistentQueue(JdbcChannelMessageStore channelMessageStore) {
return new QueueChannel(channelMessageStore, "orders");
}
@Bean
public IntegrationFlow persistentFlow(QueueChannel persistentQueue) {
return IntegrationFlow
.from("inputChannel")
.channel(persistentQueue)
.handle(msg -> {
// Process message
System.out.println("Processing: " + msg.getPayload());
})
.get();
}Required table (with default prefix "INT_"):
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_BYTES BLOB NOT NULL,
CONSTRAINT INT_CHANNEL_MESSAGE_PK PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_ID)
);
CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE);
CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (MESSAGE_PRIORITY DESC);Note: Some databases require different primary key strategies. SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql.
setPriorityEnabled(true) and MESSAGE_PRIORITY columnsetUsingIdCache(true) for multiple consumersInstall with Tessl CLI
npx tessl i tessl/maven-spring-integration-jdbc