CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-rag-core

RAG (Retrieval-Augmented Generation) framework for the Embabel Agent platform providing content ingestion, chunking, hierarchical navigation, and semantic search capabilities

Overview
Eval results
Files

content-refresh-policies.mddocs/advanced/

Advanced Content Refresh Policies

Control when content should be re-ingested with support for time-based, conditional, and custom refresh strategies. This guide covers advanced patterns for managing content lifecycle in production RAG systems.

Overview

Content refresh policies determine when documents should be re-ingested, balancing data freshness with computational cost. The framework provides built-in policies for common scenarios and extensibility for custom refresh logic.

Core Architecture

ContentRefreshPolicy Interface

The base interface defines the contract for all refresh policy implementations.

interface ContentRefreshPolicy {
    /**
     * Check if content at URI should be re-read
     * @param repository Repository to check
     * @param rootUri URI of content root
     * @return true if content should be refreshed
     */
    fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean

    /**
     * Check if document should be refreshed
     * @param repository Repository to check
     * @param root Document to potentially refresh
     * @return true if document should be refreshed
     */
    fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean

    /**
     * Ingest URI if refresh policy determines it's needed
     * @param repository Target repository
     * @param hierarchicalContentReader Reader for parsing content
     * @param rootUri URI to potentially ingest
     * @return Parsed document if ingested, null if skipped
     */
    fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument?
}

The interface separates three concerns:

  • URI-based refresh decisions (shouldReread)
  • Document-based refresh decisions (shouldRefreshDocument)
  • Integrated refresh execution (ingestUriIfNeeded)

Built-in Policy Implementations

AlwaysRefreshContentRefreshPolicy

Always refreshes content on every request. Useful for development, testing, or volatile data sources.

object AlwaysRefreshContentRefreshPolicy : ContentRefreshPolicy {
    /**
     * Always returns true
     */
    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean

    /**
     * Always returns true
     */
    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean

    /**
     * Always ingests the URI
     */
    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument?
}

Use Cases:

  • Development and debugging
  • Real-time data streams
  • Critical data that changes frequently
  • Testing refresh pipelines

Performance Considerations:

  • Highest computational cost
  • Bypasses all caching
  • May cause rate limiting issues with external sources
  • Not recommended for production without rate limiting

NeverRefreshExistingDocumentContentPolicy

Never refreshes content once it exists. Optimal for immutable or archival content.

object NeverRefreshExistingDocumentContentPolicy : ContentRefreshPolicy {
    /**
     * Returns false if document exists
     */
    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean

    /**
     * Always returns false for existing documents
     */
    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean

    /**
     * Only ingests if document doesn't exist
     */
    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument?
}

Use Cases:

  • Historical archives
  • Static documentation
  • Immutable content (versioned releases)
  • Minimizing ingestion costs

Performance Considerations:

  • Lowest computational cost
  • Single repository lookup per check
  • Ideal for large-scale systems with stable content

TtlContentRefreshPolicy

Time-To-Live based refresh policy. Refreshes content after a specified duration.

class TtlContentRefreshPolicy(
    /**
     * Time-to-live duration for content
     */
    val ttl: Duration
) : ContentRefreshPolicy {

    /**
     * Returns true if content is older than TTL
     */
    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean

    /**
     * Returns true if document timestamp exceeds TTL
     */
    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean

    /**
     * Ingests if content is stale based on TTL
     */
    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument?
}

Use Cases:

  • Periodically updated content (news, blogs)
  • Documentation with known update schedules
  • Balancing freshness and cost
  • Configurable refresh windows

Performance Considerations:

  • Moderate computational cost
  • Requires timestamp comparison per check
  • TTL selection impacts freshness vs. cost tradeoff
  • Consider content update frequency when setting TTL

TTL Guidelines:

  • Minutes (5-30): Rapidly changing content
  • Hours (1-24): Daily updates, news feeds
  • Days (1-7): Weekly updates, blogs
  • Weeks (1-4): Monthly updates, stable documentation
  • Months (1-12): Rarely updated archives

UrlSpecificContentRefreshPolicy

Apply different policies based on URL patterns. Enables fine-grained control over refresh behavior.

class UrlSpecificContentRefreshPolicy(
    /**
     * Predicate function that selects policy based on URI
     */
    val policySelector: (String) -> ContentRefreshPolicy
) : ContentRefreshPolicy {

    /**
     * Apply policy based on URL pattern
     */
    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean

    /**
     * Apply policy based on document URI pattern
     */
    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean

    /**
     * Ingest based on URL-specific policy
     */
    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument?
}

Use Cases:

  • Mixed content sources with different refresh needs
  • Domain-specific refresh strategies
  • Combining multiple TTL values
  • Priority-based refresh scheduling

Performance Considerations:

  • Policy selection overhead is minimal
  • Overall performance depends on selected policies
  • Cache policy selector if pattern matching is expensive

Advanced Patterns

Business Hours Refresh Policy

Only refresh content during specific time windows.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.time.Duration
import java.time.Instant
import java.time.LocalTime

class BusinessHoursRefreshPolicy(
    private val ttl: Duration = Duration.ofHours(8),
    private val startHour: Int = 9,
    private val endHour: Int = 17
) : ContentRefreshPolicy {

    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean {
        val root = repository.findContentRootByUri(rootUri) ?: return true

        // Only refresh during business hours
        val now = Instant.now()
        val hour = LocalTime.now().hour

        if (hour !in startHour..endHour) {
            return false
        }

        // During business hours, use TTL
        val age = Duration.between(root.ingestionTimestamp, now)
        return age > ttl
    }

    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean {
        return shouldReread(repository, root.uri)
    }

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        if (!shouldReread(repository, rootUri)) {
            return null
        }

        repository.deleteRootAndDescendants(rootUri)
        val document = hierarchicalContentReader.parseUrl(rootUri)
        repository.writeAndChunkDocument(document)
        return document
    }
}

// Use with timezone awareness
val policy = BusinessHoursRefreshPolicy(
    ttl = Duration.ofHours(4),
    startHour = 9,
    endHour = 17
)

Use Cases:

  • Minimize load on production systems during peak hours
  • Schedule refresh during low-traffic periods
  • Comply with SLA requirements for source systems
  • Coordinate with external API rate limits

Conditional Refresh Policy

Refresh based on multiple conditions that must all be satisfied.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*

class ConditionalRefreshPolicy(
    private val conditions: List<(ChunkingContentElementRepository, String) -> Boolean>
) : ContentRefreshPolicy {

    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean {
        // All conditions must be true
        return conditions.all { condition ->
            condition(repository, rootUri)
        }
    }

    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean {
        return shouldReread(repository, root.uri)
    }

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        if (!shouldReread(repository, rootUri)) {
            return null
        }

        repository.deleteRootAndDescendants(rootUri)
        val document = hierarchicalContentReader.parseUrl(rootUri)
        repository.writeAndChunkDocument(document)
        return document
    }
}

// Use with multiple conditions
val policy = ConditionalRefreshPolicy(
    conditions = listOf(
        // Condition 1: Content exists
        { repo, uri -> repo.existsRootWithUri(uri) },

        // Condition 2: Content is old enough
        { repo, uri ->
            val root = repo.findContentRootByUri(uri)
            root?.let {
                Duration.between(it.ingestionTimestamp, Instant.now()) > Duration.ofDays(1)
            } ?: false
        },

        // Condition 3: During allowed time window
        { _, _ ->
            val hour = LocalTime.now().hour
            hour in 9..17
        },

        // Condition 4: Not flagged as immutable
        { repo, uri ->
            val root = repo.findContentRootByUri(uri)
            root?.metadata?.get("immutable") != true
        }
    )
)

Use Cases:

  • Complex refresh logic with multiple constraints
  • Compliance requirements (time windows, approval flags)
  • Cost optimization (only refresh when necessary)
  • Staged rollout of refresh policies

Metadata-Based Refresh Policy

Use document metadata to control refresh behavior.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.time.Duration
import java.time.Instant

class MetadataRefreshPolicy : ContentRefreshPolicy {

    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean {
        val root = repository.findContentRootByUri(rootUri) ?: return true

        // Check for force refresh flag
        if (root.metadata["force_refresh"] == true) {
            return true
        }

        // Check for do not refresh flag
        if (root.metadata["no_refresh"] == true) {
            return false
        }

        // Check priority-based TTL
        val priority = root.metadata["priority"] as? String
        val ttl = when (priority) {
            "critical" -> Duration.ofHours(1)
            "high" -> Duration.ofHours(6)
            "normal" -> Duration.ofDays(1)
            "low" -> Duration.ofDays(7)
            else -> Duration.ofDays(1)
        }

        // Check custom TTL in metadata
        val customTtl = root.metadata["refresh_ttl_hours"] as? Int
        val effectiveTtl = customTtl?.let { Duration.ofHours(it.toLong()) } ?: ttl

        val age = Duration.between(root.ingestionTimestamp, Instant.now())
        return age > effectiveTtl
    }

    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean {
        return shouldReread(repository, root.uri)
    }

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        if (!shouldReread(repository, rootUri)) {
            return null
        }

        repository.deleteRootAndDescendants(rootUri)
        val document = hierarchicalContentReader.parseUrl(rootUri)
        repository.writeAndChunkDocument(document)
        return document
    }
}

Metadata Flags:

  • force_refresh: Boolean, forces immediate refresh
  • no_refresh: Boolean, prevents any refresh
  • priority: String (critical, high, normal, low), determines TTL
  • refresh_ttl_hours: Int, custom TTL in hours
  • immutable: Boolean, marks content as never changing

External Trigger Policy

Refresh based on external events or webhooks.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.util.concurrent.ConcurrentHashMap

class ExternalTriggerRefreshPolicy : ContentRefreshPolicy {
    private val refreshFlags = ConcurrentHashMap<String, Boolean>()
    private val refreshReasons = ConcurrentHashMap<String, String>()

    /**
     * Trigger refresh for specific URI
     */
    fun triggerRefresh(uri: String, reason: String = "external trigger") {
        refreshFlags[uri] = true
        refreshReasons[uri] = reason
    }

    /**
     * Clear refresh flag
     */
    fun clearRefresh(uri: String) {
        refreshFlags.remove(uri)
        refreshReasons.remove(uri)
    }

    /**
     * Get refresh reason
     */
    fun getRefreshReason(uri: String): String? {
        return refreshReasons[uri]
    }

    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean {
        return refreshFlags[rootUri] == true
    }

    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean {
        return shouldReread(repository, root.uri)
    }

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        if (!shouldReread(repository, rootUri)) {
            return null
        }

        val reason = getRefreshReason(rootUri)
        println("Refreshing $rootUri: $reason")

        // Clear flag after refreshing
        clearRefresh(rootUri)

        repository.deleteRootAndDescendants(rootUri)
        val document = hierarchicalContentReader.parseUrl(rootUri)
        repository.writeAndChunkDocument(document)
        return document
    }
}

// Use with webhook integration
val policy = ExternalTriggerRefreshPolicy()

// Webhook handler
fun onContentUpdatedWebhook(uri: String, updateType: String) {
    policy.triggerRefresh(uri, "Webhook update: $updateType")
}

// Git hook integration
fun onGitCommit(affectedFiles: List<String>) {
    affectedFiles.forEach { file ->
        val uri = "file:///${file}"
        policy.triggerRefresh(uri, "Git commit")
    }
}

// Refresh will happen on next check
policy.ingestUriIfNeeded(repository, reader, uri)

Use Cases:

  • Webhook-based refresh (CMS updates, Git commits)
  • Manual refresh triggers via admin interface
  • Event-driven architectures
  • Coordinated refresh across distributed systems

Composite Refresh Policy

Combine multiple policies with configurable logic.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*

class CompositeRefreshPolicy(
    private val policies: List<ContentRefreshPolicy>,
    private val mode: Mode = Mode.ANY
) : ContentRefreshPolicy {

    enum class Mode {
        ANY,  // Refresh if any policy says yes
        ALL   // Refresh only if all policies say yes
    }

    override fun shouldReread(
        repository: ChunkingContentElementRepository,
        rootUri: String
    ): Boolean {
        return when (mode) {
            Mode.ANY -> policies.any { it.shouldReread(repository, rootUri) }
            Mode.ALL -> policies.all { it.shouldReread(repository, rootUri) }
        }
    }

    override fun shouldRefreshDocument(
        repository: ChunkingContentElementRepository,
        root: NavigableDocument
    ): Boolean {
        return shouldReread(repository, root.uri)
    }

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        if (!shouldReread(repository, rootUri)) {
            return null
        }

        repository.deleteRootAndDescendants(rootUri)
        val document = hierarchicalContentReader.parseUrl(rootUri)
        repository.writeAndChunkDocument(document)
        return document
    }
}

// Combine policies with OR logic
val anyPolicy = CompositeRefreshPolicy(
    policies = listOf(
        TtlContentRefreshPolicy(Duration.ofDays(1)),
        ExternalTriggerRefreshPolicy()
    ),
    mode = CompositeRefreshPolicy.Mode.ANY
)

// Combine policies with AND logic
val allPolicy = CompositeRefreshPolicy(
    policies = listOf(
        TtlContentRefreshPolicy(Duration.ofDays(1)),
        BusinessHoursRefreshPolicy()
    ),
    mode = CompositeRefreshPolicy.Mode.ALL
)

Production Patterns

Scheduled Refresh Service

Background service for periodic content refresh.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.ingestion.policy.*
import java.util.concurrent.*
import java.time.Duration

class ScheduledRefreshService(
    private val repository: ChunkingContentElementRepository,
    private val reader: HierarchicalContentReader,
    private val policy: ContentRefreshPolicy,
    private val uris: List<String>,
    private val threadPoolSize: Int = 4
) {
    private val scheduler = Executors.newScheduledThreadPool(threadPoolSize)
    private val metrics = RefreshMetrics()

    fun start(periodMinutes: Long) {
        scheduler.scheduleAtFixedRate(
            { refreshAll() },
            0,
            periodMinutes,
            TimeUnit.MINUTES
        )
    }

    fun stop() {
        scheduler.shutdown()
        scheduler.awaitTermination(30, TimeUnit.SECONDS)
    }

    private fun refreshAll() {
        val startTime = System.currentTimeMillis()
        println("Starting scheduled refresh at ${Instant.now()}")

        val results = uris.map { uri ->
            CompletableFuture.supplyAsync({
                refreshUri(uri)
            }, scheduler)
        }

        CompletableFuture.allOf(*results.toTypedArray()).join()

        val duration = System.currentTimeMillis() - startTime
        metrics.recordRefreshCycle(duration, results.size)
        println("Scheduled refresh complete in ${duration}ms")
    }

    private fun refreshUri(uri: String): RefreshResult {
        return try {
            val startTime = System.currentTimeMillis()
            val document = policy.ingestUriIfNeeded(repository, reader, uri)
            val duration = System.currentTimeMillis() - startTime

            if (document != null) {
                metrics.recordSuccess(uri, duration)
                RefreshResult.Success(uri, duration)
            } else {
                metrics.recordSkipped(uri)
                RefreshResult.Skipped(uri)
            }
        } catch (e: Exception) {
            metrics.recordFailure(uri, e)
            println("Error refreshing $uri: ${e.message}")
            RefreshResult.Failure(uri, e)
        }
    }

    data class RefreshMetrics(
        var totalCycles: Int = 0,
        var totalDuration: Long = 0,
        var successCount: Int = 0,
        var skippedCount: Int = 0,
        var failureCount: Int = 0
    ) {
        fun recordRefreshCycle(duration: Long, uriCount: Int) {
            totalCycles++
            totalDuration += duration
        }

        fun recordSuccess(uri: String, duration: Long) {
            successCount++
        }

        fun recordSkipped(uri: String) {
            skippedCount++
        }

        fun recordFailure(uri: String, error: Exception) {
            failureCount++
        }

        fun getStats(): Map<String, Any> {
            return mapOf(
                "total_cycles" to totalCycles,
                "avg_cycle_duration_ms" to if (totalCycles > 0) totalDuration / totalCycles else 0,
                "success_count" to successCount,
                "skipped_count" to skippedCount,
                "failure_count" to failureCount,
                "success_rate" to if (successCount + failureCount > 0) {
                    successCount.toDouble() / (successCount + failureCount)
                } else 0.0
            )
        }
    }

    sealed class RefreshResult {
        data class Success(val uri: String, val duration: Long) : RefreshResult()
        data class Skipped(val uri: String) : RefreshResult()
        data class Failure(val uri: String, val error: Exception) : RefreshResult()
    }
}

// Use scheduled refresh
val service = ScheduledRefreshService(
    repository = repository,
    reader = reader,
    policy = TtlContentRefreshPolicy(Duration.ofHours(1)),
    uris = listOf(
        "https://example.com/docs/guide1.html",
        "https://example.com/docs/guide2.html"
    ),
    threadPoolSize = 8
)

// Check every 30 minutes
service.start(periodMinutes = 30)

// Later, stop the service
Runtime.getRuntime().addShutdownHook(Thread {
    service.stop()
})

Batch Refresh with Error Handling

Robust batch refresh implementation with retry logic.

import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.ingestion.policy.*
import com.embabel.agent.rag.store.*
import kotlin.math.min

class BatchRefreshExecutor(
    private val repository: ChunkingContentElementRepository,
    private val reader: HierarchicalContentReader,
    private val policy: ContentRefreshPolicy,
    private val maxRetries: Int = 3,
    private val retryDelayMs: Long = 1000
) {
    fun refreshAll(uris: List<String>): BatchRefreshResult {
        val results = mutableMapOf<String, RefreshStatus>()
        val errors = mutableMapOf<String, List<Exception>>()

        uris.forEach { uri ->
            val (status, errorList) = refreshWithRetry(uri)
            results[uri] = status
            if (errorList.isNotEmpty()) {
                errors[uri] = errorList
            }
        }

        return BatchRefreshResult(
            total = uris.size,
            refreshed = results.count { it.value == RefreshStatus.REFRESHED },
            skipped = results.count { it.value == RefreshStatus.SKIPPED },
            failed = results.count { it.value == RefreshStatus.FAILED },
            results = results,
            errors = errors
        )
    }

    private fun refreshWithRetry(uri: String): Pair<RefreshStatus, List<Exception>> {
        val errors = mutableListOf<Exception>()
        var attempts = 0

        while (attempts < maxRetries) {
            try {
                val document = policy.ingestUriIfNeeded(repository, reader, uri)
                return if (document != null) {
                    println("Refreshed: $uri (attempt ${attempts + 1})")
                    RefreshStatus.REFRESHED to errors
                } else {
                    RefreshStatus.SKIPPED to errors
                }
            } catch (e: Exception) {
                attempts++
                errors.add(e)
                println("Error refreshing $uri (attempt $attempts): ${e.message}")

                if (attempts < maxRetries) {
                    val delay = retryDelayMs * attempts
                    Thread.sleep(delay)
                }
            }
        }

        return RefreshStatus.FAILED to errors
    }

    enum class RefreshStatus {
        REFRESHED, SKIPPED, FAILED
    }

    data class BatchRefreshResult(
        val total: Int,
        val refreshed: Int,
        val skipped: Int,
        val failed: Int,
        val results: Map<String, RefreshStatus>,
        val errors: Map<String, List<Exception>>
    ) {
        fun printSummary() {
            println("=== Batch Refresh Summary ===")
            println("Total: $total")
            println("Refreshed: $refreshed")
            println("Skipped: $skipped")
            println("Failed: $failed")
            println("Success rate: ${"%.2f".format(refreshed.toDouble() / total * 100)}%")

            if (errors.isNotEmpty()) {
                println("\n=== Errors ===")
                errors.forEach { (uri, errorList) ->
                    println("$uri:")
                    errorList.forEach { error ->
                        println("  - ${error.message}")
                    }
                }
            }
        }
    }
}

// Use batch refresh
val executor = BatchRefreshExecutor(
    repository = repository,
    reader = reader,
    policy = TtlContentRefreshPolicy(Duration.ofDays(1)),
    maxRetries = 3,
    retryDelayMs = 1000
)

val uris = listOf(
    "https://example.com/docs/guide1.html",
    "https://example.com/docs/guide2.html",
    "https://example.com/docs/guide3.html"
)

val result = executor.refreshAll(uris)
result.printSummary()

Edge Cases and Considerations

Concurrent Access

When multiple processes may refresh content simultaneously:

import java.util.concurrent.locks.ReentrantLock

class LockingRefreshPolicy(
    private val delegate: ContentRefreshPolicy
) : ContentRefreshPolicy by delegate {
    private val locks = ConcurrentHashMap<String, ReentrantLock>()

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        val lock = locks.computeIfAbsent(rootUri) { ReentrantLock() }

        return if (lock.tryLock()) {
            try {
                delegate.ingestUriIfNeeded(repository, hierarchicalContentReader, rootUri)
            } finally {
                lock.unlock()
            }
        } else {
            null // Another process is refreshing
        }
    }
}

Network Failures

Handle transient network issues gracefully:

class ResilientRefreshPolicy(
    private val delegate: ContentRefreshPolicy,
    private val maxRetries: Int = 3,
    private val exponentialBackoff: Boolean = true
) : ContentRefreshPolicy by delegate {

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        var lastException: Exception? = null

        repeat(maxRetries) { attempt ->
            try {
                return delegate.ingestUriIfNeeded(repository, hierarchicalContentReader, rootUri)
            } catch (e: Exception) {
                lastException = e
                if (attempt < maxRetries - 1) {
                    val delay = if (exponentialBackoff) {
                        (1000L * (1 shl attempt))
                    } else {
                        1000L
                    }
                    Thread.sleep(delay)
                }
            }
        }

        throw lastException ?: RuntimeException("Refresh failed")
    }
}

Rate Limiting

Respect API rate limits when refreshing external content:

import java.time.Instant
import java.util.concurrent.Semaphore

class RateLimitedRefreshPolicy(
    private val delegate: ContentRefreshPolicy,
    private val requestsPerSecond: Int = 10
) : ContentRefreshPolicy by delegate {
    private val semaphore = Semaphore(requestsPerSecond)
    private val resetScheduler = Executors.newScheduledThreadPool(1)

    init {
        resetScheduler.scheduleAtFixedRate({
            semaphore.release(requestsPerSecond - semaphore.availablePermits())
        }, 1, 1, TimeUnit.SECONDS)
    }

    override fun ingestUriIfNeeded(
        repository: ChunkingContentElementRepository,
        hierarchicalContentReader: HierarchicalContentReader,
        rootUri: String
    ): NavigableDocument? {
        semaphore.acquire()
        return delegate.ingestUriIfNeeded(repository, hierarchicalContentReader, rootUri)
    }
}

See Also

  • Basic RAG Pipeline - Complete ingestion patterns
  • Repository Operations - Storage APIs
  • Chunk Transformation - Enriching chunks during refresh
tessl i tessl/maven-com-embabel-agent--embabel-agent-rag-core@0.3.1

docs

advanced

architecture.md

content-refresh-policies.md

custom-transformers.md

spring-ai-integration.md

index.md

README.md

tile.json