or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-message-store.mdinbound-polling.mdindex.mdjava-dsl.mdlock-registry.mdmessage-store.mdmetadata-store.mdoutbound-gateway.mdoutbound-updates.mdparameter-factories.mdpostgres-channels.mdstored-procedures.md
tile.json

inbound-polling.mddocs/

Inbound Polling

Inbound polling capabilities enable applications to poll database tables and create Spring Integration messages from query results. This is useful for event-driven architectures based on database changes, batch processing of pending records, and database-to-message integration patterns.

Key Information for Agents

Required Dependencies:

  • spring-integration-jdbc (this package)
  • spring-integration-core is required
  • DataSource or JdbcOperations bean must be configured
  • Poller must be configured (fixed-delay, fixed-rate, or cron)

Default Behaviors:

  • maxRows=0 (unlimited rows)
  • updatePerRow=true (per-row updates when update SQL configured)
  • Default row mapper: ColumnMapRowMapper (returns Map<String, Object>)
  • Returns null when no results found (not empty list)
  • Returns List<?> when multiple rows found
  • Returns single object when one row found (not wrapped in list)
  • UPDATE SQL executes in same transaction as SELECT (atomicity)

Threading Model:

  • Executes on poller thread (configurable via poller configuration)
  • Poller can be transactional (operations within transaction boundary)
  • For async processing, use async channels (ExecutorChannel, QueueChannel)
  • Thread-safe when using JdbcOperations (connection pooling)

Lifecycle:

  • Starts polling when Spring context is active
  • Stops polling when context is destroyed
  • Can be controlled via auto-startup attribute (default: true)
  • Poller lifecycle managed by Spring Integration poller infrastructure

Exceptions:

  • DataAccessException - Database access failures (contains SQL and parameters)
  • MessagingException - Message creation/handling failures
  • IllegalArgumentException - Invalid configuration (e.g., null DataSource)

Edge Cases:

  • Empty result set returns null (not empty list)
  • Single row returns object directly (not wrapped in list)
  • Multiple rows return List<?> (even if list contains one element)
  • maxRows limits SELECT results (0 = unlimited)
  • UPDATE SQL parameter names must match SELECT result column names or use parameter factory
  • updatePerRow=false executes single batch UPDATE (more efficient for large batches)
  • updatePerRow=true executes UPDATE per row (allows per-row parameter extraction)
  • Parameter factory extracts parameters from each polled row for UPDATE SQL
  • SELECT query can use static parameters via setSelectSqlParameterSource()
  • Custom row mapper can return domain objects instead of Maps
  • Transaction management: UPDATE executes in same transaction as SELECT

Core Classes

JdbcPollingChannelAdapter

package org.springframework.integration.jdbc.inbound;

public class JdbcPollingChannelAdapter extends AbstractMessageSource<Object> {
    public JdbcPollingChannelAdapter(DataSource dataSource, String selectQuery);
    public JdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery);

    public void setRowMapper(RowMapper<?> rowMapper);
    public void setSelectQuery(String selectQuery);
    public void setUpdateSql(String updateSql);
    public void setUpdatePerRow(boolean updatePerRow);
    public void setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory factory);
    public void setSelectSqlParameterSource(SqlParameterSource sqlQueryParameterSource);
    public void setMaxRows(int maxRows);
    public List<?> doPoll(@Nullable SqlParameterSource sqlQueryParameterSource);
    public String getComponentType();
}

StoredProcPollingChannelAdapter

package org.springframework.integration.jdbc.inbound;

public class StoredProcPollingChannelAdapter extends AbstractMessageSource<Object> {
    public StoredProcPollingChannelAdapter(StoredProcExecutor storedProcExecutor);

    public void setExpectSingleResult(boolean expectSingleResult);
    public String getComponentType();
}

Usage Examples

Basic SQL Polling

import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import javax.sql.DataSource;

// Basic polling without updates
JdbcPollingChannelAdapter simpleAdapter = new JdbcPollingChannelAdapter(
    dataSource,
    "SELECT id, order_number, amount FROM orders WHERE status = 'PENDING'"
);
simpleAdapter.setMaxRows(100);

// Poll messages
List<?> messages = simpleAdapter.receive();

Polling with Per-Row Update

// Polling with per-row update marking
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
    dataSource,
    "SELECT id, order_number, amount FROM orders WHERE processed = false"
);
adapter.setUpdateSql("UPDATE orders SET processed = true WHERE id = :id");
adapter.setUpdatePerRow(true);
adapter.setMaxRows(50);

// Poll messages - each row updated individually
List<?> messages = adapter.receive();

Custom Row Mapping

import org.springframework.jdbc.core.BeanPropertyRowMapper;

// Custom row mapping to domain objects
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
    dataSource,
    "SELECT * FROM orders WHERE processed = false"
);
adapter.setRowMapper(new BeanPropertyRowMapper<>(Order.class));
adapter.setUpdateSql("UPDATE orders SET processed = true WHERE id = :id");
adapter.setUpdatePerRow(true);

// Poll returns List<Order>
List<?> orders = adapter.receive();

Batch Update

// Polling with batch update
JdbcPollingChannelAdapter batchAdapter = new JdbcPollingChannelAdapter(
    dataSource,
    "SELECT * FROM events WHERE status = 'NEW' ORDER BY created_at LIMIT 100"
);
batchAdapter.setUpdateSql("UPDATE events SET status = 'PROCESSED' WHERE id IN (:id)");
batchAdapter.setUpdatePerRow(false); // Batch update all polled records

Parameterized SELECT Query

import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;

// Using SqlParameterSource for SELECT query
SqlParameterSource params = new MapSqlParameterSource()
    .addValue("region", "US")
    .addValue("minAmount", 1000);

JdbcPollingChannelAdapter paramAdapter = new JdbcPollingChannelAdapter(
    dataSource,
    "SELECT * FROM orders WHERE region = :region AND amount > :minAmount"
);
paramAdapter.setSelectSqlParameterSource(params);

Stored Procedure Polling

import org.springframework.integration.jdbc.StoredProcExecutor;
import org.springframework.integration.jdbc.inbound.StoredProcPollingChannelAdapter;
import org.springframework.integration.jdbc.storedproc.ProcedureParameter;

// Configure stored procedure executor
StoredProcExecutor executor = new StoredProcExecutor(dataSource);
executor.setStoredProcedureName("GET_PENDING_ORDERS");
executor.setIgnoreColumnMetaData(false);

// Define procedure parameters
List<ProcedureParameter> params = List.of(
    new ProcedureParameter("region", "US", null),
    new ProcedureParameter("limit", 100, null)
);
executor.setProcedureParameters(params);

// Create polling adapter
StoredProcPollingChannelAdapter adapter = new StoredProcPollingChannelAdapter(executor);

// Poll returns Map with result sets, OUT params, and return value
Message<?> message = adapter.receive();
// Payload is Map<String, Object> with keys: result set names, OUT param names, etc.

Stored Function Polling

// Polling stored function
StoredProcExecutor functionExecutor = new StoredProcExecutor(dataSource);
functionExecutor.setStoredProcedureName("COUNT_PENDING_ORDERS");
functionExecutor.setIsFunction(true);
functionExecutor.setReturnValueRequired(true);

StoredProcPollingChannelAdapter funcAdapter = new StoredProcPollingChannelAdapter(functionExecutor);
funcAdapter.setExpectSingleResult(true);

Message<?> countMessage = funcAdapter.receive();
Integer pendingCount = (Integer) countMessage.getPayload();

Integration with Spring Integration

Both adapters extend AbstractMessageSource<Object> and integrate with Spring Integration's polling infrastructure:

import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import static org.springframework.integration.jdbc.dsl.Jdbc.*;

@Bean
public IntegrationFlow pollingFlow(DataSource dataSource) {
    return IntegrationFlow
        .from(inboundAdapter(dataSource, "SELECT * FROM orders WHERE processed = false")
                .updateSql("UPDATE orders SET processed = true WHERE id = :id")
                .updatePerRow(true)
                .maxRows(100),
            e -> e.poller(Pollers.fixedDelay(5000))) // Poll every 5 seconds
        .handle(message -> {
            // Process message
            System.out.println("Received: " + message.getPayload());
        })
        .get();
}

Or using XML configuration:

<int-jdbc:inbound-channel-adapter
    channel="inputChannel"
    data-source="dataSource"
    query="SELECT * FROM orders WHERE processed = false"
    update="UPDATE orders SET processed = true WHERE id = :id"
    update-per-row="true"
    max-rows="100">
    <int:poller fixed-delay="5000"/>
</int-jdbc:inbound-channel-adapter>

Deprecated Classes

Important Migration Notice:

The following classes in the org.springframework.integration.jdbc package are deprecated since version 7.0 and marked for removal:

  • org.springframework.integration.jdbc.JdbcPollingChannelAdapter
  • org.springframework.integration.jdbc.StoredProcPollingChannelAdapter

These classes have been moved to the org.springframework.integration.jdbc.inbound package. Users should update their imports:

Old (Deprecated):

import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.StoredProcPollingChannelAdapter;

New (Current):

import org.springframework.integration.jdbc.inbound.JdbcPollingChannelAdapter;
import org.springframework.integration.jdbc.inbound.StoredProcPollingChannelAdapter;

The deprecated classes are simple wrappers that extend the new classes, so migration only requires updating import statements. All functionality remains the same.

Key Considerations

  • Transaction Management: Update SQL executes in same transaction as SELECT to ensure atomicity
  • Row Mappers: Default row mapper creates Map per row; custom mappers can create domain objects
  • Max Rows: Set appropriate limit to avoid memory issues with large result sets
  • Update Per Row: Per-row updates allow fine-grained control; batch updates are more efficient
  • Result Types: Single row creates single message; multiple rows create List payload or multiple messages (depending on configuration)
  • NULL Handling: Ensure queries handle NULL values appropriately
  • Performance: Use database indexes on polled columns for optimal performance
  • Parameter Binding: Use named parameters (:paramName) in SQL queries
  • Empty Results: Returns null when no rows found (not empty list)
  • Poller Configuration: Must configure poller or adapter won't poll