The JPA Outbound Channel Adapter persists, merges, or deletes entities from incoming messages. It operates in fire-and-forget mode (no reply channel), making it ideal for one-way database operations. For operations that need to return results, use the Updating Outbound Gateway instead.
Required Dependencies:
spring-integration-jpa (this package)spring-integration-core is requiredEntityManagerFactory or EntityManager bean must be configuredDefault Behaviors:
persistMode=MERGE (updates existing or creates new)flush=false (no immediate flush after each operation)flushSize=0 (no batch flushing)clearOnFlush=false (persistence context not cleared on flush)entityClass is setThreading Model:
Lifecycle:
auto-startup attribute (default: true)Exceptions:
JpaOperationFailedException - JPA operation failures (contains offending query)PersistenceException - General JPA persistence errorsEntityExistsException - Entity already exists (when using PERSIST mode)IllegalArgumentException - Invalid configuration or entity typeOptimisticLockException - Version conflict (when using optimistic locking)Edge Cases:
entityClass OR query (jpaQuery, nativeQuery, namedQuery)entityClass: payload is entity to persist/merge/deletePERSIST mode throws exception if entity already exists (use MERGE for upsert)MERGE mode creates new entity if it doesn't exist (handles both insert and update)flushSize > 0 enables batch flushing (flushes every N operations)clearOnFlush=true prevents memory issues with large entity setsflushSize > 0, flush happens after every N operations, not per messageclass JpaOutboundGateway extends AbstractReplyProducingMessageHandler {
JpaOutboundGateway(JpaExecutor jpaExecutor)
void setGatewayType(OutboundGatewayType gatewayType)
void setProducesReply(boolean producesReply) // Set to false for adapter mode
}Note: The outbound adapter is created using JpaOutboundGateway with producesReply set to false, or via DSL/XML which handles this automatically.
static JpaUpdatingOutboundEndpointSpec Jpa.outboundAdapter(EntityManagerFactory entityManagerFactory)
static JpaUpdatingOutboundEndpointSpec Jpa.outboundAdapter(EntityManager entityManager)
static JpaUpdatingOutboundEndpointSpec Jpa.outboundAdapter(JpaOperations jpaOperations)class JpaUpdatingOutboundEndpointSpec extends JpaBaseOutboundEndpointSpec {
// Persistence mode
JpaUpdatingOutboundEndpointSpec persistMode(PersistMode persistMode)
// Flush configuration
JpaUpdatingOutboundEndpointSpec flush(boolean flush)
JpaUpdatingOutboundEndpointSpec flushSize(int flushSize)
JpaUpdatingOutboundEndpointSpec clearOnFlush(boolean clearOnFlush)
// Query configuration (inherited)
JpaUpdatingOutboundEndpointSpec entityClass(Class<?> entityClass)
JpaUpdatingOutboundEndpointSpec jpaQuery(String jpaQuery)
JpaUpdatingOutboundEndpointSpec nativeQuery(String nativeQuery)
JpaUpdatingOutboundEndpointSpec namedQuery(String namedQuery)
// Parameter configuration (inherited)
JpaUpdatingOutboundEndpointSpec parameterSourceFactory(ParameterSourceFactory parameterSourceFactory)
JpaUpdatingOutboundEndpointSpec parameter(Object value)
JpaUpdatingOutboundEndpointSpec parameter(String name, Object value)
JpaUpdatingOutboundEndpointSpec parameter(JpaParameter jpaParameter)
JpaUpdatingOutboundEndpointSpec parameterExpression(String expression)
JpaUpdatingOutboundEndpointSpec parameterExpression(String name, String expression)
JpaUpdatingOutboundEndpointSpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource)
}@Bean
public IntegrationFlow persistStudentFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("newStudentChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.PERSIST))
.get();
}@Bean
public IntegrationFlow mergeStudentFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("updateStudentChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.MERGE))
.get();
}@Bean
public IntegrationFlow deleteStudentFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("deleteStudentChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.DELETE))
.get();
}@Bean
public IntegrationFlow updateStatusFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("statusUpdateChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.status = :status WHERE s.id = :id")
.parameterExpression("status", "payload.status")
.parameterExpression("id", "payload.studentId"))
.get();
}@Bean
public IntegrationFlow persistWithFlushFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("criticalDataChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(CriticalData.class)
.persistMode(PersistMode.PERSIST)
.flush(true)) // Flush after each message
.get();
}@Bean
public IntegrationFlow batchPersistFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("bulkInsertChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(LogEntry.class)
.persistMode(PersistMode.PERSIST)
.flushSize(100) // Flush every 100 entities
.clearOnFlush(true)) // Clear persistence context on flush
.get();
}The default behavior uses the message payload as the entity to persist/merge/delete:
@Bean
public IntegrationFlow simpleFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("entityChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.persistMode(PersistMode.MERGE))
.get();
}
// Send entity in message
messagingTemplate.convertAndSend("entityChannel", studentEntity);@Bean
public IntegrationFlow updateWithStaticParamsFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("activateChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.active = true WHERE s.id = :id")
.parameter("id", "payload")) // Use message payload as id parameter
.get();
}@Bean
public IntegrationFlow nativeUpdateFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("nativeUpdateChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.nativeQuery("UPDATE students SET last_login = CURRENT_TIMESTAMP WHERE id = :id")
.parameter("id", "payload.userId"))
.get();
}@Bean
public IntegrationFlow beanPropertyFlow(EntityManagerFactory entityManagerFactory) {
BeanPropertyParameterSourceFactory parameterSourceFactory =
new BeanPropertyParameterSourceFactory();
return IntegrationFlow
.from("studentUpdateChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.grade = :grade WHERE s.id = :id")
.parameterSourceFactory(parameterSourceFactory)
.usePayloadAsParameterSource(true)) // Extract from payload properties
.get();
}@Bean
public IntegrationFlow expressionParamFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("expressionChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.enrollmentDate = :date WHERE s.id = :id")
.parameterExpression("date", "T(java.time.LocalDate).now()") // Current date
.parameterExpression("id", "payload['studentId']"))
.get();
}@Bean
public IntegrationFlow mixedParametersFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("mixedParamChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.jpaQuery("UPDATE Student s SET s.grade = :grade, s.year = :year WHERE s.id = :id")
.parameter("grade", "A") // Static value
.parameterExpression("year", "payload.academicYear") // Expression
.parameter("id", "headers['studentId']")) // From message header
.get();
}@Bean
public IntegrationFlow programmaticOutboundFlow(EntityManagerFactory entityManagerFactory) {
JpaExecutor executor = new JpaExecutor(entityManagerFactory);
executor.setEntityClass(Student.class);
executor.setPersistMode(PersistMode.MERGE);
executor.setFlush(true);
JpaOutboundGateway gateway = new JpaOutboundGateway(executor);
gateway.setProducesReply(false); // Adapter mode (no reply)
return IntegrationFlow
.from("studentChannel")
.handle(gateway)
.get();
}<int-jpa:outbound-channel-adapter
channel="inputChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
persist-mode="PERSIST"/><int-jpa:outbound-channel-adapter
channel="updateChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
persist-mode="MERGE"
flush="true"/><int-jpa:outbound-channel-adapter
channel="queryChannel"
entity-manager-factory="entityManagerFactory"
jpa-query="UPDATE Student s SET s.status = :status WHERE s.id = :id">
<int-jpa:parameter name="status" expression="payload.status"/>
<int-jpa:parameter name="id" expression="payload.id"/>
</int-jpa:outbound-channel-adapter><int-jpa:outbound-channel-adapter
channel="batchChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.LogEntry"
persist-mode="PERSIST"
flush-size="100"
clear-on-flush="true"/>enum PersistMode {
PERSIST, // EntityManager.persist() - for new entities
MERGE, // EntityManager.merge() - for updates or new entities
DELETE // EntityManager.remove() - for deletions
}Default: MERGE
Flushing forces immediate synchronization with the database. Use flushSize for batch operations to improve performance while maintaining reasonable memory usage.
You must specify exactly one:
The message payload is treated as the entity to persist/merge/delete:
Student student = new Student("John", "Doe");
messagingTemplate.convertAndSend("persistChannel", student);An UPDATE or DELETE query is executed with parameters resolved from:
Map<String, Object> params = Map.of("status", "ACTIVE", "studentId", 123);
messagingTemplate.convertAndSend("updateChannel", params);The outbound adapter participates in transactions. Configure transaction management:
@Bean
public IntegrationFlow transactionalOutboundFlow(
EntityManagerFactory entityManagerFactory,
PlatformTransactionManager transactionManager) {
return IntegrationFlow
.from("transactionalChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(Order.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(transactionManager))
.get();
}Or use @Transactional on upstream components.
Configure error handling for failed operations:
@Bean
public IntegrationFlow errorHandlingFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("inputChannel")
.handle(Jpa.outboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.persistMode(PersistMode.PERSIST),
e -> e.advice(errorHandlingAdvice()))
.get();
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice errorHandlingAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice =
new ExpressionEvaluatingRequestHandlerAdvice();
advice.setFailureChannel(errorChannel());
advice.setOnFailureExpressionString("payload + ' failed: ' + #exception.message");
return advice;
}For high-volume inserts, use flushSize to batch database operations:
.handle(Jpa.outboundAdapter(entityManagerFactory)
.persistMode(PersistMode.PERSIST)
.flushSize(100) // Flush every 100 entities
.clearOnFlush(true)) // Clear cache to prevent memory issuesUse clearOnFlush(true) when processing large volumes to prevent memory issues from managed entities accumulating in the persistence context.
Keep transactions as short as possible. For long-running flows, consider placing transaction boundaries around the JPA operations rather than the entire flow.
Use Outbound Channel Adapter when:
Use Updating Outbound Gateway when: