The JpaExecutor is the central execution engine that performs JPA operations for all Spring Integration JPA components. It encapsulates query execution, parameter binding, entity persistence, and result processing. All inbound adapters, outbound adapters, and gateways delegate their JPA operations to a JpaExecutor instance.
Required Dependencies:
spring-integration-jpa (this package)spring-integration-core is requiredEntityManagerFactory, EntityManager, or JpaOperations must be providedBeanFactory required when using SpEL expressionsDefault Behaviors:
persistMode=MERGE (for persistence operations)flush=false (no immediate flush)flushSize=0 (no batch flushing)clearOnFlush=false (persistence context not cleared)expectSingleResult=false (returns List<?>)maxNumberOfResults=unlimited (no limit)deleteAfterPoll=false (entities not deleted)deleteInBatch=false (individual deletes)entityClass, jpaQuery, nativeQuery, or namedQueryThreading Model:
EntityManagerFactory (creates EntityManager per operation)EntityManager (requires thread confinement)poll(), executeOutboundJpaOperation()) can be called concurrently with EntityManagerFactoryLifecycle:
afterPropertiesSet() after configuration (validates configuration)BeanFactory must be set when using SpEL expressions (for bean references)Exceptions:
IllegalArgumentException - Invalid configuration (e.g., multiple query types set, missing required config)JpaOperationFailedException - JPA operation failures (contains offending query)PersistenceException - General JPA persistence errorsNoResultException - Query returned no results (when expectSingleResult=true)NonUniqueResultException - Query returned multiple results (when expectSingleResult=true)Edge Cases:
entityClass, jpaQuery, nativeQuery, or namedQueryentityClass for result type mapping@NamedQuery annotationidExpression uses EntityManager.find() for optimal performance (bypasses query)firstResultExpression and maxResultsExpression evaluated against message contextparameterSource (static) vs parameterSourceFactory (dynamic per message)usePayloadAsParameterSource=true extracts parameters from payload bean propertiespoll() (no message) vs poll(Message<?>) (with message context for parameters)executeOutboundJpaOperation() returns entity (PERSIST/MERGE/DELETE) or Integer (update queries)flushSize > 0, flush happens after every N operations, not per messageclearOnFlush=true prevents memory issues with large entity setsclass JpaExecutor implements InitializingBean, BeanFactoryAware {
// Constructors
JpaExecutor(EntityManagerFactory entityManagerFactory)
JpaExecutor(EntityManager entityManager)
JpaExecutor(JpaOperations jpaOperations)
// Lifecycle
void setBeanFactory(BeanFactory beanFactory)
void afterPropertiesSet()
// Query configuration
void setEntityClass(Class<?> entityClass)
void setJpaQuery(String jpaQuery)
void setNativeQuery(String nativeQuery)
void setNamedQuery(String namedQuery)
// Persistence configuration
void setPersistMode(PersistMode persistMode)
void setFlush(boolean flush)
void setFlushSize(int flushSize)
void setClearOnFlush(boolean clearOnFlush)
// Retrieval configuration
void setExpectSingleResult(boolean expectSingleResult)
void setFirstResultExpression(Expression firstResultExpression)
void setMaxResultsExpression(Expression maxResultsExpression)
void setMaxNumberOfResults(int maxNumberOfResults)
void setIdExpression(Expression idExpression)
// Delete configuration
void setDeleteAfterPoll(boolean deleteAfterPoll)
void setDeleteInBatch(boolean deleteInBatch)
// Parameter configuration
void setJpaParameters(List<JpaParameter> jpaParameters)
void setParameterSourceFactory(ParameterSourceFactory parameterSourceFactory)
void setParameterSource(ParameterSource parameterSource)
void setUsePayloadAsParameterSource(boolean usePayloadAsParameterSource)
// Execution methods
Object executeOutboundJpaOperation(Message<?> requestMessage)
Object poll()
Object poll(Message<?> requestMessage)
}@Bean
public JpaExecutor jpaExecutor(EntityManagerFactory entityManagerFactory) {
JpaExecutor executor = new JpaExecutor(entityManagerFactory);
executor.setEntityClass(Student.class);
executor.setMaxNumberOfResults(100);
executor.setDeleteAfterPoll(true);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor jpaExecutor(EntityManager entityManager) {
JpaExecutor executor = new JpaExecutor(entityManager);
executor.setJpaQuery("SELECT s FROM Student s WHERE s.active = true");
executor.setExpectSingleResult(false);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor jpaExecutor(JpaOperations jpaOperations) {
JpaExecutor executor = new JpaExecutor(jpaOperations);
executor.setEntityClass(Student.class);
executor.setPersistMode(PersistMode.MERGE);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor configuredExecutor(
EntityManagerFactory entityManagerFactory,
BeanFactory beanFactory) {
JpaExecutor executor = new JpaExecutor(entityManagerFactory);
// Query configuration
executor.setJpaQuery("SELECT s FROM Student s WHERE s.department = :dept");
// Parameter configuration
List<JpaParameter> parameters = new ArrayList<>();
parameters.add(new JpaParameter("dept", null, "payload"));
executor.setJpaParameters(parameters);
// Result configuration
executor.setMaxNumberOfResults(50);
executor.setExpectSingleResult(false);
// Lifecycle
executor.setBeanFactory(beanFactory);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaPollingChannelAdapter inboundAdapter(JpaExecutor jpaExecutor) {
return new JpaPollingChannelAdapter(jpaExecutor);
}
@Bean
public IntegrationFlow inboundFlow(JpaPollingChannelAdapter adapter) {
return IntegrationFlow
.from(adapter, e -> e.poller(Pollers.fixedDelay(10000)))
.channel("outputChannel")
.get();
}@Bean
public JpaOutboundGateway outboundGateway(JpaExecutor jpaExecutor) {
JpaOutboundGateway gateway = new JpaOutboundGateway(jpaExecutor);
gateway.setGatewayType(OutboundGatewayType.RETRIEVING);
return gateway;
}
@Bean
public IntegrationFlow gatewayFlow(JpaOutboundGateway gateway) {
return IntegrationFlow
.from("requestChannel")
.handle(gateway)
.channel("replyChannel")
.get();
}Exactly one query type must be configured:
executor.setEntityClass(Student.class);Retrieval: Returns all entities of the specified class Persistence: Payload is persisted/merged/deleted as this entity type
executor.setJpaQuery("SELECT s FROM Student s WHERE s.grade = :grade");For SELECT queries (retrieval operations) or UPDATE/DELETE queries (update operations)
executor.setNativeQuery("SELECT * FROM students WHERE grade = :grade");
executor.setEntityClass(Student.class); // Required for result mappingNative SQL queries require entityClass for result type mapping
executor.setNamedQuery("Student.findByDepartment");References a @NamedQuery defined on an entity class
executor.setPersistMode(PersistMode.PERSIST); // For new entities
executor.setPersistMode(PersistMode.MERGE); // For updates
executor.setPersistMode(PersistMode.DELETE); // For deletionsexecutor.setFlush(true);Forces immediate synchronization with database after each operation
executor.setFlushSize(100); // Flush every 100 operations
executor.setClearOnFlush(true); // Clear persistence context on flushexecutor.setExpectSingleResult(true); // Returns single entity
executor.setExpectSingleResult(false); // Returns List<?>executor.setMaxNumberOfResults(100); // Static limitOr use expression for dynamic limit:
Expression maxExpr = new SpelExpressionParser().parseExpression("payload.pageSize");
executor.setMaxResultsExpression(maxExpr);Expression firstResultExpr = new SpelExpressionParser()
.parseExpression("payload.page * payload.pageSize");
executor.setFirstResultExpression(firstResultExpr);
Expression maxResultsExpr = new SpelExpressionParser()
.parseExpression("payload.pageSize");
executor.setMaxResultsExpression(maxResultsExpr);Expression idExpr = new SpelExpressionParser().parseExpression("payload");
executor.setIdExpression(idExpr);Uses EntityManager.find() for optimal performance
executor.setDeleteAfterPoll(true); // Delete retrieved entities
executor.setDeleteInBatch(true); // Use batch deleteList<JpaParameter> parameters = new ArrayList<>();
parameters.add(new JpaParameter("grade", "A", null));
parameters.add(new JpaParameter("year", 2024, null));
executor.setJpaParameters(parameters);List<JpaParameter> parameters = new ArrayList<>();
parameters.add(new JpaParameter("grade", null, "payload.grade"));
parameters.add(new JpaParameter("year", null, "headers['academicYear']"));
executor.setJpaParameters(parameters);BeanPropertyParameterSourceFactory factory = new BeanPropertyParameterSourceFactory();
factory.setStaticParameters(Map.of("status", "ACTIVE"));
executor.setParameterSourceFactory(factory);
executor.setUsePayloadAsParameterSource(true);ParameterSource paramSource = new BeanPropertyParameterSource(criteriaObject);
executor.setParameterSource(paramSource);Object result = executor.poll();Executes a retrieval query without message context. Used internally by inbound adapters.
Returns:
expectSingleResult is trueList<?> of entities if expectSingleResult is falsenull if no results foundMessage<?> message = MessageBuilder.withPayload(searchCriteria).build();
Object result = executor.poll(message);Executes a retrieval query with message context for parameter evaluation. Used by retrieving gateways.
Message<?> message = MessageBuilder.withPayload(entity).build();
Object result = executor.executeOutboundJpaOperation(message);Executes a persistence operation (persist/merge/delete) or update query.
For entity operations:
For update queries:
Integer count of affected rows@Bean
public JpaExecutor retrievalExecutor(EntityManagerFactory emf, BeanFactory beanFactory) {
JpaExecutor executor = new JpaExecutor(emf);
// Query configuration
executor.setJpaQuery("SELECT s FROM Student s " +
"WHERE s.department = :dept " +
"AND s.enrollmentYear = :year");
// Parameters
List<JpaParameter> params = Arrays.asList(
new JpaParameter("dept", null, "payload.department"),
new JpaParameter("year", null, "payload.year")
);
executor.setJpaParameters(params);
// Result configuration
executor.setExpectSingleResult(false);
executor.setMaxNumberOfResults(100);
// Lifecycle
executor.setBeanFactory(beanFactory);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor persistenceExecutor(EntityManagerFactory emf) {
JpaExecutor executor = new JpaExecutor(emf);
// Entity configuration
executor.setEntityClass(Student.class);
executor.setPersistMode(PersistMode.MERGE);
// Flush configuration
executor.setFlush(true);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor updateExecutor(EntityManagerFactory emf, BeanFactory beanFactory) {
JpaExecutor executor = new JpaExecutor(emf);
// Update query
executor.setJpaQuery("UPDATE Student s " +
"SET s.status = :status " +
"WHERE s.year = :year");
// Parameters from payload
BeanPropertyParameterSourceFactory factory =
new BeanPropertyParameterSourceFactory();
executor.setParameterSourceFactory(factory);
executor.setUsePayloadAsParameterSource(true);
executor.setBeanFactory(beanFactory);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor pollingDeleteExecutor(EntityManagerFactory emf) {
JpaExecutor executor = new JpaExecutor(emf);
// Query
executor.setJpaQuery("SELECT o FROM Order o WHERE o.processed = true");
// Delete configuration
executor.setDeleteAfterPoll(true);
executor.setDeleteInBatch(true);
// Result limiting
executor.setMaxNumberOfResults(500);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor paginatedExecutor(EntityManagerFactory emf, BeanFactory beanFactory) {
JpaExecutor executor = new JpaExecutor(emf);
// Query
executor.setEntityClass(Student.class);
// Pagination expressions
SpelExpressionParser parser = new SpelExpressionParser();
executor.setFirstResultExpression(
parser.parseExpression("payload.offset")
);
executor.setMaxResultsExpression(
parser.parseExpression("payload.limit")
);
executor.setBeanFactory(beanFactory);
executor.afterPropertiesSet();
return executor;
}@Bean
public JpaExecutor idRetrievalExecutor(EntityManagerFactory emf, BeanFactory beanFactory) {
JpaExecutor executor = new JpaExecutor(emf);
// Entity class
executor.setEntityClass(Student.class);
// ID expression
SpelExpressionParser parser = new SpelExpressionParser();
executor.setIdExpression(parser.parseExpression("payload"));
// Single result
executor.setExpectSingleResult(true);
executor.setBeanFactory(beanFactory);
executor.afterPropertiesSet();
return executor;
}@Configuration
public class InboundConfig {
@Bean
public JpaExecutor inboundExecutor(EntityManagerFactory emf) {
JpaExecutor executor = new JpaExecutor(emf);
executor.setEntityClass(Task.class);
executor.setMaxNumberOfResults(50);
executor.setDeleteAfterPoll(true);
executor.afterPropertiesSet();
return executor;
}
@Bean
public JpaPollingChannelAdapter adapter(JpaExecutor inboundExecutor) {
return new JpaPollingChannelAdapter(inboundExecutor);
}
@Bean
public IntegrationFlow flow(JpaPollingChannelAdapter adapter) {
return IntegrationFlow
.from(adapter, e -> e.poller(Pollers.fixedDelay(5000)))
.channel("taskChannel")
.get();
}
}@Configuration
public class OutboundAdapterConfig {
@Bean
public JpaExecutor outboundExecutor(EntityManagerFactory emf) {
JpaExecutor executor = new JpaExecutor(emf);
executor.setEntityClass(LogEntry.class);
executor.setPersistMode(PersistMode.PERSIST);
executor.setFlushSize(100);
executor.afterPropertiesSet();
return executor;
}
@Bean
public JpaOutboundGateway adapter(JpaExecutor outboundExecutor) {
JpaOutboundGateway gateway = new JpaOutboundGateway(outboundExecutor);
gateway.setProducesReply(false); // Adapter mode
return gateway;
}
@Bean
public IntegrationFlow flow(JpaOutboundGateway adapter) {
return IntegrationFlow
.from("logChannel")
.handle(adapter)
.get();
}
}@Configuration
public class RetrievingGatewayConfig {
@Bean
public JpaExecutor retrievingExecutor(EntityManagerFactory emf, BeanFactory beanFactory) {
JpaExecutor executor = new JpaExecutor(emf);
executor.setJpaQuery("SELECT s FROM Student s WHERE s.id = :id");
List<JpaParameter> params = Collections.singletonList(
new JpaParameter("id", null, "payload")
);
executor.setJpaParameters(params);
executor.setExpectSingleResult(true);
executor.setBeanFactory(beanFactory);
executor.afterPropertiesSet();
return executor;
}
@Bean
public JpaOutboundGateway retrievingGateway(JpaExecutor retrievingExecutor) {
JpaOutboundGateway gateway = new JpaOutboundGateway(retrievingExecutor);
gateway.setGatewayType(OutboundGatewayType.RETRIEVING);
return gateway;
}
@Bean
public IntegrationFlow flow(JpaOutboundGateway retrievingGateway) {
return IntegrationFlow
.from("findChannel")
.handle(retrievingGateway)
.channel("resultChannel")
.get();
}
}@Configuration
public class UpdatingGatewayConfig {
@Bean
public JpaExecutor updatingExecutor(EntityManagerFactory emf) {
JpaExecutor executor = new JpaExecutor(emf);
executor.setEntityClass(Student.class);
executor.setPersistMode(PersistMode.MERGE);
executor.setFlush(true);
executor.afterPropertiesSet();
return executor;
}
@Bean
public JpaOutboundGateway updatingGateway(JpaExecutor updatingExecutor) {
JpaOutboundGateway gateway = new JpaOutboundGateway(updatingExecutor);
gateway.setGatewayType(OutboundGatewayType.UPDATING);
return gateway;
}
@Bean
public IntegrationFlow flow(JpaOutboundGateway updatingGateway) {
return IntegrationFlow
.from("updateChannel")
.handle(updatingGateway)
.channel("updatedChannel")
.get();
}
}The JpaExecutor requires access to the BeanFactory when using SpEL expressions:
executor.setBeanFactory(beanFactory);This allows expressions to reference Spring beans (e.g., @beanName.property).
Call afterPropertiesSet() after configuration:
executor.afterPropertiesSet();This validates the configuration and initializes internal state.
For advanced use cases, you can provide a custom JpaOperations implementation:
@Bean
public JpaOperations customJpaOperations(EntityManagerFactory emf) {
DefaultJpaOperations operations = new DefaultJpaOperations();
operations.setEntityManagerFactory(emf);
// Custom configuration
operations.afterPropertiesSet();
return operations;
}
@Bean
public JpaExecutor executor(JpaOperations customJpaOperations) {
return new JpaExecutor(customJpaOperations);
}JpaExecutor throws JpaOperationFailedException when operations fail:
try {
Object result = executor.executeOutboundJpaOperation(message);
} catch (JpaOperationFailedException e) {
String query = e.getOffendingJPAQl();
logger.error("JPA operation failed for query: {}", query, e);
}JpaExecutor is thread-safe after initialization. Multiple threads can safely call execution methods concurrently when using an EntityManagerFactory (recommended). When using a single EntityManager, ensure proper thread confinement.
Prefer EntityManagerFactory:
Use EntityManager only when:
For high-volume operations, configure batch flushing:
executor.setFlushSize(100);
executor.setClearOnFlush(true);Use ID-based retrieval when possible:
// Faster
executor.setEntityClass(Student.class);
executor.setIdExpression(parser.parseExpression("payload"));
// Slower
executor.setJpaQuery("SELECT s FROM Student s WHERE s.id = :id");