The JDBC Message Store provides persistent storage for Spring Integration messages using a database. This is essential for correlation components (aggregators, resequencers), delayers, claim checks, and ensuring message durability across application restarts.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDataSource or JdbcOperations bean must be configuredDefault Behaviors:
"INT_" (tables: INT_MESSAGE, INT_MESSAGE_GROUP, INT_GROUP_TO_MESSAGE)"DEFAULT"checkDatabaseOnStart=true (validates schema on startup)Threading Model:
JdbcOperations (connection pooling)Lifecycle:
SmartLifecycle (auto-starts by default)Exceptions:
DataAccessException - Database access failuresIllegalArgumentException - Invalid configurationDeserializationException - Deserialization failures (class not allowed, etc.)Edge Cases:
JdbcChannelMessageStore for channels)streamMessagesForGroup() instead of getMessagesForGroup()package org.springframework.integration.jdbc.store;
public class JdbcMessageStore extends AbstractMessageGroupStore
implements MessageStore, BeanClassLoaderAware, SmartLifecycle {
public static final String DEFAULT_TABLE_PREFIX = "INT_";
public JdbcMessageStore(DataSource dataSource);
public JdbcMessageStore(JdbcOperations jdbcOperations);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public void setSerializer(Serializer<? super Message<?>> serializer);
public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
public void addAllowedPatterns(String... patterns);
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);
public void setBeanClassLoader(ClassLoader classLoader);
public <T> Message<T> addMessage(Message<T> message);
public Message<?> getMessage(UUID id);
public MessageMetadata getMessageMetadata(UUID id);
public Message<?> removeMessage(UUID id);
public long getMessageCount();
public int getMessageGroupCount();
public int getMessageCountForAllMessageGroups();
public int messageGroupSize(Object groupId);
public MessageGroup getMessageGroup(Object groupId);
public MessageGroupMetadata getGroupMetadata(Object groupId);
public Message<?> getMessageFromGroup(Object groupId, UUID messageId);
public Message<?> getOneMessageFromGroup(Object groupId);
public Collection<Message<?>> getMessagesForGroup(Object groupId);
public Stream<Message<?>> streamMessagesForGroup(Object groupId);
public Iterator<MessageGroup> iterator();
}import org.springframework.integration.jdbc.store.JdbcMessageStore;
import javax.sql.DataSource;
// Basic message store setup
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
// Store a message
Order order = new Order("ORD-123", new BigDecimal("99.99"));
Message<Order> message = MessageBuilder.withPayload(order).build();
Message<Order> storedMessage = messageStore.addMessage(message);
UUID messageId = storedMessage.getHeaders().getId();
// Retrieve message
Message<?> retrieved = messageStore.getMessage(messageId);
// Remove message
Message<?> removed = messageStore.removeMessage(messageId);// Configure with custom table prefix and region
JdbcMessageStore regionalStore = new JdbcMessageStore(dataSource);
regionalStore.setTablePrefix("APP_");
regionalStore.setRegion("US_EAST");
// Messages stored in APP_MESSAGE, APP_MESSAGE_GROUP, APP_GROUP_TO_MESSAGE
// with REGION column = 'US_EAST'// Working with message groups (for aggregators, resequencers)
JdbcMessageStore groupStore = new JdbcMessageStore(dataSource);
Object correlationId = "ORDER-123";
// Check group size
int groupSize = groupStore.messageGroupSize(correlationId);
// Get all messages in group
MessageGroup group = groupStore.getMessageGroup(correlationId);
Collection<Message<?>> messages = group.getMessages();
// Get group metadata
MessageGroupMetadata metadata = groupStore.getGroupMetadata(correlationId);
long timestamp = metadata.getTimestamp();
boolean complete = metadata.isComplete();
// Stream messages (memory efficient)
groupStore.streamMessagesForGroup(correlationId)
.forEach(msg -> System.out.println(msg.getPayload()));import com.fasterxml.jackson.databind.ObjectMapper;
// Custom JSON serialization
JdbcMessageStore jsonStore = new JdbcMessageStore(dataSource);
ObjectMapper mapper = new ObjectMapper();
jsonStore.setSerializer((message, outputStream) -> {
byte[] json = mapper.writeValueAsBytes(message);
outputStream.write(json);
});
jsonStore.setDeserializer((inputStream) -> {
return mapper.readValue(inputStream, Message.class);
});// Configure allowed deserialization patterns
JdbcMessageStore secureStore = new JdbcMessageStore(dataSource);
secureStore.addAllowedPatterns(
"com.example.domain.*",
"com.example.events.*",
"java.util.*",
"java.time.*"
);JdbcMessageStore integrates with correlation and stateful components:
import org.springframework.integration.dsl.IntegrationFlow;
@Bean
public JdbcMessageStore messageStore(DataSource dataSource) {
JdbcMessageStore store = new JdbcMessageStore(dataSource);
store.setRegion("ORDERS");
return store;
}
@Bean
public IntegrationFlow aggregatorFlow(JdbcMessageStore messageStore) {
return IntegrationFlow
.from("inputChannel")
.aggregate(aggregator -> aggregator
.messageStore(messageStore)
.correlationStrategy(new OrderCorrelationStrategy())
.releaseStrategy(new OrderReleaseStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
)
.channel("outputChannel")
.get();
}Required tables (with default prefix "INT_"):
CREATE TABLE INT_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE TIMESTAMP NOT NULL,
MESSAGE_BYTES BLOB,
CONSTRAINT INT_MESSAGE_PK PRIMARY KEY (MESSAGE_ID, REGION)
);
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
MARKED BIGINT,
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
UPDATED_DATE TIMESTAMP DEFAULT NULL,
CONSTRAINT INT_MESSAGE_GROUP_PK PRIMARY KEY (GROUP_KEY, REGION)
);
CREATE TABLE INT_GROUP_TO_MESSAGE (
GROUP_KEY CHAR(36) NOT NULL,
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONSTRAINT INT_GROUP_TO_MESSAGE_PK PRIMARY KEY (GROUP_KEY, MESSAGE_ID, REGION)
);SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql for various databases.
streamMessagesForGroup() instead of getMessagesForGroup()