HTTP client abstraction for LangChain4j with synchronous/asynchronous execution and Server-Sent Events (SSE) streaming support
This guide covers Server-Sent Events (SSE) streaming for real-time data from servers, particularly important for streaming responses from language models.
import dev.langchain4j.http.client.*;
import dev.langchain4j.http.client.sse.*;
HttpRequest request = HttpRequest.builder()
.method(HttpMethod.POST)
.url("https://api.example.com/stream")
.addHeader("Accept", "text/event-stream")
.addHeader("Content-Type", "application/json")
.body("{\"prompt\":\"Tell me a story\"}")
.build();
client.execute(request, new ServerSentEventListener() {
@Override
public void onOpen(SuccessfulHttpResponse response) {
System.out.println("Stream opened with status: " + response.statusCode());
}
@Override
public void onEvent(ServerSentEvent event) {
System.out.println("Event type: " + event.event());
System.out.println("Event data: " + event.data());
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onClose() {
System.out.println("Stream closed");
}
});Cancel the stream after a certain condition using the context parameter.
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger eventCount = new AtomicInteger(0);
client.execute(request, new ServerSentEventListener() {
@Override
public void onEvent(ServerSentEvent event, ServerSentEventContext context) {
System.out.println("Received: " + event.data());
// Cancel after receiving 10 events
if (eventCount.incrementAndGet() >= 10) {
System.out.println("Cancelling stream after 10 events");
context.parsingHandle().cancel();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onClose() {
System.out.println("Received " + eventCount.get() + " events");
}
});Cancel the stream when a specific condition is met in the data.
client.execute(request, new ServerSentEventListener() {
private boolean foundTarget = false;
@Override
public void onEvent(ServerSentEvent event, ServerSentEventContext context) {
String data = event.data();
System.out.println("Received: " + data);
// Cancel if we find a specific condition
if (data.contains("STOP")) {
System.out.println("Found stop signal, cancelling stream");
context.parsingHandle().cancel();
foundTarget = true;
}
}
@Override
public void onError(Throwable throwable) {
if (!foundTarget) {
System.err.println("Error: " + throwable.getMessage());
}
}
@Override
public void onClose() {
System.out.println("Stream closed");
}
});Handle different event types with different logic.
client.execute(request, new ServerSentEventListener() {
@Override
public void onEvent(ServerSentEvent event) {
String eventType = event.event();
String data = event.data();
if ("message".equals(eventType)) {
System.out.println("Message: " + data);
} else if ("error".equals(eventType)) {
System.err.println("Server error: " + data);
} else if ("done".equals(eventType)) {
System.out.println("Stream complete");
} else {
// Handle unnamed events (eventType is null)
System.out.println("Data: " + data);
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
});Collect all streamed data into a single result.
StringBuilder accumulatedData = new StringBuilder();
client.execute(request, new ServerSentEventListener() {
@Override
public void onEvent(ServerSentEvent event) {
String chunk = event.data();
accumulatedData.append(chunk);
System.out.print(chunk); // Print as we receive
}
@Override
public void onError(Throwable throwable) {
System.err.println("\nError occurred: " + throwable.getMessage());
}
@Override
public void onClose() {
System.out.println("\nComplete response: " + accumulatedData.toString());
}
});Implement a custom parser for non-standard SSE formats.
import java.io.*;
public class CustomSSEParser implements ServerSentEventParser {
@Override
public void parse(InputStream httpResponseBody, ServerSentEventListener listener) {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(httpResponseBody))) {
String line;
while ((line = reader.readLine()) != null) {
// Custom parsing implementation
if (line.startsWith("data: ")) {
String data = line.substring(6);
ServerSentEvent event = new ServerSentEvent(null, data);
listener.onEvent(event);
}
}
} catch (IOException e) {
listener.onError(e);
}
}
}
// Use custom parser
CustomSSEParser customParser = new CustomSSEParser();
client.execute(request, customParser, listener);public class CancellableSSEParser implements ServerSentEventParser {
@Override
public void parse(InputStream httpResponseBody, ServerSentEventListener listener) {
// Create a parsing handle for cancellation support
DefaultServerSentEventParsingHandle handle =
new DefaultServerSentEventParsingHandle(httpResponseBody);
ServerSentEventContext context = new ServerSentEventContext(handle);
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(httpResponseBody))) {
String line;
while ((line = reader.readLine()) != null && !handle.isCancelled()) {
if (line.startsWith("data: ")) {
String data = line.substring(6);
ServerSentEvent event = new ServerSentEvent(null, data);
// Pass context to listener for cancellation control
listener.onEvent(event, context);
}
}
} catch (IOException e) {
if (!handle.isCancelled()) {
listener.onError(e);
}
}
}
}event: message
data: First line of data
data: Second line of data
event: update
data: {"status": "processing"}
data: Data without explicit event typeevent: set the event type for the next eventdata: append to the event data (multiple data lines are joined with newlines): are comments and ignoredImportant: If any method of the ServerSentEventListener throws an exception, the stream processing will be terminated immediately and no further events will be processed.
client.execute(request, new ServerSentEventListener() {
@Override
public void onEvent(ServerSentEvent event) {
// If this throws an exception, stream processing stops
processEvent(event);
}
@Override
public void onError(Throwable throwable) {
// Handle errors
System.err.println("Error: " + throwable.getMessage());
}
});The onError(Throwable) method is called for:
After onError() is called, onClose() will also be called to signal the end of the stream.
public class StreamingLLMClient {
private final HttpClient client;
private final String apiUrl;
private final String apiKey;
public void streamCompletion(String prompt, Consumer<String> onChunk) {
HttpRequest request = HttpRequest.builder()
.method(HttpMethod.POST)
.url(apiUrl)
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("Accept", "text/event-stream")
.addHeader("Content-Type", "application/json")
.body(String.format("{\"prompt\":\"%s\",\"stream\":true}", prompt))
.build();
client.execute(request, new ServerSentEventListener() {
@Override
public void onEvent(ServerSentEvent event) {
if ("data".equals(event.event())) {
onChunk.accept(event.data());
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Streaming error: " + throwable.getMessage());
}
});
}
}
// Usage
streamingClient.streamCompletion("Hello", chunk -> {
System.out.print(chunk);
});public CompletableFuture<String> streamAsync(HttpRequest request) {
CompletableFuture<String> future = new CompletableFuture<>();
StringBuilder result = new StringBuilder();
client.execute(request, new ServerSentEventListener() {
@Override
public void onEvent(ServerSentEvent event) {
result.append(event.data());
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onClose() {
future.complete(result.toString());
}
});
return future;
}
// Usage
streamAsync(request)
.thenAccept(result -> System.out.println("Complete: " + result))
.exceptionally(error -> {
System.err.println("Error: " + error.getMessage());
return null;
});Configure longer timeouts for SSE streams since there may be delays between events.
import java.time.Duration;
HttpClient client = HttpClientBuilderLoader.loadHttpClientBuilder()
.connectTimeout(Duration.ofSeconds(15))
.readTimeout(Duration.ofMinutes(5)) // Allow 5 minutes between events
.build();See Timeout Configuration Guide for more details.
Install with Tessl CLI
npx tessl i tessl/maven-dev-langchain4j--langchain4j-http-client@1.11.0