Fluent DSL and Kotlin DSL for building autonomous agents with planning capabilities on the JVM, featuring annotation-based and programmatic configuration for agentic flows with Spring Boot integration
Streaming enables real-time processing of LLM responses as they are generated. This provides better user experience for long-running generations and allows incremental processing.
Reactive streaming operations interface.
interface StreamingPromptRunner : PromptRunner {
/** Stream text generation */
fun streamText(prompt: String): Flux<String>
/** Stream object creation */
fun <T> streamObject(prompt: String, outputClass: Class<T>): Flux<T>
/** Stream with messages */
fun streamTextFromMessages(messages: List<Message>): Flux<String>
}Basic Streaming Example:
@Action(description = "Stream text generation")
fun streamGeneration(
prompt: String,
context: ActionContext
): String {
val promptRunner = context.promptRunner()
if (!promptRunner.supportsStreaming()) {
// Fall back to non-streaming
return promptRunner.generateText(prompt)
}
val streamingRunner = promptRunner.streaming()
val chunks = mutableListOf<String>()
streamingRunner.streamText(prompt)
.doOnNext { chunk ->
// Process chunk as it arrives
chunks.add(chunk)
context.updateProgress("Generated ${chunks.size} chunks")
}
.blockLast() // Wait for completion
return chunks.joinToString("")
}Query and access streaming capabilities.
interface StreamingCapability {
/** Check if text streaming is supported */
fun supportsTextStreaming(): Boolean
/** Check if object streaming is supported */
fun supportsObjectStreaming(): Boolean
/** Get streaming prompt runner */
fun runner(): StreamingPromptRunner
}Capability Check Example:
@Action(description = "Check streaming support")
fun checkStreamingSupport(context: ActionContext): StreamingStatus {
val promptRunner = context.promptRunner()
if (!promptRunner.supportsStreaming()) {
return StreamingStatus(
supported = false,
textStreaming = false,
objectStreaming = false
)
}
val streaming = promptRunner.streaming()
return StreamingStatus(
supported = true,
textStreaming = streaming.supportsTextStreaming(),
objectStreaming = streaming.supportsObjectStreaming()
)
}
data class StreamingStatus(
val supported: Boolean,
val textStreaming: Boolean,
val objectStreaming: Boolean
)Update progress as content streams.
@Action(description = "Stream with progress updates")
fun streamWithProgress(
prompt: String,
context: ExecutingOperationContext
): String {
val result = StringBuilder()
var chunkCount = 0
context.promptRunner()
.streaming()
.streamText(prompt)
.doOnNext { chunk ->
result.append(chunk)
chunkCount++
// Update progress every 10 chunks
if (chunkCount % 10 == 0) {
context.updateProgress("Received $chunkCount chunks, ${result.length} chars")
}
}
.doOnComplete {
context.updateProgress("Streaming complete: ${result.length} total chars")
}
.blockLast()
return result.toString()
}Streaming-specific operations interface.
interface StreamingPromptRunnerOperations {
/** Stream object creation with examples */
fun <T> streamObjectCreation(
prompt: String,
outputClass: Class<T>,
examples: List<CreationExample<T>>
): Flux<T>
/** Stream text with multimodal content */
fun streamMultimodal(content: MultimodalContent): Flux<String>
}Stream structured objects as they are generated.
data class StreamingAnalysis(
val findings: List<Finding>,
val summary: String,
val confidence: Double
)
data class Finding(
val category: String,
val description: String,
val severity: String
)
@Action(description = "Stream object creation")
fun streamObjectCreation(
data: Dataset,
context: ActionContext
): StreamingAnalysis {
var analysis: StreamingAnalysis? = null
context.promptRunner()
.streaming()
.streamObject("""
Analyze this dataset and provide findings:
${data.summary}
""", StreamingAnalysis::class.java)
.doOnNext { partial ->
// Process partial results as they arrive
logger.info("Received partial analysis: ${partial.findings.size} findings")
analysis = partial
}
.blockLast()
return analysis ?: throw IllegalStateException("No analysis generated")
}Emit events as streaming progresses.
@Action(description = "Stream with events")
fun streamWithEvents(
prompt: String,
context: ExecutingOperationContext
): String {
val result = StringBuilder()
context.promptRunner()
.streaming()
.streamText(prompt)
.doOnSubscribe {
context.sendOutputChannelEvent(
StreamingStartedEvent(prompt)
)
}
.doOnNext { chunk ->
result.append(chunk)
context.sendOutputChannelEvent(
StreamingChunkEvent(chunk, result.length)
)
}
.doOnComplete {
context.sendOutputChannelEvent(
StreamingCompletedEvent(result.toString())
)
}
.doOnError { error ->
context.sendOutputChannelEvent(
StreamingErrorEvent(error.message ?: "Unknown error")
)
}
.blockLast()
return result.toString()
}
data class StreamingStartedEvent(val prompt: String) : OutputChannelEvent
data class StreamingChunkEvent(val chunk: String, val totalLength: Int) : OutputChannelEvent
data class StreamingCompletedEvent(val result: String) : OutputChannelEvent
data class StreamingErrorEvent(val error: String) : OutputChannelEventBuilder for streaming prompt runners (Java-friendly).
public class StreamingPromptRunnerBuilder {
public StreamingPromptRunnerBuilder withModel(String model);
public StreamingPromptRunnerBuilder withTemperature(double temperature);
public StreamingPromptRunnerBuilder withMaxTokens(int maxTokens);
public StreamingPromptRunnerBuilder withSystemPrompt(String systemPrompt);
public StreamingPromptRunner build();
}Java Usage Example:
@Action(description = "Stream in Java")
public String streamInJava(String prompt, ActionContext context) {
StreamingPromptRunner runner = new StreamingPromptRunnerBuilder()
.withModel(OpenAiModels.GPT_4_TURBO)
.withTemperature(0.7)
.withSystemPrompt("You are a helpful assistant")
.build();
StringBuilder result = new StringBuilder();
runner.streamText(prompt)
.doOnNext(chunk -> result.append(chunk))
.blockLast();
return result.toString();
}Stream multiple prompts in parallel.
@Action(description = "Parallel streaming")
fun parallelStreaming(
prompts: List<String>,
context: ActionContext
): List<String> {
val streaming = context.promptRunner().streaming()
return prompts.parallelStream()
.map { prompt ->
val result = StringBuilder()
streaming.streamText(prompt)
.doOnNext { chunk -> result.append(chunk) }
.blockLast()
result.toString()
}
.toList()
}Handle streaming with timeouts.
@Action(description = "Stream with timeout")
fun streamWithTimeout(
prompt: String,
context: ActionContext
): String {
val result = StringBuilder()
try {
context.promptRunner()
.streaming()
.streamText(prompt)
.timeout(Duration.ofMinutes(2))
.doOnNext { chunk -> result.append(chunk) }
.blockLast()
} catch (e: TimeoutException) {
logger.warn("Streaming timed out after 2 minutes")
result.append("\n[Generation timed out]")
}
return result.toString()
}Stream content directly to user interface.
@Action(description = "Stream to UI")
fun streamToUi(
prompt: String,
context: ExecutingOperationContext
): CompletableFuture<String> {
val future = CompletableFuture<String>()
val result = StringBuilder()
context.promptRunner()
.streaming()
.streamText(prompt)
.doOnNext { chunk ->
result.append(chunk)
// Send chunk to UI
context.sendMessage(Message.assistant(chunk))
}
.doOnComplete {
future.complete(result.toString())
}
.doOnError { error ->
future.completeExceptionally(error)
}
.subscribe()
return future
}Process content incrementally as it streams.
@Action(description = "Process incrementally")
fun processIncrementally(
prompt: String,
context: ActionContext
): ProcessedResult {
val sentences = mutableListOf<String>()
val buffer = StringBuilder()
context.promptRunner()
.streaming()
.streamText(prompt)
.doOnNext { chunk ->
buffer.append(chunk)
// Extract complete sentences
val text = buffer.toString()
val sentenceEnd = text.lastIndexOf('.')
if (sentenceEnd > 0) {
val completeSentences = text.substring(0, sentenceEnd + 1)
sentences.addAll(completeSentences.split('.').filter { it.isNotBlank() })
// Keep remainder in buffer
buffer.clear()
buffer.append(text.substring(sentenceEnd + 1))
}
}
.doOnComplete {
// Process any remaining content
if (buffer.isNotEmpty()) {
sentences.add(buffer.toString())
}
}
.blockLast()
return ProcessedResult(sentences)
}
data class ProcessedResult(
val sentences: List<String>
)Handle errors during streaming.
@Action(description = "Stream with error handling")
fun streamWithErrorHandling(
prompt: String,
context: ActionContext
): StreamResult {
val result = StringBuilder()
var error: String? = null
context.promptRunner()
.streaming()
.streamText(prompt)
.doOnNext { chunk ->
result.append(chunk)
}
.doOnError { throwable ->
error = throwable.message
logger.error("Streaming error", throwable)
}
.onErrorResume { throwable ->
// Recover with fallback
Flux.just("[Error occurred: ${throwable.message}]")
}
.blockLast()
return StreamResult(
content = result.toString(),
error = error,
success = error == null
)
}
data class StreamResult(
val content: String,
val error: String?,
val success: Boolean
)Handle backpressure in streaming scenarios.
@Action(description = "Stream with backpressure")
fun streamWithBackpressure(
prompt: String,
context: ActionContext
): String {
val result = StringBuilder()
context.promptRunner()
.streaming()
.streamText(prompt)
.onBackpressureBuffer(1000) // Buffer up to 1000 chunks
.doOnNext { chunk ->
// Slow processing simulation
result.append(chunk)
Thread.sleep(10) // Simulate slow consumer
}
.blockLast()
return result.toString()
}interface PromptRunner {
/** Check if streaming is supported */
fun supportsStreaming(): Boolean
/** Get streaming capability */
fun streaming(): StreamingCapability
}
interface StreamingCapability {
fun supportsTextStreaming(): Boolean
fun supportsObjectStreaming(): Boolean
fun runner(): StreamingPromptRunner
}
/** Reactive Flux from Project Reactor */
interface Flux<T> {
fun doOnNext(consumer: (T) -> Unit): Flux<T>
fun doOnComplete(action: () -> Unit): Flux<T>
fun doOnError(consumer: (Throwable) -> Unit): Flux<T>
fun doOnSubscribe(action: () -> Unit): Flux<T>
fun blockLast(): T?
fun subscribe(): Disposable
fun timeout(duration: Duration): Flux<T>
fun onBackpressureBuffer(capacity: Int): Flux<T>
}
interface Disposable {
fun dispose()
}Install with Tessl CLI
npx tessl i tessl/maven-com-embabel-agent--embabel-agent-api@0.3.0docs