Common AI framework utilities for the Embabel Agent system including LLM configuration, output converters, prompt contributors, and embedding service abstractions.
Reactive streaming converters for JSONL format with thinking block detection. Returns Project Reactor Flux streams with error recovery.
Dependencies: Requires com.embabel.common.core.streaming package (embabel-common-core) for StreamingEvent and ThinkingState types.
JSONL Format: One JSON object per line
StreamingEvent: Sealed interface for data or thinking events
Reactive Streams: Project Reactor Flux for backpressure support
Thinking Blocks: LLM reasoning content extraction
class StreamingJacksonOutputConverter<T>(
clazz: Class<T>,
objectMapper: ObjectMapper,
propertyFilter: Predicate<String> = Predicate { true }
) : FilteringJacksonOutputConverter<T> {
fun convertStream(jsonlContent: String): Flux<T>
fun convertStreamWithThinking(text: String): Flux<StreamingEvent<T>>
override fun getFormat(): String
}import reactor.core.publisher.Flux
data class Event(val type: String, val message: String, val timestamp: Long)
val converter = StreamingJacksonOutputConverter(
Event::class.java,
ObjectMapper().registerKotlinModule()
)
// JSONL input (one JSON per line)
val jsonl = """
{"type":"login","message":"User logged in","timestamp":1234567890}
{"type":"action","message":"Button clicked","timestamp":1234567891}
{"type":"logout","message":"User logged out","timestamp":1234567892}
""".trimIndent()
// Convert to stream
val stream: Flux<Event> = converter.convertStream(jsonl)
// Subscribe
stream.subscribe { event ->
println("${event.type}: ${event.message}")
}// Block and collect all
val events = stream.collectList().block()
// Process as list
events?.forEach { println(it) }Extract LLM reasoning separately from data.
XML-style (recommended):
<think>
Reasoning content here
</think>Legacy prefix:
[THINKING] Reasoning contentval mixedResponse = """
<think>
Analyzing user activity log...
</think>
{"type":"login","message":"User logged in","timestamp":1234567890}
<think>
Processing next event...
</think>
{"type":"action","message":"Button clicked","timestamp":1234567891}
{"type":"logout","message":"User logged out","timestamp":1234567892}
""".trimIndent()
val stream: Flux<StreamingEvent<Event>> =
converter.convertStreamWithThinking(mixedResponse)stream.subscribe { event ->
when (event) {
is StreamingEvent.Object -> {
println("Data: ${event.item}")
}
is StreamingEvent.Thinking -> {
println("Thinking: ${event.content}")
println("State: ${event.state}")
}
}
}Data only:
stream
.filter { it is StreamingEvent.Object }
.map { (it as StreamingEvent.Object).item }
.subscribe { println("Event: $it") }Thinking only:
stream
.filter { it is StreamingEvent.Thinking }
.map { (it as StreamingEvent.Thinking).content }
.subscribe { println("Thought: $it") }sealed interface StreamingEvent<T> {
data class Object<T>(val item: T) : StreamingEvent<T>
data class Thinking(
val content: String,
val state: ThinkingState
) : StreamingEvent<Nothing>
}enum class ThinkingState {
NONE, // No thinking detected
START, // Start of thinking block
END, // End of thinking block
BOTH, // Start and end in same line
CONTINUATION // Multi-line thinking continuation
}Usage:
stream.subscribe { event ->
if (event is StreamingEvent.Thinking) {
when (event.state) {
ThinkingState.START -> println(">>> Thinking started")
ThinkingState.END -> println("<<< Thinking ended")
ThinkingState.CONTINUATION -> println("... ${event.content}")
ThinkingState.BOTH -> println("! Quick thought: ${event.content}")
ThinkingState.NONE -> {}
}
}
}Per-line error recovery automatically skips invalid lines.
val mixedQuality = """
{"type":"login","message":"Valid","timestamp":1234567890}
{invalid json here}
{"type":"action","message":"Also valid","timestamp":1234567891}
not json at all
{"type":"logout","message":"Still valid","timestamp":1234567892}
""".trimIndent()
val stream = converter.convertStream(mixedQuality)
// Only valid lines processed, invalid skipped
stream.subscribe { event ->
println("Valid: $event")
}stream.subscribe(
{ event -> println("Success: $event") },
{ error -> println("Stream error: ${error.message}") },
{ println("Stream complete") }
)Leverage Project Reactor operators for stream processing.
stream
.filter { it.type == "login" }
.subscribe { println("Login: $it") }stream
.map { event -> event.copy(message = event.message.uppercase()) }
.subscribe { println(it) }stream.take(5).subscribe { println(it) }stream
.buffer(10)
.subscribe { batch -> println("Batch: ${batch.size} events") }import java.time.Duration
stream
.window(Duration.ofSeconds(1))
.subscribe { window ->
window.collectList().subscribe { events ->
println("Window: ${events.size} events")
}
}stream
.flatMap { event ->
Flux.just(event, event.copy(message = "Duplicate"))
}
.subscribe { println(it) }import reactor.core.publisher.Flux
val numbers = Flux.range(1, 100)
stream.zipWith(numbers).subscribe { (event, num) ->
println("$num: $event")
}Flux naturally supports backpressure for slow consumers.
import java.time.Duration
stream
.delayElements(Duration.ofMillis(100)) // Slow processing
.subscribe { event ->
// Process without memory issues
processExpensively(event)
}data class EventWithCount(val event: Event, val count: Int)
stream
.scan(Pair(null as Event?, 0)) { acc, event ->
Pair(event, acc.second + 1)
}
.skip(1)
.map { EventWithCount(it.first!!, it.second) }
.subscribe { println("Event #${it.count}: ${it.event}") }import reactor.core.scheduler.Schedulers
stream
.parallel()
.runOn(Schedulers.parallel())
.map { event ->
// Process in parallel
expensiveOperation(event)
}
.sequential()
.subscribe { println("Processed: $it") }stream
.groupBy { it.type }
.subscribe { group ->
group.subscribe { event ->
println("Type ${group.key()}: $event")
}
}import java.time.Duration
stream
.delayElements(Duration.ofMillis(100))
.subscribe { /* Max 10 per second */ }import org.springframework.ai.chat.client.ChatClient
val chatClient: ChatClient = // ... configured
// Stream from LLM
val llmStream: Flux<String> = chatClient
.prompt()
.user("Generate events as JSONL...")
.stream()
.content()
// Convert to typed stream
val eventStream: Flux<Event> = llmStream
.buffer()
.map { chunks -> chunks.joinToString("") }
.flatMapMany { jsonl -> converter.convertStream(jsonl) }
eventStream.subscribe { println("Event: $it") }val llmStream: Flux<String> = chatClient
.prompt()
.user("Analyze and output JSONL with thinking...")
.stream()
.content()
val eventStream = llmStream
.buffer()
.map { it.joinToString("") }
.flatMapMany { converter.convertStreamWithThinking(it) }
eventStream.subscribe { event ->
when (event) {
is StreamingEvent.Object -> saveToDatabase(event.item)
is StreamingEvent.Thinking -> logThinking(event.content)
}
}Get instructions for LLM prompts.
val instructions = converter.getFormat()
val prompt = """
Generate user events as JSONL.
$instructions
Events:
- User logged in at 10:00
- User clicked button at 10:05
- User logged out at 11:00
""".trimIndent()Format includes:
convertStreamWithThinking)Use batch processing for large streams:
stream.buffer(100).subscribe { batch ->
saveBatch(batch) // More efficient than one-by-one
}Apply backpressure for slow consumers:
stream.onBackpressureBuffer(1000).subscribe { /* ... */ }Parallel processing for CPU-intensive work:
stream.parallel().runOn(Schedulers.parallel()).map { /* ... */ }Early termination if you don't need all data:
stream.take(100).subscribe { /* Only first 100 */ }→ See Integration Patterns for complete examples → See Performance for optimization strategies
tessl i tessl/maven-com-embabel-agent--embabel-agent-common@0.3.1