Spring Integration JDBC Support - Provides channel adapters and gateways for database integration using JDBC
Inbound polling capabilities enable applications to poll database tables and create Spring Integration messages from query results. This is useful for event-driven architectures based on database changes, batch processing of pending records, and database-to-message integration patterns.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDataSource or JdbcOperations bean must be configuredDefault Behaviors:
maxRows=0 (unlimited rows)updatePerRow=true (per-row updates when update SQL configured)ColumnMapRowMapper (returns Map<String, Object>)null when no results found (not empty list)List<?> when multiple rows foundThreading Model:
JdbcOperations (connection pooling)Lifecycle:
auto-startup attribute (default: true)Exceptions:
DataAccessException - Database access failures (contains SQL and parameters)MessagingException - Message creation/handling failuresIllegalArgumentException - Invalid configuration (e.g., null DataSource)Edge Cases:
null (not empty list)List<?> (even if list contains one element)maxRows limits SELECT results (0 = unlimited)updatePerRow=false executes single batch UPDATE (more efficient for large batches)updatePerRow=true executes UPDATE per row (allows per-row parameter extraction)setSelectSqlParameterSource()package org.springframework.integration.jdbc.inbound;
public class JdbcPollingChannelAdapter extends AbstractMessageSource<Object> {
public JdbcPollingChannelAdapter(DataSource dataSource, String selectQuery);
public JdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery);
public void setRowMapper(RowMapper<?> rowMapper);
public void setSelectQuery(String selectQuery);
public void setUpdateSql(String updateSql);
public void setUpdatePerRow(boolean updatePerRow);
public void setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setSelectSqlParameterSource(SqlParameterSource sqlQueryParameterSource);
public void setMaxRows(int maxRows);
public List<?> doPoll(@Nullable SqlParameterSource sqlQueryParameterSource);
public String getComponentType();
}package org.springframework.integration.jdbc.inbound;
public class StoredProcPollingChannelAdapter extends AbstractMessageSource<Object> {
public StoredProcPollingChannelAdapter(StoredProcExecutor storedProcExecutor);
public void setExpectSingleResult(boolean expectSingleResult);
public String getComponentType();
}import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import javax.sql.DataSource;
// Basic polling without updates
JdbcPollingChannelAdapter simpleAdapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT id, order_number, amount FROM orders WHERE status = 'PENDING'"
);
simpleAdapter.setMaxRows(100);
// Poll messages
List<?> messages = simpleAdapter.receive();// Polling with per-row update marking
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT id, order_number, amount FROM orders WHERE processed = false"
);
adapter.setUpdateSql("UPDATE orders SET processed = true WHERE id = :id");
adapter.setUpdatePerRow(true);
adapter.setMaxRows(50);
// Poll messages - each row updated individually
List<?> messages = adapter.receive();import org.springframework.jdbc.core.BeanPropertyRowMapper;
// Custom row mapping to domain objects
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT * FROM orders WHERE processed = false"
);
adapter.setRowMapper(new BeanPropertyRowMapper<>(Order.class));
adapter.setUpdateSql("UPDATE orders SET processed = true WHERE id = :id");
adapter.setUpdatePerRow(true);
// Poll returns List<Order>
List<?> orders = adapter.receive();// Polling with batch update
JdbcPollingChannelAdapter batchAdapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT * FROM events WHERE status = 'NEW' ORDER BY created_at LIMIT 100"
);
batchAdapter.setUpdateSql("UPDATE events SET status = 'PROCESSED' WHERE id IN (:id)");
batchAdapter.setUpdatePerRow(false); // Batch update all polled recordsimport org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
// Using SqlParameterSource for SELECT query
SqlParameterSource params = new MapSqlParameterSource()
.addValue("region", "US")
.addValue("minAmount", 1000);
JdbcPollingChannelAdapter paramAdapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT * FROM orders WHERE region = :region AND amount > :minAmount"
);
paramAdapter.setSelectSqlParameterSource(params);import org.springframework.integration.jdbc.StoredProcExecutor;
import org.springframework.integration.jdbc.inbound.StoredProcPollingChannelAdapter;
import org.springframework.integration.jdbc.storedproc.ProcedureParameter;
// Configure stored procedure executor
StoredProcExecutor executor = new StoredProcExecutor(dataSource);
executor.setStoredProcedureName("GET_PENDING_ORDERS");
executor.setIgnoreColumnMetaData(false);
// Define procedure parameters
List<ProcedureParameter> params = List.of(
new ProcedureParameter("region", "US", null),
new ProcedureParameter("limit", 100, null)
);
executor.setProcedureParameters(params);
// Create polling adapter
StoredProcPollingChannelAdapter adapter = new StoredProcPollingChannelAdapter(executor);
// Poll returns Map with result sets, OUT params, and return value
Message<?> message = adapter.receive();
// Payload is Map<String, Object> with keys: result set names, OUT param names, etc.// Polling stored function
StoredProcExecutor functionExecutor = new StoredProcExecutor(dataSource);
functionExecutor.setStoredProcedureName("COUNT_PENDING_ORDERS");
functionExecutor.setIsFunction(true);
functionExecutor.setReturnValueRequired(true);
StoredProcPollingChannelAdapter funcAdapter = new StoredProcPollingChannelAdapter(functionExecutor);
funcAdapter.setExpectSingleResult(true);
Message<?> countMessage = funcAdapter.receive();
Integer pendingCount = (Integer) countMessage.getPayload();Both adapters extend AbstractMessageSource<Object> and integrate with Spring Integration's polling infrastructure:
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import static org.springframework.integration.jdbc.dsl.Jdbc.*;
@Bean
public IntegrationFlow pollingFlow(DataSource dataSource) {
return IntegrationFlow
.from(inboundAdapter(dataSource, "SELECT * FROM orders WHERE processed = false")
.updateSql("UPDATE orders SET processed = true WHERE id = :id")
.updatePerRow(true)
.maxRows(100),
e -> e.poller(Pollers.fixedDelay(5000))) // Poll every 5 seconds
.handle(message -> {
// Process message
System.out.println("Received: " + message.getPayload());
})
.get();
}Or using XML configuration:
<int-jdbc:inbound-channel-adapter
channel="inputChannel"
data-source="dataSource"
query="SELECT * FROM orders WHERE processed = false"
update="UPDATE orders SET processed = true WHERE id = :id"
update-per-row="true"
max-rows="100">
<int:poller fixed-delay="5000"/>
</int-jdbc:inbound-channel-adapter>Important Migration Notice:
The following classes in the org.springframework.integration.jdbc package are deprecated since version 7.0 and marked for removal:
org.springframework.integration.jdbc.JdbcPollingChannelAdapterorg.springframework.integration.jdbc.StoredProcPollingChannelAdapterThese classes have been moved to the org.springframework.integration.jdbc.inbound package. Users should update their imports:
Old (Deprecated):
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.StoredProcPollingChannelAdapter;New (Current):
import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.inbound.StoredProcPollingChannelAdapter;The deprecated classes are simple wrappers that extend the new classes, so migration only requires updating import statements. All functionality remains the same.
:paramName) in SQL queriesnull when no rows found (not empty list)Install with Tessl CLI
npx tessl i tessl/maven-spring-integration-jdbc