The JDBC Metadata Store provides transactional key-value storage for managing application metadata in a database. This is essential for tracking message processing state, implementing idempotency, storing sequence numbers, and coordinating distributed operations.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDataSource or JdbcOperations bean must be configuredDefault Behaviors:
"INT_" (table: INT_METADATA_STORE)"DEFAULT""FOR UPDATE" (pessimistic locking)checkDatabaseOnStart=true (validates schema on startup)Threading Model:
JdbcOperations (connection pooling)Lifecycle:
SmartLifecycle (auto-starts by default)Exceptions:
DataAccessException - Database access failuresIllegalArgumentException - Invalid configurationTransactionRequiredException - Operations require transactional contextEdge Cases:
putIfAbsent() and replace() provide atomic guarantees within transactionpackage org.springframework.integration.jdbc.metadata;
public class JdbcMetadataStore
implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {
public static final String DEFAULT_TABLE_PREFIX = "INT_";
public JdbcMetadataStore(DataSource dataSource);
public JdbcMetadataStore(JdbcOperations jdbcOperations);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public void setLockHint(String lockHint);
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);
public void put(String key, String value);
public String get(String key);
public String putIfAbsent(String key, String value);
public boolean replace(String key, String oldValue, String newValue);
public String remove(String key);
}import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;
// Basic metadata store setup
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
// Store metadata
metadataStore.put("last-processed-id", "12345");
// Retrieve metadata
String lastId = metadataStore.get("last-processed-id");
// Remove metadata
String removed = metadataStore.remove("last-processed-id");// Idempotent message processing using putIfAbsent
JdbcMetadataStore idempotentStore = new JdbcMetadataStore(dataSource);
public void processMessage(Message<?> message) {
String messageId = message.getHeaders().getId().toString();
// Check if already processed
String previous = idempotentStore.putIfAbsent(
"processed:" + messageId,
String.valueOf(System.currentTimeMillis())
);
if (previous == null) {
// First time seeing this message - process it
performProcessing(message);
} else {
// Already processed - skip
System.out.println("Duplicate message detected: " + messageId);
}
}// Atomic counter using replace operation
JdbcMetadataStore counterStore = new JdbcMetadataStore(dataSource);
public long incrementCounter(String counterName) {
while (true) {
String currentValue = counterStore.get(counterName);
if (currentValue == null) {
// Initialize counter
String previous = counterStore.putIfAbsent(counterName, "1");
if (previous == null) {
return 1L; // Successfully initialized
}
// Someone else initialized, retry
continue;
}
long current = Long.parseLong(currentValue);
long next = current + 1;
// Atomic compare-and-swap
if (counterStore.replace(counterName, currentValue, String.valueOf(next))) {
return next; // Successfully incremented
}
// Value changed by another thread/instance, retry
}
}// Sequence number tracking for inbound adapters
JdbcMetadataStore sequenceStore = new JdbcMetadataStore(dataSource);
public void pollDatabase() {
String lastSequence = sequenceStore.get("order:last-sequence");
long startSequence = lastSequence != null ? Long.parseLong(lastSequence) + 1 : 0;
List<Order> newOrders = fetchOrdersFrom(startSequence);
for (Order order : newOrders) {
processOrder(order);
// Update last processed sequence
sequenceStore.put("order:last-sequence", String.valueOf(order.getSequenceNumber()));
}
}JdbcMetadataStore integrates with inbound adapters and various integration patterns:
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
@Bean
public JdbcMetadataStore metadataStore(DataSource dataSource) {
JdbcMetadataStore store = new JdbcMetadataStore(dataSource);
store.setRegion("PRODUCTION");
return store;
}
@Bean
public IntegrationFlow filePollingFlow(JdbcMetadataStore metadataStore) {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/data/input"))
.patternFilter("*.csv")
.metadataStore(metadataStore), // Prevents duplicate file processing
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(file -> processFile(file))
.get();
}Required table (with default prefix "INT_"):
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
CONSTRAINT INT_METADATA_STORE_PK PRIMARY KEY (METADATA_KEY, REGION)
);Note: VARCHAR sizes may vary by database. SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql.
putIfAbsent() and replace() provide atomic guarantees within transaction