or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-message-store.mdinbound-polling.mdindex.mdjava-dsl.mdlock-registry.mdmessage-store.mdmetadata-store.mdoutbound-gateway.mdoutbound-updates.mdparameter-factories.mdpostgres-channels.mdstored-procedures.md
tile.json

index.mddocs/

Spring Integration JDBC

Spring Integration JDBC provides comprehensive database integration capabilities for the Spring Integration messaging framework. It enables applications to send and receive messages through JDBC operations, supporting plain SQL queries, stored procedures, functions, and various message persistence patterns for enterprise integration scenarios.

Package Information

  • Package Name: spring-integration-jdbc
  • Package Type: maven
  • Group ID: org.springframework.integration
  • Artifact ID: spring-integration-jdbc
  • Version: 7.0.0
  • Language: Java

Installation:

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-jdbc</artifactId>
  <version>7.0.0</version>
</dependency>

For Gradle:

implementation 'org.springframework.integration:spring-integration-jdbc:7.0.0'

Core Imports

import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.outbound.JdbcMessageHandler;
import org.springframework.integration.jdbc.outbound.JdbcOutboundGateway;
import org.springframework.integration.jdbc.store.JdbcMessageStore;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;

For Java DSL:

import org.springframework.integration.jdbc.dsl.Jdbc;

Key Information for Agents

Required Dependencies:

  • spring-integration-jdbc (this package)
  • spring-integration-core is required
  • spring-jdbc is required (provided transitively)
  • DataSource bean must be configured
  • Database driver for target database (PostgreSQL, MySQL, Oracle, SQL Server, H2, etc.)

Default Behaviors:

  • Default table prefix: "INT_" for all stores
  • Default region: "DEFAULT" for stores
  • Default row mapper: ColumnMapRowMapper (returns Map<String, Object>)
  • Batch operations supported for Iterable payloads
  • Auto-generated keys available via :GENERATED_KEY parameter in SELECT queries
  • Transaction management: Operations execute in existing transaction or create new one if transaction manager configured

Threading Model:

  • Inbound adapters execute on poller thread (configurable)
  • Outbound handlers execute on message handling thread
  • Message stores are thread-safe (use connection pooling)
  • Lock registry uses database for distributed coordination
  • PostgreSQL channels use dedicated LISTEN connection (not pooled)

Lifecycle:

  • All stores implement SmartLifecycle (auto-start by default)
  • Stores check database schema on startup (can be disabled)
  • Polling adapters require poller configuration
  • PostgreSQL subscriber starts automatically with Spring context

Exceptions:

  • DataAccessException - Database access failures
  • MessagingException - Message handling failures
  • IllegalArgumentException - Invalid configuration
  • SQLException - SQL execution errors (wrapped in Spring exceptions)

Edge Cases:

  • Batch updates NOT compatible with keysGenerated=true
  • Empty result sets return null or empty list depending on component
  • maxRows limits SELECT results (default: unlimited)
  • Parameter names are case-sensitive (must match SQL exactly)
  • Stored procedures require explicit parameter type definitions for some databases
  • PostgreSQL LISTEN requires un-pooled connection
  • Lock TTL expiration requires clock synchronization across instances

Architecture

Spring Integration JDBC is built around several key architectural components:

  • Inbound Channel Adapters: Poll databases and generate messages from query results
  • Outbound Channel Adapters: Execute database updates/inserts from incoming messages (fire-and-forget)
  • Outbound Gateways: Execute database operations and return results (request-reply pattern)
  • Message Stores: Persist Spring Integration messages and message groups for aggregators, resequencers, and delayers
  • Channel Message Stores: Specialized stores optimized for queue-backed channels with priority support
  • Lock Registry: Distributed locking mechanism for coordination across multiple JVMs
  • Metadata Store: Transactional key-value store for managing integration metadata
  • Stored Procedure Support: Execute database stored procedures and functions with full parameter mapping
  • Java DSL: Fluent API for defining JDBC-based integration flows programmatically
  • PostgreSQL Extensions: Native support for PostgreSQL LISTEN/NOTIFY push notifications

The module integrates seamlessly with Spring's JDBC abstractions (DataSource, JdbcTemplate) and supports multiple database vendors including PostgreSQL, MySQL, Oracle, SQL Server, H2, Derby, HSQLDB, and DB2.

Capabilities

Inbound Polling (SQL Queries)

Poll database tables using SQL SELECT queries, optionally updating records after retrieval. Supports custom row mapping and parameterized queries.

package org.springframework.integration.jdbc.inbound;

public class JdbcPollingChannelAdapter extends AbstractMessageSource<Object> {
    public JdbcPollingChannelAdapter(DataSource dataSource, String selectQuery);
    public JdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery);

    public void setRowMapper(RowMapper<?> rowMapper);
    public void setSelectQuery(String selectQuery);
    public void setUpdateSql(String updateSql);
    public void setUpdatePerRow(boolean updatePerRow);
    public void setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory factory);
    public void setSelectSqlParameterSource(SqlParameterSource sqlQueryParameterSource);
    public void setMaxRows(int maxRows);
    public List<?> doPoll(SqlParameterSource sqlQueryParameterSource);
}

Inbound Polling

Outbound Updates

Execute SQL INSERT, UPDATE, or DELETE statements from incoming messages. Supports batch operations, auto-generated keys, and parameterized queries using message payload and headers.

package org.springframework.integration.jdbc.outbound;

public class JdbcMessageHandler extends AbstractMessageHandler {
    public JdbcMessageHandler(DataSource dataSource, String updateSql);
    public JdbcMessageHandler(JdbcOperations jdbcOperations, String updateSql);

    public void setKeysGenerated(boolean keysGenerated);
    public void setSqlParameterSourceFactory(SqlParameterSourceFactory factory);
    public void setUsePayloadAsParameterSource(boolean usePayloadAsParameterSource);
    public void setPreparedStatementSetter(MessagePreparedStatementSetter setter);
}

Outbound Updates

Outbound Gateway (Request-Reply)

Execute database operations and return query results or generated keys in reply messages. Supports combined update+select operations for complex database interactions.

package org.springframework.integration.jdbc.outbound;

public class JdbcOutboundGateway extends AbstractReplyProducingMessageHandler {
    public JdbcOutboundGateway(DataSource dataSource, String updateQuery);
    public JdbcOutboundGateway(DataSource dataSource, String updateQuery, String selectQuery);
    public JdbcOutboundGateway(JdbcOperations jdbcOperations, String updateQuery);
    public JdbcOutboundGateway(JdbcOperations jdbcOperations, String updateQuery, String selectQuery);

    public void setMaxRows(Integer maxRows);
    public void setKeysGenerated(boolean keysGenerated);
    public void setRequestSqlParameterSourceFactory(SqlParameterSourceFactory factory);
    public void setRequestPreparedStatementSetter(MessagePreparedStatementSetter setter);
    public void setReplySqlParameterSourceFactory(SqlParameterSourceFactory factory);
    public void setRowMapper(RowMapper<?> rowMapper);
}

Outbound Gateway

Stored Procedures and Functions

Execute database stored procedures and functions with full parameter mapping, result set handling, and support for multiple OUT parameters and return values.

package org.springframework.integration.jdbc;

public class StoredProcExecutor implements BeanFactoryAware, InitializingBean {
    public StoredProcExecutor(DataSource dataSource);

    public void setStoredProcedureName(String storedProcedureName);
    public void setStoredProcedureNameExpression(Expression expression);
    public void setIsFunction(boolean isFunction);
    public void setProcedureParameters(List<ProcedureParameter> procedureParameters);
    public void setSqlParameters(List<SqlParameter> sqlParameters);
    public void setSqlParameterSourceFactory(SqlParameterSourceFactory factory);
    public void setReturnValueRequired(boolean returnValueRequired);
    public void setReturningResultSetRowMappers(Map<String, RowMapper<?>> mappers);
    public void setIgnoreColumnMetaData(boolean ignoreColumnMetaData);
    public void setJdbcCallOperationsCacheSize(int size);

    public Map<String, Object> executeStoredProcedure();
    public Map<String, Object> executeStoredProcedure(Message<?> message);
}

Stored Procedures

Message Store

Persist Spring Integration messages and message groups in relational databases. Used by stateful patterns like aggregators, resequencers, and claim checks. Supports message expiration, custom serialization, and multi-region deployments.

package org.springframework.integration.jdbc.store;

public class JdbcMessageStore extends AbstractMessageGroupStore
        implements MessageStore, BeanClassLoaderAware, SmartLifecycle {

    public JdbcMessageStore(DataSource dataSource);
    public JdbcMessageStore(JdbcOperations jdbcOperations);

    public void setTablePrefix(String tablePrefix);
    public void setRegion(String region);
    public void setSerializer(Serializer<? super Message<?>> serializer);
    public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
    public void addAllowedPatterns(String... patterns);

    public Message<?> addMessage(Message<?> message);
    public Message<?> getMessage(UUID id);
    public Message<?> removeMessage(UUID id);
    public MessageGroup getMessageGroup(Object groupId);
    public long getMessageCount();
    public int getMessageGroupCount();
}

Message Store

Channel Message Store

Specialized message store optimized for QueueChannel and PollableChannel implementations. Provides priority-based message polling, efficient queue operations, and optional ID caching for high-concurrency scenarios.

package org.springframework.integration.jdbc.store;

public class JdbcChannelMessageStore
        implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {

    public JdbcChannelMessageStore();
    public JdbcChannelMessageStore(DataSource dataSource);

    public void setTablePrefix(String tablePrefix);
    public void setRegion(String region);
    public void setPriorityEnabled(boolean priorityEnabled);
    public void setUsingIdCache(boolean usingIdCache);
    public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider provider);

    public MessageGroup addMessageToGroup(Object groupId, Message<?> message);
    public Message<?> pollMessageFromGroup(Object groupId);
    public MessageGroup getMessageGroup(Object groupId);
    public void removeMessageGroup(Object groupId);
    public int messageGroupSize(Object groupId);
}

Channel Message Store

Distributed Lock Registry

JDBC-based distributed locking for coordination across multiple application instances. Supports lock expiration, renewal, and automatic cleanup of stale locks.

package org.springframework.integration.jdbc.lock;

public class JdbcLockRegistry
        implements ExpirableLockRegistry<DistributedLock>, RenewableLockRegistry<DistributedLock> {

    public JdbcLockRegistry(LockRepository client);
    public JdbcLockRegistry(LockRepository client, Duration expireAfter);

    public void setIdleBetweenTries(Duration idleBetweenTries);
    public void setCacheCapacity(int cacheCapacity);

    public DistributedLock obtain(Object lockKey);
    public void expireUnusedOlderThan(long age);
    public void renewLock(Object lockKey);
    public void renewLock(Object lockKey, Duration customTtl);
}
package org.springframework.integration.jdbc.lock;

public class DefaultLockRepository implements LockRepository, InitializingBean,
        ApplicationContextAware, SmartInitializingSingleton, SmartLifecycle {

    public DefaultLockRepository(DataSource dataSource);
    public DefaultLockRepository(DataSource dataSource, String id);

    public void setRegion(String region);
    public void setPrefix(String prefix);
    public void setTransactionManager(PlatformTransactionManager transactionManager);

    public boolean acquire(String lock, Duration ttl);
    public boolean renew(String lock, Duration ttl);
    public boolean isAcquired(String lock);
    public boolean delete(String lock);
    public void deleteExpired();
}

Lock Registry

Metadata Store

Transactional key-value store for managing integration metadata such as last processed IDs, timestamps, and state information. Provides atomic operations for concurrent access patterns.

package org.springframework.integration.jdbc.metadata;

public class JdbcMetadataStore
        implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {

    public JdbcMetadataStore(DataSource dataSource);
    public JdbcMetadataStore(JdbcOperations jdbcOperations);

    public void setTablePrefix(String tablePrefix);
    public void setRegion(String region);
    public void setLockHint(String lockHint);

    public void put(String key, String value);
    public String get(String key);
    public String putIfAbsent(String key, String value);
    public boolean replace(String key, String oldValue, String newValue);
    public String remove(String key);
}

Metadata Store

Java DSL

Fluent Java API for defining JDBC-based integration flows programmatically. Provides type-safe builders for all JDBC components with method chaining for configuration.

package org.springframework.integration.jdbc.dsl;

public final class Jdbc {
    // Inbound adapters
    public static JdbcInboundChannelAdapterSpec inboundAdapter(DataSource dataSource, String selectQuery);
    public static JdbcInboundChannelAdapterSpec inboundAdapter(JdbcOperations jdbcOperations, String selectQuery);

    // Outbound adapters
    public static JdbcOutboundChannelAdapterSpec outboundAdapter(DataSource dataSource, String updateQuery);
    public static JdbcOutboundChannelAdapterSpec outboundAdapter(JdbcOperations jdbcOperations, String updateQuery);

    // Outbound gateways
    public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery);
    public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery, String selectQuery);

    // Stored procedure adapters and gateways
    public static JdbcStoredProcInboundChannelAdapterSpec storedProcInboundAdapter(DataSource dataSource);
    public static JdbcStoredProcOutboundChannelAdapterSpec storedProcOutboundAdapter(DataSource dataSource);
    public static JdbcStoredProcOutboundGatewaySpec storedProcOutboundGateway(DataSource dataSource);

    // Stored procedure executor builder
    public static StoredProcExecutorSpec storedProcExecutorSpec(DataSource dataSource);
}

Java DSL

PostgreSQL Push Notifications

Native PostgreSQL LISTEN/NOTIFY support for subscribable channels. Enables push-based message delivery from database triggers, eliminating polling overhead for real-time database event processing.

package org.springframework.integration.jdbc.channel;

public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier);
    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix);

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor);
    public void setNotificationTimeout(Duration notificationTimeout);

    public boolean subscribe(Subscription subscription);
    public boolean unsubscribe(Subscription subscription);
}
package org.springframework.integration.jdbc.channel;

public class PostgresSubscribableChannel extends AbstractSubscribableChannel
        implements PostgresChannelMessageTableSubscriber.Subscription {

    public PostgresSubscribableChannel(
        JdbcChannelMessageStore store,
        Object groupId,
        PostgresChannelMessageTableSubscriber subscriber
    );

    public void setDispatcherExecutor(Executor executor);
    public void setTransactionManager(PlatformTransactionManager transactionManager);
    public void setRetryTemplate(RetryTemplate retryTemplate);
    public void setErrorHandler(ErrorHandler errorHandler);
}

PostgreSQL Channels

SQL Parameter Source Factories

Flexible parameter binding for SQL statements using message payload, headers, and SpEL expressions. Supports static parameters, bean property extraction, and dynamic expression evaluation.

package org.springframework.integration.jdbc;

@FunctionalInterface
public interface SqlParameterSourceFactory {
    SqlParameterSource createParameterSource(Object input);
}
package org.springframework.integration.jdbc;

public class BeanPropertySqlParameterSourceFactory implements SqlParameterSourceFactory {
    public void setStaticParameters(Map<String, Object> staticParameters);
    public SqlParameterSource createParameterSource(Object input);
}
package org.springframework.integration.jdbc;

public class ExpressionEvaluatingSqlParameterSourceFactory
        extends AbstractExpressionEvaluator implements SqlParameterSourceFactory {

    public void setStaticParameters(Map<String, Object> staticParameters);
    public void setParameterExpressions(Map<String, String> parameterExpressions);
    public void setSqlParameterTypes(Map<String, Integer> sqlParametersTypes);

    public SqlParameterSource createParameterSource(Object input);
    public SqlParameterSource createParameterSourceNoCache(Object input);
}

Parameter Factories

Database Support

Spring Integration JDBC provides SQL schema scripts and optimized query providers for multiple database vendors:

  • PostgreSQL: Full support including LISTEN/NOTIFY push notifications
  • MySQL: Optimized for MySQL-specific features
  • Oracle: Supports Oracle-specific SQL syntax
  • SQL Server: Microsoft SQL Server compatibility
  • H2: In-memory and file-based databases
  • HSQLDB: HyperSQL Database support
  • Derby: Apache Derby compatibility
  • DB2: IBM DB2 support
  • Sybase: Sybase ASE compatibility

Database-specific schema scripts are located in the JAR at org/springframework/integration/jdbc/schema-{vendor}.sql.

Common Types

ProcedureParameter

package org.springframework.integration.jdbc.storedproc;

public class ProcedureParameter {
    public ProcedureParameter();
    public ProcedureParameter(String name, Object value, String expression);

    public String getName();
    public void setName(String name);

    public Object getValue();
    public void setValue(Object value);

    public String getExpression();
    public void setExpression(String expression);
}

MessagePreparedStatementSetter

package org.springframework.integration.jdbc;

@FunctionalInterface
public interface MessagePreparedStatementSetter {
    void setValues(PreparedStatement ps, Message<?> requestMessage) throws SQLException;
}

LockRepository

package org.springframework.integration.jdbc.lock;

public interface LockRepository extends Closeable {
    boolean acquire(String lock, Duration ttl);
    boolean renew(String lock, Duration ttl);
    boolean isAcquired(String lock);
    boolean delete(String lock);
    void deleteExpired();
    void close();
}

ChannelMessageStoreQueryProvider

package org.springframework.integration.jdbc.store.channel;

public interface ChannelMessageStoreQueryProvider {
    String getCountAllMessagesInGroupQuery();
    String getMessageQuery();
    String getMessageCountForRegionQuery();
    String getDeleteMessageQuery();
    String getCreateMessageQuery();
    String getDeleteMessageGroupQuery();
    String getPollFromGroupExcludeIdsQuery();
    String getPollFromGroupQuery();
    String getPriorityPollFromGroupExcludeIdsQuery();
    String getPriorityPollFromGroupQuery();
    boolean isSingleStatementForPoll();
}