Base starter module for the Embabel Agent Framework providing core dependencies for building agentic flows on the JVM with Spring Boot integration and GOAP-based intelligent path finding.
Complete reference for Java streaming API using StreamingPromptRunner.
package com.embabel.agent.api.streaming;
/**
* Builder for creating streaming prompt runners
*/
public record StreamingPromptRunnerBuilder(/* internal fields */) {
/**
* Enable streaming mode
* @return StreamingPromptRunner.Streaming instance for streaming operations
*/
public StreamingPromptRunner.Streaming withStreaming();
}public interface Streaming {
/**
* Stream tokens with a callback
* @param consumer Function to process each token
*/
void stream(Consumer<String> consumer);
/**
* Stream tokens as a Flux (reactive streams)
* @return Flux of string tokens
*/
Flux<String> streamAsFlux();
/**
* Collect all tokens into a complete response
* @return Complete response as a single string
*/
String collectAll();
}| Property | Type | Default | Description |
|---|---|---|---|
embabel.agent.platform.sse.max-buffer-size | int | 100 | Maximum buffer size per SSE stream |
embabel.agent.platform.sse.max-process-buffers | int | 1000 | Maximum number of process buffers |
embabel:
agent:
platform:
sse:
max-buffer-size: 100
max-process-buffers: 1000| Property | Type | Default | Description |
|---|---|---|---|
spring.ai.mcp.client.request-timeout | Duration | 30s | Request timeout for streaming operations |
spring:
ai:
mcp:
client:
request-timeout: 30sSupported formats: 30s, 1m, 5m30s, PT30S
| Property | Type | Default | Description |
|---|---|---|---|
spring.threads.virtual.enabled | boolean | true | Enable virtual threads for concurrent streaming |
spring:
threads:
virtual:
enabled: trueimport com.embabel.agent.api.streaming.StreamingPromptRunnerBuilder;
import com.embabel.agent.api.streaming.StreamingPromptRunner;
@Action
public void streamResponse(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
StreamingPromptRunner.Streaming streaming = builder.withStreaming();
streaming.stream(token -> {
System.out.print(token);
});
}import reactor.core.publisher.Flux;
@Action
public Flux<String> streamToUI(String userQuery) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
userQuery,
llmOptions
);
return builder.withStreaming().streamAsFlux();
}@Action
public String streamAndCollect(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
return builder.withStreaming().collectAll();
}@Action
public Result streamWithProgress(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
StreamingPromptRunner.Streaming streaming = builder.withStreaming();
StringBuilder response = new StringBuilder();
AtomicInteger tokenCount = new AtomicInteger(0);
streaming.stream(token -> {
response.append(token);
int count = tokenCount.incrementAndGet();
if (count % 10 == 0) {
logger.info("Processed {} tokens", count);
}
});
return new Result(response.toString(), tokenCount.get());
}@Action
public String streamWithFiltering(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
StreamingPromptRunner.Streaming streaming = builder.withStreaming();
StringBuilder filtered = new StringBuilder();
streaming.stream(token -> {
if (!token.contains("REDACTED")) {
filtered.append(token);
}
});
return filtered.toString();
}import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class StreamingAgentController {
private final AgentPlatform platform;
@GetMapping(value = "/api/stream", produces = "text/event-stream")
public Flux<String> streamResponse(@RequestParam String query) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
query,
options
);
return builder.withStreaming().streamAsFlux();
}
}import reactor.core.publisher.Mono;
@Action
public Mono<String> streamToMono(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
StreamingPromptRunner.Streaming streaming = builder.withStreaming();
return Mono.fromCallable(streaming::collectAll);
}import reactor.core.publisher.Flux;
@Action
public Flux<ProcessedToken> streamAndTransform(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
return builder.withStreaming()
.streamAsFlux()
.map(token -> new ProcessedToken(token, token.length()))
.filter(processed -> processed.length() > 0);
}@Action
public void streamWithErrorHandling(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
StreamingPromptRunner.Streaming streaming = builder.withStreaming();
try {
streaming.stream(token -> {
try {
processToken(token);
} catch (Exception e) {
logger.error("Error processing token: {}", token, e);
}
});
} catch (StreamingException e) {
logger.error("Streaming failed", e);
}
}@Action
public Flux<String> streamWithReactiveErrorHandling(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
return builder.withStreaming()
.streamAsFlux()
.onErrorResume(error -> {
logger.error("Stream error", error);
return Flux.just("[Error occurred]");
})
.retry(3);
}@Action
public Flux<String> streamWithBackpressure(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
return builder.withStreaming()
.streamAsFlux()
.onBackpressureBuffer(1000)
.limitRate(100);
}import java.time.Duration;
@Action
public Flux<String> streamWithTimeout(String prompt) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
prompt,
options
);
return builder.withStreaming()
.streamAsFlux()
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class, e ->
Flux.just("[Response timeout]")
);
}import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
@Action
suspend fun streamResponse(prompt: String) {
val stream: Flow<String> = aiService.streamPrompt(prompt)
stream.collect { token ->
println(token)
}
}package com.embabel.agent.api.agents;
import com.embabel.agent.api.annotation.*;
import com.embabel.agent.api.streaming.StreamingPromptRunnerBuilder;
import com.embabel.agent.api.streaming.StreamingPromptRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@Agent(description = "Streaming response agent")
@Component
public class StreamingAgent {
private final AiService aiService;
public StreamingAgent(AiService aiService) {
this.aiService = aiService;
}
@AchievesGoal(
description = "Generate streaming response to user query",
export = @Export(remote = true, local = true)
)
@Action
public Flux<String> streamResponse(String userQuery) {
StreamingPromptRunnerBuilder builder = new StreamingPromptRunnerBuilder(
aiService,
userQuery,
createOptions()
);
return builder.withStreaming()
.streamAsFlux()
.doOnNext(token -> logger.debug("Token: {}", token))
.doOnComplete(() -> logger.info("Stream completed"))
.doOnError(error -> logger.error("Stream error", error))
.onErrorResume(error -> Flux.just("[Error: " + error.getMessage() + "]"));
}
private LlmOptions createOptions() {
return LlmOptions.builder()
.temperature(0.7)
.maxTokens(2000)
.build();
}
}# application-dev.yml
embabel:
agent:
platform:
sse:
max-buffer-size: 50
max-process-buffers: 100
spring:
ai:
mcp:
client:
request-timeout: 10s
threads:
virtual:
enabled: true# application-prod.yml
embabel:
agent:
platform:
sse:
max-buffer-size: 200
max-process-buffers: 5000
spring:
ai:
mcp:
client:
request-timeout: 60s
threads:
virtual:
enabled: true# SSE buffer configuration
export EMBABEL_AGENT_PLATFORM_SSE_MAX_BUFFER_SIZE=100
export EMBABEL_AGENT_PLATFORM_SSE_MAX_PROCESS_BUFFERS=1000
# Request timeout
export SPRING_AI_MCP_CLIENT_REQUEST_TIMEOUT=30s
# Virtual threads
export SPRING_THREADS_VIRTUAL_ENABLED=truetessl i tessl/maven-com-embabel-agent--embabel-agent-starter@0.3.1docs