CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-spring-integration-jpa

Spring Integration JPA Support - provides components for performing database operations using JPA

Overview
Eval results
Files

updating-gateway.mddocs/

Updating Outbound Gateway

The JPA Updating Outbound Gateway persists, merges, or deletes entities and returns the result to a reply channel. Unlike the outbound adapter (fire-and-forget), the updating gateway provides request-reply semantics, returning the persisted/merged entity or the count of affected rows for update queries.

Key Information for Agents

Required Dependencies:

  • spring-integration-jpa (this package)
  • spring-integration-core is required
  • EntityManagerFactory or EntityManager bean must be configured
  • Reply channel required (or uses temporary channel)

Default Behaviors:

  • persistMode=MERGE (updates existing or creates new)
  • flush=false (no immediate flush after each operation)
  • flushSize=0 (no batch flushing)
  • clearOnFlush=false (persistence context not cleared on flush)
  • Request-reply pattern (expects reply, blocks until result)
  • Returns persisted/merged entity (entity mode) or Integer count (query mode)
  • Gateway type is UPDATING by default

Threading Model:

  • Executes on message processing thread
  • Blocking I/O (database operations are synchronous)
  • For async processing, use async channels (ExecutorChannel, QueueChannel)
  • Thread-safe when using EntityManagerFactory (creates EntityManager per operation)
  • Batch operations (flushSize > 0) process entities sequentially
  • Reply timeout configurable via reply-timeout attribute

Lifecycle:

  • Initialized when Spring context starts
  • Processes messages as they arrive on request channel
  • Stops processing when context is destroyed
  • Can be controlled via auto-startup attribute (default: true)

Exceptions:

  • JpaOperationFailedException - JPA operation failures (contains offending query)
  • PersistenceException - General JPA persistence errors
  • EntityExistsException - Entity already exists (when using PERSIST mode)
  • OptimisticLockException - Version conflict (when using optimistic locking)
  • MessageTimeoutException - Reply not received within timeout
  • IllegalArgumentException - Invalid configuration

Edge Cases:

  • Exactly one configuration must be set: entityClass OR query (jpaQuery, nativeQuery, namedQuery)
  • With entityClass: payload is entity to persist/merge/delete, returns entity
  • With query: payload provides parameters for UPDATE/DELETE query, returns Integer count
  • PERSIST mode throws exception if entity already exists (use MERGE for upsert)
  • MERGE mode creates new entity if it doesn't exist (handles both insert and update)
  • flushSize > 0 enables batch flushing (flushes every N operations)
  • clearOnFlush=true prevents memory issues with large entity sets
  • When flushSize > 0, flush happens after every N operations, not per message
  • MERGE returns managed entity (may be different instance than input)
  • PERSIST returns persisted entity with generated ID populated
  • DELETE returns deleted entity (in detached state)
  • Update queries return Integer count of affected rows
  • Parameter sources can mix static values, expressions, and bean properties

Core Class

class JpaOutboundGateway extends AbstractReplyProducingMessageHandler {
    JpaOutboundGateway(JpaExecutor jpaExecutor)
    void setGatewayType(OutboundGatewayType gatewayType)  // Set to UPDATING (default)
}

Java DSL Configuration

Factory Methods

static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(EntityManagerFactory entityManagerFactory)
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(EntityManager entityManager)
static JpaUpdatingOutboundEndpointSpec Jpa.updatingGateway(JpaOperations jpaOperations)

Configuration Methods

class JpaUpdatingOutboundEndpointSpec extends JpaBaseOutboundEndpointSpec {
    // Persistence mode
    JpaUpdatingOutboundEndpointSpec persistMode(PersistMode persistMode)

    // Flush configuration
    JpaUpdatingOutboundEndpointSpec flush(boolean flush)
    JpaUpdatingOutboundEndpointSpec flushSize(int flushSize)
    JpaUpdatingOutboundEndpointSpec clearOnFlush(boolean clearOnFlush)

    // Query configuration (inherited)
    JpaUpdatingOutboundEndpointSpec entityClass(Class<?> entityClass)
    JpaUpdatingOutboundEndpointSpec jpaQuery(String jpaQuery)
    JpaUpdatingOutboundEndpointSpec nativeQuery(String nativeQuery)
    JpaUpdatingOutboundEndpointSpec namedQuery(String namedQuery)

    // Parameter configuration (inherited)
    JpaUpdatingOutboundEndpointSpec parameterSourceFactory(ParameterSourceFactory parameterSourceFactory)
    JpaUpdatingOutboundEndpointSpec parameter(Object value)
    JpaUpdatingOutboundEndpointSpec parameter(String name, Object value)
    JpaUpdatingOutboundEndpointSpec parameter(JpaParameter jpaParameter)
    JpaUpdatingOutboundEndpointSpec parameterExpression(String expression)
    JpaUpdatingOutboundEndpointSpec parameterExpression(String name, String expression)
    JpaUpdatingOutboundEndpointSpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource)
}

Usage Examples

Persist Entity and Return Result

@Bean
public IntegrationFlow persistAndReturnFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("createStudentChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.PERSIST))
        .handle((payload, headers) -> {
            Student persisted = (Student) payload;
            System.out.println("Created student with ID: " + persisted.getId());
            return payload;
        })
        .channel("persistedStudentChannel")
        .get();
}

Merge Entity and Return Result

@Bean
public IntegrationFlow mergeAndReturnFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("updateStudentChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.MERGE))
        .channel("mergedStudentChannel")
        .get();
}

// Usage: The merged (updated) entity is returned
Student updated = messagingTemplate.convertSendAndReceive(
    "updateStudentChannel",
    studentToUpdate,
    Student.class
);

Execute Update Query and Return Affected Count

@Bean
public IntegrationFlow updateQueryFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("bulkUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .jpaQuery("UPDATE Student s SET s.status = :status WHERE s.year = :year")
                .parameterExpression("status", "payload.newStatus")
                .parameterExpression("year", "payload.year"))
        .handle((payload, headers) -> {
            Integer count = (Integer) payload;
            System.out.println("Updated " + count + " students");
            return payload;
        })
        .channel("updateResultChannel")
        .get();
}

Delete Entity and Return Result

@Bean
public IntegrationFlow deleteAndReturnFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("deleteStudentChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.DELETE))
        .handle((payload, headers) -> {
            // Payload is the deleted entity
            Student deleted = (Student) payload;
            System.out.println("Deleted student: " + deleted.getId());
            return "Deletion confirmed";
        })
        .channel("deleteConfirmationChannel")
        .get();
}

Conditional Update Based on Result

@Bean
public IntegrationFlow conditionalUpdateFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("conditionalChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .jpaQuery("UPDATE Student s SET s.graduated = true " +
                         "WHERE s.credits >= :requiredCredits")
                .parameter("requiredCredits", "payload"))
        .<Integer, String>route(count -> {
            if (count > 0) {
                return "success";
            } else {
                return "noStudentsGraduated";
            }
        })
        .get();
}

Persist Multiple Entities

@Bean
public IntegrationFlow persistMultipleFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("batchCreateChannel")
        .split()  // Split collection into individual entities
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.PERSIST)
                .flush(true))
        .aggregate()  // Aggregate results back
        .channel("persistedListChannel")
        .get();
}

// Usage:
List<Student> students = Arrays.asList(new Student("John"), new Student("Jane"));
List<Student> persisted = messagingTemplate.convertSendAndReceive(
    "batchCreateChannel",
    students,
    List.class
);

Merge with Flush After Each

@Bean
public IntegrationFlow mergeWithFlushFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("criticalUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(CriticalData.class)
                .persistMode(PersistMode.MERGE)
                .flush(true))  // Ensure immediate database sync
        .channel("updatedDataChannel")
        .get();
}

Batch Processing with Periodic Flush

@Bean
public IntegrationFlow batchMergeFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("batchMergeChannel")
        .split()
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(LogEntry.class)
                .persistMode(PersistMode.MERGE)
                .flushSize(50)  // Flush every 50 entities
                .clearOnFlush(true))
        .aggregate()
        .channel("mergedBatchChannel")
        .get();
}

Update with Validation

@Bean
public IntegrationFlow updateWithValidationFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("validatedUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.MERGE))
        .handle((payload, headers) -> {
            Student updated = (Student) payload;
            // Validate the merge was successful
            if (updated.getId() != null && updated.getVersion() != null) {
                return updated;
            } else {
                throw new IllegalStateException("Merge failed");
            }
        })
        .channel("validatedStudentChannel")
        .get();
}

Execute Named Update Query

// Assuming @NamedQuery on Student entity:
// @NamedQuery(name = "Student.updateStatus",
//             query = "UPDATE Student s SET s.status = :status WHERE s.id IN :ids")

@Bean
public IntegrationFlow namedUpdateFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("namedUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .namedQuery("Student.updateStatus")
                .parameterExpression("status", "payload.status")
                .parameterExpression("ids", "payload.studentIds"))
        .handle((payload, headers) -> {
            Integer count = (Integer) payload;
            return Map.of("updated", count, "status", "completed");
        })
        .channel("updateStatusChannel")
        .get();
}

Native Update Query

@Bean
public IntegrationFlow nativeUpdateFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("nativeUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .nativeQuery("UPDATE students SET last_modified = CURRENT_TIMESTAMP " +
                           "WHERE department = :dept")
                .parameter("dept", "payload"))
        .channel("nativeUpdateResultChannel")
        .get();
}

Upsert Pattern (Merge with Detection)

@Bean
public IntegrationFlow upsertFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("upsertChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.MERGE))  // MERGE handles both insert and update
        .handle((payload, headers) -> {
            Student entity = (Student) payload;
            boolean isNew = headers.get("originalId") == null && entity.getId() != null;
            return Map.of(
                "entity", entity,
                "operation", isNew ? "INSERT" : "UPDATE"
            );
        })
        .channel("upsertResultChannel")
        .get();
}

Cascade Operations

@Bean
public IntegrationFlow cascadeFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("studentWithEnrollmentsChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.MERGE))  // Cascades to related entities
        .handle((payload, headers) -> {
            Student student = (Student) payload;
            // Both student and enrollments are persisted due to cascade
            return student;
        })
        .channel("cascadedStudentChannel")
        .get();
}

Error Handling with Gateway Reply

@Bean
public IntegrationFlow errorHandlingUpdateFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from("safeUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Student.class)
                .persistMode(PersistMode.MERGE),
            e -> e.advice(errorAdvice()))
        .channel("updateReplyChannel")
        .get();
}

@Bean
public ExpressionEvaluatingRequestHandlerAdvice errorAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice =
        new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpressionString("payload + ' updated successfully'");
    advice.setOnFailureExpressionString("'Update failed: ' + #exception.message");
    advice.setReturnFailureExpressionResult(true);
    return advice;
}

Transaction Coordination

@Bean
public IntegrationFlow transactionalUpdateFlow(
        EntityManagerFactory entityManagerFactory,
        PlatformTransactionManager transactionManager) {
    return IntegrationFlow
        .from("transactionalUpdateChannel")
        .handle(Jpa.updatingGateway(entityManagerFactory)
                .entityClass(Order.class)
                .persistMode(PersistMode.MERGE),
            e -> e.transactional(transactionManager))
        .handle((payload, headers) -> {
            Order order = (Order) payload;
            // Additional transactional work
            return order;
        })
        .channel("committedOrderChannel")
        .get();
}

Programmatic Configuration

@Bean
public IntegrationFlow programmaticUpdatingFlow(EntityManagerFactory entityManagerFactory) {
    JpaExecutor executor = new JpaExecutor(entityManagerFactory);
    executor.setEntityClass(Student.class);
    executor.setPersistMode(PersistMode.MERGE);
    executor.setFlush(true);

    JpaOutboundGateway gateway = new JpaOutboundGateway(executor);
    gateway.setGatewayType(OutboundGatewayType.UPDATING);

    return IntegrationFlow
        .from("requestChannel")
        .handle(gateway)
        .channel("replyChannel")
        .get();
}

XML Configuration

Persist Entity

<int-jpa:updating-outbound-gateway
    request-channel="createChannel"
    reply-channel="createdChannel"
    entity-manager-factory="entityManagerFactory"
    entity-class="com.example.Student"
    persist-mode="PERSIST"/>

Merge Entity

<int-jpa:updating-outbound-gateway
    request-channel="updateChannel"
    reply-channel="updatedChannel"
    entity-manager-factory="entityManagerFactory"
    entity-class="com.example.Student"
    persist-mode="MERGE"
    flush="true"/>

Execute Update Query

<int-jpa:updating-outbound-gateway
    request-channel="bulkUpdateChannel"
    reply-channel="updateResultChannel"
    entity-manager-factory="entityManagerFactory"
    jpa-query="UPDATE Student s SET s.status = :status WHERE s.year = :year">
    <int-jpa:parameter name="status" expression="payload.status"/>
    <int-jpa:parameter name="year" expression="payload.year"/>
</int-jpa:updating-outbound-gateway>

Batch Processing

<int-jpa:updating-outbound-gateway
    request-channel="batchChannel"
    reply-channel="batchResultChannel"
    entity-manager-factory="entityManagerFactory"
    entity-class="com.example.LogEntry"
    persist-mode="PERSIST"
    flush-size="100"
    clear-on-flush="true"/>

Configuration Options

Persist Mode

enum PersistMode {
    PERSIST,  // EntityManager.persist() - for new entities
    MERGE,    // EntityManager.merge() - for updates or new entities
    DELETE    // EntityManager.remove() - for deletions
}
  • PERSIST - For new entities only. Returns persisted entity with generated ID
  • MERGE - For updates or inserts. Returns managed entity (may be different instance)
  • DELETE - Removes entity. Returns the deleted entity

Default: MERGE

Flush Configuration

  • flush - If true, flush EntityManager after each operation. Default: false
  • flushSize - Flush every N operations. Default: 0 (disabled)
  • clearOnFlush - Clear persistence context on flush. Default: false

Query Types

Choose one:

  • entityClass - Message payload is the entity
  • jpaQuery - JPQL UPDATE/DELETE query
  • nativeQuery - Native SQL UPDATE/DELETE query
  • namedQuery - Named update query

Parameter Configuration

  • parameterSourceFactory - Factory for creating parameter sources
  • parameter(name, value) - Static parameter
  • parameter(name, expression) - SpEL expression parameter
  • parameterExpression(map) - Multiple expression parameters
  • usePayloadAsParameterSource - Extract from payload properties

Return Values

Entity Mode (with entityClass)

PERSIST Mode

  • Returns the persisted entity with generated ID populated
  • Entity is in managed state

MERGE Mode

  • Returns the managed entity (may be different instance than input)
  • Changes are reflected in returned entity

DELETE Mode

  • Returns the deleted entity
  • Entity is in detached state

Query Mode (with jpaQuery/nativeQuery/namedQuery)

  • Returns Integer representing number of affected rows
  • For UPDATE queries: count of updated rows
  • For DELETE queries: count of deleted rows

Transaction Management

The updating gateway participates in transactions. Configure transaction boundaries:

@Bean
public IntegrationFlow transactionalFlow(
        EntityManagerFactory emf,
        PlatformTransactionManager txManager) {
    return IntegrationFlow
        .from("txChannel")
        .handle(Jpa.updatingGateway(emf)
                .entityClass(Order.class)
                .persistMode(PersistMode.PERSIST),
            e -> e.transactional(txManager))
        .channel("replyChannel")
        .get();
}

For complex transactions spanning multiple operations:

@Bean
@Transactional
public IntegrationFlow multiOperationFlow(EntityManagerFactory emf) {
    return IntegrationFlow
        .from("complexTxChannel")
        .handle(Jpa.updatingGateway(emf)
                .entityClass(Order.class)
                .persistMode(PersistMode.PERSIST))
        .handle(Jpa.updatingGateway(emf)
                .jpaQuery("UPDATE Inventory i SET i.quantity = i.quantity - :qty " +
                         "WHERE i.productId = :pid")
                .parameterExpression("qty", "payload.quantity")
                .parameterExpression("pid", "payload.productId"))
        .channel("replyChannel")
        .get();
}

Error Handling

Basic Error Handling

@Bean
public IntegrationFlow errorHandlingFlow(EntityManagerFactory emf) {
    return IntegrationFlow
        .from("inputChannel")
        .handle(Jpa.updatingGateway(emf)
                .entityClass(Student.class)
                .persistMode(PersistMode.PERSIST),
            e -> e.advice(errorHandlingAdvice()))
        .channel("replyChannel")
        .get();
}

@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage error) {
    if (error.getPayload().getCause() instanceof JpaOperationFailedException) {
        // Handle JPA specific errors
    }
}

Retry on Failure

@Bean
public RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());

    advice.setRetryTemplate(retryTemplate);
    return advice;
}

@Bean
public IntegrationFlow retryableFlow(EntityManagerFactory emf) {
    return IntegrationFlow
        .from("retryChannel")
        .handle(Jpa.updatingGateway(emf)
                .entityClass(Student.class)
                .persistMode(PersistMode.MERGE),
            e -> e.advice(retryAdvice()))
        .channel("replyChannel")
        .get();
}

Performance Considerations

Use PERSIST vs MERGE Appropriately

// For new entities (faster)
.handle(Jpa.updatingGateway(emf)
        .persistMode(PersistMode.PERSIST))

// For updates or unknown state (more flexible but slower)
.handle(Jpa.updatingGateway(emf)
        .persistMode(PersistMode.MERGE))

Batch Processing

For bulk operations, use flushSize:

.handle(Jpa.updatingGateway(emf)
        .persistMode(PersistMode.PERSIST)
        .flushSize(100)
        .clearOnFlush(true))

Bulk Updates vs Individual Updates

Prefer JPQL UPDATE for mass updates:

// More efficient
.handle(Jpa.updatingGateway(emf)
        .jpaQuery("UPDATE Student s SET s.graduated = true WHERE s.credits >= 120"))

// Less efficient (requires loading each entity)
// Splitting and updating each entity individually

Comparison: Adapter vs Gateway

Use Outbound Channel Adapter when:

  • Don't need the result
  • Fire-and-forget is acceptable
  • Maximizing throughput

Use Updating Outbound Gateway when:

  • Need persisted/merged entity returned
  • Need affected row count
  • Want to continue processing with result
  • Implementing request-reply pattern
  • Need confirmation of operation

Install with Tessl CLI

npx tessl i tessl/maven-spring-integration-jpa

docs

inbound-adapter.md

index.md

jpa-executor.md

jpa-operations.md

outbound-adapter.md

parameter-sources.md

retrieving-gateway.md

updating-gateway.md

xml-configuration.md

tile.json