Common AI framework utilities for the Embabel Agent system including LLM configuration, output converters, prompt contributors, and embedding service abstractions.
Optimization strategies and best practices for Embabel Agent Common.
Development: Use cheaper models for iteration
val devOptions = LlmOptions.withModel("gpt-3.5-turbo")
.withMaxTokens(500)
// ~10x cheaper than GPT-4Production: Use appropriate model for task
val taskOptions = when (complexity) {
Complexity.LOW -> LlmOptions.withModel("gpt-3.5-turbo")
Complexity.MEDIUM -> LlmOptions.withModel("gpt-4")
Complexity.HIGH -> LlmOptions.withModel("gpt-4-turbo")
}Deterministic tasks (data extraction, classification): Low temperature
val extractionOptions = LlmOptions.withModel("gpt-4")
.withTemperature(0.2)
// More consistent, faster inferenceCreative tasks (writing, brainstorming): Higher temperature
val creativeOptions = LlmOptions.withModel("gpt-4")
.withTemperature(0.8)Minimize tokens for faster responses and lower costs:
val options = LlmOptions.withModel("gpt-4")
.withMaxTokens(500) // Not 2000 if you only need brief responsesCalculate actual needs:
fun calculateTokenBudget(expectedWords: Int): Int {
// ~1.3 tokens per word for English
return (expectedWords * 1.3).toInt()
}
val options = LlmOptions.withModel("gpt-4")
.withMaxTokens(calculateTokenBudget(300))DON'T - Multiple individual calls:
val embeddings = texts.map { text ->
embeddingService.embed(text) // N network calls
}DO - Single batch call:
val embeddings = embeddingService.embed(texts) // 1 network callImprovement: 5-10x faster for large batches
Process multiple items efficiently:
fun processBatch(items: List<String>): List<Result> {
return items.chunked(10).flatMap { batch ->
// Process 10 at a time
batch.map { item -> process(item) }
}
}Parallel batching:
fun processBatchParallel(items: List<String>): List<Result> {
return items.chunked(10).flatMap { batch ->
batch.parallelStream()
.map { item -> process(item) }
.toList()
}
}Simple cache:
private val cache = ConcurrentHashMap<String, String>()
fun callWithCache(prompt: String): String {
return cache.getOrPut(prompt) {
llmClient.call(prompt)
}
}Time-based cache:
data class CachedResponse(val content: String, val timestamp: Instant)
private val cache = ConcurrentHashMap<String, CachedResponse>()
fun callWithTimedCache(prompt: String, ttl: Duration): String {
val cached = cache[prompt]
if (cached != null) {
val age = Duration.between(cached.timestamp, Instant.now())
if (age < ttl) {
return cached.content
}
}
val response = llmClient.call(prompt)
cache[prompt] = CachedResponse(response, Instant.now())
return response
}Cache by content hash:
class CachedEmbeddingService(
private val delegate: EmbeddingService
) : EmbeddingService by delegate {
private val cache = ConcurrentHashMap<String, FloatArray>()
override fun embed(text: String): FloatArray {
return cache.getOrPut(text) {
delegate.embed(text)
}
}
override fun embed(texts: List<String>): List<FloatArray> {
val results = mutableListOf<FloatArray>()
val toEmbed = mutableListOf<String>()
texts.forEach { text ->
cache[text]?.let { results.add(it) } ?: toEmbed.add(text)
}
if (toEmbed.isNotEmpty()) {
val newEmbeddings = delegate.embed(toEmbed)
toEmbed.zip(newEmbeddings).forEach { (text, embedding) ->
cache[text] = embedding
}
results.addAll(newEmbeddings)
}
return results
}
}Improvement: Near-instant retrieval for repeated content
Handle slow consumers:
stream
.onBackpressureBuffer(1000) // Buffer up to 1000 items
.subscribe { event ->
slowProcess(event)
}Drop excess items:
stream
.onBackpressureDrop() // Drop if consumer too slow
.subscribe { event ->
processQuickly(event)
}CPU-bound work:
stream
.parallel()
.runOn(Schedulers.parallel())
.map { event ->
cpuIntensiveOperation(event)
}
.sequential()
.subscribe { result -> save(result) }I/O-bound work:
stream
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap { event ->
Mono.fromCallable { databaseOperation(event) }
}
.sequential()
.subscribe { result -> process(result) }Don't process more than needed:
stream
.take(100) // Only first 100 items
.subscribe { event -> process(event) }Stop on condition:
stream
.takeWhile { event -> event.timestamp < cutoff }
.subscribe { event -> process(event) }DON'T - Create new converter each time:
fun convert(response: String): Person? {
val converter = JacksonOutputConverter(Person::class.java, objectMapper)
return converter.convert(response)
}DO - Reuse converter instance:
private val personConverter = JacksonOutputConverter(Person::class.java, objectMapper)
fun convert(response: String): Person? {
return personConverter.convert(response)
}Reason: Schema generation is expensive, done once at creation
DON'T - Create new mapper each time:
val mapper = ObjectMapper().registerKotlinModule()
val converter = JacksonOutputConverter(Person::class.java, mapper)DO - Shared mapper instance:
@Configuration
class JacksonConfig {
@Bean
fun objectMapper(): ObjectMapper {
return ObjectMapper()
.registerKotlinModule()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
}
@Service
class MyService(private val objectMapper: ObjectMapper) {
private val converter = JacksonOutputConverter(Person::class.java, objectMapper)
}Reason: ObjectMapper is thread-safe and expensive to create
DON'T - Load all into memory:
val events = converter.convertStream(hugeJsonl).collectList().block()
// OutOfMemoryError for large dataDO - Process incrementally:
converter.convertStream(hugeJsonl)
.buffer(100) // Process in batches
.subscribe { batch ->
processBatch(batch)
}Optimize FloatArray storage:
// For many embeddings, consider using primitive arrays
class EmbeddingStore {
private val embeddings = FloatArray(numDocs * dimensions)
fun getEmbedding(docId: Int): FloatArray {
val start = docId * dimensions
return embeddings.copyOfRange(start, start + dimensions)
}
}
// More memory-efficient than List<FloatArray>DON'T - Include unnecessary detail:
val prompt = """
You are an AI assistant. Please be helpful and courteous.
Extract the person's name, age, and email from the text.
Make sure to format it as JSON. Be careful to get all details.
The JSON should have fields for name, age, and email.
${converter.jsonSchema}
Text: $text
"""DO - Be concise:
val prompt = """
Extract person info as JSON:
${converter.jsonSchema}
$text
"""Improvement: Lower token costs, faster responses
Only include schema when needed:
// For structured output
val prompt = "Extract data: ${converter.jsonSchema}\n$text"
// For simple tasks, skip schema
val prompt = "Summarize: $text"@Configuration
class HttpClientConfig {
@Bean
fun httpClient(): HttpClient {
return HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
}
}class PerformanceMetrics {
private val requestDurations = mutableListOf<Long>()
private val tokenCounts = mutableListOf<Int>()
fun recordRequest(durationMs: Long, tokens: Int) {
requestDurations.add(durationMs)
tokenCounts.add(tokens)
}
fun getStats(): Stats {
return Stats(
avgDuration = requestDurations.average(),
p95Duration = requestDurations.sorted()[
(requestDurations.size * 0.95).toInt()
],
avgTokens = tokenCounts.average(),
totalRequests = requestDurations.size
)
}
}
data class Stats(
val avgDuration: Double,
val p95Duration: Long,
val avgTokens: Double,
val totalRequests: Int
)class CostMonitor(private val pricing: PricingModel) {
private val costByModel = ConcurrentHashMap<String, Double>()
fun recordUsage(model: String, inputTokens: Int, outputTokens: Int) {
val cost = pricing.costOf(inputTokens, outputTokens)
costByModel.merge(model, cost) { old, new -> old + new }
}
fun getTotalCost(): Double = costByModel.values.sum()
fun getCostByModel(): Map<String, Double> = costByModel.toMap()
}inline fun <T> measureTime(operation: String, block: () -> T): T {
val start = System.currentTimeMillis()
return try {
block()
} finally {
val duration = System.currentTimeMillis() - start
logger.info("$operation took ${duration}ms")
}
}
// Usage
val result = measureTime("LLM call") {
llmClient.call(prompt)
}@Test
fun `benchmark embedding strategies`() {
val texts = (1..1000).map { "Document $it" }
// Individual calls
val time1 = measureTimeMillis {
texts.forEach { embeddingService.embed(it) }
}
// Batch call
val time2 = measureTimeMillis {
embeddingService.embed(texts)
}
println("Individual: ${time1}ms")
println("Batch: ${time2}ms")
println("Speedup: ${time1.toDouble() / time2}x")
}| Operation | Target | Notes |
|---|---|---|
| Embedding (batch 100) | < 2s | Using text-embedding-ada-002 |
| GPT-3.5 call (500 tokens) | < 3s | With low temperature |
| GPT-4 call (500 tokens) | < 8s | With low temperature |
| Streaming JSONL (1000 lines) | < 1s | Parsing only, no LLM |
| Schema generation | < 50ms | Cached after first use |
| Conversion (valid JSON) | < 10ms | Per object |
Reduce input tokens:
Reduce output tokens:
Cost comparison (per 1M tokens):
Use cheaper models when possible:
val simpleTask = LlmOptions.withModel("gpt-3.5-turbo")
// 60x cheaper than GPT-4Example savings:
tessl i tessl/maven-com-embabel-agent--embabel-agent-common@0.3.1