or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

inbound-adapter.mdindex.mdjpa-executor.mdjpa-operations.mdoutbound-adapter.mdparameter-sources.mdretrieving-gateway.mdupdating-gateway.mdxml-configuration.md
tile.json

inbound-adapter.mddocs/

Inbound Channel Adapter

The JPA Inbound Channel Adapter polls a database using JPA queries and produces messages containing the retrieved entities. It's ideal for scheduled database polling scenarios where you want to retrieve data periodically and process it in a Spring Integration flow.

Key Information for Agents

Required Dependencies:

  • spring-integration-jpa (this package)
  • spring-integration-core is required
  • EntityManagerFactory or EntityManager bean must be configured
  • Poller must be configured (fixed-delay, fixed-rate, or cron)

Default Behaviors:

  • expectSingleResult=false (returns List<?>)
  • maxResults=unlimited (retrieves all matching entities)
  • deleteAfterPoll=false (entities not deleted after retrieval)
  • deleteInBatch=false (individual deletes if deleteAfterPoll=true)
  • flushAfterDelete=false (no flush after deletion)
  • Returns null when no results found (not empty list)
  • Returns single entity when expectSingleResult=true and one result found
  • Returns List<?> when expectSingleResult=false (even if single result)

Threading Model:

  • Executes on poller thread (configurable via poller configuration)
  • Poller can be transactional (operations within transaction boundary)
  • For async processing, use async channels (ExecutorChannel, QueueChannel)
  • Thread-safe when using EntityManagerFactory (creates EntityManager per poll)

Lifecycle:

  • Starts polling when Spring context is active
  • Stops polling when context is destroyed
  • Can be controlled via auto-startup attribute (default: true)
  • Poller lifecycle managed by Spring Integration poller infrastructure

Exceptions:

  • JpaOperationFailedException - JPA query execution failures (contains offending query)
  • PersistenceException - General JPA persistence errors
  • NoResultException - When expectSingleResult=true and no results found
  • NonUniqueResultException - When expectSingleResult=true and multiple results found
  • IllegalArgumentException - Invalid configuration (e.g., multiple query types set)

Edge Cases:

  • Exactly one query type must be configured: entityClass, jpaQuery, nativeQuery, or namedQuery
  • Native queries require entityClass for result type mapping
  • Named queries must be defined on entity classes with @NamedQuery annotation
  • deleteAfterPoll=true with deleteInBatch=true is more efficient for large batches
  • flushAfterDelete=true ensures deletions are immediately persisted
  • maxResultsExpression allows dynamic limit based on message context (evaluated per poll)
  • When expectSingleResult=true and query returns multiple results, throws NonUniqueResultException
  • When expectSingleResult=true and query returns no results, returns null (not empty list)
  • Parameter sources can be static (via parameterSource) or dynamic (via expressions)
  • Poller must be configured or adapter won't poll (use e -> e.poller(...) in DSL)

Core Class

class JpaPollingChannelAdapter extends AbstractMessageSource<Object> {
    JpaPollingChannelAdapter(JpaExecutor jpaExecutor)
    String getComponentType()
}

Java DSL Configuration

Factory Methods

static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(EntityManagerFactory entityManagerFactory)
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(EntityManager entityManager)
static JpaInboundChannelAdapterSpec Jpa.inboundAdapter(JpaOperations jpaOperations)

Configuration Methods

class JpaInboundChannelAdapterSpec {
    // Query configuration
    JpaInboundChannelAdapterSpec entityClass(Class<?> entityClass)
    JpaInboundChannelAdapterSpec jpaQuery(String jpaQuery)
    JpaInboundChannelAdapterSpec nativeQuery(String nativeQuery)
    JpaInboundChannelAdapterSpec namedQuery(String namedQuery)

    // Result configuration
    JpaInboundChannelAdapterSpec expectSingleResult(boolean expectSingleResult)
    JpaInboundChannelAdapterSpec maxResults(int maxResults)
    JpaInboundChannelAdapterSpec maxResultsExpression(String maxResultsExpression)
    JpaInboundChannelAdapterSpec maxResultsExpression(Expression maxResultsExpression)

    // Delete after poll configuration
    JpaInboundChannelAdapterSpec deleteAfterPoll(boolean deleteAfterPoll)
    JpaInboundChannelAdapterSpec deleteInBatch(boolean deleteInBatch)
    JpaInboundChannelAdapterSpec flushAfterDelete(boolean flush)

    // Parameter configuration
    JpaInboundChannelAdapterSpec parameterSource(ParameterSource parameterSource)
}

Usage Examples

Poll All Entities of a Class

@Bean
public IntegrationFlow pollStudentsFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .entityClass(Student.class)
                .maxResults(100),
            e -> e.poller(Pollers.fixedDelay(10000)))
        .channel("studentChannel")
        .get();
}

Poll with JPQL Query

@Bean
public IntegrationFlow pollActiveStudentsFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .jpaQuery("SELECT s FROM Student s WHERE s.status = 'ACTIVE'")
                .maxResults(50),
            e -> e.poller(Pollers.fixedRate(30000)))
        .channel("activeStudentChannel")
        .get();
}

Poll with Native SQL Query

@Bean
public IntegrationFlow pollWithNativeQueryFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .nativeQuery("SELECT * FROM students WHERE registration_date > CURRENT_DATE - 7")
                .entityClass(Student.class),
            e -> e.poller(Pollers.fixedDelay(60000)))
        .channel("recentStudentChannel")
        .get();
}

Poll with Named Query

// Assuming @NamedQuery defined on Student entity:
// @NamedQuery(name = "Student.findByGrade", query = "SELECT s FROM Student s WHERE s.grade = :grade")

@Bean
public IntegrationFlow pollByGradeFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .namedQuery("Student.findByGrade")
                .parameterSource(new BeanPropertyParameterSource(
                    Map.of("grade", "A"))),
            e -> e.poller(Pollers.fixedDelay(15000)))
        .channel("gradeAStudentChannel")
        .get();
}

Poll and Delete Entities

@Bean
public IntegrationFlow pollAndDeleteFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .entityClass(ProcessedOrder.class)
                .jpaQuery("SELECT o FROM ProcessedOrder o WHERE o.processed = true")
                .deleteAfterPoll(true)
                .maxResults(100),
            e -> e.poller(Pollers.fixedDelay(5000)))
        .handle(message -> {
            // Process the order
            ProcessedOrder order = (ProcessedOrder) message.getPayload();
            System.out.println("Archiving order: " + order.getId());
        })
        .get();
}

Poll and Delete in Batch

@Bean
public IntegrationFlow pollAndBatchDeleteFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .jpaQuery("SELECT e FROM Event e WHERE e.processed = true")
                .deleteAfterPoll(true)
                .deleteInBatch(true)  // More efficient for large batches
                .flushAfterDelete(true)
                .maxResults(500),
            e -> e.poller(Pollers.fixedDelay(10000)))
        .channel("eventArchiveChannel")
        .get();
}

Poll Single Entity

@Bean
public IntegrationFlow pollSingleConfigFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .jpaQuery("SELECT c FROM Configuration c WHERE c.active = true")
                .expectSingleResult(true),
            e -> e.poller(Pollers.fixedRate(60000)))
        .channel("configChannel")
        .get();
}

Dynamic Max Results Using Expression

@Bean
public IntegrationFlow dynamicMaxResultsFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .entityClass(Task.class)
                .maxResultsExpression("@systemConfig.batchSize"),  // References Spring bean
            e -> e.poller(Pollers.fixedDelay(5000)))
        .channel("taskChannel")
        .get();
}

Programmatic Configuration

@Bean
public JpaPollingChannelAdapter jpaInboundAdapter(EntityManagerFactory entityManagerFactory) {
    JpaExecutor executor = new JpaExecutor(entityManagerFactory);
    executor.setEntityClass(Student.class);
    executor.setMaxNumberOfResults(100);
    executor.setDeleteAfterPoll(true);
    executor.setDeleteInBatch(true);

    return new JpaPollingChannelAdapter(executor);
}

@Bean
public IntegrationFlow inboundFlow(JpaPollingChannelAdapter adapter) {
    return IntegrationFlow
        .from(adapter, e -> e.poller(Pollers.fixedDelay(10000)))
        .channel("outputChannel")
        .get();
}

XML Configuration

<int-jpa:inbound-channel-adapter
    id="jpaInboundAdapter"
    entity-manager-factory="entityManagerFactory"
    channel="outputChannel"
    entity-class="com.example.Student"
    max-results="100"
    delete-after-poll="true"
    delete-in-batch="true"
    flush="true">
    <int:poller fixed-delay="10000"/>
</int-jpa:inbound-channel-adapter>

With JPQL query:

<int-jpa:inbound-channel-adapter
    entity-manager-factory="entityManagerFactory"
    channel="outputChannel"
    jpa-query="SELECT s FROM Student s WHERE s.status = 'ACTIVE'"
    max-results="50">
    <int:poller fixed-rate="30000"/>
</int-jpa:inbound-channel-adapter>

With named query and parameters:

<int-jpa:inbound-channel-adapter
    entity-manager-factory="entityManagerFactory"
    channel="outputChannel"
    named-query="Student.findByGrade">
    <int-jpa:parameter name="grade" value="A"/>
    <int:poller fixed-delay="15000"/>
</int-jpa:inbound-channel-adapter>

Configuration Options

Query Type (choose one)

  • entityClass - Poll all entities of specified class
  • jpaQuery - JPQL SELECT query
  • nativeQuery - Native SQL query (requires entityClass for result mapping)
  • namedQuery - Named query defined on entity

Result Configuration

  • expectSingleResult - If true, expects and returns single entity instead of list. Throws exception if multiple results found. Default: false
  • maxResults - Maximum number of entities to retrieve per poll. Default: unlimited
  • maxResultsExpression - SpEL expression to dynamically determine max results

Delete After Poll

  • deleteAfterPoll - If true, deletes entities after successful polling. Default: false
  • deleteInBatch - If true with deleteAfterPoll, uses batch delete (more efficient). Default: false
  • flushAfterDelete - If true, flushes EntityManager after deletion. Default: false

Parameter Configuration

  • parameterSource - ParameterSource providing query parameter values

Return Value

  • Returns List<?> containing retrieved entities (unless expectSingleResult is true)
  • Returns single entity when expectSingleResult is true
  • Returns null when no results found

Transaction Management

The inbound adapter participates in transactions. When used with a transactional poller, the poll and optional delete operations are executed within the same transaction. Configure a transaction interceptor on the poller for transactional behavior:

@Bean
public IntegrationFlow transactionalPollFlow(
        EntityManagerFactory entityManagerFactory,
        PlatformTransactionManager transactionManager) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .entityClass(Order.class)
                .deleteAfterPoll(true),
            e -> e.poller(Pollers.fixedDelay(5000)
                .transactional(transactionManager)))
        .channel("orderChannel")
        .get();
}

Error Handling

If a JPA operation fails, a JpaOperationFailedException is thrown. Configure error handling on the poller or in the flow:

@Bean
public IntegrationFlow errorHandlingPollFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlow
        .from(Jpa.inboundAdapter(entityManagerFactory)
                .jpaQuery("SELECT s FROM Student s"),
            e -> e.poller(Pollers.fixedDelay(10000)
                .errorChannel("jpaErrorChannel")))
        .channel("studentChannel")
        .get();
}

@ServiceActivator(inputChannel = "jpaErrorChannel")
public void handleError(ErrorMessage errorMessage) {
    Throwable cause = errorMessage.getPayload().getCause();
    if (cause instanceof JpaOperationFailedException) {
        JpaOperationFailedException jpaEx = (JpaOperationFailedException) cause;
        logger.error("JPA query failed: {}", jpaEx.getOffendingJPAQl(), jpaEx);
    }
}