CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-api

Fluent DSL and Kotlin DSL for building autonomous agents with planning capabilities on the JVM, featuring annotation-based and programmatic configuration for agentic flows with Spring Boot integration

Overview
Eval results
Files

streaming.mddocs/

Streaming Support

Streaming enables real-time processing of LLM responses as they are generated. This provides better user experience for long-running generations and allows incremental processing.

Capabilities

StreamingPromptRunner

Reactive streaming operations interface.

interface StreamingPromptRunner : PromptRunner {
    /** Stream text generation */
    fun streamText(prompt: String): Flux<String>

    /** Stream object creation */
    fun <T> streamObject(prompt: String, outputClass: Class<T>): Flux<T>

    /** Stream with messages */
    fun streamTextFromMessages(messages: List<Message>): Flux<String>
}

Basic Streaming Example:

@Action(description = "Stream text generation")
fun streamGeneration(
    prompt: String,
    context: ActionContext
): String {
    val promptRunner = context.promptRunner()

    if (!promptRunner.supportsStreaming()) {
        // Fall back to non-streaming
        return promptRunner.generateText(prompt)
    }

    val streamingRunner = promptRunner.streaming()
    val chunks = mutableListOf<String>()

    streamingRunner.streamText(prompt)
        .doOnNext { chunk ->
            // Process chunk as it arrives
            chunks.add(chunk)
            context.updateProgress("Generated ${chunks.size} chunks")
        }
        .blockLast() // Wait for completion

    return chunks.joinToString("")
}

StreamingCapability

Query and access streaming capabilities.

interface StreamingCapability {
    /** Check if text streaming is supported */
    fun supportsTextStreaming(): Boolean

    /** Check if object streaming is supported */
    fun supportsObjectStreaming(): Boolean

    /** Get streaming prompt runner */
    fun runner(): StreamingPromptRunner
}

Capability Check Example:

@Action(description = "Check streaming support")
fun checkStreamingSupport(context: ActionContext): StreamingStatus {
    val promptRunner = context.promptRunner()

    if (!promptRunner.supportsStreaming()) {
        return StreamingStatus(
            supported = false,
            textStreaming = false,
            objectStreaming = false
        )
    }

    val streaming = promptRunner.streaming()

    return StreamingStatus(
        supported = true,
        textStreaming = streaming.supportsTextStreaming(),
        objectStreaming = streaming.supportsObjectStreaming()
    )
}

data class StreamingStatus(
    val supported: Boolean,
    val textStreaming: Boolean,
    val objectStreaming: Boolean
)

Streaming with Progress Updates

Update progress as content streams.

@Action(description = "Stream with progress updates")
fun streamWithProgress(
    prompt: String,
    context: ExecutingOperationContext
): String {
    val result = StringBuilder()
    var chunkCount = 0

    context.promptRunner()
        .streaming()
        .streamText(prompt)
        .doOnNext { chunk ->
            result.append(chunk)
            chunkCount++

            // Update progress every 10 chunks
            if (chunkCount % 10 == 0) {
                context.updateProgress("Received $chunkCount chunks, ${result.length} chars")
            }
        }
        .doOnComplete {
            context.updateProgress("Streaming complete: ${result.length} total chars")
        }
        .blockLast()

    return result.toString()
}

StreamingPromptRunnerOperations

Streaming-specific operations interface.

interface StreamingPromptRunnerOperations {
    /** Stream object creation with examples */
    fun <T> streamObjectCreation(
        prompt: String,
        outputClass: Class<T>,
        examples: List<CreationExample<T>>
    ): Flux<T>

    /** Stream text with multimodal content */
    fun streamMultimodal(content: MultimodalContent): Flux<String>
}

Streaming Object Creation

Stream structured objects as they are generated.

data class StreamingAnalysis(
    val findings: List<Finding>,
    val summary: String,
    val confidence: Double
)

data class Finding(
    val category: String,
    val description: String,
    val severity: String
)

@Action(description = "Stream object creation")
fun streamObjectCreation(
    data: Dataset,
    context: ActionContext
): StreamingAnalysis {
    var analysis: StreamingAnalysis? = null

    context.promptRunner()
        .streaming()
        .streamObject("""
            Analyze this dataset and provide findings:
            ${data.summary}
        """, StreamingAnalysis::class.java)
        .doOnNext { partial ->
            // Process partial results as they arrive
            logger.info("Received partial analysis: ${partial.findings.size} findings")
            analysis = partial
        }
        .blockLast()

    return analysis ?: throw IllegalStateException("No analysis generated")
}

Streaming with Event Emission

Emit events as streaming progresses.

@Action(description = "Stream with events")
fun streamWithEvents(
    prompt: String,
    context: ExecutingOperationContext
): String {
    val result = StringBuilder()

    context.promptRunner()
        .streaming()
        .streamText(prompt)
        .doOnSubscribe {
            context.sendOutputChannelEvent(
                StreamingStartedEvent(prompt)
            )
        }
        .doOnNext { chunk ->
            result.append(chunk)
            context.sendOutputChannelEvent(
                StreamingChunkEvent(chunk, result.length)
            )
        }
        .doOnComplete {
            context.sendOutputChannelEvent(
                StreamingCompletedEvent(result.toString())
            )
        }
        .doOnError { error ->
            context.sendOutputChannelEvent(
                StreamingErrorEvent(error.message ?: "Unknown error")
            )
        }
        .blockLast()

    return result.toString()
}

data class StreamingStartedEvent(val prompt: String) : OutputChannelEvent
data class StreamingChunkEvent(val chunk: String, val totalLength: Int) : OutputChannelEvent
data class StreamingCompletedEvent(val result: String) : OutputChannelEvent
data class StreamingErrorEvent(val error: String) : OutputChannelEvent

StreamingPromptRunnerBuilder

Builder for streaming prompt runners (Java-friendly).

public class StreamingPromptRunnerBuilder {
    public StreamingPromptRunnerBuilder withModel(String model);
    public StreamingPromptRunnerBuilder withTemperature(double temperature);
    public StreamingPromptRunnerBuilder withMaxTokens(int maxTokens);
    public StreamingPromptRunnerBuilder withSystemPrompt(String systemPrompt);
    public StreamingPromptRunner build();
}

Java Usage Example:

@Action(description = "Stream in Java")
public String streamInJava(String prompt, ActionContext context) {
    StreamingPromptRunner runner = new StreamingPromptRunnerBuilder()
        .withModel(OpenAiModels.GPT_4_TURBO)
        .withTemperature(0.7)
        .withSystemPrompt("You are a helpful assistant")
        .build();

    StringBuilder result = new StringBuilder();

    runner.streamText(prompt)
        .doOnNext(chunk -> result.append(chunk))
        .blockLast();

    return result.toString();
}

Parallel Streaming

Stream multiple prompts in parallel.

@Action(description = "Parallel streaming")
fun parallelStreaming(
    prompts: List<String>,
    context: ActionContext
): List<String> {
    val streaming = context.promptRunner().streaming()

    return prompts.parallelStream()
        .map { prompt ->
            val result = StringBuilder()
            streaming.streamText(prompt)
                .doOnNext { chunk -> result.append(chunk) }
                .blockLast()
            result.toString()
        }
        .toList()
}

Streaming with Timeout

Handle streaming with timeouts.

@Action(description = "Stream with timeout")
fun streamWithTimeout(
    prompt: String,
    context: ActionContext
): String {
    val result = StringBuilder()

    try {
        context.promptRunner()
            .streaming()
            .streamText(prompt)
            .timeout(Duration.ofMinutes(2))
            .doOnNext { chunk -> result.append(chunk) }
            .blockLast()
    } catch (e: TimeoutException) {
        logger.warn("Streaming timed out after 2 minutes")
        result.append("\n[Generation timed out]")
    }

    return result.toString()
}

Streaming to UI

Stream content directly to user interface.

@Action(description = "Stream to UI")
fun streamToUi(
    prompt: String,
    context: ExecutingOperationContext
): CompletableFuture<String> {
    val future = CompletableFuture<String>()
    val result = StringBuilder()

    context.promptRunner()
        .streaming()
        .streamText(prompt)
        .doOnNext { chunk ->
            result.append(chunk)
            // Send chunk to UI
            context.sendMessage(Message.assistant(chunk))
        }
        .doOnComplete {
            future.complete(result.toString())
        }
        .doOnError { error ->
            future.completeExceptionally(error)
        }
        .subscribe()

    return future
}

Incremental Processing

Process content incrementally as it streams.

@Action(description = "Process incrementally")
fun processIncrementally(
    prompt: String,
    context: ActionContext
): ProcessedResult {
    val sentences = mutableListOf<String>()
    val buffer = StringBuilder()

    context.promptRunner()
        .streaming()
        .streamText(prompt)
        .doOnNext { chunk ->
            buffer.append(chunk)

            // Extract complete sentences
            val text = buffer.toString()
            val sentenceEnd = text.lastIndexOf('.')

            if (sentenceEnd > 0) {
                val completeSentences = text.substring(0, sentenceEnd + 1)
                sentences.addAll(completeSentences.split('.').filter { it.isNotBlank() })

                // Keep remainder in buffer
                buffer.clear()
                buffer.append(text.substring(sentenceEnd + 1))
            }
        }
        .doOnComplete {
            // Process any remaining content
            if (buffer.isNotEmpty()) {
                sentences.add(buffer.toString())
            }
        }
        .blockLast()

    return ProcessedResult(sentences)
}

data class ProcessedResult(
    val sentences: List<String>
)

Streaming Error Handling

Handle errors during streaming.

@Action(description = "Stream with error handling")
fun streamWithErrorHandling(
    prompt: String,
    context: ActionContext
): StreamResult {
    val result = StringBuilder()
    var error: String? = null

    context.promptRunner()
        .streaming()
        .streamText(prompt)
        .doOnNext { chunk ->
            result.append(chunk)
        }
        .doOnError { throwable ->
            error = throwable.message
            logger.error("Streaming error", throwable)
        }
        .onErrorResume { throwable ->
            // Recover with fallback
            Flux.just("[Error occurred: ${throwable.message}]")
        }
        .blockLast()

    return StreamResult(
        content = result.toString(),
        error = error,
        success = error == null
    )
}

data class StreamResult(
    val content: String,
    val error: String?,
    val success: Boolean
)

Reactive Streaming with Backpressure

Handle backpressure in streaming scenarios.

@Action(description = "Stream with backpressure")
fun streamWithBackpressure(
    prompt: String,
    context: ActionContext
): String {
    val result = StringBuilder()

    context.promptRunner()
        .streaming()
        .streamText(prompt)
        .onBackpressureBuffer(1000) // Buffer up to 1000 chunks
        .doOnNext { chunk ->
            // Slow processing simulation
            result.append(chunk)
            Thread.sleep(10) // Simulate slow consumer
        }
        .blockLast()

    return result.toString()
}

Types

interface PromptRunner {
    /** Check if streaming is supported */
    fun supportsStreaming(): Boolean

    /** Get streaming capability */
    fun streaming(): StreamingCapability
}

interface StreamingCapability {
    fun supportsTextStreaming(): Boolean
    fun supportsObjectStreaming(): Boolean
    fun runner(): StreamingPromptRunner
}

/** Reactive Flux from Project Reactor */
interface Flux<T> {
    fun doOnNext(consumer: (T) -> Unit): Flux<T>
    fun doOnComplete(action: () -> Unit): Flux<T>
    fun doOnError(consumer: (Throwable) -> Unit): Flux<T>
    fun doOnSubscribe(action: () -> Unit): Flux<T>
    fun blockLast(): T?
    fun subscribe(): Disposable
    fun timeout(duration: Duration): Flux<T>
    fun onBackpressureBuffer(capacity: Int): Flux<T>
}

interface Disposable {
    fun dispose()
}

Install with Tessl CLI

npx tessl i tessl/maven-com-embabel-agent--embabel-agent-api@0.3.0

docs

actions-goals.md

agent-definition.md

builtin-tools.md

chat.md

conditions.md

events.md

human-in-the-loop.md

index.md

invocation.md

io-binding.md

llm-interaction.md

models.md

planning-workflows.md

platform-management.md

runtime-context.md

state-management.md

streaming.md

subagents.md

tools.md

type-system.md

typed-operations.md

validation.md

tile.json