The JPA Inbound Channel Adapter polls a database using JPA queries and produces messages containing the retrieved entities. It's ideal for scheduled database polling scenarios where you want to retrieve data periodically and process it in a Spring Integration flow.
Required Dependencies:
spring-integration-jpa (this package)spring-integration-core is requiredEntityManagerFactory or EntityManager bean must be configuredDefault Behaviors:
expectSingleResult=false (returns List<?>)maxResults=unlimited (retrieves all matching entities)deleteAfterPoll=false (entities not deleted after retrieval)deleteInBatch=false (individual deletes if deleteAfterPoll=true)flushAfterDelete=false (no flush after deletion)null when no results found (not empty list)expectSingleResult=true and one result foundexpectSingleResult=false (even if single result)Threading Model:
Lifecycle:
auto-startup attribute (default: true)Exceptions:
JpaOperationFailedException - JPA query execution failures (contains offending query)PersistenceException - General JPA persistence errorsNoResultException - When expectSingleResult=true and no results foundNonUniqueResultException - When expectSingleResult=true and multiple results foundIllegalArgumentException - Invalid configuration (e.g., multiple query types set)Edge Cases:
entityClass, jpaQuery, nativeQuery, or namedQueryentityClass for result type mapping@NamedQuery annotationdeleteAfterPoll=true with deleteInBatch=true is more efficient for large batchesflushAfterDelete=true ensures deletions are immediately persistedmaxResultsExpression allows dynamic limit based on message context (evaluated per poll)expectSingleResult=true and query returns multiple results, throws NonUniqueResultExceptionexpectSingleResult=true and query returns no results, returns null (not empty list)parameterSource) or dynamic (via expressions)e -> e.poller(...) in DSL)class JpaPollingChannelAdapter extends AbstractMessageSource<Object> {
JpaPollingChannelAdapter(JpaExecutor jpaExecutor)
String getComponentType()
}static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(EntityManagerFactory entityManagerFactory)
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(EntityManager entityManager)
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(JpaOperations jpaOperations)class JpaInboundChannelAdapterSpec {
// Query configuration
JpaInboundChannelAdapterSpec entityClass(Class<?> entityClass)
JpaInboundChannelAdapterSpec jpaQuery(String jpaQuery)
JpaInboundChannelAdapterSpec nativeQuery(String nativeQuery)
JpaInboundChannelAdapterSpec namedQuery(String namedQuery)
// Result configuration
JpaInboundChannelAdapterSpec expectSingleResult(boolean expectSingleResult)
JpaInboundChannelAdapterSpec maxResults(int maxResults)
JpaInboundChannelAdapterSpec maxResultsExpression(String maxResultsExpression)
JpaInboundChannelAdapterSpec maxResultsExpression(Expression maxResultsExpression)
// Delete after poll configuration
JpaInboundChannelAdapterSpec deleteAfterPoll(boolean deleteAfterPoll)
JpaInboundChannelAdapterSpec deleteInBatch(boolean deleteInBatch)
JpaInboundChannelAdapterSpec flushAfterDelete(boolean flush)
// Parameter configuration
JpaInboundChannelAdapterSpec parameterSource(ParameterSource parameterSource)
}@Bean
public IntegrationFlow pollStudentsFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.entityClass(Student.class)
.maxResults(100),
e -> e.poller(Pollers.fixedDelay(10000)))
.channel("studentChannel")
.get();
}@Bean
public IntegrationFlow pollActiveStudentsFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s WHERE s.status = 'ACTIVE'")
.maxResults(50),
e -> e.poller(Pollers.fixedRate(30000)))
.channel("activeStudentChannel")
.get();
}@Bean
public IntegrationFlow pollWithNativeQueryFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.nativeQuery("SELECT * FROM students WHERE registration_date > CURRENT_DATE - 7")
.entityClass(Student.class),
e -> e.poller(Pollers.fixedDelay(60000)))
.channel("recentStudentChannel")
.get();
}// Assuming @NamedQuery defined on Student entity:
// @NamedQuery(name = "Student.findByGrade", query = "SELECT s FROM Student s WHERE s.grade = :grade")
@Bean
public IntegrationFlow pollByGradeFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.namedQuery("Student.findByGrade")
.parameterSource(new BeanPropertyParameterSource(
Map.of("grade", "A"))),
e -> e.poller(Pollers.fixedDelay(15000)))
.channel("gradeAStudentChannel")
.get();
}@Bean
public IntegrationFlow pollAndDeleteFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.entityClass(ProcessedOrder.class)
.jpaQuery("SELECT o FROM ProcessedOrder o WHERE o.processed = true")
.deleteAfterPoll(true)
.maxResults(100),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(message -> {
// Process the order
ProcessedOrder order = (ProcessedOrder) message.getPayload();
System.out.println("Archiving order: " + order.getId());
})
.get();
}@Bean
public IntegrationFlow pollAndBatchDeleteFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.jpaQuery("SELECT e FROM Event e WHERE e.processed = true")
.deleteAfterPoll(true)
.deleteInBatch(true) // More efficient for large batches
.flushAfterDelete(true)
.maxResults(500),
e -> e.poller(Pollers.fixedDelay(10000)))
.channel("eventArchiveChannel")
.get();
}@Bean
public IntegrationFlow pollSingleConfigFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.jpaQuery("SELECT c FROM Configuration c WHERE c.active = true")
.expectSingleResult(true),
e -> e.poller(Pollers.fixedRate(60000)))
.channel("configChannel")
.get();
}@Bean
public IntegrationFlow dynamicMaxResultsFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.entityClass(Task.class)
.maxResultsExpression("@systemConfig.batchSize"), // References Spring bean
e -> e.poller(Pollers.fixedDelay(5000)))
.channel("taskChannel")
.get();
}@Bean
public JpaPollingChannelAdapter jpaInboundAdapter(EntityManagerFactory entityManagerFactory) {
JpaExecutor executor = new JpaExecutor(entityManagerFactory);
executor.setEntityClass(Student.class);
executor.setMaxNumberOfResults(100);
executor.setDeleteAfterPoll(true);
executor.setDeleteInBatch(true);
return new JpaPollingChannelAdapter(executor);
}
@Bean
public IntegrationFlow inboundFlow(JpaPollingChannelAdapter adapter) {
return IntegrationFlow
.from(adapter, e -> e.poller(Pollers.fixedDelay(10000)))
.channel("outputChannel")
.get();
}<int-jpa:inbound-channel-adapter
id="jpaInboundAdapter"
entity-manager-factory="entityManagerFactory"
channel="outputChannel"
entity-class="com.example.Student"
max-results="100"
delete-after-poll="true"
delete-in-batch="true"
flush="true">
<int:poller fixed-delay="10000"/>
</int-jpa:inbound-channel-adapter>With JPQL query:
<int-jpa:inbound-channel-adapter
entity-manager-factory="entityManagerFactory"
channel="outputChannel"
jpa-query="SELECT s FROM Student s WHERE s.status = 'ACTIVE'"
max-results="50">
<int:poller fixed-rate="30000"/>
</int-jpa:inbound-channel-adapter>With named query and parameters:
<int-jpa:inbound-channel-adapter
entity-manager-factory="entityManagerFactory"
channel="outputChannel"
named-query="Student.findByGrade">
<int-jpa:parameter name="grade" value="A"/>
<int:poller fixed-delay="15000"/>
</int-jpa:inbound-channel-adapter>List<?> containing retrieved entities (unless expectSingleResult is true)null when no results foundThe inbound adapter participates in transactions. When used with a transactional poller, the poll and optional delete operations are executed within the same transaction. Configure a transaction interceptor on the poller for transactional behavior:
@Bean
public IntegrationFlow transactionalPollFlow(
EntityManagerFactory entityManagerFactory,
PlatformTransactionManager transactionManager) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.entityClass(Order.class)
.deleteAfterPoll(true),
e -> e.poller(Pollers.fixedDelay(5000)
.transactional(transactionManager)))
.channel("orderChannel")
.get();
}If a JPA operation fails, a JpaOperationFailedException is thrown. Configure error handling on the poller or in the flow:
@Bean
public IntegrationFlow errorHandlingPollFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory)
.jpaQuery("SELECT s FROM Student s"),
e -> e.poller(Pollers.fixedDelay(10000)
.errorChannel("jpaErrorChannel")))
.channel("studentChannel")
.get();
}
@ServiceActivator(inputChannel = "jpaErrorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable cause = errorMessage.getPayload().getCause();
if (cause instanceof JpaOperationFailedException) {
JpaOperationFailedException jpaEx = (JpaOperationFailedException) cause;
logger.error("JPA query failed: {}", jpaEx.getOffendingJPAQl(), jpaEx);
}
}