RAG (Retrieval-Augmented Generation) framework for the Embabel Agent platform providing content ingestion, chunking, hierarchical navigation, and semantic search capabilities
Transform and enrich chunks during ingestion with support for text modification, metadata enrichment, and transformation chaining. This guide covers advanced patterns for building sophisticated chunk transformation pipelines.
Chunk transformers enable you to enrich, modify, and enhance chunks during ingestion. The framework provides a flexible transformation architecture supporting:
Base interface for all chunk transformers.
interface ChunkTransformer {
/**
* Transformer name for identification
*/
val name: String
/**
* Transform a chunk with context
* @param chunk Chunk to transform
* @param context Transformation context with section and document info
* @return Transformed chunk
*/
fun transform(chunk: Chunk, context: ChunkTransformationContext): Chunk
companion object {
/**
* No-operation transformer that passes chunks through unchanged
*/
@JvmField
val NO_OP: ChunkTransformer
}
}Context available during transformation, providing access to document structure.
data class ChunkTransformationContext(
/**
* Section containing the chunk
*/
val section: Section,
/**
* Document root (if available)
*/
val document: ContentRoot?
)The context provides:
Simplified base class with separate methods for metadata and text transformation.
abstract class AbstractChunkTransformer : ChunkTransformer {
/**
* Generate additional metadata for chunk
* Override to add custom metadata
* @param chunk Chunk being transformed
* @param context Transformation context
* @return Map of metadata to add
*/
open fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> = emptyMap()
/**
* Generate new text for chunk
* Override to modify chunk text
* @param chunk Chunk being transformed
* @param context Transformation context
* @return Modified text
*/
open fun newText(
chunk: Chunk,
context: ChunkTransformationContext
): String = chunk.text
/**
* Final transform implementation
* Applies metadata and text transformations
*/
final override fun transform(
chunk: Chunk,
context: ChunkTransformationContext
): Chunk
}Benefits of using AbstractChunkTransformer:
Prepends section and document titles to chunk text for better context.
object AddTitlesChunkTransformer : ChunkTransformer {
override val name: String
/**
* Transform chunk by prepending titles
* @param chunk Chunk to transform
* @param context Transformation context
* @return Chunk with titles added to text
*/
override fun transform(chunk: Chunk, context: ChunkTransformationContext): Chunk
}Output Format:
Document Title
## Section Title
Original chunk text...Use Cases:
Apply multiple transformers in sequence.
class ChainedChunkTransformer(
/**
* List of transformers to apply in order
*/
val transformers: List<ChunkTransformer>
) : ChunkTransformer {
override val name: String
/**
* Apply all transformers in sequence
* @param chunk Chunk to transform
* @param context Transformation context
* @return Fully transformed chunk
*/
override fun transform(chunk: Chunk, context: ChunkTransformationContext): Chunk
/**
* Add a transformer to the chain
* @param transformer Transformer to append
* @return New chained transformer
*/
fun withTransformer(transformer: ChunkTransformer): ChainedChunkTransformer
}Execution Model:
Detect and tag content language for multilingual systems.
import com.embabel.agent.rag.ingestion.*
class LanguageDetectionTransformer : AbstractChunkTransformer() {
override val name = "language-detector"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
val detectedLanguage = detectLanguage(chunk.text)
val confidence = calculateConfidence(chunk.text, detectedLanguage)
return mapOf(
"language" to detectedLanguage,
"language_confidence" to confidence,
"is_english" to (detectedLanguage == "en"),
"is_multilingual" to containsMultipleLanguages(chunk.text)
)
}
private fun detectLanguage(text: String): String {
return when {
// CJK languages
text.contains(Regex("[\\p{IsHan}]")) -> "zh"
text.contains(Regex("[\\p{IsHiragana}\\p{IsKatakana}]")) -> "ja"
text.contains(Regex("[\\p{IsHangul}]")) -> "ko"
// European languages with special characters
text.contains(Regex("[àâäéèêëïîôùûüÿç]", RegexOption.IGNORE_CASE)) -> "fr"
text.contains(Regex("[äöüß]", RegexOption.IGNORE_CASE)) -> "de"
text.contains(Regex("[áéíóúñ¿¡]", RegexOption.IGNORE_CASE)) -> "es"
text.contains(Regex("[àèéìíîòóùú]", RegexOption.IGNORE_CASE)) -> "it"
// Default to English
else -> "en"
}
}
private fun calculateConfidence(text: String, language: String): Double {
// Simple confidence based on character frequency
val languageChars = when (language) {
"zh" -> "[\\p{IsHan}]"
"ja" -> "[\\p{IsHiragana}\\p{IsKatakana}]"
"ko" -> "[\\p{IsHangul}]"
else -> "[a-zA-Z]"
}
val matches = Regex(languageChars).findAll(text).count()
val total = text.length
return if (total > 0) matches.toDouble() / total else 0.0
}
private fun containsMultipleLanguages(text: String): Boolean {
val scripts = listOf(
"[\\p{IsHan}]",
"[\\p{IsHiragana}\\p{IsKatakana}]",
"[\\p{IsHangul}]",
"[a-zA-Z]"
)
return scripts.count { script ->
Regex(script).containsMatchIn(text)
} > 1
}
}Use Cases:
Add semantic metadata using NLP techniques.
import com.embabel.agent.rag.ingestion.*
class SemanticEnrichmentTransformer : AbstractChunkTransformer() {
override val name = "semantic-enricher"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
return mapOf(
// Sentiment analysis
"sentiment" to analyzeSentiment(chunk.text).name,
"sentiment_score" to analyzeSentiment(chunk.text).score,
// Content classification
"content_type" to classifyContent(chunk.text),
"is_code_heavy" to isCodeHeavy(chunk.text),
"is_conversational" to isConversational(chunk.text),
// Complexity metrics
"readability_score" to calculateReadability(chunk.text),
"complexity" to assessComplexity(chunk.text),
// Entity detection
"has_urls" to hasUrls(chunk.text),
"has_emails" to hasEmails(chunk.text),
"has_dates" to hasDates(chunk.text),
"has_numbers" to hasNumbers(chunk.text),
// Structure
"has_lists" to hasLists(chunk.text),
"has_tables" to hasTables(chunk.text),
"has_code_blocks" to hasCodeBlocks(chunk.text)
)
}
private fun analyzeSentiment(text: String): Sentiment {
val positiveWords = setOf(
"good", "great", "excellent", "success", "works",
"helpful", "useful", "effective", "improved"
)
val negativeWords = setOf(
"bad", "error", "fail", "problem", "issue",
"broken", "wrong", "incorrect", "bug"
)
val lowerText = text.lowercase()
val words = lowerText.split(Regex("\\s+"))
val positiveCount = words.count { it in positiveWords }
val negativeCount = words.count { it in negativeWords }
return when {
positiveCount > negativeCount -> Sentiment("positive", 0.7)
negativeCount > positiveCount -> Sentiment("negative", -0.7)
else -> Sentiment("neutral", 0.0)
}
}
private fun classifyContent(text: String): String {
return when {
hasCodeBlocks(text) -> "code_documentation"
hasLists(text) && !hasCodeBlocks(text) -> "procedural"
hasQuestions(text) -> "faq"
hasTables(text) -> "reference"
text.length < 200 -> "summary"
else -> "explanation"
}
}
private fun isCodeHeavy(text: String): Boolean {
val codePatterns = listOf(
"```",
" [a-zA-Z]",
"function\\s+\\w+",
"class\\s+\\w+",
"import\\s+",
"package\\s+"
)
val codeLines = text.lines().count { line ->
codePatterns.any { pattern ->
Regex(pattern).containsMatchIn(line)
}
}
return codeLines.toDouble() / text.lines().size > 0.3
}
private fun isConversational(text: String): Boolean {
val conversationalIndicators = listOf(
Regex("\\?\\s*$", RegexOption.MULTILINE),
Regex("^(you|your|we|our|let's)", RegexOption.IGNORE_CASE),
Regex("(can|could|should|would|will)\\s+you", RegexOption.IGNORE_CASE)
)
return conversationalIndicators.any { it.containsMatchIn(text) }
}
private fun calculateReadability(text: String): Double {
// Simplified Flesch Reading Ease
val words = text.split(Regex("\\s+")).size
val sentences = text.split(Regex("[.!?]")).size
val syllables = estimateSyllables(text)
if (sentences == 0 || words == 0) return 0.0
val avgWordsPerSentence = words.toDouble() / sentences
val avgSyllablesPerWord = syllables.toDouble() / words
return 206.835 - 1.015 * avgWordsPerSentence - 84.6 * avgSyllablesPerWord
}
private fun assessComplexity(text: String): String {
val readability = calculateReadability(text)
return when {
readability >= 60 -> "easy"
readability >= 30 -> "moderate"
else -> "difficult"
}
}
private fun estimateSyllables(text: String): Int {
// Simple syllable estimation
return text.split(Regex("\\s+")).sumOf { word ->
val vowels = word.lowercase().count { it in "aeiouy" }
maxOf(1, vowels)
}
}
private fun hasUrls(text: String): Boolean =
Regex("https?://[^\\s]+").containsMatchIn(text)
private fun hasEmails(text: String): Boolean =
Regex("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}").containsMatchIn(text)
private fun hasDates(text: String): Boolean =
Regex("\\d{1,2}[/-]\\d{1,2}[/-]\\d{2,4}").containsMatchIn(text)
private fun hasNumbers(text: String): Boolean =
Regex("\\d+").containsMatchIn(text)
private fun hasLists(text: String): Boolean =
Regex("^[*-]\\s", RegexOption.MULTILINE).containsMatchIn(text)
private fun hasTables(text: String): Boolean =
Regex("\\|.*\\|", RegexOption.MULTILINE).containsMatchIn(text)
private fun hasCodeBlocks(text: String): Boolean =
text.contains("```")
private fun hasQuestions(text: String): Boolean =
Regex("^.*\\?\\s*$", RegexOption.MULTILINE).containsMatchIn(text)
data class Sentiment(val name: String, val score: Double)
}Specialized transformer for code documentation.
import com.embabel.agent.rag.ingestion.*
class CodeAnalysisTransformer : AbstractChunkTransformer() {
override val name = "code-analyzer"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
val codeBlocks = extractCodeBlocks(chunk.text)
return mapOf(
"has_code" to codeBlocks.isNotEmpty(),
"code_block_count" to codeBlocks.size,
"code_languages" to codeBlocks.map { it.language }.distinct(),
"code_lines" to codeBlocks.sumOf { it.lineCount },
"code_complexity" to assessCodeComplexity(codeBlocks),
"has_imports" to codeBlocks.any { hasImports(it.code) },
"has_functions" to codeBlocks.any { hasFunctions(it.code) },
"has_classes" to codeBlocks.any { hasClasses(it.code) },
"primary_language" to determinePrimaryLanguage(codeBlocks)
)
}
override fun newText(
chunk: Chunk,
context: ChunkTransformationContext
): String {
// Add language tags to improve search
val codeBlocks = extractCodeBlocks(chunk.text)
if (codeBlocks.isEmpty()) return chunk.text
val languages = codeBlocks.map { it.language }.distinct()
val languageHeader = "Languages: ${languages.joinToString(", ")}\n\n"
return languageHeader + chunk.text
}
private fun extractCodeBlocks(text: String): List<CodeBlock> {
val pattern = Regex("```(\\w+)?\\n([^`]+)```", RegexOption.MULTILINE)
return pattern.findAll(text).map { match ->
val language = match.groupValues[1].takeIf { it.isNotEmpty() } ?: "text"
val code = match.groupValues[2]
CodeBlock(language, code, code.lines().size)
}.toList()
}
private fun assessCodeComplexity(blocks: List<CodeBlock>): String {
if (blocks.isEmpty()) return "none"
val avgComplexity = blocks.map { calculateBlockComplexity(it) }.average()
return when {
avgComplexity < 5 -> "simple"
avgComplexity < 15 -> "moderate"
else -> "complex"
}
}
private fun calculateBlockComplexity(block: CodeBlock): Int {
var complexity = 0
// Control flow keywords
val controlKeywords = listOf(
"if", "else", "for", "while", "switch", "case",
"try", "catch", "throw", "return"
)
complexity += controlKeywords.sumOf { keyword ->
Regex("\\b$keyword\\b").findAll(block.code).count()
}
// Nesting level
val maxNesting = block.code.lines().maxOfOrNull { line ->
line.takeWhile { it == ' ' || it == '\t' }.length
} ?: 0
complexity += maxNesting / 4
return complexity
}
private fun hasImports(code: String): Boolean {
return Regex("^(import|require|use|include)\\s+", RegexOption.MULTILINE)
.containsMatchIn(code)
}
private fun hasFunctions(code: String): Boolean {
return Regex("(function|def|fun|func|fn)\\s+\\w+", RegexOption.MULTILINE)
.containsMatchIn(code)
}
private fun hasClasses(code: String): Boolean {
return Regex("(class|interface|trait|struct)\\s+\\w+", RegexOption.MULTILINE)
.containsMatchIn(code)
}
private fun determinePrimaryLanguage(blocks: List<CodeBlock>): String? {
return blocks.groupBy { it.language }
.maxByOrNull { it.value.sumOf { block -> block.lineCount } }
?.key
}
data class CodeBlock(
val language: String,
val code: String,
val lineCount: Int
)
}Enhance chunks with hierarchical context for better retrieval.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.model.*
class ContextAugmentationTransformer : AbstractChunkTransformer() {
override val name = "context-augmenter"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
return buildMap {
// Section context
put("section_title", context.section.title)
put("section_id", context.section.id)
// Document context
context.document?.let { doc ->
put("document_title", doc.title)
put("document_uri", doc.uri)
put("ingestion_timestamp", doc.ingestionTimestamp.toString())
}
// Hierarchy context
put("has_parent", context.section.parentId != null)
if (context.section is NavigableContainerSection) {
put("has_children", true)
put("child_count", context.section.children.count())
}
// Chunk statistics
put("char_count", chunk.text.length)
put("word_count", chunk.text.split(Regex("\\s+")).size)
put("line_count", chunk.text.lines().size)
put("paragraph_count", chunk.text.split(Regex("\\n\\s*\\n")).size)
}
}
override fun newText(
chunk: Chunk,
context: ChunkTransformationContext
): String {
val breadcrumb = buildBreadcrumb(context)
val header = buildHeader(breadcrumb)
return header + chunk.text
}
private fun buildBreadcrumb(context: ChunkTransformationContext): String {
val parts = mutableListOf<String>()
context.document?.let { parts.add(it.title) }
parts.add(context.section.title)
return parts.joinToString(" > ")
}
private fun buildHeader(breadcrumb: String): String {
return buildString {
appendLine("# $breadcrumb")
appendLine()
}
}
}Clean and normalize markdown content.
import com.embabel.agent.rag.ingestion.*
class MarkdownProcessingTransformer(
private val stripFormatting: Boolean = false,
private val preserveLinks: Boolean = true
) : AbstractChunkTransformer() {
override val name = "markdown-processor"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
return mapOf(
"has_headers" to hasHeaders(chunk.text),
"has_links" to hasLinks(chunk.text),
"has_images" to hasImages(chunk.text),
"has_code" to hasCodeBlocks(chunk.text),
"has_lists" to hasLists(chunk.text),
"has_blockquotes" to hasBlockquotes(chunk.text),
"markdown_complexity" to assessMarkdownComplexity(chunk.text)
)
}
override fun newText(
chunk: Chunk,
context: ChunkTransformationContext
): String {
if (!stripFormatting) return chunk.text
var processed = chunk.text
// Remove header markers
processed = processed.replace(Regex("^#+\\s", RegexOption.MULTILINE), "")
// Handle emphasis
processed = processed.replace(Regex("\\*\\*(.+?)\\*\\*"), "$1") // Bold
processed = processed.replace(Regex("\\*(.+?)\\*"), "$1") // Italic
processed = processed.replace(Regex("__(.+?)__"), "$1") // Bold
processed = processed.replace(Regex("_(.+?)_"), "$1") // Italic
// Handle links
processed = if (preserveLinks) {
// Keep link text and URL
processed.replace(Regex("\\[(.+?)\\]\\((.+?)\\)"), "$1 ($2)")
} else {
// Keep only link text
processed.replace(Regex("\\[(.+?)\\]\\(.+?\\)"), "$1")
}
// Remove images
processed = processed.replace(Regex("!\\[.*?\\]\\(.*?\\)"), "[Image]")
// Clean up blockquotes
processed = processed.replace(Regex("^>\\s?", RegexOption.MULTILINE), "")
// Normalize whitespace
processed = processed.replace(Regex("\\n{3,}"), "\n\n")
return processed.trim()
}
private fun hasHeaders(text: String): Boolean =
Regex("^#+\\s", RegexOption.MULTILINE).containsMatchIn(text)
private fun hasLinks(text: String): Boolean =
Regex("\\[.+\\]\\(.+\\)").containsMatchIn(text)
private fun hasImages(text: String): Boolean =
Regex("!\\[.+\\]\\(.+\\)").containsMatchIn(text)
private fun hasCodeBlocks(text: String): Boolean =
text.contains("```")
private fun hasLists(text: String): Boolean =
Regex("^[*-]\\s", RegexOption.MULTILINE).containsMatchIn(text)
private fun hasBlockquotes(text: String): Boolean =
Regex("^>\\s", RegexOption.MULTILINE).containsMatchIn(text)
private fun assessMarkdownComplexity(text: String): String {
val features = listOf(
hasHeaders(text),
hasLinks(text),
hasImages(text),
hasCodeBlocks(text),
hasLists(text),
hasBlockquotes(text)
).count { it }
return when {
features <= 1 -> "simple"
features <= 3 -> "moderate"
else -> "rich"
}
}
}Apply transformations only when conditions are met.
import com.embabel.agent.rag.ingestion.*
class ConditionalTransformer(
private val condition: (Chunk, ChunkTransformationContext) -> Boolean,
private val transformer: ChunkTransformer
) : ChunkTransformer {
override val name = "conditional-${transformer.name}"
override fun transform(
chunk: Chunk,
context: ChunkTransformationContext
): Chunk {
return if (condition(chunk, context)) {
transformer.transform(chunk, context)
} else {
chunk
}
}
}
// Apply transformer only to long chunks
val lengthGated = ConditionalTransformer(
condition = { chunk, _ -> chunk.text.length > 500 },
transformer = AddTitlesChunkTransformer
)
// Apply transformer only to code documentation
val codeOnly = ConditionalTransformer(
condition = { chunk, _ -> chunk.text.contains("```") },
transformer = CodeAnalysisTransformer()
)
// Apply based on metadata
val priorityOnly = ConditionalTransformer(
condition = { _, context ->
context.document?.metadata?.get("priority") == "high"
},
transformer = SemanticEnrichmentTransformer()
)Wrap transformers to collect performance metrics.
import com.embabel.agent.rag.ingestion.*
import kotlin.system.measureTimeMillis
class MonitoredTransformer(
private val delegate: ChunkTransformer,
private val metricsCollector: TransformationMetrics = TransformationMetrics()
) : ChunkTransformer {
override val name = "monitored-${delegate.name}"
override fun transform(
chunk: Chunk,
context: ChunkTransformationContext
): Chunk {
var result: Chunk? = null
val duration = measureTimeMillis {
result = delegate.transform(chunk, context)
}
metricsCollector.record(
transformerName = delegate.name,
duration = duration,
inputSize = chunk.text.length,
outputSize = result!!.text.length
)
return result!!
}
fun getMetrics(): Map<String, Any> = metricsCollector.getStats()
class TransformationMetrics {
private var totalTime = 0L
private var callCount = 0
private var totalInputSize = 0L
private var totalOutputSize = 0L
@Synchronized
fun record(
transformerName: String,
duration: Long,
inputSize: Int,
outputSize: Int
) {
totalTime += duration
callCount++
totalInputSize += inputSize
totalOutputSize += outputSize
}
fun getStats(): Map<String, Any> {
return mapOf(
"total_time_ms" to totalTime,
"call_count" to callCount,
"avg_time_ms" to if (callCount > 0) totalTime.toDouble() / callCount else 0.0,
"avg_input_size" to if (callCount > 0) totalInputSize / callCount else 0,
"avg_output_size" to if (callCount > 0) totalOutputSize / callCount else 0,
"size_increase_ratio" to if (totalInputSize > 0) {
totalOutputSize.toDouble() / totalInputSize
} else 1.0
)
}
}
}Production-ready pipeline combining multiple transformers.
import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.ingestion.transform.*
// Build comprehensive transformation pipeline
val productionPipeline = ChainedChunkTransformer(
listOf(
// 1. Add contextual headers
AddTitlesChunkTransformer,
// 2. Detect and tag language
LanguageDetectionTransformer(),
// 3. Analyze code content (conditional)
ConditionalTransformer(
condition = { chunk, _ -> chunk.text.contains("```") },
transformer = CodeAnalysisTransformer()
),
// 4. Process markdown (conditional)
ConditionalTransformer(
condition = { _, context ->
context.document?.uri?.endsWith(".md") == true
},
transformer = MarkdownProcessingTransformer(stripFormatting = false)
),
// 5. Semantic enrichment
SemanticEnrichmentTransformer(),
// 6. Context augmentation
ContextAugmentationTransformer(),
// 7. Custom domain-specific enrichment
DomainSpecificTransformer()
)
)
// Wrap with monitoring
val monitoredPipeline = MonitoredTransformer(productionPipeline)
// Use with content chunker
val chunker = ContentChunker(
config = ContentChunker.Config(
maxChunkSize = 1500,
overlapSize = 200
),
chunkTransformer = monitoredPipeline
)
// After processing, check metrics
val metrics = monitoredPipeline.getMetrics()
println("Transformation metrics: $metrics")import com.embabel.agent.rag.ingestion.*
import com.embabel.agent.rag.model.*
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class TransformerTests {
@Test
fun `test language detection transformer`() {
val transformer = LanguageDetectionTransformer()
val chunk = Chunk.create(
text = "This is English text with some content.",
parentId = "section-1"
)
val section = LeafSection(
text = chunk.text,
title = "Test Section",
parentId = "doc-1",
id = "section-1",
uri = null
)
val context = ChunkTransformationContext(
section = section,
document = null
)
val transformed = transformer.transform(chunk, context)
assertEquals("en", transformed.metadata["language"])
assertTrue(transformed.metadata["is_english"] as Boolean)
}
@Test
fun `test chained transformer execution order`() {
val calls = mutableListOf<String>()
val transformer1 = object : AbstractChunkTransformer() {
override val name = "first"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
calls.add("first")
return mapOf("first" to true)
}
}
val transformer2 = object : AbstractChunkTransformer() {
override val name = "second"
override fun additionalMetadata(
chunk: Chunk,
context: ChunkTransformationContext
): Map<String, Any> {
calls.add("second")
return mapOf("second" to true)
}
}
val chained = ChainedChunkTransformer(listOf(transformer1, transformer2))
val chunk = Chunk.create(text = "test", parentId = "section-1")
val section = LeafSection(
text = "test",
title = "Test",
parentId = "doc-1",
id = "section-1",
uri = null
)
val context = ChunkTransformationContext(section, null)
chained.transform(chunk, context)
assertEquals(listOf("first", "second"), calls)
}
}