Inbound polling capabilities enable applications to poll database tables and create Spring Integration messages from query results. This is useful for event-driven architectures based on database changes, batch processing of pending records, and database-to-message integration patterns.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDataSource or JdbcOperations bean must be configuredDefault Behaviors:
maxRows=0 (unlimited rows)updatePerRow=true (per-row updates when update SQL configured)ColumnMapRowMapper (returns Map<String, Object>)null when no results found (not empty list)List<?> when multiple rows foundThreading Model:
JdbcOperations (connection pooling)Lifecycle:
auto-startup attribute (default: true)Exceptions:
DataAccessException - Database access failures (contains SQL and parameters)MessagingException - Message creation/handling failuresIllegalArgumentException - Invalid configuration (e.g., null DataSource)Edge Cases:
null (not empty list)List<?> (even if list contains one element)maxRows limits SELECT results (0 = unlimited)updatePerRow=false executes single batch UPDATE (more efficient for large batches)updatePerRow=true executes UPDATE per row (allows per-row parameter extraction)setSelectSqlParameterSource()package org.springframework.integration.jdbc.inbound;
public class JdbcPollingChannelAdapter extends AbstractMessageSource<Object> {
public JdbcPollingChannelAdapter(DataSource dataSource, String selectQuery);
public JdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery);
public void setRowMapper(RowMapper<?> rowMapper);
public void setSelectQuery(String selectQuery);
public void setUpdateSql(String updateSql);
public void setUpdatePerRow(boolean updatePerRow);
public void setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public void setSelectSqlParameterSource(SqlParameterSource sqlQueryParameterSource);
public void setMaxRows(int maxRows);
public List<?> doPoll(@Nullable SqlParameterSource sqlQueryParameterSource);
public String getComponentType();
}package org.springframework.integration.jdbc.inbound;
public class StoredProcPollingChannelAdapter extends AbstractMessageSource<Object> {
public StoredProcPollingChannelAdapter(StoredProcExecutor storedProcExecutor);
public void setExpectSingleResult(boolean expectSingleResult);
public String getComponentType();
}import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import javax.sql.DataSource;
// Basic polling without updates
JdbcPollingChannelAdapter simpleAdapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT id, order_number, amount FROM orders WHERE status = 'PENDING'"
);
simpleAdapter.setMaxRows(100);
// Poll messages
List<?> messages = simpleAdapter.receive();// Polling with per-row update marking
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT id, order_number, amount FROM orders WHERE processed = false"
);
adapter.setUpdateSql("UPDATE orders SET processed = true WHERE id = :id");
adapter.setUpdatePerRow(true);
adapter.setMaxRows(50);
// Poll messages - each row updated individually
List<?> messages = adapter.receive();import org.springframework.jdbc.core.BeanPropertyRowMapper;
// Custom row mapping to domain objects
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT * FROM orders WHERE processed = false"
);
adapter.setRowMapper(new BeanPropertyRowMapper<>(Order.class));
adapter.setUpdateSql("UPDATE orders SET processed = true WHERE id = :id");
adapter.setUpdatePerRow(true);
// Poll returns List<Order>
List<?> orders = adapter.receive();// Polling with batch update
JdbcPollingChannelAdapter batchAdapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT * FROM events WHERE status = 'NEW' ORDER BY created_at LIMIT 100"
);
batchAdapter.setUpdateSql("UPDATE events SET status = 'PROCESSED' WHERE id IN (:id)");
batchAdapter.setUpdatePerRow(false); // Batch update all polled recordsimport org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
// Using SqlParameterSource for SELECT query
SqlParameterSource params = new MapSqlParameterSource()
.addValue("region", "US")
.addValue("minAmount", 1000);
JdbcPollingChannelAdapter paramAdapter = new JdbcPollingChannelAdapter(
dataSource,
"SELECT * FROM orders WHERE region = :region AND amount > :minAmount"
);
paramAdapter.setSelectSqlParameterSource(params);import org.springframework.integration.jdbc.StoredProcExecutor;
import org.springframework.integration.jdbc.inbound.StoredProcPollingChannelAdapter;
import org.springframework.integration.jdbc.storedproc.ProcedureParameter;
// Configure stored procedure executor
StoredProcExecutor executor = new StoredProcExecutor(dataSource);
executor.setStoredProcedureName("GET_PENDING_ORDERS");
executor.setIgnoreColumnMetaData(false);
// Define procedure parameters
List<ProcedureParameter> params = List.of(
new ProcedureParameter("region", "US", null),
new ProcedureParameter("limit", 100, null)
);
executor.setProcedureParameters(params);
// Create polling adapter
StoredProcPollingChannelAdapter adapter = new StoredProcPollingChannelAdapter(executor);
// Poll returns Map with result sets, OUT params, and return value
Message<?> message = adapter.receive();
// Payload is Map<String, Object> with keys: result set names, OUT param names, etc.// Polling stored function
StoredProcExecutor functionExecutor = new StoredProcExecutor(dataSource);
functionExecutor.setStoredProcedureName("COUNT_PENDING_ORDERS");
functionExecutor.setIsFunction(true);
functionExecutor.setReturnValueRequired(true);
StoredProcPollingChannelAdapter funcAdapter = new StoredProcPollingChannelAdapter(functionExecutor);
funcAdapter.setExpectSingleResult(true);
Message<?> countMessage = funcAdapter.receive();
Integer pendingCount = (Integer) countMessage.getPayload();Both adapters extend AbstractMessageSource<Object> and integrate with Spring Integration's polling infrastructure:
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import static org.springframework.integration.jdbc.dsl.Jdbc.*;
@Bean
public IntegrationFlow pollingFlow(DataSource dataSource) {
return IntegrationFlow
.from(inboundAdapter(dataSource, "SELECT * FROM orders WHERE processed = false")
.updateSql("UPDATE orders SET processed = true WHERE id = :id")
.updatePerRow(true)
.maxRows(100),
e -> e.poller(Pollers.fixedDelay(5000))) // Poll every 5 seconds
.handle(message -> {
// Process message
System.out.println("Received: " + message.getPayload());
})
.get();
}Or using XML configuration:
<int-jdbc:inbound-channel-adapter
channel="inputChannel"
data-source="dataSource"
query="SELECT * FROM orders WHERE processed = false"
update="UPDATE orders SET processed = true WHERE id = :id"
update-per-row="true"
max-rows="100">
<int:poller fixed-delay="5000"/>
</int-jdbc:inbound-channel-adapter>Important Migration Notice:
The following classes in the org.springframework.integration.jdbc package are deprecated since version 7.0 and marked for removal:
org.springframework.integration.jdbc.JdbcPollingChannelAdapterorg.springframework.integration.jdbc.StoredProcPollingChannelAdapterThese classes have been moved to the org.springframework.integration.jdbc.inbound package. Users should update their imports:
Old (Deprecated):
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.StoredProcPollingChannelAdapter;New (Current):
import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.inbound.StoredProcPollingChannelAdapter;The deprecated classes are simple wrappers that extend the new classes, so migration only requires updating import statements. All functionality remains the same.
:paramName) in SQL queriesnull when no rows found (not empty list)