Common AI framework utilities for the Embabel Agent system including LLM configuration, output converters, prompt contributors, and embedding service abstractions.
—
Reactive streaming converters for JSONL format with thinking block detection. Returns Project Reactor Flux streams with error recovery.
Dependencies: Requires com.embabel.common.core.streaming package (embabel-common-core) for StreamingEvent and ThinkingState types.
JSONL Format: One JSON object per line
StreamingEvent: Sealed interface for data or thinking events
Reactive Streams: Project Reactor Flux for backpressure support
Thinking Blocks: LLM reasoning content extraction
class StreamingJacksonOutputConverter<T>(
clazz: Class<T>,
objectMapper: ObjectMapper,
propertyFilter: Predicate<String> = Predicate { true }
) : FilteringJacksonOutputConverter<T> {
fun convertStream(jsonlContent: String): Flux<T>
fun convertStreamWithThinking(text: String): Flux<StreamingEvent<T>>
override fun getFormat(): String
}import reactor.core.publisher.Flux
data class Event(val type: String, val message: String, val timestamp: Long)
val converter = StreamingJacksonOutputConverter(
Event::class.java,
ObjectMapper().registerKotlinModule()
)
// JSONL input (one JSON per line)
val jsonl = """
{"type":"login","message":"User logged in","timestamp":1234567890}
{"type":"action","message":"Button clicked","timestamp":1234567891}
{"type":"logout","message":"User logged out","timestamp":1234567892}
""".trimIndent()
// Convert to stream
val stream: Flux<Event> = converter.convertStream(jsonl)
// Subscribe
stream.subscribe { event ->
println("${event.type}: ${event.message}")
}// Block and collect all
val events = stream.collectList().block()
// Process as list
events?.forEach { println(it) }Extract LLM reasoning separately from data.
XML-style (recommended):
<think>
Reasoning content here
</think>Legacy prefix:
[THINKING] Reasoning contentval mixedResponse = """
<think>
Analyzing user activity log...
</think>
{"type":"login","message":"User logged in","timestamp":1234567890}
<think>
Processing next event...
</think>
{"type":"action","message":"Button clicked","timestamp":1234567891}
{"type":"logout","message":"User logged out","timestamp":1234567892}
""".trimIndent()
val stream: Flux<StreamingEvent<Event>> =
converter.convertStreamWithThinking(mixedResponse)stream.subscribe { event ->
when (event) {
is StreamingEvent.Object -> {
println("Data: ${event.item}")
}
is StreamingEvent.Thinking -> {
println("Thinking: ${event.content}")
println("State: ${event.state}")
}
}
}Data only:
stream
.filter { it is StreamingEvent.Object }
.map { (it as StreamingEvent.Object).item }
.subscribe { println("Event: $it") }Thinking only:
stream
.filter { it is StreamingEvent.Thinking }
.map { (it as StreamingEvent.Thinking).content }
.subscribe { println("Thought: $it") }sealed interface StreamingEvent<T> {
data class Object<T>(val item: T) : StreamingEvent<T>
data class Thinking(
val content: String,
val state: ThinkingState
) : StreamingEvent<Nothing>
}enum class ThinkingState {
NONE, // No thinking detected
START, // Start of thinking block
END, // End of thinking block
BOTH, // Start and end in same line
CONTINUATION // Multi-line thinking continuation
}Usage:
stream.subscribe { event ->
if (event is StreamingEvent.Thinking) {
when (event.state) {
ThinkingState.START -> println(">>> Thinking started")
ThinkingState.END -> println("<<< Thinking ended")
ThinkingState.CONTINUATION -> println("... ${event.content}")
ThinkingState.BOTH -> println("! Quick thought: ${event.content}")
ThinkingState.NONE -> {}
}
}
}Per-line error recovery automatically skips invalid lines.
val mixedQuality = """
{"type":"login","message":"Valid","timestamp":1234567890}
{invalid json here}
{"type":"action","message":"Also valid","timestamp":1234567891}
not json at all
{"type":"logout","message":"Still valid","timestamp":1234567892}
""".trimIndent()
val stream = converter.convertStream(mixedQuality)
// Only valid lines processed, invalid skipped
stream.subscribe { event ->
println("Valid: $event")
}stream.subscribe(
{ event -> println("Success: $event") },
{ error -> println("Stream error: ${error.message}") },
{ println("Stream complete") }
)Leverage Project Reactor operators for stream processing.
stream
.filter { it.type == "login" }
.subscribe { println("Login: $it") }stream
.map { event -> event.copy(message = event.message.uppercase()) }
.subscribe { println(it) }stream.take(5).subscribe { println(it) }stream
.buffer(10)
.subscribe { batch -> println("Batch: ${batch.size} events") }import java.time.Duration
stream
.window(Duration.ofSeconds(1))
.subscribe { window ->
window.collectList().subscribe { events ->
println("Window: ${events.size} events")
}
}stream
.flatMap { event ->
Flux.just(event, event.copy(message = "Duplicate"))
}
.subscribe { println(it) }import reactor.core.publisher.Flux
val numbers = Flux.range(1, 100)
stream.zipWith(numbers).subscribe { (event, num) ->
println("$num: $event")
}Flux naturally supports backpressure for slow consumers.
import java.time.Duration
stream
.delayElements(Duration.ofMillis(100)) // Slow processing
.subscribe { event ->
// Process without memory issues
processExpensively(event)
}data class EventWithCount(val event: Event, val count: Int)
stream
.scan(Pair(null as Event?, 0)) { acc, event ->
Pair(event, acc.second + 1)
}
.skip(1)
.map { EventWithCount(it.first!!, it.second) }
.subscribe { println("Event #${it.count}: ${it.event}") }import reactor.core.scheduler.Schedulers
stream
.parallel()
.runOn(Schedulers.parallel())
.map { event ->
// Process in parallel
expensiveOperation(event)
}
.sequential()
.subscribe { println("Processed: $it") }stream
.groupBy { it.type }
.subscribe { group ->
group.subscribe { event ->
println("Type ${group.key()}: $event")
}
}import java.time.Duration
stream
.delayElements(Duration.ofMillis(100))
.subscribe { /* Max 10 per second */ }import org.springframework.ai.chat.client.ChatClient
val chatClient: ChatClient = // ... configured
// Stream from LLM
val llmStream: Flux<String> = chatClient
.prompt()
.user("Generate events as JSONL...")
.stream()
.content()
// Convert to typed stream
val eventStream: Flux<Event> = llmStream
.buffer()
.map { chunks -> chunks.joinToString("") }
.flatMapMany { jsonl -> converter.convertStream(jsonl) }
eventStream.subscribe { println("Event: $it") }val llmStream: Flux<String> = chatClient
.prompt()
.user("Analyze and output JSONL with thinking...")
.stream()
.content()
val eventStream = llmStream
.buffer()
.map { it.joinToString("") }
.flatMapMany { converter.convertStreamWithThinking(it) }
eventStream.subscribe { event ->
when (event) {
is StreamingEvent.Object -> saveToDatabase(event.item)
is StreamingEvent.Thinking -> logThinking(event.content)
}
}Get instructions for LLM prompts.
val instructions = converter.getFormat()
val prompt = """
Generate user events as JSONL.
$instructions
Events:
- User logged in at 10:00
- User clicked button at 10:05
- User logged out at 11:00
""".trimIndent()Format includes:
convertStreamWithThinking)Use batch processing for large streams:
stream.buffer(100).subscribe { batch ->
saveBatch(batch) // More efficient than one-by-one
}Apply backpressure for slow consumers:
stream.onBackpressureBuffer(1000).subscribe { /* ... */ }Parallel processing for CPU-intensive work:
stream.parallel().runOn(Schedulers.parallel()).map { /* ... */ }Early termination if you don't need all data:
stream.take(100).subscribe { /* Only first 100 */ }→ See Integration Patterns for complete examples → See Performance for optimization strategies
Install with Tessl CLI
npx tessl i tessl/maven-com-embabel-agent--embabel-agent-common@0.3.0