CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-starter

Base starter module for the Embabel Agent Framework providing core dependencies for building agentic flows on the JVM with Spring Boot integration and GOAP-based intelligent path finding.

Overview
Eval results
Files

reference-streaming.mddocs/

Streaming API Reference

Complete reference for Java streaming API using StreamingPromptRunner.

StreamingPromptRunnerBuilder

package com.embabel.agent.api.streaming;

/**
 * Builder for creating streaming prompt runners
 */
public record StreamingPromptRunnerBuilder(/* internal fields */) {
    /**
     * Enable streaming mode
     * @return StreamingPromptRunner.Streaming instance for streaming operations
     */
    public StreamingPromptRunner.Streaming withStreaming();
}

StreamingPromptRunner.Streaming

public interface Streaming {
    /**
     * Stream tokens with a callback
     * @param consumer Function to process each token
     */
    void stream(Consumer<String> consumer);

    /**
     * Stream tokens as a Flux (reactive streams)
     * @return Flux of string tokens
     */
    Flux<String> streamAsFlux();

    /**
     * Collect all tokens into a complete response
     * @return Complete response as a single string
     */
    String collectAll();
}

SSE Configuration Properties

PropertyTypeDefaultDescription
embabel.agent.platform.sse.max-buffer-sizeint100Maximum buffer size per SSE stream
embabel.agent.platform.sse.max-process-buffersint1000Maximum number of process buffers
embabel:
  agent:
    platform:
      sse:
        max-buffer-size: 100
        max-process-buffers: 1000

Timeout Configuration

PropertyTypeDefaultDescription
spring.ai.mcp.client.request-timeoutDuration30sRequest timeout for streaming operations
spring:
  ai:
    mcp:
      client:
        request-timeout: 30s

Supported formats: 30s, 1m, 5m30s, PT30S

Virtual Threads Configuration

PropertyTypeDefaultDescription
spring.threads.virtual.enabledbooleantrueEnable virtual threads for concurrent streaming
spring:
  threads:
    virtual:
      enabled: true

Basic Streaming Example

import com.embabel.agent.api.streaming.StreamingPromptRunnerBuilder;
import com.embabel.agent.api.streaming.StreamingPromptRunner;

@Action
public void streamResponse(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    StreamingPromptRunner.Streaming streaming = builder.withStreaming();

    streaming.stream(token -> {
        System.out.print(token);
    });
}

Stream to Flux

import reactor.core.publisher.Flux;

@Action
public Flux<String> streamToUI(String userQuery) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        userQuery,
        llmOptions
    );

    return builder.withStreaming().streamAsFlux();
}

Collect All Tokens

@Action
public String streamAndCollect(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    return builder.withStreaming().collectAll();
}

Progress Tracking

@Action
public Result streamWithProgress(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    StreamingPromptRunner.Streaming streaming = builder.withStreaming();

    StringBuilder response = new StringBuilder();
    AtomicInteger tokenCount = new AtomicInteger(0);

    streaming.stream(token -> {
        response.append(token);
        int count = tokenCount.incrementAndGet();

        if (count % 10 == 0) {
            logger.info("Processed {} tokens", count);
        }
    });

    return new Result(response.toString(), tokenCount.get());
}

Token Filtering

@Action
public String streamWithFiltering(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    StreamingPromptRunner.Streaming streaming = builder.withStreaming();

    StringBuilder filtered = new StringBuilder();

    streaming.stream(token -> {
        if (!token.contains("REDACTED")) {
            filtered.append(token);
        }
    });

    return filtered.toString();
}

REST Controller with SSE

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class StreamingAgentController {
    private final AgentPlatform platform;

    @GetMapping(value = "/api/stream", produces = "text/event-stream")
    public Flux<String> streamResponse(@RequestParam String query) {
        StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
            aiService,
            query,
            options
        );

        return builder.withStreaming().streamAsFlux();
    }
}

Convert to Mono

import reactor.core.publisher.Mono;

@Action
public Mono<String> streamToMono(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    StreamingPromptRunner.Streaming streaming = builder.withStreaming();

    return Mono.fromCallable(streaming::collectAll);
}

Flux Transformations

import reactor.core.publisher.Flux;

@Action
public Flux<ProcessedToken> streamAndTransform(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    return builder.withStreaming()
        .streamAsFlux()
        .map(token -> new ProcessedToken(token, token.length()))
        .filter(processed -> processed.length() > 0);
}

Error Handling

Stream Error Recovery

@Action
public void streamWithErrorHandling(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    StreamingPromptRunner.Streaming streaming = builder.withStreaming();

    try {
        streaming.stream(token -> {
            try {
                processToken(token);
            } catch (Exception e) {
                logger.error("Error processing token: {}", token, e);
            }
        });
    } catch (StreamingException e) {
        logger.error("Streaming failed", e);
    }
}

Reactive Error Handling

@Action
public Flux<String> streamWithReactiveErrorHandling(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    return builder.withStreaming()
        .streamAsFlux()
        .onErrorResume(error -> {
            logger.error("Stream error", error);
            return Flux.just("[Error occurred]");
        })
        .retry(3);
}

Backpressure Configuration

@Action
public Flux<String> streamWithBackpressure(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    return builder.withStreaming()
        .streamAsFlux()
        .onBackpressureBuffer(1000)
        .limitRate(100);
}

Custom Timeout

import java.time.Duration;

@Action
public Flux<String> streamWithTimeout(String prompt) {
    StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
        aiService,
        prompt,
        options
    );

    return builder.withStreaming()
        .streamAsFlux()
        .timeout(Duration.ofSeconds(30))
        .onErrorResume(TimeoutException.class, e ->
            Flux.just("[Response timeout]")
        );
}

Kotlin Alternative

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect

@Action
suspend fun streamResponse(prompt: String) {
    val stream: Flow<String> = aiService.streamPrompt(prompt)

    stream.collect { token ->
        println(token)
    }
}

Complete Example

package com.embabel.agent.api.agents;

import com.embabel.agent.api.annotation.*;
import com.embabel.agent.api.streaming.StreamingPromptRunnerBuilder;
import com.embabel.agent.api.streaming.StreamingPromptRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Agent(description = "Streaming response agent")
@Component
public class StreamingAgent {

    private final AiService aiService;

    public StreamingAgent(AiService aiService) {
        this.aiService = aiService;
    }

    @AchievesGoal(
        description = "Generate streaming response to user query",
        export = @Export(remote = true, local = true)
    )
    @Action
    public Flux<String> streamResponse(String userQuery) {
        StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
            aiService,
            userQuery,
            createOptions()
        );

        return builder.withStreaming()
            .streamAsFlux()
            .doOnNext(token -> logger.debug("Token: {}", token))
            .doOnComplete(() -> logger.info("Stream completed"))
            .doOnError(error -> logger.error("Stream error", error))
            .onErrorResume(error -> Flux.just("[Error: " + error.getMessage() + "]"));
    }

    private LlmOptions createOptions() {
        return LlmOptions.builder()
            .temperature(0.7)
            .maxTokens(2000)
            .build();
    }
}

Configuration Examples

Development

# application-dev.yml
embabel:
  agent:
    platform:
      sse:
        max-buffer-size: 50
        max-process-buffers: 100

spring:
  ai:
    mcp:
      client:
        request-timeout: 10s
  threads:
    virtual:
      enabled: true

Production

# application-prod.yml
embabel:
  agent:
    platform:
      sse:
        max-buffer-size: 200
        max-process-buffers: 5000

spring:
  ai:
    mcp:
      client:
        request-timeout: 60s
  threads:
    virtual:
      enabled: true

Environment Variables

# SSE buffer configuration
export EMBABEL_AGENT_PLATFORM_SSE_MAX_BUFFER_SIZE=100
export EMBABEL_AGENT_PLATFORM_SSE_MAX_PROCESS_BUFFERS=1000

# Request timeout
export SPRING_AI_MCP_CLIENT_REQUEST_TIMEOUT=30s

# Virtual threads
export SPRING_THREADS_VIRTUAL_ENABLED=true
tessl i tessl/maven-com-embabel-agent--embabel-agent-starter@0.3.1

docs

api-annotations.md

api-domain-model.md

api-invocation.md

api-tools.md

concepts-actions.md

concepts-agents.md

concepts-goals.md

concepts-invocation.md

concepts-tools.md

guides-creating-agents.md

guides-creating-tools.md

guides-defining-actions.md

guides-goal-achievement.md

guides-human-in-loop.md

guides-multimodal.md

index.md

integration-mcp.md

integration-model-providers.md

integration-spring-boot.md

LlmTool.md

quickstart.md

reference-component-scanning.md

reference-configuration-properties.md

reference-installation.md

reference-logging.md

reference-resilience.md

reference-streaming.md

tile.json