Spring Integration JDBC Support - Provides channel adapters and gateways for database integration using JDBC
npx @tessl/cli install tessl/maven-spring-integration-jdbc@7.0.0Spring Integration JDBC provides comprehensive database integration capabilities for the Spring Integration messaging framework. It enables applications to send and receive messages through JDBC operations, supporting plain SQL queries, stored procedures, functions, and various message persistence patterns for enterprise integration scenarios.
Installation:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>7.0.0</version>
</dependency>For Gradle:
implementation 'org.springframework.integration:spring-integration-jdbc:7.0.0'import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.outbound.JdbcMessageHandler;
import org.springframework.integration.jdbc.outbound.JdbcOutboundGateway;
import org.springframework.integration.jdbc.store.JdbcMessageStore;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;For Java DSL:
import org.springframework.integration.jdbc.dsl.Jdbc;Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredspring-jdbc is required (provided transitively)DataSource bean must be configuredDefault Behaviors:
"INT_" for all stores"DEFAULT" for storesColumnMapRowMapper (returns Map<String, Object>)Iterable payloads:GENERATED_KEY parameter in SELECT queriesThreading Model:
Lifecycle:
SmartLifecycle (auto-start by default)Exceptions:
DataAccessException - Database access failuresMessagingException - Message handling failuresIllegalArgumentException - Invalid configurationSQLException - SQL execution errors (wrapped in Spring exceptions)Edge Cases:
keysGenerated=truenull or empty list depending on componentmaxRows limits SELECT results (default: unlimited)Spring Integration JDBC is built around several key architectural components:
The module integrates seamlessly with Spring's JDBC abstractions (DataSource, JdbcTemplate) and supports multiple database vendors including PostgreSQL, MySQL, Oracle, SQL Server, H2, Derby, HSQLDB, and DB2.
Poll database tables using SQL SELECT queries, optionally updating records after retrieval. Supports custom row mapping and parameterized queries.
package org.springframework.integration.jdbc.inbound;
public class JdbcPollingChannelAdapter extends AbstractMessageSource<Object> {
public JdbcPollingChannelAdapter(DataSource dataSource, String selectQuery);
public JdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery);
public void setRowMapper(RowMapper<?> rowMapper);
public void setSelectQuery(String selectQuery);
public void setUpdateSql(String updateSql);
public void setUpdatePerRow(boolean updatePerRow);
public void setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setSelectSqlParameterSource(SqlParameterSource sqlQueryParameterSource);
public void setMaxRows(int maxRows);
public List<?> doPoll(SqlParameterSource sqlQueryParameterSource);
}Execute SQL INSERT, UPDATE, or DELETE statements from incoming messages. Supports batch operations, auto-generated keys, and parameterized queries using message payload and headers.
package org.springframework.integration.jdbc.outbound;
public class JdbcMessageHandler extends AbstractMessageHandler {
public JdbcMessageHandler(DataSource dataSource, String updateSql);
public JdbcMessageHandler(JdbcOperations jdbcOperations, String updateSql);
public void setKeysGenerated(boolean keysGenerated);
public void setSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setUsePayloadAsParameterSource(boolean usePayloadAsParameterSource);
public void setPreparedStatementSetter(MessagePreparedStatementSetter setter);
}Execute database operations and return query results or generated keys in reply messages. Supports combined update+select operations for complex database interactions.
package org.springframework.integration.jdbc.outbound;
public class JdbcOutboundGateway extends AbstractReplyProducingMessageHandler {
public JdbcOutboundGateway(DataSource dataSource, String updateQuery);
public JdbcOutboundGateway(DataSource dataSource, String updateQuery, String selectQuery);
public JdbcOutboundGateway(JdbcOperations jdbcOperations, String updateQuery);
public JdbcOutboundGateway(JdbcOperations jdbcOperations, String updateQuery, String selectQuery);
public void setMaxRows(Integer maxRows);
public void setKeysGenerated(boolean keysGenerated);
public void setRequestSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setRequestPreparedStatementSetter(MessagePreparedStatementSetter setter);
public void setReplySqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setRowMapper(RowMapper<?> rowMapper);
}Execute database stored procedures and functions with full parameter mapping, result set handling, and support for multiple OUT parameters and return values.
package org.springframework.integration.jdbc;
public class StoredProcExecutor implements BeanFactoryAware, InitializingBean {
public StoredProcExecutor(DataSource dataSource);
public void setStoredProcedureName(String storedProcedureName);
public void setStoredProcedureNameExpression(Expression expression);
public void setIsFunction(boolean isFunction);
public void setProcedureParameters(List<ProcedureParameter> procedureParameters);
public void setSqlParameters(List<SqlParameter> sqlParameters);
public void setSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setReturnValueRequired(boolean returnValueRequired);
public void setReturningResultSetRowMappers(Map<String, RowMapper<?>> mappers);
public void setIgnoreColumnMetaData(boolean ignoreColumnMetaData);
public void setJdbcCallOperationsCacheSize(int size);
public Map<String, Object> executeStoredProcedure();
public Map<String, Object> executeStoredProcedure(Message<?> message);
}Persist Spring Integration messages and message groups in relational databases. Used by stateful patterns like aggregators, resequencers, and claim checks. Supports message expiration, custom serialization, and multi-region deployments.
package org.springframework.integration.jdbc.store;
public class JdbcMessageStore extends AbstractMessageGroupStore
implements MessageStore, BeanClassLoaderAware, SmartLifecycle {
public JdbcMessageStore(DataSource dataSource);
public JdbcMessageStore(JdbcOperations jdbcOperations);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public void setSerializer(Serializer<? super Message<?>> serializer);
public void setDeserializer(Deserializer<? extends Message<?>> deserializer);
public void addAllowedPatterns(String... patterns);
public Message<?> addMessage(Message<?> message);
public Message<?> getMessage(UUID id);
public Message<?> removeMessage(UUID id);
public MessageGroup getMessageGroup(Object groupId);
public long getMessageCount();
public int getMessageGroupCount();
}Specialized message store optimized for QueueChannel and PollableChannel implementations. Provides priority-based message polling, efficient queue operations, and optional ID caching for high-concurrency scenarios.
package org.springframework.integration.jdbc.store;
public class JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {
public JdbcChannelMessageStore();
public JdbcChannelMessageStore(DataSource dataSource);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public void setPriorityEnabled(boolean priorityEnabled);
public void setUsingIdCache(boolean usingIdCache);
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider provider);
public MessageGroup addMessageToGroup(Object groupId, Message<?> message);
public Message<?> pollMessageFromGroup(Object groupId);
public MessageGroup getMessageGroup(Object groupId);
public void removeMessageGroup(Object groupId);
public int messageGroupSize(Object groupId);
}JDBC-based distributed locking for coordination across multiple application instances. Supports lock expiration, renewal, and automatic cleanup of stale locks.
package org.springframework.integration.jdbc.lock;
public class JdbcLockRegistry
implements ExpirableLockRegistry<DistributedLock>, RenewableLockRegistry<DistributedLock> {
public JdbcLockRegistry(LockRepository client);
public JdbcLockRegistry(LockRepository client, Duration expireAfter);
public void setIdleBetweenTries(Duration idleBetweenTries);
public void setCacheCapacity(int cacheCapacity);
public DistributedLock obtain(Object lockKey);
public void expireUnusedOlderThan(long age);
public void renewLock(Object lockKey);
public void renewLock(Object lockKey, Duration customTtl);
}package org.springframework.integration.jdbc.lock;
public class DefaultLockRepository implements LockRepository, InitializingBean,
ApplicationContextAware, SmartInitializingSingleton, SmartLifecycle {
public DefaultLockRepository(DataSource dataSource);
public DefaultLockRepository(DataSource dataSource, String id);
public void setRegion(String region);
public void setPrefix(String prefix);
public void setTransactionManager(PlatformTransactionManager transactionManager);
public boolean acquire(String lock, Duration ttl);
public boolean renew(String lock, Duration ttl);
public boolean isAcquired(String lock);
public boolean delete(String lock);
public void deleteExpired();
}Transactional key-value store for managing integration metadata such as last processed IDs, timestamps, and state information. Provides atomic operations for concurrent access patterns.
package org.springframework.integration.jdbc.metadata;
public class JdbcMetadataStore
implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {
public JdbcMetadataStore(DataSource dataSource);
public JdbcMetadataStore(JdbcOperations jdbcOperations);
public void setTablePrefix(String tablePrefix);
public void setRegion(String region);
public void setLockHint(String lockHint);
public void put(String key, String value);
public String get(String key);
public String putIfAbsent(String key, String value);
public boolean replace(String key, String oldValue, String newValue);
public String remove(String key);
}Fluent Java API for defining JDBC-based integration flows programmatically. Provides type-safe builders for all JDBC components with method chaining for configuration.
package org.springframework.integration.jdbc.dsl;
public final class Jdbc {
// Inbound adapters
public static JdbcInboundChannelAdapterSpec inboundAdapter(DataSource dataSource, String selectQuery);
public static JdbcInboundChannelAdapterSpec inboundAdapter(JdbcOperations jdbcOperations, String selectQuery);
// Outbound adapters
public static JdbcOutboundChannelAdapterSpec outboundAdapter(DataSource dataSource, String updateQuery);
public static JdbcOutboundChannelAdapterSpec outboundAdapter(JdbcOperations jdbcOperations, String updateQuery);
// Outbound gateways
public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery);
public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery, String selectQuery);
// Stored procedure adapters and gateways
public static JdbcStoredProcInboundChannelAdapterSpec storedProcInboundAdapter(DataSource dataSource);
public static JdbcStoredProcOutboundChannelAdapterSpec storedProcOutboundAdapter(DataSource dataSource);
public static JdbcStoredProcOutboundGatewaySpec storedProcOutboundGateway(DataSource dataSource);
// Stored procedure executor builder
public static StoredProcExecutorSpec storedProcExecutorSpec(DataSource dataSource);
}Native PostgreSQL LISTEN/NOTIFY support for subscribable channels. Enables push-based message delivery from database triggers, eliminating polling overhead for real-time database event processing.
package org.springframework.integration.jdbc.channel;
public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier);
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix);
public void setTaskExecutor(AsyncTaskExecutor taskExecutor);
public void setNotificationTimeout(Duration notificationTimeout);
public boolean subscribe(Subscription subscription);
public boolean unsubscribe(Subscription subscription);
}package org.springframework.integration.jdbc.channel;
public class PostgresSubscribableChannel extends AbstractSubscribableChannel
implements PostgresChannelMessageTableSubscriber.Subscription {
public PostgresSubscribableChannel(
JdbcChannelMessageStore store,
Object groupId,
PostgresChannelMessageTableSubscriber subscriber
);
public void setDispatcherExecutor(Executor executor);
public void setTransactionManager(PlatformTransactionManager transactionManager);
public void setRetryTemplate(RetryTemplate retryTemplate);
public void setErrorHandler(ErrorHandler errorHandler);
}Flexible parameter binding for SQL statements using message payload, headers, and SpEL expressions. Supports static parameters, bean property extraction, and dynamic expression evaluation.
package org.springframework.integration.jdbc;
@FunctionalInterface
public interface SqlParameterSourceFactory {
SqlParameterSource createParameterSource(Object input);
}package org.springframework.integration.jdbc;
public class BeanPropertySqlParameterSourceFactory implements SqlParameterSourceFactory {
public void setStaticParameters(Map<String, Object> staticParameters);
public SqlParameterSource createParameterSource(Object input);
}package org.springframework.integration.jdbc;
public class ExpressionEvaluatingSqlParameterSourceFactory
extends AbstractExpressionEvaluator implements SqlParameterSourceFactory {
public void setStaticParameters(Map<String, Object> staticParameters);
public void setParameterExpressions(Map<String, String> parameterExpressions);
public void setSqlParameterTypes(Map<String, Integer> sqlParametersTypes);
public SqlParameterSource createParameterSource(Object input);
public SqlParameterSource createParameterSourceNoCache(Object input);
}Spring Integration JDBC provides SQL schema scripts and optimized query providers for multiple database vendors:
Database-specific schema scripts are located in the JAR at org/springframework/integration/jdbc/schema-{vendor}.sql.
package org.springframework.integration.jdbc.storedproc;
public class ProcedureParameter {
public ProcedureParameter();
public ProcedureParameter(String name, Object value, String expression);
public String getName();
public void setName(String name);
public Object getValue();
public void setValue(Object value);
public String getExpression();
public void setExpression(String expression);
}package org.springframework.integration.jdbc;
@FunctionalInterface
public interface MessagePreparedStatementSetter {
void setValues(PreparedStatement ps, Message<?> requestMessage) throws SQLException;
}package org.springframework.integration.jdbc.lock;
public interface LockRepository extends Closeable {
boolean acquire(String lock, Duration ttl);
boolean renew(String lock, Duration ttl);
boolean isAcquired(String lock);
boolean delete(String lock);
void deleteExpired();
void close();
}package org.springframework.integration.jdbc.store.channel;
public interface ChannelMessageStoreQueryProvider {
String getCountAllMessagesInGroupQuery();
String getMessageQuery();
String getMessageCountForRegionQuery();
String getDeleteMessageQuery();
String getCreateMessageQuery();
String getDeleteMessageGroupQuery();
String getPollFromGroupExcludeIdsQuery();
String getPollFromGroupQuery();
String getPriorityPollFromGroupExcludeIdsQuery();
String getPriorityPollFromGroupQuery();
boolean isSingleStatementForPoll();
}