The JPA Retrieving Outbound Gateway queries the database and sends results to a reply channel for continued processing. Unlike the inbound adapter (which polls on a schedule), the retrieving gateway processes messages on-demand. It's ideal for request-reply scenarios where you need to retrieve data based on message content.
Required Dependencies:
spring-integration-jpa (this package)spring-integration-core is requiredEntityManagerFactory or EntityManager bean must be configuredDefault Behaviors:
expectSingleResult=false (returns List<?>)maxResults=unlimited (no limit)firstResult=0 (no offset)deleteAfterPoll=false (entities not deleted)deleteInBatch=false (individual deletes)flushAfterDelete=false (no flush after deletion)null when no results found and expectSingleResult=trueexpectSingleResult=falseThreading Model:
reply-timeout attributeLifecycle:
auto-startup attribute (default: true)Exceptions:
JpaOperationFailedException - JPA query execution failures (contains offending query)PersistenceException - General JPA persistence errorsNoResultException - Query returned no results (when expectSingleResult=true)NonUniqueResultException - Query returned multiple results (when expectSingleResult=true)MessageTimeoutException - Reply not received within timeoutIllegalArgumentException - Invalid configurationEdge Cases:
entityClass, jpaQuery, nativeQuery, or namedQueryidExpression uses EntityManager.find() for optimal performance (bypasses query)entityClass for result type mapping@NamedQuery annotationfirstResultExpression and maxResultsExpression evaluated against message contextexpectSingleResult=true and multiple results found, throws NonUniqueResultExceptionexpectSingleResult=true and no results found, returns null (not empty list)expectSingleResult=false and no results found, returns empty list (not null)Object[] or List<Object[]> depending on expectSingleResultdeleteAfterPoll=true with deleteInBatch=true is more efficient for large batchesclass JpaOutboundGateway extends AbstractReplyProducingMessageHandler {
JpaOutboundGateway(JpaExecutor jpaExecutor)
void setGatewayType(OutboundGatewayType gatewayType) // Set to RETRIEVING
}static JpaRetrievingOutboundGatewaySpec Jpa.retrievingGateway(EntityManagerFactory entityManagerFactory)
static JpaRetrievingOutboundGatewaySpec Jpa.retrievingGateway(EntityManager entityManager)
static JpaRetrievingOutboundGatewaySpec Jpa.retrievingGateway(JpaOperations jpaOperations)class JpaRetrievingOutboundGatewaySpec extends JpaBaseOutboundEndpointSpec {
// Result configuration
JpaRetrievingOutboundGatewaySpec expectSingleResult(boolean expectSingleResult)
JpaRetrievingOutboundGatewaySpec firstResult(int firstResult)
JpaRetrievingOutboundGatewaySpec firstResultExpression(String firstResultExpression)
JpaRetrievingOutboundGatewaySpec firstResultExpression(Expression firstResultExpression)
JpaRetrievingOutboundGatewaySpec maxResults(int maxResults)
JpaRetrievingOutboundGatewaySpec maxResultsExpression(String maxResultsExpression)
JpaRetrievingOutboundGatewaySpec maxResultsExpression(Expression maxResultsExpression)
// ID-based retrieval
JpaRetrievingOutboundGatewaySpec idExpression(String idExpression)
JpaRetrievingOutboundGatewaySpec idExpression(Expression idExpression)
// Delete after retrieval
JpaRetrievingOutboundGatewaySpec deleteAfterPoll(boolean deleteAfterPoll)
JpaRetrievingOutboundGatewaySpec deleteInBatch(boolean deleteInBatch)
JpaRetrievingOutboundGatewaySpec flushAfterDelete(boolean flush)
// Query configuration (inherited)
JpaRetrievingOutboundGatewaySpec entityClass(Class<?> entityClass)
JpaRetrievingOutboundGatewaySpec jpaQuery(String jpaQuery)
JpaRetrievingOutboundGatewaySpec nativeQuery(String nativeQuery)
JpaRetrievingOutboundGatewaySpec namedQuery(String namedQuery)
// Parameter configuration (inherited)
JpaRetrievingOutboundGatewaySpec parameterSourceFactory(ParameterSourceFactory parameterSourceFactory)
JpaRetrievingOutboundGatewaySpec parameter(Object value)
JpaRetrievingOutboundGatewaySpec parameter(String name, Object value)
JpaRetrievingOutboundGatewaySpec parameter(JpaParameter jpaParameter)
JpaRetrievingOutboundGatewaySpec parameterExpression(String expression)
JpaRetrievingOutboundGatewaySpec parameterExpression(String name, String expression)
JpaRetrievingOutboundGatewaySpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource)
}@Bean
public IntegrationFlow findStudentByIdFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("findStudentChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.entityClass(Student.class)
.idExpression("payload") // Payload contains the student ID
.expectSingleResult(true))
.channel("studentResultChannel")
.get();
}
// Usage:
messagingTemplate.convertAndSend("findStudentChannel", 123L);@Bean
public IntegrationFlow queryStudentsFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("queryChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.lastName = :lastName")
.parameterExpression("lastName", "payload"))
.channel("resultsChannel")
.get();
}
// Usage:
messagingTemplate.convertAndSend("queryChannel", "Smith");@Bean
public IntegrationFlow nativeQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("nativeQueryChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.nativeQuery("SELECT * FROM students WHERE grade = :grade AND year = :year")
.entityClass(Student.class)
.parameterExpression("grade", "payload.grade")
.parameterExpression("year", "payload.year"))
.channel("studentListChannel")
.get();
}// Assuming @NamedQuery on Student entity:
// @NamedQuery(name = "Student.findActiveByDepartment",
// query = "SELECT s FROM Student s WHERE s.active = true AND s.department = :dept")
@Bean
public IntegrationFlow namedQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("departmentQueryChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.namedQuery("Student.findActiveByDepartment")
.parameter("dept", "payload"))
.channel("activeStudentsChannel")
.get();
}@Bean
public IntegrationFlow findSingleStudentFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("findByEmailChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.email = :email")
.parameterExpression("email", "payload")
.expectSingleResult(true)) // Returns single entity, not list
.channel("studentChannel")
.get();
}@Bean
public IntegrationFlow paginatedQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("paginationChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s ORDER BY s.lastName")
.firstResultExpression("payload.page * payload.pageSize") // Offset
.maxResultsExpression("payload.pageSize")) // Limit
.channel("pageResultsChannel")
.get();
}
// Usage:
Map<String, Integer> pageRequest = Map.of("page", 2, "pageSize", 20);
messagingTemplate.convertAndSend("paginationChannel", pageRequest);@Bean
public IntegrationFlow staticPaginationFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("staticPageChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.entityClass(Student.class)
.firstResult(0) // Start at first result
.maxResults(100)) // Limit to 100 results
.channel("limitedResultsChannel")
.get();
}@Bean
public IntegrationFlow retrieveAndDeleteFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("processOrderChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.entityClass(PendingOrder.class)
.idExpression("payload")
.deleteAfterPoll(true) // Delete after retrieving
.expectSingleResult(true))
.handle((payload, headers) -> {
PendingOrder order = (PendingOrder) payload;
// Process the order
return order;
})
.channel("processedOrderChannel")
.get();
}@Bean
public IntegrationFlow complexQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("complexSearchChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s " +
"WHERE s.department = :dept " +
"AND s.enrollmentYear >= :yearFrom " +
"AND s.enrollmentYear <= :yearTo " +
"AND s.gpa >= :minGpa " +
"ORDER BY s.gpa DESC")
.parameterExpression("dept", "payload.department")
.parameterExpression("yearFrom", "payload.yearRange[0]")
.parameterExpression("yearTo", "payload.yearRange[1]")
.parameterExpression("minGpa", "payload.minGpa")
.maxResultsExpression("payload.limit"))
.channel("searchResultsChannel")
.get();
}@Bean
public IntegrationFlow joinQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("courseStudentsChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s " +
"JOIN s.enrollments e " +
"WHERE e.course.id = :courseId")
.parameter("courseId", "payload"))
.channel("enrolledStudentsChannel")
.get();
}@Bean
public IntegrationFlow aggregateQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("statsChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s.department, COUNT(s), AVG(s.gpa) " +
"FROM Student s " +
"GROUP BY s.department")
.expectSingleResult(false))
.channel("statisticsChannel")
.get();
}@Bean
public IntegrationFlow headerParametersFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("headerParamChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.id = :id AND s.department = :dept")
.parameterExpression("id", "payload")
.parameterExpression("dept", "headers['department']")) // Parameter from header
.channel("resultChannel")
.get();
}@Bean
public IntegrationFlow beanPropertyQueryFlow(EntityManagerFactory entityManagerFactory) {
BeanPropertyParameterSourceFactory paramSourceFactory =
new BeanPropertyParameterSourceFactory();
return IntegrationFlow
.from("beanSearchChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s " +
"WHERE s.firstName = :firstName " +
"AND s.lastName = :lastName")
.parameterSourceFactory(paramSourceFactory)
.usePayloadAsParameterSource(true)) // Extract from payload properties
.channel("foundStudentsChannel")
.get();
}
// Usage:
StudentSearchCriteria criteria = new StudentSearchCriteria();
criteria.setFirstName("John");
criteria.setLastName("Smith");
messagingTemplate.convertAndSend("beanSearchChannel", criteria);@Bean
public IntegrationFlow dynamicQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("dynamicChannel")
.route(Message.class, message -> {
String queryType = (String) message.getHeaders().get("queryType");
return "query." + queryType;
})
.get();
}
@Bean
public IntegrationFlow byNameQuery(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("query.byName")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.lastName = :name")
.parameter("name", "payload"))
.channel("resultsChannel")
.get();
}
@Bean
public IntegrationFlow byDepartmentQuery(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("query.byDepartment")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.department = :dept")
.parameter("dept", "payload"))
.channel("resultsChannel")
.get();
}@Bean
public IntegrationFlow programmaticRetrievingFlow(EntityManagerFactory entityManagerFactory) {
JpaExecutor executor = new JpaExecutor(entityManagerFactory);
executor.setJpaQuery("SELECT s FROM Student s WHERE s.id = :id");
executor.setExpectSingleResult(true);
List<JpaParameter> params = new ArrayList<>();
params.add(new JpaParameter("id", null, "payload"));
executor.setJpaParameters(params);
JpaOutboundGateway gateway = new JpaOutboundGateway(executor);
gateway.setGatewayType(OutboundGatewayType.RETRIEVING);
return IntegrationFlow
.from("requestChannel")
.handle(gateway)
.channel("replyChannel")
.get();
}<int-jpa:retrieving-outbound-gateway
request-channel="findChannel"
reply-channel="resultChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
id-expression="payload"
expect-single-result="true"/><int-jpa:retrieving-outbound-gateway
request-channel="queryChannel"
reply-channel="resultsChannel"
entity-manager-factory="entityManagerFactory"
jpa-query="SELECT s FROM Student s WHERE s.lastName = :lastName">
<int-jpa:parameter name="lastName" expression="payload"/>
</int-jpa:retrieving-outbound-gateway><int-jpa:retrieving-outbound-gateway
request-channel="pageChannel"
reply-channel="pageResultsChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.example.Student"
first-result-expression="payload.offset"
max-results-expression="payload.limit"/><int-jpa:retrieving-outbound-gateway
request-channel="namedQueryChannel"
reply-channel="resultsChannel"
entity-manager-factory="entityManagerFactory"
named-query="Student.findActiveByDepartment">
<int-jpa:parameter name="dept" expression="payload"/>
</int-jpa:retrieving-outbound-gateway>When idExpression is set, the gateway uses EntityManager.find() for optimal performance.
Multiple approaches for binding query parameters:
null if no results foundList<?> of entitiesnullThe retrieving gateway participates in transactions:
@Bean
public IntegrationFlow transactionalRetrievingFlow(
EntityManagerFactory entityManagerFactory,
PlatformTransactionManager transactionManager) {
return IntegrationFlow
.from("transactionalQueryChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.id = :id")
.parameter("id", "payload")
.deleteAfterPoll(true), // Delete in same transaction
e -> e.transactional(transactionManager))
.channel("replyChannel")
.get();
}Handle query errors using standard Spring Integration error handling:
@Bean
public IntegrationFlow errorHandlingRetrievingFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from("queryChannel")
.handle(Jpa.retrievingGateway(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.id = :id")
.parameter("id", "payload")
.expectSingleResult(true),
e -> e.advice(retryAdvice()))
.channel("resultChannel")
.get();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRetryTemplate(retryTemplate());
return advice;
}// Faster
.handle(Jpa.retrievingGateway(emf)
.entityClass(Student.class)
.idExpression("payload"))
// Slower
.handle(Jpa.retrievingGateway(emf)
.jpaQuery("SELECT s FROM Student s WHERE s.id = :id")
.parameter("id", "payload"))Always use maxResults for queries that could return large result sets:
.handle(Jpa.retrievingGateway(emf)
.jpaQuery("SELECT s FROM Student s")
.maxResults(1000)) // Protect against huge result setsFor large datasets, implement pagination:
.handle(Jpa.retrievingGateway(emf)
.jpaQuery("SELECT s FROM Student s ORDER BY s.id")
.firstResultExpression("payload.offset")
.maxResultsExpression("payload.pageSize"))For better performance with read-only queries, select only needed fields:
.handle(Jpa.retrievingGateway(emf)
.jpaQuery("SELECT NEW com.example.StudentDTO(s.id, s.name) FROM Student s"))Use Inbound Channel Adapter when:
Use Retrieving Outbound Gateway when: