or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-message-store.mdinbound-polling.mdindex.mdjava-dsl.mdlock-registry.mdmessage-store.mdmetadata-store.mdoutbound-gateway.mdoutbound-updates.mdparameter-factories.mdpostgres-channels.mdstored-procedures.md
tile.json

metadata-store.mddocs/

Metadata Store

The JDBC Metadata Store provides transactional key-value storage for managing application metadata in a database. This is essential for tracking message processing state, implementing idempotency, storing sequence numbers, and coordinating distributed operations.

Key Information for Agents

Required Dependencies:

  • spring-integration-jdbc (this package)
  • spring-integration-core is required
  • DataSource or JdbcOperations bean must be configured
  • Database schema must exist (single table)

Default Behaviors:

  • Default table prefix: "INT_" (table: INT_METADATA_STORE)
  • Default region: "DEFAULT"
  • Default lock hint: "FOR UPDATE" (pessimistic locking)
  • checkDatabaseOnStart=true (validates schema on startup)
  • All operations are transactional (require transaction manager)
  • Values stored as VARCHAR (default: 4000 characters)

Threading Model:

  • Thread-safe when using JdbcOperations (connection pooling)
  • Atomic operations use pessimistic locking (SELECT FOR UPDATE)
  • Concurrent access is safe (database-level locking)

Lifecycle:

  • Implements SmartLifecycle (auto-starts by default)
  • Checks database schema on startup (can be disabled)
  • Stops gracefully on context shutdown

Exceptions:

  • DataAccessException - Database access failures
  • IllegalArgumentException - Invalid configuration
  • TransactionRequiredException - Operations require transactional context

Edge Cases:

  • All operations require transactional context (configure transaction manager)
  • putIfAbsent() and replace() provide atomic guarantees within transaction
  • Pessimistic locking uses SELECT FOR UPDATE (tune lock hint for database)
  • Region partitioning allows multiple logical stores to share same physical table
  • Key namespaces recommended (e.g., "processed:", "config:", "sequence:")
  • Value size limited by VARCHAR length (default: 4000; adjust if needed)
  • Database round-trip for each operation (cache frequently-read values if appropriate)
  • Lock timeout should be configured to prevent indefinite blocking

Core Class

package org.springframework.integration.jdbc.metadata;

public class JdbcMetadataStore
    implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {

    public static final String DEFAULT_TABLE_PREFIX = "INT_";

    public JdbcMetadataStore(DataSource dataSource);
    public JdbcMetadataStore(JdbcOperations jdbcOperations);

    public void setTablePrefix(String tablePrefix);
    public void setRegion(String region);
    public void setLockHint(String lockHint);
    public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart);

    public void put(String key, String value);
    public String get(String key);
    public String putIfAbsent(String key, String value);
    public boolean replace(String key, String oldValue, String newValue);
    public String remove(String key);
}

Usage Examples

Basic Operations

import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;

// Basic metadata store setup
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);

// Store metadata
metadataStore.put("last-processed-id", "12345");

// Retrieve metadata
String lastId = metadataStore.get("last-processed-id");

// Remove metadata
String removed = metadataStore.remove("last-processed-id");

Idempotent Message Processing

// Idempotent message processing using putIfAbsent
JdbcMetadataStore idempotentStore = new JdbcMetadataStore(dataSource);

public void processMessage(Message<?> message) {
    String messageId = message.getHeaders().getId().toString();

    // Check if already processed
    String previous = idempotentStore.putIfAbsent(
        "processed:" + messageId,
        String.valueOf(System.currentTimeMillis())
    );

    if (previous == null) {
        // First time seeing this message - process it
        performProcessing(message);
    } else {
        // Already processed - skip
        System.out.println("Duplicate message detected: " + messageId);
    }
}

Atomic Counter

// Atomic counter using replace operation
JdbcMetadataStore counterStore = new JdbcMetadataStore(dataSource);

public long incrementCounter(String counterName) {
    while (true) {
        String currentValue = counterStore.get(counterName);

        if (currentValue == null) {
            // Initialize counter
            String previous = counterStore.putIfAbsent(counterName, "1");
            if (previous == null) {
                return 1L; // Successfully initialized
            }
            // Someone else initialized, retry
            continue;
        }

        long current = Long.parseLong(currentValue);
        long next = current + 1;

        // Atomic compare-and-swap
        if (counterStore.replace(counterName, currentValue, String.valueOf(next))) {
            return next; // Successfully incremented
        }
        // Value changed by another thread/instance, retry
    }
}

Sequence Number Tracking

// Sequence number tracking for inbound adapters
JdbcMetadataStore sequenceStore = new JdbcMetadataStore(dataSource);

public void pollDatabase() {
    String lastSequence = sequenceStore.get("order:last-sequence");
    long startSequence = lastSequence != null ? Long.parseLong(lastSequence) + 1 : 0;

    List<Order> newOrders = fetchOrdersFrom(startSequence);

    for (Order order : newOrders) {
        processOrder(order);
        // Update last processed sequence
        sequenceStore.put("order:last-sequence", String.valueOf(order.getSequenceNumber()));
    }
}

Integration with Spring Integration

JdbcMetadataStore integrates with inbound adapters and various integration patterns:

import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;

@Bean
public JdbcMetadataStore metadataStore(DataSource dataSource) {
    JdbcMetadataStore store = new JdbcMetadataStore(dataSource);
    store.setRegion("PRODUCTION");
    return store;
}

@Bean
public IntegrationFlow filePollingFlow(JdbcMetadataStore metadataStore) {
    return IntegrationFlow
        .from(Files.inboundAdapter(new File("/data/input"))
                .patternFilter("*.csv")
                .metadataStore(metadataStore), // Prevents duplicate file processing
            e -> e.poller(Pollers.fixedDelay(5000)))
        .handle(file -> processFile(file))
        .get();
}

Database Schema

Required table (with default prefix "INT_"):

CREATE TABLE INT_METADATA_STORE (
    METADATA_KEY VARCHAR(255) NOT NULL,
    METADATA_VALUE VARCHAR(4000),
    REGION VARCHAR(100) NOT NULL,
    CONSTRAINT INT_METADATA_STORE_PK PRIMARY KEY (METADATA_KEY, REGION)
);

Note: VARCHAR sizes may vary by database. SQL scripts are provided in org/springframework/integration/jdbc/schema-*.sql.

Key Considerations

  • Transaction Required: All operations require transactional context; configure transaction manager
  • Atomic Operations: putIfAbsent() and replace() provide atomic guarantees within transaction
  • Pessimistic Locking: Uses SELECT FOR UPDATE for atomic operations; tune lock hint for database
  • Region Partitioning: Multiple logical stores can share same physical table via regions
  • Key Namespace: Use prefixes for different types of metadata (e.g., "processed:", "config:", "sequence:")
  • Value Size: Default VARCHAR(4000); adjust for larger values if needed
  • Performance: Database round-trip for each operation; cache frequently-read values if appropriate
  • Idempotency: Primary use case for preventing duplicate message processing
  • State Tracking: Ideal for tracking last-processed IDs, sequence numbers, and positions
  • Cleanup: Implement retention policies to prevent unbounded growth
  • Lock Timeout: Configure database lock timeout to prevent indefinite blocking
  • Isolation Level: Ensure appropriate transaction isolation for consistency
  • Monitoring: Track metadata operations for performance and capacity planning
  • High Availability: Database must be available for metadata operations
  • Schema Management: Table must exist before use; enable checkDatabaseOnStart for validation