RAG (Retrieval-Augmented Generation) framework for the Embabel Agent platform providing content ingestion, chunking, hierarchical navigation, and semantic search capabilities
Control when content should be re-ingested with support for time-based, conditional, and custom refresh strategies. This guide covers advanced patterns for managing content lifecycle in production RAG systems.
Content refresh policies determine when documents should be re-ingested, balancing data freshness with computational cost. The framework provides built-in policies for common scenarios and extensibility for custom refresh logic.
The base interface defines the contract for all refresh policy implementations.
interface ContentRefreshPolicy {
/**
* Check if content at URI should be re-read
* @param repository Repository to check
* @param rootUri URI of content root
* @return true if content should be refreshed
*/
fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean
/**
* Check if document should be refreshed
* @param repository Repository to check
* @param root Document to potentially refresh
* @return true if document should be refreshed
*/
fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean
/**
* Ingest URI if refresh policy determines it's needed
* @param repository Target repository
* @param hierarchicalContentReader Reader for parsing content
* @param rootUri URI to potentially ingest
* @return Parsed document if ingested, null if skipped
*/
fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument?
}The interface separates three concerns:
shouldReread)shouldRefreshDocument)ingestUriIfNeeded)Always refreshes content on every request. Useful for development, testing, or volatile data sources.
object AlwaysRefreshContentRefreshPolicy : ContentRefreshPolicy {
/**
* Always returns true
*/
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean
/**
* Always returns true
*/
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean
/**
* Always ingests the URI
*/
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument?
}Use Cases:
Performance Considerations:
Never refreshes content once it exists. Optimal for immutable or archival content.
object NeverRefreshExistingDocumentContentPolicy : ContentRefreshPolicy {
/**
* Returns false if document exists
*/
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean
/**
* Always returns false for existing documents
*/
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean
/**
* Only ingests if document doesn't exist
*/
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument?
}Use Cases:
Performance Considerations:
Time-To-Live based refresh policy. Refreshes content after a specified duration.
class TtlContentRefreshPolicy(
/**
* Time-to-live duration for content
*/
val ttl: Duration
) : ContentRefreshPolicy {
/**
* Returns true if content is older than TTL
*/
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean
/**
* Returns true if document timestamp exceeds TTL
*/
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean
/**
* Ingests if content is stale based on TTL
*/
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument?
}Use Cases:
Performance Considerations:
TTL Guidelines:
Apply different policies based on URL patterns. Enables fine-grained control over refresh behavior.
class UrlSpecificContentRefreshPolicy(
/**
* Predicate function that selects policy based on URI
*/
val policySelector: (String) -> ContentRefreshPolicy
) : ContentRefreshPolicy {
/**
* Apply policy based on URL pattern
*/
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean
/**
* Apply policy based on document URI pattern
*/
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean
/**
* Ingest based on URL-specific policy
*/
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument?
}Use Cases:
Performance Considerations:
Only refresh content during specific time windows.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.time.Duration
import java.time.Instant
import java.time.LocalTime
class BusinessHoursRefreshPolicy(
private val ttl: Duration = Duration.ofHours(8),
private val startHour: Int = 9,
private val endHour: Int = 17
) : ContentRefreshPolicy {
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean {
val root = repository.findContentRootByUri(rootUri) ?: return true
// Only refresh during business hours
val now = Instant.now()
val hour = LocalTime.now().hour
if (hour !in startHour..endHour) {
return false
}
// During business hours, use TTL
val age = Duration.between(root.ingestionTimestamp, now)
return age > ttl
}
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean {
return shouldReread(repository, root.uri)
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
if (!shouldReread(repository, rootUri)) {
return null
}
repository.deleteRootAndDescendants(rootUri)
val document = hierarchicalContentReader.parseUrl(rootUri)
repository.writeAndChunkDocument(document)
return document
}
}
// Use with timezone awareness
val policy = BusinessHoursRefreshPolicy(
ttl = Duration.ofHours(4),
startHour = 9,
endHour = 17
)Use Cases:
Refresh based on multiple conditions that must all be satisfied.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
class ConditionalRefreshPolicy(
private val conditions: List<(ChunkingContentElementRepository, String) -> Boolean>
) : ContentRefreshPolicy {
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean {
// All conditions must be true
return conditions.all { condition ->
condition(repository, rootUri)
}
}
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean {
return shouldReread(repository, root.uri)
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
if (!shouldReread(repository, rootUri)) {
return null
}
repository.deleteRootAndDescendants(rootUri)
val document = hierarchicalContentReader.parseUrl(rootUri)
repository.writeAndChunkDocument(document)
return document
}
}
// Use with multiple conditions
val policy = ConditionalRefreshPolicy(
conditions = listOf(
// Condition 1: Content exists
{ repo, uri -> repo.existsRootWithUri(uri) },
// Condition 2: Content is old enough
{ repo, uri ->
val root = repo.findContentRootByUri(uri)
root?.let {
Duration.between(it.ingestionTimestamp, Instant.now()) > Duration.ofDays(1)
} ?: false
},
// Condition 3: During allowed time window
{ _, _ ->
val hour = LocalTime.now().hour
hour in 9..17
},
// Condition 4: Not flagged as immutable
{ repo, uri ->
val root = repo.findContentRootByUri(uri)
root?.metadata?.get("immutable") != true
}
)
)Use Cases:
Use document metadata to control refresh behavior.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.time.Duration
import java.time.Instant
class MetadataRefreshPolicy : ContentRefreshPolicy {
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean {
val root = repository.findContentRootByUri(rootUri) ?: return true
// Check for force refresh flag
if (root.metadata["force_refresh"] == true) {
return true
}
// Check for do not refresh flag
if (root.metadata["no_refresh"] == true) {
return false
}
// Check priority-based TTL
val priority = root.metadata["priority"] as? String
val ttl = when (priority) {
"critical" -> Duration.ofHours(1)
"high" -> Duration.ofHours(6)
"normal" -> Duration.ofDays(1)
"low" -> Duration.ofDays(7)
else -> Duration.ofDays(1)
}
// Check custom TTL in metadata
val customTtl = root.metadata["refresh_ttl_hours"] as? Int
val effectiveTtl = customTtl?.let { Duration.ofHours(it.toLong()) } ?: ttl
val age = Duration.between(root.ingestionTimestamp, Instant.now())
return age > effectiveTtl
}
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean {
return shouldReread(repository, root.uri)
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
if (!shouldReread(repository, rootUri)) {
return null
}
repository.deleteRootAndDescendants(rootUri)
val document = hierarchicalContentReader.parseUrl(rootUri)
repository.writeAndChunkDocument(document)
return document
}
}Metadata Flags:
force_refresh: Boolean, forces immediate refreshno_refresh: Boolean, prevents any refreshpriority: String (critical, high, normal, low), determines TTLrefresh_ttl_hours: Int, custom TTL in hoursimmutable: Boolean, marks content as never changingRefresh based on external events or webhooks.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.util.concurrent.ConcurrentHashMap
class ExternalTriggerRefreshPolicy : ContentRefreshPolicy {
private val refreshFlags = ConcurrentHashMap<String, Boolean>()
private val refreshReasons = ConcurrentHashMap<String, String>()
/**
* Trigger refresh for specific URI
*/
fun triggerRefresh(uri: String, reason: String = "external trigger") {
refreshFlags[uri] = true
refreshReasons[uri] = reason
}
/**
* Clear refresh flag
*/
fun clearRefresh(uri: String) {
refreshFlags.remove(uri)
refreshReasons.remove(uri)
}
/**
* Get refresh reason
*/
fun getRefreshReason(uri: String): String? {
return refreshReasons[uri]
}
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean {
return refreshFlags[rootUri] == true
}
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean {
return shouldReread(repository, root.uri)
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
if (!shouldReread(repository, rootUri)) {
return null
}
val reason = getRefreshReason(rootUri)
println("Refreshing $rootUri: $reason")
// Clear flag after refreshing
clearRefresh(rootUri)
repository.deleteRootAndDescendants(rootUri)
val document = hierarchicalContentReader.parseUrl(rootUri)
repository.writeAndChunkDocument(document)
return document
}
}
// Use with webhook integration
val policy = ExternalTriggerRefreshPolicy()
// Webhook handler
fun onContentUpdatedWebhook(uri: String, updateType: String) {
policy.triggerRefresh(uri, "Webhook update: $updateType")
}
// Git hook integration
fun onGitCommit(affectedFiles: List<String>) {
affectedFiles.forEach { file ->
val uri = "file:///${file}"
policy.triggerRefresh(uri, "Git commit")
}
}
// Refresh will happen on next check
policy.ingestUriIfNeeded(repository, reader, uri)Use Cases:
Combine multiple policies with configurable logic.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
class CompositeRefreshPolicy(
private val policies: List<ContentRefreshPolicy>,
private val mode: Mode = Mode.ANY
) : ContentRefreshPolicy {
enum class Mode {
ANY, // Refresh if any policy says yes
ALL // Refresh only if all policies say yes
}
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean {
return when (mode) {
Mode.ANY -> policies.any { it.shouldReread(repository, rootUri) }
Mode.ALL -> policies.all { it.shouldReread(repository, rootUri) }
}
}
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean {
return shouldReread(repository, root.uri)
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
if (!shouldReread(repository, rootUri)) {
return null
}
repository.deleteRootAndDescendants(rootUri)
val document = hierarchicalContentReader.parseUrl(rootUri)
repository.writeAndChunkDocument(document)
return document
}
}
// Combine policies with OR logic
val anyPolicy = CompositeRefreshPolicy(
policies = listOf(
TtlContentRefreshPolicy(Duration.ofDays(1)),
ExternalTriggerRefreshPolicy()
),
mode = CompositeRefreshPolicy.Mode.ANY
)
// Combine policies with AND logic
val allPolicy = CompositeRefreshPolicy(
policies = listOf(
TtlContentRefreshPolicy(Duration.ofDays(1)),
BusinessHoursRefreshPolicy()
),
mode = CompositeRefreshPolicy.Mode.ALL
)Background service for periodic content refresh.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.ingestion.policy.*
import java.util.concurrent.*
import java.time.Duration
class ScheduledRefreshService(
private val repository: ChunkingContentElementRepository,
private val reader: HierarchicalContentReader,
private val policy: ContentRefreshPolicy,
private val uris: List<String>,
private val threadPoolSize: Int = 4
) {
private val scheduler = Executors.newScheduledThreadPool(threadPoolSize)
private val metrics = RefreshMetrics()
fun start(periodMinutes: Long) {
scheduler.scheduleAtFixedRate(
{ refreshAll() },
0,
periodMinutes,
TimeUnit.MINUTES
)
}
fun stop() {
scheduler.shutdown()
scheduler.awaitTermination(30, TimeUnit.SECONDS)
}
private fun refreshAll() {
val startTime = System.currentTimeMillis()
println("Starting scheduled refresh at ${Instant.now()}")
val results = uris.map { uri ->
CompletableFuture.supplyAsync({
refreshUri(uri)
}, scheduler)
}
CompletableFuture.allOf(*results.toTypedArray()).join()
val duration = System.currentTimeMillis() - startTime
metrics.recordRefreshCycle(duration, results.size)
println("Scheduled refresh complete in ${duration}ms")
}
private fun refreshUri(uri: String): RefreshResult {
return try {
val startTime = System.currentTimeMillis()
val document = policy.ingestUriIfNeeded(repository, reader, uri)
val duration = System.currentTimeMillis() - startTime
if (document != null) {
metrics.recordSuccess(uri, duration)
RefreshResult.Success(uri, duration)
} else {
metrics.recordSkipped(uri)
RefreshResult.Skipped(uri)
}
} catch (e: Exception) {
metrics.recordFailure(uri, e)
println("Error refreshing $uri: ${e.message}")
RefreshResult.Failure(uri, e)
}
}
data class RefreshMetrics(
var totalCycles: Int = 0,
var totalDuration: Long = 0,
var successCount: Int = 0,
var skippedCount: Int = 0,
var failureCount: Int = 0
) {
fun recordRefreshCycle(duration: Long, uriCount: Int) {
totalCycles++
totalDuration += duration
}
fun recordSuccess(uri: String, duration: Long) {
successCount++
}
fun recordSkipped(uri: String) {
skippedCount++
}
fun recordFailure(uri: String, error: Exception) {
failureCount++
}
fun getStats(): Map<String, Any> {
return mapOf(
"total_cycles" to totalCycles,
"avg_cycle_duration_ms" to if (totalCycles > 0) totalDuration / totalCycles else 0,
"success_count" to successCount,
"skipped_count" to skippedCount,
"failure_count" to failureCount,
"success_rate" to if (successCount + failureCount > 0) {
successCount.toDouble() / (successCount + failureCount)
} else 0.0
)
}
}
sealed class RefreshResult {
data class Success(val uri: String, val duration: Long) : RefreshResult()
data class Skipped(val uri: String) : RefreshResult()
data class Failure(val uri: String, val error: Exception) : RefreshResult()
}
}
// Use scheduled refresh
val service = ScheduledRefreshService(
repository = repository,
reader = reader,
policy = TtlContentRefreshPolicy(Duration.ofHours(1)),
uris = listOf(
"https://example.com/docs/guide1.html",
"https://example.com/docs/guide2.html"
),
threadPoolSize = 8
)
// Check every 30 minutes
service.start(periodMinutes = 30)
// Later, stop the service
Runtime.getRuntime().addShutdownHook(Thread {
service.stop()
})Robust batch refresh implementation with retry logic.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.ingestion.policy.*
import com.embabel.agent.rag.store.*
import kotlin.math.min
class BatchRefreshExecutor(
private val repository: ChunkingContentElementRepository,
private val reader: HierarchicalContentReader,
private val policy: ContentRefreshPolicy,
private val maxRetries: Int = 3,
private val retryDelayMs: Long = 1000
) {
fun refreshAll(uris: List<String>): BatchRefreshResult {
val results = mutableMapOf<String, RefreshStatus>()
val errors = mutableMapOf<String, List<Exception>>()
uris.forEach { uri ->
val (status, errorList) = refreshWithRetry(uri)
results[uri] = status
if (errorList.isNotEmpty()) {
errors[uri] = errorList
}
}
return BatchRefreshResult(
total = uris.size,
refreshed = results.count { it.value == RefreshStatus.REFRESHED },
skipped = results.count { it.value == RefreshStatus.SKIPPED },
failed = results.count { it.value == RefreshStatus.FAILED },
results = results,
errors = errors
)
}
private fun refreshWithRetry(uri: String): Pair<RefreshStatus, List<Exception>> {
val errors = mutableListOf<Exception>()
var attempts = 0
while (attempts < maxRetries) {
try {
val document = policy.ingestUriIfNeeded(repository, reader, uri)
return if (document != null) {
println("Refreshed: $uri (attempt ${attempts + 1})")
RefreshStatus.REFRESHED to errors
} else {
RefreshStatus.SKIPPED to errors
}
} catch (e: Exception) {
attempts++
errors.add(e)
println("Error refreshing $uri (attempt $attempts): ${e.message}")
if (attempts < maxRetries) {
val delay = retryDelayMs * attempts
Thread.sleep(delay)
}
}
}
return RefreshStatus.FAILED to errors
}
enum class RefreshStatus {
REFRESHED, SKIPPED, FAILED
}
data class BatchRefreshResult(
val total: Int,
val refreshed: Int,
val skipped: Int,
val failed: Int,
val results: Map<String, RefreshStatus>,
val errors: Map<String, List<Exception>>
) {
fun printSummary() {
println("=== Batch Refresh Summary ===")
println("Total: $total")
println("Refreshed: $refreshed")
println("Skipped: $skipped")
println("Failed: $failed")
println("Success rate: ${"%.2f".format(refreshed.toDouble() / total * 100)}%")
if (errors.isNotEmpty()) {
println("\n=== Errors ===")
errors.forEach { (uri, errorList) ->
println("$uri:")
errorList.forEach { error ->
println(" - ${error.message}")
}
}
}
}
}
}
// Use batch refresh
val executor = BatchRefreshExecutor(
repository = repository,
reader = reader,
policy = TtlContentRefreshPolicy(Duration.ofDays(1)),
maxRetries = 3,
retryDelayMs = 1000
)
val uris = listOf(
"https://example.com/docs/guide1.html",
"https://example.com/docs/guide2.html",
"https://example.com/docs/guide3.html"
)
val result = executor.refreshAll(uris)
result.printSummary()When multiple processes may refresh content simultaneously:
import java.util.concurrent.locks.ReentrantLock
class LockingRefreshPolicy(
private val delegate: ContentRefreshPolicy
) : ContentRefreshPolicy by delegate {
private val locks = ConcurrentHashMap<String, ReentrantLock>()
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
val lock = locks.computeIfAbsent(rootUri) { ReentrantLock() }
return if (lock.tryLock()) {
try {
delegate.ingestUriIfNeeded(repository, hierarchicalContentReader, rootUri)
} finally {
lock.unlock()
}
} else {
null // Another process is refreshing
}
}
}Handle transient network issues gracefully:
class ResilientRefreshPolicy(
private val delegate: ContentRefreshPolicy,
private val maxRetries: Int = 3,
private val exponentialBackoff: Boolean = true
) : ContentRefreshPolicy by delegate {
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
var lastException: Exception? = null
repeat(maxRetries) { attempt ->
try {
return delegate.ingestUriIfNeeded(repository, hierarchicalContentReader, rootUri)
} catch (e: Exception) {
lastException = e
if (attempt < maxRetries - 1) {
val delay = if (exponentialBackoff) {
(1000L * (1 shl attempt))
} else {
1000L
}
Thread.sleep(delay)
}
}
}
throw lastException ?: RuntimeException("Refresh failed")
}
}Respect API rate limits when refreshing external content:
import java.time.Instant
import java.util.concurrent.Semaphore
class RateLimitedRefreshPolicy(
private val delegate: ContentRefreshPolicy,
private val requestsPerSecond: Int = 10
) : ContentRefreshPolicy by delegate {
private val semaphore = Semaphore(requestsPerSecond)
private val resetScheduler = Executors.newScheduledThreadPool(1)
init {
resetScheduler.scheduleAtFixedRate({
semaphore.release(requestsPerSecond - semaphore.availablePermits())
}, 1, 1, TimeUnit.SECONDS)
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
semaphore.acquire()
return delegate.ingestUriIfNeeded(repository, hierarchicalContentReader, rootUri)
}
}