Spring Integration JPA Support - provides components for performing database operations using JPA
npx @tessl/cli install tessl/maven-spring-integration-jpa@7.0.0Spring Integration JPA provides comprehensive Java Persistence API (JPA) integration components that enable message-driven database operations within Spring Integration flows. It supports polling entities from databases, persisting and merging entities, executing update queries, and retrieving data using JPQL, native SQL, or named queries.
Required Dependencies:
spring-integration-jpa (this package)spring-integration-core is requiredEntityManagerFactory or EntityManager bean must be configuredspring-orm for JPA supportDefault Behaviors:
expectSingleResult=false (returns List), maxResults=unlimited, deleteAfterPoll=falsepersistMode=MERGE, flush=false, flushSize=0 (no batch flushing)expectReply=true (request-reply pattern)JpaExecutor which manages EntityManager lifecycleThreading Model:
flushSize for efficient bulk processingLifecycle:
JpaExecutor.afterPropertiesSet() must be called after configurationExceptions:
JpaOperationFailedException - JPA operation failures (contains offending query)PersistenceException - General JPA persistence errorsEntityExistsException - Entity already exists (when using PERSIST)IllegalArgumentException - Invalid configuration or parametersNoResultException - Query returned no results (when expectSingleResult=true)Edge Cases:
entityClass for result mapping@NamedQuerydeleteAfterPoll with deleteInBatch=true is more efficient for large batchesflushSize > 0 enables batch flushing (flushes every N operations)clearOnFlush=true prevents memory issues with large entity setsidExpression) uses EntityManager.find() for optimal performanceexpectSingleResult=true and multiple results found, throws exceptionexpectSingleResult=false and no results, returns empty list (not null)Maven:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jpa</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-jpa:7.0.0'Java DSL (recommended):
import org.springframework.integration.jpa.dsl.Jpa;
import org.springframework.integration.jpa.support.PersistMode;
import org.springframework.integration.jpa.support.OutboundGatewayType;Programmatic configuration:
import org.springframework.integration.jpa.core.JpaExecutor;
import org.springframework.integration.jpa.inbound.JpaPollingChannelAdapter;
import org.springframework.integration.jpa.outbound.JpaOutboundGateway;XML configuration namespace:
xmlns:int-jpa="http://www.springframework.org/schema/integration/jpa"@Configuration
@EnableIntegration
public class JpaInboundConfig {
@Bean
public IntegrationFlow jpaInboundFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.maxResults(10)
.deleteAfterPoll(true),
e -> e.poller(Pollers.fixedDelay(5000)))
.channel("studentChannel")
.get();
}
}@Bean
public IntegrationFlow jpaOutboundFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("inputChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.PERSIST))
.get();
}@Bean
public IntegrationFlow jpaRetrievingGatewayFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("queryChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.lastName = :lastName")
.parameterExpression("lastName", "payload['lastName']")
.expectSingleResult(false))
.channel("resultChannel")
.get();
}Spring Integration JPA is organized around four primary integration components:
All components are powered by JpaExecutor, the central execution engine that handles both retrieval and persistence operations. Configuration can be done via Java DSL (recommended), programmatic configuration, or XML namespace.
Polls the database using JPA queries and produces messages containing retrieved entities.
// Java DSL
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(EntityManagerFactory entityManagerFactory)
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(EntityManager entityManager)
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(JpaOperations jpaOperations)Key capabilities:
Persists, merges, or deletes entities from incoming messages without returning results.
// Java DSL
static JpaUpdatingOutboundEndpointSpec Jpa.outboundAdapter(EntityManagerFactory entityManagerFactory)
static JpaUpdatingOutboundEndpointSpec Jpa.outboundAdapter(EntityManager entityManager)
static JpaUpdatingOutboundEndpointSpec Jpa.outboundAdapter(JpaOperations jpaOperations)Key capabilities:
Queries the database and sends results to a reply channel for continued processing.
// Java DSL
static JpaRetrievingOutboundGatewaySpec Jpa.retrievingGateway(EntityManagerFactory entityManagerFactory)
static JpaRetrievingOutboundGatewaySpec Jpa.retrievingGateway(EntityManager entityManager)
static JpaRetrievingOutboundGatewaySpec Jpa.retrievingGateway(JpaOperations jpaOperations)Key capabilities:
Persists or merges entities and returns the persisted/merged entities to a reply channel.
// Java DSL
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(EntityManagerFactory entityManagerFactory)
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(EntityManager entityManager)
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(JpaOperations jpaOperations)Key capabilities:
The central execution engine that performs JPA operations for all components.
class JpaExecutor {
JpaExecutor(EntityManagerFactory entityManagerFactory)
JpaExecutor(EntityManager entityManager)
JpaExecutor(JpaOperations jpaOperations)
}Key capabilities:
Provides flexible parameter binding for JPA queries using static values, bean properties, or SpEL expressions.
interface ParameterSourceFactory {
ParameterSource createParameterSource(Object input)
}
class BeanPropertyParameterSourceFactory implements ParameterSourceFactory
class ExpressionEvaluatingParameterSourceFactory implements ParameterSourceFactoryKey capabilities:
Core interface defining all JPA operations, with default implementation.
interface JpaOperations {
void persist(Object entity)
void persist(Object entity, int flushSize, boolean clearOnFlush)
Object merge(Object entity)
Object merge(Object entity, int flushSize, boolean clearOnFlush)
void delete(Object entity)
void deleteInBatch(Iterable<?> entities)
<T> T find(Class<T> entityClass, Object primaryKey)
List<?> getResultListForQuery(String query, ParameterSource source, int firstResult, int maxNumberOfResults)
// ... additional query and update methods
void flush()
}
class DefaultJpaOperations implements JpaOperationsSpring XML namespace support for declaring JPA components.
<int-jpa:inbound-channel-adapter
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
channel="outputChannel">
<int:poller fixed-delay="5000"/>
</int-jpa:inbound-channel-adapter>
<int-jpa:outbound-channel-adapter
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
channel="inputChannel"/>
<int-jpa:retrieving-outbound-gateway
request-channel="requestChannel"
reply-channel="replyChannel"
entity-manager-factory="entityManagerFactory"
jpa-query="SELECT s FROM Student s WHERE s.id = :id"/>
<int-jpa:updating-outbound-gateway
request-channel="requestChannel"
reply-channel="replyChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
persist-mode="MERGE"/>enum PersistMode {
PERSIST, // Use EntityManager.persist()
MERGE, // Use EntityManager.merge()
DELETE // Use EntityManager.remove()
}enum OutboundGatewayType {
UPDATING, // Persists entities to database
RETRIEVING // Retrieves entities from database
}class JpaParameter {
JpaParameter()
JpaParameter(String name, Object value, String expression)
JpaParameter(Object value, String expression)
String getName()
void setName(String name)
Object getValue()
void setValue(Object value)
String getExpression()
void setExpression(String expression)
}class JpaOperationFailedException extends RuntimeException {
String getOffendingJPAQl()
}final class JpaUtils {
static final String DELETE_ALL_QUERY_STRING = "delete from %s x"
static String detectAlias(String query)
static <T> Query applyAndBind(String queryString, Iterable<T> entities, EntityManager entityManager)
static String getQueryString(String template, String entityName)
static String getEntityName(EntityManager em, Class<?> entityClass)
}Utility methods for JPA operations, including query string manipulation, alias detection, and entity name resolution.
Spring Integration JPA supports three configuration approaches:
Jpa factory classint-jpa namespaceAll approaches support the same capabilities and can be mixed in the same application.