CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-common

Common AI framework utilities for the Embabel Agent system including LLM configuration, output converters, prompt contributors, and embedding service abstractions.

Overview
Eval results
Files

streaming.mddocs/core/

Streaming

Reactive streaming converters for JSONL format with thinking block detection. Returns Project Reactor Flux streams with error recovery.

Dependencies: Requires com.embabel.common.core.streaming package (embabel-common-core) for StreamingEvent and ThinkingState types.

Core Concepts

JSONL Format: One JSON object per line StreamingEvent: Sealed interface for data or thinking events Reactive Streams: Project Reactor Flux for backpressure support Thinking Blocks: LLM reasoning content extraction

Basic Streaming

class StreamingJacksonOutputConverter<T>(
    clazz: Class<T>,
    objectMapper: ObjectMapper,
    propertyFilter: Predicate<String> = Predicate { true }
) : FilteringJacksonOutputConverter<T> {
    fun convertStream(jsonlContent: String): Flux<T>
    fun convertStreamWithThinking(text: String): Flux<StreamingEvent<T>>
    override fun getFormat(): String
}

Simple Example

import reactor.core.publisher.Flux

data class Event(val type: String, val message: String, val timestamp: Long)

val converter = StreamingJacksonOutputConverter(
    Event::class.java,
    ObjectMapper().registerKotlinModule()
)

// JSONL input (one JSON per line)
val jsonl = """
{"type":"login","message":"User logged in","timestamp":1234567890}
{"type":"action","message":"Button clicked","timestamp":1234567891}
{"type":"logout","message":"User logged out","timestamp":1234567892}
""".trimIndent()

// Convert to stream
val stream: Flux<Event> = converter.convertStream(jsonl)

// Subscribe
stream.subscribe { event ->
    println("${event.type}: ${event.message}")
}

Collect Results

// Block and collect all
val events = stream.collectList().block()

// Process as list
events?.forEach { println(it) }

Thinking Blocks

Extract LLM reasoning separately from data.

Thinking Tags

XML-style (recommended):

<think>
Reasoning content here
</think>

Legacy prefix:

[THINKING] Reasoning content

Mixed Content Example

val mixedResponse = """
<think>
Analyzing user activity log...
</think>
{"type":"login","message":"User logged in","timestamp":1234567890}
<think>
Processing next event...
</think>
{"type":"action","message":"Button clicked","timestamp":1234567891}
{"type":"logout","message":"User logged out","timestamp":1234567892}
""".trimIndent()

val stream: Flux<StreamingEvent<Event>> =
    converter.convertStreamWithThinking(mixedResponse)

Process Events

stream.subscribe { event ->
    when (event) {
        is StreamingEvent.Object -> {
            println("Data: ${event.item}")
        }
        is StreamingEvent.Thinking -> {
            println("Thinking: ${event.content}")
            println("State: ${event.state}")
        }
    }
}

Filter Event Types

Data only:

stream
    .filter { it is StreamingEvent.Object }
    .map { (it as StreamingEvent.Object).item }
    .subscribe { println("Event: $it") }

Thinking only:

stream
    .filter { it is StreamingEvent.Thinking }
    .map { (it as StreamingEvent.Thinking).content }
    .subscribe { println("Thought: $it") }

Event Types

sealed interface StreamingEvent<T> {
    data class Object<T>(val item: T) : StreamingEvent<T>
    data class Thinking(
        val content: String,
        val state: ThinkingState
    ) : StreamingEvent<Nothing>
}

Thinking States

enum class ThinkingState {
    NONE,          // No thinking detected
    START,         // Start of thinking block
    END,           // End of thinking block
    BOTH,          // Start and end in same line
    CONTINUATION   // Multi-line thinking continuation
}

Usage:

stream.subscribe { event ->
    if (event is StreamingEvent.Thinking) {
        when (event.state) {
            ThinkingState.START -> println(">>> Thinking started")
            ThinkingState.END -> println("<<< Thinking ended")
            ThinkingState.CONTINUATION -> println("... ${event.content}")
            ThinkingState.BOTH -> println("! Quick thought: ${event.content}")
            ThinkingState.NONE -> {}
        }
    }
}

Error Handling

Per-line error recovery automatically skips invalid lines.

Invalid Lines Skipped

val mixedQuality = """
{"type":"login","message":"Valid","timestamp":1234567890}
{invalid json here}
{"type":"action","message":"Also valid","timestamp":1234567891}
not json at all
{"type":"logout","message":"Still valid","timestamp":1234567892}
""".trimIndent()

val stream = converter.convertStream(mixedQuality)

// Only valid lines processed, invalid skipped
stream.subscribe { event ->
    println("Valid: $event")
}

Error Callbacks

stream.subscribe(
    { event -> println("Success: $event") },
    { error -> println("Stream error: ${error.message}") },
    { println("Stream complete") }
)

Reactive Operations

Leverage Project Reactor operators for stream processing.

Filter

stream
    .filter { it.type == "login" }
    .subscribe { println("Login: $it") }

Map

stream
    .map { event -> event.copy(message = event.message.uppercase()) }
    .subscribe { println(it) }

Take

stream.take(5).subscribe { println(it) }

Buffer

stream
    .buffer(10)
    .subscribe { batch -> println("Batch: ${batch.size} events") }

Window

import java.time.Duration

stream
    .window(Duration.ofSeconds(1))
    .subscribe { window ->
        window.collectList().subscribe { events ->
            println("Window: ${events.size} events")
        }
    }

FlatMap

stream
    .flatMap { event ->
        Flux.just(event, event.copy(message = "Duplicate"))
    }
    .subscribe { println(it) }

Zip

import reactor.core.publisher.Flux

val numbers = Flux.range(1, 100)
stream.zipWith(numbers).subscribe { (event, num) ->
    println("$num: $event")
}

Backpressure

Flux naturally supports backpressure for slow consumers.

import java.time.Duration

stream
    .delayElements(Duration.ofMillis(100)) // Slow processing
    .subscribe { event ->
        // Process without memory issues
        processExpensively(event)
    }

Advanced Patterns

Stateful Processing

data class EventWithCount(val event: Event, val count: Int)

stream
    .scan(Pair(null as Event?, 0)) { acc, event ->
        Pair(event, acc.second + 1)
    }
    .skip(1)
    .map { EventWithCount(it.first!!, it.second) }
    .subscribe { println("Event #${it.count}: ${it.event}") }

Parallel Processing

import reactor.core.scheduler.Schedulers

stream
    .parallel()
    .runOn(Schedulers.parallel())
    .map { event ->
        // Process in parallel
        expensiveOperation(event)
    }
    .sequential()
    .subscribe { println("Processed: $it") }

Grouped Processing

stream
    .groupBy { it.type }
    .subscribe { group ->
        group.subscribe { event ->
            println("Type ${group.key()}: $event")
        }
    }

Rate Limiting

import java.time.Duration

stream
    .delayElements(Duration.ofMillis(100))
    .subscribe { /* Max 10 per second */ }

Spring AI Integration

Streaming LLM Response

import org.springframework.ai.chat.client.ChatClient

val chatClient: ChatClient = // ... configured

// Stream from LLM
val llmStream: Flux<String> = chatClient
    .prompt()
    .user("Generate events as JSONL...")
    .stream()
    .content()

// Convert to typed stream
val eventStream: Flux<Event> = llmStream
    .buffer()
    .map { chunks -> chunks.joinToString("") }
    .flatMapMany { jsonl -> converter.convertStream(jsonl) }

eventStream.subscribe { println("Event: $it") }

With Thinking Extraction

val llmStream: Flux<String> = chatClient
    .prompt()
    .user("Analyze and output JSONL with thinking...")
    .stream()
    .content()

val eventStream = llmStream
    .buffer()
    .map { it.joinToString("") }
    .flatMapMany { converter.convertStreamWithThinking(it) }

eventStream.subscribe { event ->
    when (event) {
        is StreamingEvent.Object -> saveToDatabase(event.item)
        is StreamingEvent.Thinking -> logThinking(event.content)
    }
}

Format Instructions

Get instructions for LLM prompts.

val instructions = converter.getFormat()

val prompt = """
    Generate user events as JSONL.

    $instructions

    Events:
    - User logged in at 10:00
    - User clicked button at 10:05
    - User logged out at 11:00
""".trimIndent()

Format includes:

  • JSONL specification (one JSON per line)
  • JSON schema for the type
  • Thinking block syntax (if using convertStreamWithThinking)

Performance Tips

Use batch processing for large streams:

stream.buffer(100).subscribe { batch ->
    saveBatch(batch) // More efficient than one-by-one
}

Apply backpressure for slow consumers:

stream.onBackpressureBuffer(1000).subscribe { /* ... */ }

Parallel processing for CPU-intensive work:

stream.parallel().runOn(Schedulers.parallel()).map { /* ... */ }

Early termination if you don't need all data:

stream.take(100).subscribe { /* Only first 100 */ }

→ See Integration Patterns for complete examples → See Performance for optimization strategies

tessl i tessl/maven-com-embabel-agent--embabel-agent-common@0.3.1

docs

index.md

quick-reference.md

README.md

tile.json