The Spring Integration JDBC Java DSL provides fluent API for configuring JDBC components in integration flows. This enables type-safe, concise configuration with IDE auto-completion support, making it easier to build and maintain JDBC-based integration flows.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredspring-integration-java-dsl is requiredDataSource or JdbcOperations bean must be configuredDefault Behaviors:
Threading Model:
Lifecycle:
Exceptions:
Edge Cases:
configurerStoredProcExecutor)package org.springframework.integration.jdbc.dsl;
public final class Jdbc {
// Inbound adapters
public static JdbcInboundChannelAdapterSpec inboundAdapter(DataSource dataSource, String selectQuery);
public static JdbcInboundChannelAdapterSpec inboundAdapter(JdbcOperations jdbcOperations, String selectQuery);
// Outbound adapters
public static JdbcOutboundChannelAdapterSpec outboundAdapter(DataSource dataSource, String updateQuery);
public static JdbcOutboundChannelAdapterSpec outboundAdapter(JdbcOperations jdbcOperations, String updateQuery);
// Outbound gateways
public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery);
public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery, String selectQuery);
// Stored procedure adapters and gateways
public static JdbcStoredProcInboundChannelAdapterSpec storedProcInboundAdapter(DataSource dataSource);
public static JdbcStoredProcOutboundChannelAdapterSpec storedProcOutboundAdapter(DataSource dataSource);
public static JdbcStoredProcOutboundGatewaySpec storedProcOutboundGateway(DataSource dataSource);
// Stored procedure executor builder
public static StoredProcExecutorSpec storedProcExecutorSpec(DataSource dataSource);
}import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import static org.springframework.integration.jdbc.dsl.Jdbc.*;
@Bean
public IntegrationFlow jdbcInboundFlow(DataSource dataSource) {
return IntegrationFlow
.from(inboundAdapter(dataSource, "SELECT * FROM orders WHERE processed = false")
.updateSql("UPDATE orders SET processed = true WHERE id = :id")
.updatePerRow(true)
.maxRows(100),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(msg -> System.out.println("Received: " + msg.getPayload()))
.get();
}@Bean
public IntegrationFlow jdbcOutboundFlow(DataSource dataSource) {
return IntegrationFlow
.from("inputChannel")
.handle(outboundAdapter(dataSource,
"INSERT INTO audit_log (message, timestamp) VALUES (:payload, :headers[timestamp])")
.usePayloadAsParameterSource(false))
.get();
}@Bean
public IntegrationFlow jdbcGatewayFlow(DataSource dataSource) {
return IntegrationFlow
.from("requestChannel")
.handle(outboundGateway(dataSource,
"INSERT INTO orders (order_number, amount) VALUES (:orderNumber, :amount)",
"SELECT * FROM orders WHERE id = :GENERATED_KEY")
.keysGenerated(true))
.channel("replyChannel")
.get();
}@Bean
public IntegrationFlow storedProcInboundFlow(DataSource dataSource) {
return IntegrationFlow
.from(storedProcInboundAdapter(dataSource)
.configurerStoredProcExecutor(spec -> spec
.storedProcedureName("GET_PENDING_ORDERS")
.procedureParameter(new ProcedureParameter("region", "US", null))
.returningResultSetRowMapper("orders", new OrderRowMapper())
)
.expectSingleResult(false),
e -> e.poller(Pollers.fixedDelay(10000)))
.handle(msg -> processOrders(msg))
.get();
}@Bean
public IntegrationFlow storedProcGatewayFlow(DataSource dataSource) {
return IntegrationFlow
.from("orderChannel")
.handle(storedProcOutboundGateway(dataSource)
.configurerStoredProcExecutor(spec -> spec
.storedProcedureName("PROCESS_ORDER")
.procedureParameter(new ProcedureParameter("order_id", null, "payload.id"))
.sqlParameter(new SqlOutParameter("result_code", Types.INTEGER))
))
.handle(msg -> {
Map<String, Object> result = (Map<String, Object>) msg.getPayload();
Integer resultCode = (Integer) result.get("result_code");
System.out.println("Result: " + resultCode);
})
.get();
}package org.springframework.integration.jdbc.dsl;
public class JdbcInboundChannelAdapterSpec extends MessageSourceSpec<JdbcInboundChannelAdapterSpec, JdbcPollingChannelAdapter> {
public JdbcInboundChannelAdapterSpec rowMapper(RowMapper<?> rowMapper);
public JdbcInboundChannelAdapterSpec updateSql(String updateSql);
public JdbcInboundChannelAdapterSpec updatePerRow(boolean updatePerRow);
public JdbcInboundChannelAdapterSpec updateSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public JdbcInboundChannelAdapterSpec selectSqlParameterSource(SqlParameterSource sqlQueryParameterSource);
public JdbcInboundChannelAdapterSpec maxRows(int maxRows);
}package org.springframework.integration.jdbc.dsl;
public class JdbcOutboundChannelAdapterSpec extends MessageHandlerSpec<JdbcOutboundChannelAdapterSpec, JdbcMessageHandler> {
public JdbcOutboundChannelAdapterSpec keysGenerated(boolean keysGenerated);
public JdbcOutboundChannelAdapterSpec sqlParameterSourceFactory(SqlParameterSourceFactory factory);
public JdbcOutboundChannelAdapterSpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource);
public JdbcOutboundChannelAdapterSpec preparedStatementSetter(MessagePreparedStatementSetter setter);
}package org.springframework.integration.jdbc.dsl;
public class JdbcOutboundGatewaySpec extends MessageHandlerSpec<JdbcOutboundGatewaySpec, JdbcOutboundGateway> {
public JdbcOutboundGatewaySpec maxRows(Integer maxRows);
public JdbcOutboundGatewaySpec keysGenerated(boolean keysGenerated);
public JdbcOutboundGatewaySpec requestSqlParameterSourceFactory(SqlParameterSourceFactory factory);
public JdbcOutboundGatewaySpec requestPreparedStatementSetter(MessagePreparedStatementSetter setter);
public JdbcOutboundGatewaySpec replySqlParameterSourceFactory(SqlParameterSourceFactory factory);
public JdbcOutboundGatewaySpec rowMapper(RowMapper<?> rowMapper);
}package org.springframework.integration.jdbc.dsl;
public class StoredProcExecutorSpec extends IntegrationComponentSpec<StoredProcExecutorSpec, StoredProcExecutor> {
public StoredProcExecutorSpec ignoreColumnMetaData(boolean ignoreColumnMetaData);
public StoredProcExecutorSpec procedureParameter(ProcedureParameter parameter);
public StoredProcExecutorSpec procedureParameters(List<ProcedureParameter> parameters);
public StoredProcExecutorSpec sqlParameter(SqlParameter parameter);
public StoredProcExecutorSpec sqlParameters(List<SqlParameter> parameters);
public StoredProcExecutorSpec sqlParameterSourceFactory(SqlParameterSourceFactory factory);
public StoredProcExecutorSpec storedProcedureName(String name);
public StoredProcExecutorSpec storedProcedureNameExpression(Expression expression);
public StoredProcExecutorSpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource);
public StoredProcExecutorSpec isFunction(boolean isFunction);
public StoredProcExecutorSpec returnValueRequired(boolean returnValueRequired);
public StoredProcExecutorSpec skipUndeclaredResults(boolean skipUndeclaredResults);
public StoredProcExecutorSpec returningResultSetRowMapper(String name, RowMapper<?> mapper);
public StoredProcExecutorSpec returningResultSetRowMappers(Map<String, RowMapper<?>> mappers);
public StoredProcExecutorSpec jdbcCallOperationsCacheSize(int size);
}configurerStoredProcExecutor uses Consumer for nested configuration