Spring Integration JPA Support - provides components for performing database operations using JPA
The JPA Updating Outbound Gateway persists, merges, or deletes entities and returns the result to a reply channel. Unlike the outbound adapter (fire-and-forget), the updating gateway provides request-reply semantics, returning the persisted/merged entity or the count of affected rows for update queries.
Required Dependencies:
spring-integration-jpa (this package)spring-integration-core is requiredEntityManagerFactory or EntityManager bean must be configuredDefault Behaviors:
persistMode=MERGE (updates existing or creates new)flush=false (no immediate flush after each operation)flushSize=0 (no batch flushing)clearOnFlush=false (persistence context not cleared on flush)Threading Model:
reply-timeout attributeLifecycle:
auto-startup attribute (default: true)Exceptions:
JpaOperationFailedException - JPA operation failures (contains offending query)PersistenceException - General JPA persistence errorsEntityExistsException - Entity already exists (when using PERSIST mode)OptimisticLockException - Version conflict (when using optimistic locking)MessageTimeoutException - Reply not received within timeoutIllegalArgumentException - Invalid configurationEdge Cases:
entityClass OR query (jpaQuery, nativeQuery, namedQuery)entityClass: payload is entity to persist/merge/delete, returns entityPERSIST mode throws exception if entity already exists (use MERGE for upsert)MERGE mode creates new entity if it doesn't exist (handles both insert and update)flushSize > 0 enables batch flushing (flushes every N operations)clearOnFlush=true prevents memory issues with large entity setsflushSize > 0, flush happens after every N operations, not per messageclass JpaOutboundGateway extends AbstractReplyProducingMessageHandler {
JpaOutboundGateway(JpaExecutor jpaExecutor)
void setGatewayType(OutboundGatewayType gatewayType) // Set to UPDATING (default)
}static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(EntityManagerFactory entityManagerFactory)
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(EntityManager entityManager)
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(JpaOperations jpaOperations)class JpaUpdatingOutboundEndpointSpec extends JpaBaseOutboundEndpointSpec {
// Persistence mode
JpaUpdatingOutboundEndpointSpec persistMode(PersistMode persistMode)
// Flush configuration
JpaUpdatingOutboundEndpointSpec flush(boolean flush)
JpaUpdatingOutboundEndpointSpec flushSize(int flushSize)
JpaUpdatingOutboundEndpointSpec clearOnFlush(boolean clearOnFlush)
// Query configuration (inherited)
JpaUpdatingOutboundEndpointSpec entityClass(Class<?> entityClass)
JpaUpdatingOutboundEndpointSpec jpaQuery(String jpaQuery)
JpaUpdatingOutboundEndpointSpec nativeQuery(String nativeQuery)
JpaUpdatingOutboundEndpointSpec namedQuery(String namedQuery)
// Parameter configuration (inherited)
JpaUpdatingOutboundEndpointSpec parameterSourceFactory(ParameterSourceFactory parameterSourceFactory)
JpaUpdatingOutboundEndpointSpec parameter(Object value)
JpaUpdatingOutboundEndpointSpec parameter(String name, Object value)
JpaUpdatingOutboundEndpointSpec parameter(JpaParameter jpaParameter)
JpaUpdatingOutboundEndpointSpec parameterExpression(String expression)
JpaUpdatingOutboundEndpointSpec parameterExpression(String name, String expression)
JpaUpdatingOutboundEndpointSpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource)
}@Bean
public IntegrationFlow persistAndReturnFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("createStudentChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.PERSIST))
.handle((payload, headers) -> {
Student persisted = (Student) payload;
System.out.println("Created student with ID: " + persisted.getId());
return payload;
})
.channel("persistedStudentChannel")
.get();
}@Bean
public IntegrationFlow mergeAndReturnFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("updateStudentChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE))
.channel("mergedStudentChannel")
.get();
}
// Usage: The merged (updated) entity is returned
Student updated = messagingTemplate.convertSendAndReceive(
"updateStudentChannel",
studentToUpdate,
Student.class
);@Bean
public IntegrationFlow updateQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("bulkUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.status = :status WHERE s.year = :year")
.parameterExpression("status", "payload.newStatus")
.parameterExpression("year", "payload.year"))
.handle((payload, headers) -> {
Integer count = (Integer) payload;
System.out.println("Updated " + count + " students");
return payload;
})
.channel("updateResultChannel")
.get();
}@Bean
public IntegrationFlow deleteAndReturnFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("deleteStudentChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.DELETE))
.handle((payload, headers) -> {
// Payload is the deleted entity
Student deleted = (Student) payload;
System.out.println("Deleted student: " + deleted.getId());
return "Deletion confirmed";
})
.channel("deleteConfirmationChannel")
.get();
}@Bean
public IntegrationFlow conditionalUpdateFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("conditionalChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.graduated = true " +
"WHERE s.credits >= :requiredCredits")
.parameter("requiredCredits", "payload"))
.<Integer, String>route(count -> {
if (count > 0) {
return "success";
} else {
return "noStudentsGraduated";
}
})
.get();
}@Bean
public IntegrationFlow persistMultipleFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("batchCreateChannel")
.split() // Split collection into individual entities
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.PERSIST)
.flush(true))
.aggregate() // Aggregate results back
.channel("persistedListChannel")
.get();
}
// Usage:
List<Student> students = Arrays.asList(new Student("John"), new Student("Jane"));
List<Student> persisted = messagingTemplate.convertSendAndReceive(
"batchCreateChannel",
students,
List.class
);@Bean
public IntegrationFlow mergeWithFlushFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("criticalUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(CriticalData.class)
.persistMode(PersistMode.MERGE)
.flush(true)) // Ensure immediate database sync
.channel("updatedDataChannel")
.get();
}@Bean
public IntegrationFlow batchMergeFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("batchMergeChannel")
.split()
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(LogEntry.class)
.persistMode(PersistMode.MERGE)
.flushSize(50) // Flush every 50 entities
.clearOnFlush(true))
.aggregate()
.channel("mergedBatchChannel")
.get();
}@Bean
public IntegrationFlow updateWithValidationFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("validatedUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE))
.handle((payload, headers) -> {
Student updated = (Student) payload;
// Validate the merge was successful
if (updated.getId() != null && updated.getVersion() != null) {
return updated;
} else {
throw new IllegalStateException("Merge failed");
}
})
.channel("validatedStudentChannel")
.get();
}// Assuming @NamedQuery on Student entity:
// @NamedQuery(name = "Student.updateStatus",
// query = "UPDATE Student s SET s.status = :status WHERE s.id IN :ids")
@Bean
public IntegrationFlow namedUpdateFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("namedUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.namedQuery("Student.updateStatus")
.parameterExpression("status", "payload.status")
.parameterExpression("ids", "payload.studentIds"))
.handle((payload, headers) -> {
Integer count = (Integer) payload;
return Map.of("updated", count, "status", "completed");
})
.channel("updateStatusChannel")
.get();
}@Bean
public IntegrationFlow nativeUpdateFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("nativeUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.nativeQuery("UPDATE students SET last_modified = CURRENT_TIMESTAMP " +
"WHERE department = :dept")
.parameter("dept", "payload"))
.channel("nativeUpdateResultChannel")
.get();
}@Bean
public IntegrationFlow upsertFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("upsertChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE)) // MERGE handles both insert and update
.handle((payload, headers) -> {
Student entity = (Student) payload;
boolean isNew = headers.get("originalId") == null && entity.getId() != null;
return Map.of(
"entity", entity,
"operation", isNew ? "INSERT" : "UPDATE"
);
})
.channel("upsertResultChannel")
.get();
}@Bean
public IntegrationFlow cascadeFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("studentWithEnrollmentsChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE)) // Cascades to related entities
.handle((payload, headers) -> {
Student student = (Student) payload;
// Both student and enrollments are persisted due to cascade
return student;
})
.channel("cascadedStudentChannel")
.get();
}@Bean
public IntegrationFlow errorHandlingUpdateFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("safeUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE),
e -> e.advice(errorAdvice()))
.channel("updateReplyChannel")
.get();
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice errorAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice =
new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpressionString("payload + ' updated successfully'");
advice.setOnFailureExpressionString("'Update failed: ' + #exception.message");
advice.setReturnFailureExpressionResult(true);
return advice;
}@Bean
public IntegrationFlow transactionalUpdateFlow(
EntityManagerFactory entityManagerFactory,
PlatformTransactionManager transactionManager) {
return IntegrationFlow
.from("transactionalUpdateChannel")
.handle(Jpa.updatingGateway(entityManagerFactory)
.entityClass(Order.class)
.persistMode(PersistMode.MERGE),
e -> e.transactional(transactionManager))
.handle((payload, headers) -> {
Order order = (Order) payload;
// Additional transactional work
return order;
})
.channel("committedOrderChannel")
.get();
}@Bean
public IntegrationFlow programmaticUpdatingFlow(EntityManagerFactory entityManagerFactory) {
JpaExecutor executor = new JpaExecutor(entityManagerFactory);
executor.setEntityClass(Student.class);
executor.setPersistMode(PersistMode.MERGE);
executor.setFlush(true);
JpaOutboundGateway gateway = new JpaOutboundGateway(executor);
gateway.setGatewayType(OutboundGatewayType.UPDATING);
return IntegrationFlow
.from("requestChannel")
.handle(gateway)
.channel("replyChannel")
.get();
}<int-jpa:updating-outbound-gateway
request-channel="createChannel"
reply-channel="createdChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
persist-mode="PERSIST"/><int-jpa:updating-outbound-gateway
request-channel="updateChannel"
reply-channel="updatedChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
persist-mode="MERGE"
flush="true"/><int-jpa:updating-outbound-gateway
request-channel="bulkUpdateChannel"
reply-channel="updateResultChannel"
entity-manager-factory="entityManagerFactory"
jpa-query="UPDATE Student s SET s.status = :status WHERE s.year = :year">
<int-jpa:parameter name="status" expression="payload.status"/>
<int-jpa:parameter name="year" expression="payload.year"/>
</int-jpa:updating-outbound-gateway><int-jpa:updating-outbound-gateway
request-channel="batchChannel"
reply-channel="batchResultChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.LogEntry"
persist-mode="PERSIST"
flush-size="100"
clear-on-flush="true"/>enum PersistMode {
PERSIST, // EntityManager.persist() - for new entities
MERGE, // EntityManager.merge() - for updates or new entities
DELETE // EntityManager.remove() - for deletions
}Default: MERGE
Choose one:
Integer representing number of affected rowsThe updating gateway participates in transactions. Configure transaction boundaries:
@Bean
public IntegrationFlow transactionalFlow(
EntityManagerFactory emf,
PlatformTransactionManager txManager) {
return IntegrationFlow
.from("txChannel")
.handle(Jpa.updatingGateway(emf)
.entityClass(Order.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(txManager))
.channel("replyChannel")
.get();
}For complex transactions spanning multiple operations:
@Bean
@Transactional
public IntegrationFlow multiOperationFlow(EntityManagerFactory emf) {
return IntegrationFlow
.from("complexTxChannel")
.handle(Jpa.updatingGateway(emf)
.entityClass(Order.class)
.persistMode(PersistMode.PERSIST))
.handle(Jpa.updatingGateway(emf)
.jpaQuery("UPDATE Inventory i SET i.quantity = i.quantity - :qty " +
"WHERE i.productId = :pid")
.parameterExpression("qty", "payload.quantity")
.parameterExpression("pid", "payload.productId"))
.channel("replyChannel")
.get();
}@Bean
public IntegrationFlow errorHandlingFlow(EntityManagerFactory emf) {
return IntegrationFlow
.from("inputChannel")
.handle(Jpa.updatingGateway(emf)
.entityClass(Student.class)
.persistMode(PersistMode.PERSIST),
e -> e.advice(errorHandlingAdvice()))
.channel("replyChannel")
.get();
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage error) {
if (error.getPayload().getCause() instanceof JpaOperationFailedException) {
// Handle JPA specific errors
}
}@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
advice.setRetryTemplate(retryTemplate);
return advice;
}
@Bean
public IntegrationFlow retryableFlow(EntityManagerFactory emf) {
return IntegrationFlow
.from("retryChannel")
.handle(Jpa.updatingGateway(emf)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE),
e -> e.advice(retryAdvice()))
.channel("replyChannel")
.get();
}// For new entities (faster)
.handle(Jpa.updatingGateway(emf)
.persistMode(PersistMode.PERSIST))
// For updates or unknown state (more flexible but slower)
.handle(Jpa.updatingGateway(emf)
.persistMode(PersistMode.MERGE))For bulk operations, use flushSize:
.handle(Jpa.updatingGateway(emf)
.persistMode(PersistMode.PERSIST)
.flushSize(100)
.clearOnFlush(true))Prefer JPQL UPDATE for mass updates:
// More efficient
.handle(Jpa.updatingGateway(emf)
.jpaQuery("UPDATE Student s SET s.graduated = true WHERE s.credits >= 120"))
// Less efficient (requires loading each entity)
// Splitting and updating each entity individuallyUse Outbound Channel Adapter when:
Use Updating Outbound Gateway when:
Install with Tessl CLI
npx tessl i tessl/maven-spring-integration-jpa