CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-dev-langchain4j--langchain4j-http-client

HTTP client abstraction for LangChain4j with synchronous/asynchronous execution and Server-Sent Events (SSE) streaming support

Overview
Eval results
Files

asynchronous-sse-streams.mddocs/guides/

Asynchronous SSE Streaming Guide

This guide covers Server-Sent Events (SSE) streaming for real-time data from servers, particularly important for streaming responses from language models.

Basic SSE Streaming

import dev.langchain4j.http.client.*;
import dev.langchain4j.http.client.sse.*;

HttpRequest request = HttpRequest.builder()
    .method(HttpMethod.POST)
    .url("https://api.example.com/stream")
    .addHeader("Accept", "text/event-stream")
    .addHeader("Content-Type", "application/json")
    .body("{\"prompt\":\"Tell me a story\"}")
    .build();

client.execute(request, new ServerSentEventListener() {
    @Override
    public void onOpen(SuccessfulHttpResponse response) {
        System.out.println("Stream opened with status: " + response.statusCode());
    }

    @Override
    public void onEvent(ServerSentEvent event) {
        System.out.println("Event type: " + event.event());
        System.out.println("Event data: " + event.data());
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onClose() {
        System.out.println("Stream closed");
    }
});

Stream Cancellation

Cancel the stream after a certain condition using the context parameter.

import java.util.concurrent.atomic.AtomicInteger;

AtomicInteger eventCount = new AtomicInteger(0);

client.execute(request, new ServerSentEventListener() {
    @Override
    public void onEvent(ServerSentEvent event, ServerSentEventContext context) {
        System.out.println("Received: " + event.data());

        // Cancel after receiving 10 events
        if (eventCount.incrementAndGet() >= 10) {
            System.out.println("Cancelling stream after 10 events");
            context.parsingHandle().cancel();
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onClose() {
        System.out.println("Received " + eventCount.get() + " events");
    }
});

Conditional Stream Cancellation

Cancel the stream when a specific condition is met in the data.

client.execute(request, new ServerSentEventListener() {
    private boolean foundTarget = false;

    @Override
    public void onEvent(ServerSentEvent event, ServerSentEventContext context) {
        String data = event.data();
        System.out.println("Received: " + data);

        // Cancel if we find a specific condition
        if (data.contains("STOP")) {
            System.out.println("Found stop signal, cancelling stream");
            context.parsingHandle().cancel();
            foundTarget = true;
        }
    }

    @Override
    public void onError(Throwable throwable) {
        if (!foundTarget) {
            System.err.println("Error: " + throwable.getMessage());
        }
    }

    @Override
    public void onClose() {
        System.out.println("Stream closed");
    }
});

Processing Typed Events

Handle different event types with different logic.

client.execute(request, new ServerSentEventListener() {
    @Override
    public void onEvent(ServerSentEvent event) {
        String eventType = event.event();
        String data = event.data();

        if ("message".equals(eventType)) {
            System.out.println("Message: " + data);
        } else if ("error".equals(eventType)) {
            System.err.println("Server error: " + data);
        } else if ("done".equals(eventType)) {
            System.out.println("Stream complete");
        } else {
            // Handle unnamed events (eventType is null)
            System.out.println("Data: " + data);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }
});

Accumulating Streamed Data

Collect all streamed data into a single result.

StringBuilder accumulatedData = new StringBuilder();

client.execute(request, new ServerSentEventListener() {
    @Override
    public void onEvent(ServerSentEvent event) {
        String chunk = event.data();
        accumulatedData.append(chunk);
        System.out.print(chunk); // Print as we receive
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("\nError occurred: " + throwable.getMessage());
    }

    @Override
    public void onClose() {
        System.out.println("\nComplete response: " + accumulatedData.toString());
    }
});

Custom SSE Parser

Implement a custom parser for non-standard SSE formats.

import java.io.*;

public class CustomSSEParser implements ServerSentEventParser {
    @Override
    public void parse(InputStream httpResponseBody, ServerSentEventListener listener) {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(httpResponseBody))) {
            String line;
            while ((line = reader.readLine()) != null) {
                // Custom parsing implementation
                if (line.startsWith("data: ")) {
                    String data = line.substring(6);
                    ServerSentEvent event = new ServerSentEvent(null, data);
                    listener.onEvent(event);
                }
            }
        } catch (IOException e) {
            listener.onError(e);
        }
    }
}

// Use custom parser
CustomSSEParser customParser = new CustomSSEParser();
client.execute(request, customParser, listener);

Custom Parser with Cancellation Support

public class CancellableSSEParser implements ServerSentEventParser {
    @Override
    public void parse(InputStream httpResponseBody, ServerSentEventListener listener) {
        // Create a parsing handle for cancellation support
        DefaultServerSentEventParsingHandle handle =
            new DefaultServerSentEventParsingHandle(httpResponseBody);

        ServerSentEventContext context = new ServerSentEventContext(handle);

        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(httpResponseBody))) {
            String line;
            while ((line = reader.readLine()) != null && !handle.isCancelled()) {
                if (line.startsWith("data: ")) {
                    String data = line.substring(6);
                    ServerSentEvent event = new ServerSentEvent(null, data);

                    // Pass context to listener for cancellation control
                    listener.onEvent(event, context);
                }
            }
        } catch (IOException e) {
            if (!handle.isCancelled()) {
                listener.onError(e);
            }
        }
    }
}

SSE Format Examples

Standard SSE Format

event: message
data: First line of data
data: Second line of data

event: update
data: {"status": "processing"}

data: Data without explicit event type

Parsing Rules

  1. Event Type: Lines starting with event: set the event type for the next event
  2. Event Data: Lines starting with data: append to the event data (multiple data lines are joined with newlines)
  3. Event Dispatch: Empty lines trigger dispatch of the accumulated event
  4. Whitespace: Leading whitespace after the colon is trimmed
  5. Comments: Lines starting with : are comments and ignored
  6. Unknown Fields: Lines with unknown field names are ignored

Exception Handling

Listener Exception Behavior

Important: If any method of the ServerSentEventListener throws an exception, the stream processing will be terminated immediately and no further events will be processed.

client.execute(request, new ServerSentEventListener() {
    @Override
    public void onEvent(ServerSentEvent event) {
        // If this throws an exception, stream processing stops
        processEvent(event);
    }

    @Override
    public void onError(Throwable throwable) {
        // Handle errors
        System.err.println("Error: " + throwable.getMessage());
    }
});

Stream Error Types

The onError(Throwable) method is called for:

  • I/O errors while reading the stream
  • Network errors
  • Parsing errors
  • Connection timeouts

After onError() is called, onClose() will also be called to signal the end of the stream.

Common Patterns

Streaming LLM Response

public class StreamingLLMClient {
    private final HttpClient client;
    private final String apiUrl;
    private final String apiKey;

    public void streamCompletion(String prompt, Consumer<String> onChunk) {
        HttpRequest request = HttpRequest.builder()
            .method(HttpMethod.POST)
            .url(apiUrl)
            .addHeader("Authorization", "Bearer " + apiKey)
            .addHeader("Accept", "text/event-stream")
            .addHeader("Content-Type", "application/json")
            .body(String.format("{\"prompt\":\"%s\",\"stream\":true}", prompt))
            .build();

        client.execute(request, new ServerSentEventListener() {
            @Override
            public void onEvent(ServerSentEvent event) {
                if ("data".equals(event.event())) {
                    onChunk.accept(event.data());
                }
            }

            @Override
            public void onError(Throwable throwable) {
                System.err.println("Streaming error: " + throwable.getMessage());
            }
        });
    }
}

// Usage
streamingClient.streamCompletion("Hello", chunk -> {
    System.out.print(chunk);
});

Async Callback Pattern

public CompletableFuture<String> streamAsync(HttpRequest request) {
    CompletableFuture<String> future = new CompletableFuture<>();
    StringBuilder result = new StringBuilder();

    client.execute(request, new ServerSentEventListener() {
        @Override
        public void onEvent(ServerSentEvent event) {
            result.append(event.data());
        }

        @Override
        public void onError(Throwable throwable) {
            future.completeExceptionally(throwable);
        }

        @Override
        public void onClose() {
            future.complete(result.toString());
        }
    });

    return future;
}

// Usage
streamAsync(request)
    .thenAccept(result -> System.out.println("Complete: " + result))
    .exceptionally(error -> {
        System.err.println("Error: " + error.getMessage());
        return null;
    });

Configuration for SSE

Configure longer timeouts for SSE streams since there may be delays between events.

import java.time.Duration;

HttpClient client = HttpClientBuilderLoader.loadHttpClientBuilder()
    .connectTimeout(Duration.ofSeconds(15))
    .readTimeout(Duration.ofMinutes(5))  // Allow 5 minutes between events
    .build();

See Timeout Configuration Guide for more details.

Related Documentation

Install with Tessl CLI

npx tessl i tessl/maven-dev-langchain4j--langchain4j-http-client@1.11.0

docs

index.md

installation.md

quick-start.md

tile.json