RAG (Retrieval-Augmented Generation) framework for the Embabel Agent platform providing content ingestion, chunking, hierarchical navigation, and semantic search capabilities
Parse, chunk, and ingest documents into the RAG system with support for hierarchical content structures, directory parsing, and configurable chunking strategies.
Parse content from various sources into hierarchical document structures.
Interface for reading and parsing content into NavigableDocument structures.
interface HierarchicalContentReader {
/**
* Parse content from a URL
* @param url URL to fetch and parse
* @return Parsed navigable document
*/
fun parseUrl(url: String): NavigableDocument
/**
* Parse content from a resource path
* @param resourcePath Classpath resource path
* @return Parsed navigable document
*/
fun parseResource(resourcePath: String): NavigableDocument
/**
* Parse content from a file
* @param file File to parse
* @param url Optional URL to associate with the document
* @return Parsed navigable document
*/
fun parseFile(file: File, url: String? = null): NavigableDocument
/**
* Parse content from an input stream
* @param inputStream Stream containing content
* @param uri URI to associate with the document
* @return Parsed navigable document
*/
fun parseContent(inputStream: InputStream, uri: String): NavigableDocument
/**
* Parse multiple files from a directory
* @param fileTools File reading tools
* @param config Directory parsing configuration
* @return Parsing result with statistics and parsed documents
*/
fun parseFromDirectory(
fileTools: FileReadTools,
config: DirectoryParsingConfig
): DirectoryParsingResult
}Methods:
parseUrl(): Fetch and parse content from HTTP(S) URL
url - URL to fetchparseResource(): Load and parse from classpath resource
resourcePath - Classpath resource pathparseFile(): Parse local file
file - File to parseurl - Optional URL for document URIparseContent(): Parse from stream
inputStream - Content streamuri - URI to associate with documentparseFromDirectory(): Batch parse directory
fileTools - File system utilitiesconfig - Parsing configurationParse multiple documents from directory structures.
Configuration for directory parsing operations.
data class DirectoryParsingConfig(
/**
* File extensions to include (e.g., "md", "txt")
*/
val includedExtensions: Set<String>,
/**
* Directory names to exclude from traversal
*/
val excludedDirectories: Set<String>,
/**
* Relative path from base directory
*/
val relativePath: String = ".",
/**
* Maximum file size in bytes
*/
val maxFileSize: Long = 10_485_760, // 10 MB
/**
* Whether to follow symbolic links
*/
val followSymlinks: Boolean = false,
/**
* Maximum directory depth to traverse
*/
val maxDepth: Int = Int.MAX_VALUE
) {
fun withRelativePath(newRelativePath: String): DirectoryParsingConfig
fun withMaxFileSize(newMaxFileSize: Long): DirectoryParsingConfig
fun withFollowSymlinks(newFollowSymlinks: Boolean): DirectoryParsingConfig
fun withMaxDepth(newMaxDepth: Int): DirectoryParsingConfig
}Properties:
includedExtensions: File extensions to process (without dot)excludedDirectories: Directory names to skiprelativePath: Starting path relative to basemaxFileSize: Maximum file size in bytes (default 10 MB)followSymlinks: Follow symbolic links (default false)maxDepth: Maximum traversal depth (default unlimited)Builder Methods:
withRelativePath(): Create copy with new pathwithMaxFileSize(): Create copy with new size limitwithFollowSymlinks(): Create copy with new symlink settingwithMaxDepth(): Create copy with new depth limitResult of directory parsing operation with statistics.
data class DirectoryParsingResult(
/**
* Total files found matching criteria
*/
val totalFilesFound: Int,
/**
* Number of files successfully processed
*/
val filesProcessed: Int,
/**
* Number of files skipped
*/
val filesSkipped: Int,
/**
* Number of files that errored during processing
*/
val filesErrored: Int,
/**
* Parsed content roots
*/
val contentRoots: List<NavigableDocument>,
/**
* Time taken for processing
*/
val processingTime: Duration,
/**
* Error messages from failed files
*/
val errors: List<String>,
/**
* Whether parsing succeeded overall
*/
val success: Boolean,
/**
* Total sections extracted across all documents
*/
val totalSectionsExtracted: Int
)Properties:
totalFilesFound: Files matching criteriafilesProcessed: Successfully parsed filesfilesSkipped: Skipped files (size, permissions, etc.)filesErrored: Files with parsing errorscontentRoots: Parsed documentsprocessingTime: Total processing durationerrors: Error messages for failed filessuccess: True if at least one file processed successfullytotalSectionsExtracted: Sum of sections across all documentsConvert hierarchical documents into indexed chunks.
Interface for chunking document sections.
interface ContentChunker {
/**
* Chunk transformer to apply to generated chunks
*/
val chunkTransformer: ChunkTransformer
/**
* Convert a container section into chunks
* @param section Container section to chunk
* @return Iterable of chunks with metadata
*/
fun chunk(section: NavigableContainerSection): Iterable<Chunk>
/**
* Configuration for chunking behavior
*/
data class Config(
/**
* Maximum size of each chunk in characters
*/
val maxChunkSize: Int = 1500,
/**
* Overlap between consecutive chunks in characters
*/
val overlapSize: Int = 200,
/**
* Batch size for embedding generation
*/
val embeddingBatchSize: Int = 100
)
companion object {
/**
* Standard metadata keys for chunks
*/
const val CHUNK_INDEX = "chunk_index"
const val TOTAL_CHUNKS = "total_chunks"
const val SEQUENCE_NUMBER = "sequence_number"
const val ROOT_DOCUMENT_ID = "root_document_id"
const val CONTAINER_SECTION_ID = "container_section_id"
const val CONTAINER_SECTION_TITLE = "container_section_title"
const val CONTAINER_SECTION_URL = "container_section_url"
const val LEAF_SECTION_ID = "leaf_section_id"
const val LEAF_SECTION_TITLE = "leaf_section_title"
const val LEAF_SECTION_URL = "leaf_section_url"
/**
* Create a content chunker with config and transformer
*/
operator fun invoke(
config: Config,
chunkTransformer: ChunkTransformer
): InMemoryContentChunker
}
}Properties:
chunkTransformer: Transformer applied to each chunkMethods:
chunk(): Convert section to chunks
section - Container section to chunkConfiguration:
maxChunkSize: Maximum chunk size in characters (default 1500)overlapSize: Overlap between chunks in characters (default 200)embeddingBatchSize: Batch size for embedding generation (default 100)Metadata Keys:
CHUNK_INDEX: Index within section (0-based)TOTAL_CHUNKS: Total chunks from sectionSEQUENCE_NUMBER: Global sequence across documentROOT_DOCUMENT_ID: Document root IDCONTAINER_SECTION_ID: Parent container section IDCONTAINER_SECTION_TITLE: Parent container section titleCONTAINER_SECTION_URL: Parent container section URLLEAF_SECTION_ID: Source leaf section IDLEAF_SECTION_TITLE: Source leaf section titleLEAF_SECTION_URL: Source leaf section URLIn-memory implementation of ContentChunker.
class InMemoryContentChunker(
/**
* Chunking configuration
*/
val config: ContentChunker.Config,
/**
* Transformer to apply to chunks
*/
override val chunkTransformer: ChunkTransformer
) : ContentChunker {
/**
* Chunk a single container section
* @param section Container section to chunk
* @return List of chunks
*/
override fun chunk(section: NavigableContainerSection): List<Chunk>
/**
* Chunk multiple sections
* @param sections List of container sections
* @return List of all chunks from all sections
*/
fun splitSections(sections: List<NavigableContainerSection>): List<Chunk>
}Constructor Parameters:
config: Chunking configurationchunkTransformer: Transformer for chunksMethods:
chunk(): Chunk single sectionsplitSections(): Batch chunk multiple sectionsBehavior:
maxChunkSizeoverlapSize overlap between consecutive chunksIngest content into storage repositories.
Interface for ingesting resources into stores.
interface Ingester : HasInfoString {
/**
* Target stores for ingestion
*/
val stores: List<ChunkingContentElementRepository>
/**
* Check if ingester is active and ready
* @return true if ingester can accept requests
*/
fun active(): Boolean
/**
* Ingest a resource by path or URL
* @param resourcePath Path or URL to ingest
* @return Ingestion result with statistics
*/
fun ingest(resourcePath: String): IngestionResult
}Properties:
stores: List of target repositoriesMethods:
active(): Check if ingester is ready
ingest(): Ingest resource
resourcePath - File path, URL, or resource pathResult of an ingestion operation.
data class IngestionResult(
/**
* Names of stores that received content
*/
val storesWrittenTo: Set<String>,
/**
* IDs of chunks created
*/
val chunkIds: List<String>,
/**
* Number of documents written
*/
val documentsWritten: Int
) {
/**
* Check if ingestion succeeded
* @return true if any content was written
*/
fun success(): Boolean
}Properties:
storesWrittenTo: Names of stores that received contentchunkIds: IDs of created chunksdocumentsWritten: Number of documents ingestedMethods:
success(): Returns true if any content was writtenControl when content should be re-ingested.
Interface for determining whether content needs refreshing.
interface ContentRefreshPolicy {
/**
* Check if content at URI should be re-read
* @param repository Repository to check
* @param rootUri URI of content root
* @return true if content should be refreshed
*/
fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean
/**
* Check if document should be refreshed
* @param repository Repository to check
* @param root Document to potentially refresh
* @return true if document should be refreshed
*/
fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean
/**
* Ingest URI if refresh policy determines it's needed
* @param repository Target repository
* @param hierarchicalContentReader Reader for parsing content
* @param rootUri URI to potentially ingest
* @return Parsed document if ingested, null if skipped
*/
fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument?
}Methods:
shouldReread(): Check if URI needs re-ingestion
repository - Repository to checkrootUri - URI of contentshouldRefreshDocument(): Check if document needs refresh
repository - Repository to checkroot - Document to evaluateingestUriIfNeeded(): Conditionally ingest URI
repository - Target repositoryhierarchicalContentReader - ParserrootUri - URI to potentially ingestEnhance retrievables during storage.
Interface for enhancing retrievables with additional data.
interface RetrievableEnhancer {
/**
* Enhance a retrievable before storage
* @param retrievable Retrievable to enhance
* @return Enhanced retrievable
*/
fun <T : Retrievable> enhance(retrievable: T): T
}Methods:
enhance(): Add data to retrievable
retrievable - Item to enhanceimport com.embabel.agent.rag.ingestion.*
import java.io.File
val reader: HierarchicalContentReader = // implementation
// Parse from URL
val doc1 = reader.parseUrl("https://example.com/docs/guide.html")
println("Parsed: ${doc1.title}")
println("Sections: ${doc1.children.count()}")
// Parse from file
val file = File("/path/to/document.md")
val doc2 = reader.parseFile(file, url = "file:///path/to/document.md")
println("Parsed: ${doc2.title}")
// Parse from classpath resource
val doc3 = reader.parseResource("docs/readme.md")
println("Parsed: ${doc3.title}")
// Parse from input stream
val inputStream = File("document.txt").inputStream()
val doc4 = reader.parseContent(inputStream, uri = "file:///document.txt")
println("Parsed: ${doc4.title}")import com.embabel.agent.rag.ingestion.*
val reader: HierarchicalContentReader = // implementation
val fileTools: FileReadTools = // implementation
// Configure directory parsing
val config = DirectoryParsingConfig(
includedExtensions = setOf("md", "txt", "html"),
excludedDirectories = setOf("node_modules", ".git", "build", "target"),
relativePath = "docs",
maxFileSize = 5_242_880, // 5 MB
followSymlinks = false,
maxDepth = 10
)
// Parse directory
val result = reader.parseFromDirectory(fileTools, config)
// Check results
println("=== Parsing Results ===")
println("Files found: ${result.totalFilesFound}")
println("Files processed: ${result.filesProcessed}")
println("Files skipped: ${result.filesSkipped}")
println("Files errored: ${result.filesErrored}")
println("Documents created: ${result.contentRoots.size}")
println("Sections extracted: ${result.totalSectionsExtracted}")
println("Processing time: ${result.processingTime}")
println("Success: ${result.success}")
// Process errors
if (result.errors.isNotEmpty()) {
println("\n=== Errors ===")
result.errors.forEach { error ->
println(" - $error")
}
}
// Process parsed documents
result.contentRoots.forEach { doc ->
println("\nDocument: ${doc.title}")
println(" URI: ${doc.uri}")
println(" Children: ${doc.children.count()}")
println(" Leaves: ${doc.leaves().count()}")
}import com.embabel.agent.rag.ingestion.*
// Start with base config
val baseConfig = DirectoryParsingConfig(
includedExtensions = setOf("md", "adoc"),
excludedDirectories = setOf(".git", "node_modules")
)
// Customize with builder methods
val customConfig = baseConfig
.withRelativePath("src/docs")
.withMaxFileSize(10_485_760) // 10 MB
.withFollowSymlinks(true)
.withMaxDepth(5)
// Use custom config
val result = reader.parseFromDirectory(fileTools, customConfig)import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.model.*
// Configure chunker
val config = ContentChunker.Config(
maxChunkSize = 1500,
overlapSize = 200,
embeddingBatchSize = 100
)
// Create chunker with transformer
val chunker = ContentChunker(
config = config,
chunkTransformer = ChunkTransformer.NO_OP
)
// Parse a document
val reader: HierarchicalContentReader = // implementation
val document = reader.parseUrl("https://example.com/docs")
// Chunk the document
val chunks = chunker.chunk(document).toList()
println("Created ${chunks.size} chunks")
// Examine chunks
chunks.forEach { chunk ->
val index = chunk.metadata[ContentChunker.CHUNK_INDEX]
val total = chunk.metadata[ContentChunker.TOTAL_CHUNKS]
val sequence = chunk.metadata[ContentChunker.SEQUENCE_NUMBER]
val sectionTitle = chunk.metadata[ContentChunker.CONTAINER_SECTION_TITLE]
println("Chunk $index of $total (sequence: $sequence)")
println(" From section: $sectionTitle")
println(" Text length: ${chunk.text.length}")
println(" Parent: ${chunk.parentId}")
}import com.embabel.agent.rag.ingestion.*
val config = ContentChunker.Config(
maxChunkSize = 2000,
overlapSize = 300
)
val chunker = InMemoryContentChunker(
config = config,
chunkTransformer = ChunkTransformer.NO_OP
)
// Chunk multiple sections
val reader: HierarchicalContentReader = // implementation
val doc1 = reader.parseUrl("https://example.com/doc1")
val doc2 = reader.parseUrl("https://example.com/doc2")
val allChunks = chunker.splitSections(listOf(doc1, doc2))
println("Total chunks: ${allChunks.size}")
// Process chunks
allChunks.forEach { chunk ->
println("${chunk.id}: ${chunk.text.take(50)}...")
}import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.model.*
val chunker: ContentChunker = // implementation
val document: NavigableDocument = // parsed document
val chunks = chunker.chunk(document).toList()
chunks.forEach { chunk ->
// Standard metadata
val chunkIndex = chunk.metadata[ContentChunker.CHUNK_INDEX] as? Int
val totalChunks = chunk.metadata[ContentChunker.TOTAL_CHUNKS] as? Int
val sequenceNumber = chunk.metadata[ContentChunker.SEQUENCE_NUMBER] as? Int
val rootDocId = chunk.metadata[ContentChunker.ROOT_DOCUMENT_ID] as? String
// Section metadata
val containerSectionId = chunk.metadata[ContentChunker.CONTAINER_SECTION_ID] as? String
val containerSectionTitle = chunk.metadata[ContentChunker.CONTAINER_SECTION_TITLE] as? String
val leafSectionTitle = chunk.metadata[ContentChunker.LEAF_SECTION_TITLE] as? String
println("Chunk $chunkIndex/$totalChunks (sequence: $sequenceNumber)")
println(" Root: $rootDocId")
println(" Container: $containerSectionTitle")
println(" Leaf: $leafSectionTitle")
}import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
val ingester: Ingester = // implementation
// Check if ingester is ready
if (!ingester.active()) {
println("Ingester not active")
return
}
// Ingest a resource
val result = ingester.ingest("https://example.com/docs/guide.html")
// Check results
if (result.success()) {
println("Ingestion succeeded!")
println("Stores written to: ${result.storesWrittenTo.joinToString()}")
println("Chunks created: ${result.chunkIds.size}")
println("Documents written: ${result.documentsWritten}")
// Access chunk IDs
result.chunkIds.take(5).forEach { chunkId ->
println(" Created chunk: $chunkId")
}
} else {
println("Ingestion failed - no content written")
}import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
class CustomIngester(
override val stores: List<ChunkingContentElementRepository>,
private val reader: HierarchicalContentReader,
private val chunker: ContentChunker
) : Ingester {
override fun active(): Boolean {
return stores.all { it.info().isPersistent }
}
override fun ingest(resourcePath: String): IngestionResult {
// Parse document
val document = when {
resourcePath.startsWith("http") -> reader.parseUrl(resourcePath)
resourcePath.startsWith("classpath:") ->
reader.parseResource(resourcePath.removePrefix("classpath:"))
else -> reader.parseFile(File(resourcePath))
}
// Store in all repositories
val storesWritten = mutableSetOf<String>()
val allChunkIds = mutableListOf<String>()
stores.forEach { store ->
val chunkIds = store.writeAndChunkDocument(document)
storesWritten.add(store.name)
allChunkIds.addAll(chunkIds)
}
return IngestionResult(
storesWrittenTo = storesWritten,
chunkIds = allChunkIds,
documentsWritten = 1
)
}
override fun infoString(verbose: Boolean?, indent: Int): String {
return "CustomIngester with ${stores.size} stores"
}
}
// Use custom ingester
val store1: ChunkingContentElementRepository = // implementation
val store2: ChunkingContentElementRepository = // implementation
val ingester = CustomIngester(
stores = listOf(store1, store2),
reader = reader,
chunker = chunker
)
val result = ingester.ingest("https://example.com/docs")
println("Wrote to ${result.storesWrittenTo.size} stores")import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.time.Duration
import java.time.Instant
// Simple time-based policy
class TimeBasedRefreshPolicy(
private val maxAge: Duration
) : ContentRefreshPolicy {
override fun shouldReread(
repository: ChunkingContentElementRepository,
rootUri: String
): Boolean {
val existing = repository.findContentRootByUri(rootUri)
if (existing == null) return true
val age = Duration.between(existing.ingestionTimestamp, Instant.now())
return age > maxAge
}
override fun shouldRefreshDocument(
repository: ChunkingContentElementRepository,
root: NavigableDocument
): Boolean {
val age = Duration.between(root.ingestionTimestamp, Instant.now())
return age > maxAge
}
override fun ingestUriIfNeeded(
repository: ChunkingContentElementRepository,
hierarchicalContentReader: HierarchicalContentReader,
rootUri: String
): NavigableDocument? {
if (!shouldReread(repository, rootUri)) {
return null
}
// Delete old version
repository.deleteRootAndDescendants(rootUri)
// Ingest new version
val document = hierarchicalContentReader.parseUrl(rootUri)
repository.writeAndChunkDocument(document)
return document
}
}
// Use refresh policy
val policy = TimeBasedRefreshPolicy(maxAge = Duration.ofDays(7))
val repository: ChunkingContentElementRepository = // implementation
val reader: HierarchicalContentReader = // implementation
// Check if refresh needed
val uri = "https://example.com/docs"
if (policy.shouldReread(repository, uri)) {
println("Content needs refreshing")
val document = policy.ingestUriIfNeeded(repository, reader, uri)
if (document != null) {
println("Refreshed: ${document.title}")
}
} else {
println("Content is up to date")
}import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.model.*
// Sentiment analysis enhancer
class SentimentEnhancer : RetrievableEnhancer {
override fun <T : Retrievable> enhance(retrievable: T): T {
if (retrievable is Chunk) {
val sentiment = analyzeSentiment(retrievable.text)
val enhanced = retrievable.withAdditionalMetadata(
mapOf(
"sentiment" to sentiment.name,
"sentiment_score" to sentiment.score
)
)
@Suppress("UNCHECKED_CAST")
return enhanced as T
}
return retrievable
}
private fun analyzeSentiment(text: String): Sentiment {
val positiveWords = listOf("good", "great", "excellent", "success")
val negativeWords = listOf("bad", "error", "fail", "problem")
val lowerText = text.lowercase()
val positiveCount = positiveWords.count { lowerText.contains(it) }
val negativeCount = negativeWords.count { lowerText.contains(it) }
return when {
positiveCount > negativeCount -> Sentiment("positive", 0.7)
negativeCount > positiveCount -> Sentiment("negative", -0.7)
else -> Sentiment("neutral", 0.0)
}
}
data class Sentiment(val name: String, val score: Double)
}
// Language detection enhancer
class LanguageEnhancer : RetrievableEnhancer {
override fun <T : Retrievable> enhance(retrievable: T): T {
if (retrievable is Chunk) {
val language = detectLanguage(retrievable.text)
val enhanced = retrievable.withAdditionalMetadata(
mapOf("language" to language)
)
@Suppress("UNCHECKED_CAST")
return enhanced as T
}
return retrievable
}
private fun detectLanguage(text: String): String {
return when {
text.contains(Regex("[\\p{IsHan}]")) -> "zh"
text.contains(Regex("[\\p{IsHiragana}\\p{IsKatakana}]")) -> "ja"
text.contains(Regex("[\\p{IsHangul}]")) -> "ko"
else -> "en"
}
}
}
// Use enhancers with repository
val enhancers = listOf(
SentimentEnhancer(),
LanguageEnhancer()
)
// Repository will apply enhancers during ingestion
val repository: ChunkingContentElementRepository = // with enhancers
val document: NavigableDocument = // parsed document
val chunkIds = repository.writeAndChunkDocument(document)
// Chunks now have sentiment and language metadataimport com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
import java.time.Duration
// 1. Set up components
val reader: HierarchicalContentReader = // implementation
val chunker = ContentChunker(
config = ContentChunker.Config(
maxChunkSize = 1500,
overlapSize = 200,
embeddingBatchSize = 100
),
chunkTransformer = ChunkTransformer.NO_OP
)
val repository: ChunkingContentElementRepository = // implementation
val policy = TimeBasedRefreshPolicy(maxAge = Duration.ofDays(7))
// 2. Parse document
val uri = "https://example.com/docs/guide.html"
val document = reader.parseUrl(uri)
println("Parsed: ${document.title}")
println("Sections: ${document.descendants().count()}")
// 3. Check if refresh needed
if (policy.shouldRefreshDocument(repository, document)) {
println("Refreshing content...")
// Delete existing
repository.deleteRootAndDescendants(uri)
// Store new version
val chunkIds = repository.writeAndChunkDocument(document)
println("Stored ${chunkIds.size} chunks")
// Verify storage
val info = repository.info()
println("Repository now has ${info.chunkCount} chunks")
} else {
println("Content is up to date, skipping ingestion")
}import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.store.*
val reader: HierarchicalContentReader = // implementation
val fileTools: FileReadTools = // implementation
val repository: ChunkingContentElementRepository = // implementation
// Configure directory parsing
val config = DirectoryParsingConfig(
includedExtensions = setOf("md", "txt"),
excludedDirectories = setOf(".git", "node_modules"),
relativePath = "docs",
maxFileSize = 5_242_880 // 5 MB
)
// Parse all documents
val result = reader.parseFromDirectory(fileTools, config)
if (result.success) {
println("Parsed ${result.filesProcessed} files")
// Ingest all documents
var totalChunks = 0
result.contentRoots.forEach { document ->
val chunkIds = repository.writeAndChunkDocument(document)
totalChunks += chunkIds.size
println("Ingested: ${document.title} (${chunkIds.size} chunks)")
}
println("Total chunks created: $totalChunks")
// Check final state
val info = repository.info()
println("Repository state:")
println(" Documents: ${info.documentCount}")
println(" Chunks: ${info.chunkCount}")
} else {
println("Parsing failed")
result.errors.forEach { println("Error: $it") }
}