HTTP client abstraction for LangChain4j with synchronous/asynchronous execution and Server-Sent Events (SSE) streaming support
This document provides advanced usage patterns for sophisticated HTTP client scenarios.
import dev.langchain4j.http.client.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class CircuitBreakerHttpClient implements HttpClient {
private final HttpClient delegate;
private final int failureThreshold;
private final long retryTimeoutMs;
private final long resetTimeoutMs;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private volatile CircuitState state = CircuitState.CLOSED;
private enum CircuitState {
CLOSED, // Normal operation
OPEN, // Failing, rejecting requests
HALF_OPEN // Testing if service recovered
}
public CircuitBreakerHttpClient(HttpClient delegate, int failureThreshold,
long retryTimeoutMs, long resetTimeoutMs) {
this.delegate = delegate;
this.failureThreshold = failureThreshold;
this.retryTimeoutMs = retryTimeoutMs;
this.resetTimeoutMs = resetTimeoutMs;
}
@Override
public SuccessfulHttpResponse execute(HttpRequest request) {
if (state == CircuitState.OPEN) {
if (System.currentTimeMillis() - lastFailureTime.get() > retryTimeoutMs) {
state = CircuitState.HALF_OPEN;
System.out.println("Circuit breaker entering HALF_OPEN state");
} else {
throw new RuntimeException("Circuit breaker is OPEN");
}
}
try {
SuccessfulHttpResponse response = delegate.execute(request);
onSuccess();
return response;
} catch (RuntimeException e) {
onFailure();
throw e;
}
}
@Override
public void execute(HttpRequest request, ServerSentEventListener listener) {
if (state == CircuitState.OPEN) {
listener.onError(new RuntimeException("Circuit breaker is OPEN"));
return;
}
delegate.execute(request, new ServerSentEventListener() {
@Override
public void onOpen(SuccessfulHttpResponse response) {
listener.onOpen(response);
}
@Override
public void onEvent(ServerSentEvent event, ServerSentEventContext context) {
listener.onEvent(event, context);
}
@Override
public void onEvent(ServerSentEvent event) {
listener.onEvent(event);
}
@Override
public void onError(Throwable throwable) {
onFailure();
listener.onError(throwable);
}
@Override
public void onClose() {
onSuccess();
listener.onClose();
}
});
}
@Override
public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
delegate.execute(request, parser, listener);
}
private void onSuccess() {
if (state == CircuitState.HALF_OPEN) {
int successes = successCount.incrementAndGet();
if (successes >= 3) { // Require 3 successes to close circuit
state = CircuitState.CLOSED;
failureCount.set(0);
successCount.set(0);
System.out.println("Circuit breaker CLOSED");
}
} else {
failureCount.set(0);
}
}
private void onFailure() {
lastFailureTime.set(System.currentTimeMillis());
int failures = failureCount.incrementAndGet();
if (state == CircuitState.HALF_OPEN) {
state = CircuitState.OPEN;
successCount.set(0);
System.out.println("Circuit breaker back to OPEN state");
} else if (failures >= failureThreshold) {
state = CircuitState.OPEN;
System.out.println("Circuit breaker OPENED after " + failures + " failures");
}
}
public CircuitState getState() {
return state;
}
}
// Usage
CircuitBreakerHttpClient circuitClient = new CircuitBreakerHttpClient(
HttpClientFactory.createDefault(),
5, // Open after 5 failures
30000, // Retry after 30 seconds
60000 // Reset to closed after 60 seconds of success
);
try {
SuccessfulHttpResponse response = circuitClient.execute(request);
System.out.println("Request successful");
} catch (RuntimeException e) {
System.err.println("Request failed or circuit open: " + e.getMessage());
}import java.util.concurrent.atomic.AtomicLong;
public class RateLimitedHttpClient implements HttpClient {
private final HttpClient delegate;
private final int maxTokens;
private final long refillIntervalMs;
private final AtomicLong tokens;
private final AtomicLong lastRefillTime;
public RateLimitedHttpClient(HttpClient delegate, int requestsPerSecond) {
this.delegate = delegate;
this.maxTokens = requestsPerSecond;
this.refillIntervalMs = 1000 / requestsPerSecond;
this.tokens = new AtomicLong(maxTokens);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
}
@Override
public SuccessfulHttpResponse execute(HttpRequest request) {
acquireToken();
return delegate.execute(request);
}
@Override
public void execute(HttpRequest request, ServerSentEventListener listener) {
acquireToken();
delegate.execute(request, listener);
}
@Override
public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
acquireToken();
delegate.execute(request, parser, listener);
}
private void acquireToken() {
while (true) {
refillTokens();
long currentTokens = tokens.get();
if (currentTokens > 0) {
if (tokens.compareAndSet(currentTokens, currentTokens - 1)) {
return;
}
} else {
// Wait for token refill
try {
Thread.sleep(refillIntervalMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for rate limit", e);
}
}
}
}
private void refillTokens() {
long now = System.currentTimeMillis();
long lastRefill = lastRefillTime.get();
long timeSinceRefill = now - lastRefill;
if (timeSinceRefill >= refillIntervalMs) {
long tokensToAdd = timeSinceRefill / refillIntervalMs;
if (lastRefillTime.compareAndSet(lastRefill, now)) {
long currentTokens = tokens.get();
long newTokens = Math.min(maxTokens, currentTokens + tokensToAdd);
tokens.set(newTokens);
}
}
}
}
// Usage
RateLimitedHttpClient rateLimitedClient = new RateLimitedHttpClient(
HttpClientFactory.createDefault(),
10 // 10 requests per second
);public interface HttpRequestInterceptor {
HttpRequest intercept(HttpRequest request);
}
public interface HttpResponseInterceptor {
SuccessfulHttpResponse intercept(SuccessfulHttpResponse response);
}
public class InterceptingHttpClient implements HttpClient {
private final HttpClient delegate;
private final List<HttpRequestInterceptor> requestInterceptors;
private final List<HttpResponseInterceptor> responseInterceptors;
public InterceptingHttpClient(HttpClient delegate,
List<HttpRequestInterceptor> requestInterceptors,
List<HttpResponseInterceptor> responseInterceptors) {
this.delegate = delegate;
this.requestInterceptors = new ArrayList<>(requestInterceptors);
this.responseInterceptors = new ArrayList<>(responseInterceptors);
}
@Override
public SuccessfulHttpResponse execute(HttpRequest request) {
// Apply request interceptors
HttpRequest interceptedRequest = request;
for (HttpRequestInterceptor interceptor : requestInterceptors) {
interceptedRequest = interceptor.intercept(interceptedRequest);
}
// Execute request
SuccessfulHttpResponse response = delegate.execute(interceptedRequest);
// Apply response interceptors
SuccessfulHttpResponse interceptedResponse = response;
for (HttpResponseInterceptor interceptor : responseInterceptors) {
interceptedResponse = interceptor.intercept(interceptedResponse);
}
return interceptedResponse;
}
@Override
public void execute(HttpRequest request, ServerSentEventListener listener) {
HttpRequest interceptedRequest = request;
for (HttpRequestInterceptor interceptor : requestInterceptors) {
interceptedRequest = interceptor.intercept(interceptedRequest);
}
delegate.execute(interceptedRequest, listener);
}
@Override
public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
HttpRequest interceptedRequest = request;
for (HttpRequestInterceptor interceptor : requestInterceptors) {
interceptedRequest = interceptor.intercept(interceptedRequest);
}
delegate.execute(interceptedRequest, parser, listener);
}
}
// Example: Add timestamp header
public class TimestampInterceptor implements HttpRequestInterceptor {
@Override
public HttpRequest intercept(HttpRequest request) {
return HttpRequest.builder()
.method(request.method())
.url(request.url())
.headers(request.headers())
.addHeader("X-Request-Timestamp", String.valueOf(System.currentTimeMillis()))
.body(request.body())
.formDataFields(request.formDataFields())
.formDataFiles(request.formDataFiles())
.build();
}
}
// Usage
InterceptingHttpClient client = new InterceptingHttpClient(
HttpClientFactory.createDefault(),
List.of(new TimestampInterceptor()),
List.of()
);import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class CachingHttpClient implements HttpClient {
private final HttpClient delegate;
private final ConcurrentHashMap<String, CacheEntry> cache;
private final long ttlMs;
private static class CacheEntry {
final SuccessfulHttpResponse response;
final long timestamp;
CacheEntry(SuccessfulHttpResponse response, long timestamp) {
this.response = response;
this.timestamp = timestamp;
}
boolean isExpired(long ttlMs) {
return System.currentTimeMillis() - timestamp > ttlMs;
}
}
public CachingHttpClient(HttpClient delegate, long ttl, TimeUnit unit) {
this.delegate = delegate;
this.cache = new ConcurrentHashMap<>();
this.ttlMs = unit.toMillis(ttl);
}
@Override
public SuccessfulHttpResponse execute(HttpRequest request) {
// Only cache GET requests
if (request.method() != HttpMethod.GET) {
return delegate.execute(request);
}
String cacheKey = request.url();
CacheEntry entry = cache.get(cacheKey);
if (entry != null && !entry.isExpired(ttlMs)) {
System.out.println("Cache HIT for: " + cacheKey);
return entry.response;
}
System.out.println("Cache MISS for: " + cacheKey);
SuccessfulHttpResponse response = delegate.execute(request);
cache.put(cacheKey, new CacheEntry(response, System.currentTimeMillis()));
return response;
}
@Override
public void execute(HttpRequest request, ServerSentEventListener listener) {
// Don't cache streaming requests
delegate.execute(request, listener);
}
@Override
public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
delegate.execute(request, parser, listener);
}
public void clearCache() {
cache.clear();
}
public void evictExpired() {
cache.entrySet().removeIf(entry -> entry.getValue().isExpired(ttlMs));
}
}
// Usage
CachingHttpClient cachingClient = new CachingHttpClient(
HttpClientFactory.createDefault(),
5, // TTL
TimeUnit.MINUTES
);
// First request - cache miss
SuccessfulHttpResponse response1 = cachingClient.execute(request);
// Second request - cache hit
SuccessfulHttpResponse response2 = cachingClient.execute(request);import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
public class MetricsHttpClient implements HttpClient {
private final HttpClient delegate;
private final LongAdder totalRequests = new LongAdder();
private final LongAdder successfulRequests = new LongAdder();
private final LongAdder failedRequests = new LongAdder();
private final AtomicLong totalResponseTimeMs = new AtomicLong(0);
public MetricsHttpClient(HttpClient delegate) {
this.delegate = delegate;
}
@Override
public SuccessfulHttpResponse execute(HttpRequest request) {
totalRequests.increment();
long startTime = System.currentTimeMillis();
try {
SuccessfulHttpResponse response = delegate.execute(request);
successfulRequests.increment();
long duration = System.currentTimeMillis() - startTime;
totalResponseTimeMs.addAndGet(duration);
return response;
} catch (RuntimeException e) {
failedRequests.increment();
throw e;
}
}
@Override
public void execute(HttpRequest request, ServerSentEventListener listener) {
totalRequests.increment();
delegate.execute(request, new ServerSentEventListener() {
@Override
public void onOpen(SuccessfulHttpResponse response) {
listener.onOpen(response);
}
@Override
public void onEvent(ServerSentEvent event, ServerSentEventContext context) {
listener.onEvent(event, context);
}
@Override
public void onEvent(ServerSentEvent event) {
listener.onEvent(event);
}
@Override
public void onError(Throwable throwable) {
failedRequests.increment();
listener.onError(throwable);
}
@Override
public void onClose() {
successfulRequests.increment();
listener.onClose();
}
});
}
@Override
public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
delegate.execute(request, parser, listener);
}
public Metrics getMetrics() {
long total = totalRequests.sum();
long successful = successfulRequests.sum();
long failed = failedRequests.sum();
long totalTime = totalResponseTimeMs.get();
double successRate = total > 0 ? (double) successful / total * 100 : 0;
double avgResponseTime = successful > 0 ? (double) totalTime / successful : 0;
return new Metrics(total, successful, failed, successRate, avgResponseTime);
}
public static class Metrics {
public final long totalRequests;
public final long successfulRequests;
public final long failedRequests;
public final double successRate;
public final double avgResponseTimeMs;
public Metrics(long total, long successful, long failed, double successRate, double avgResponseTime) {
this.totalRequests = total;
this.successfulRequests = successful;
this.failedRequests = failed;
this.successRate = successRate;
this.avgResponseTimeMs = avgResponseTime;
}
@Override
public String toString() {
return String.format(
"Total: %d, Successful: %d, Failed: %d, Success Rate: %.2f%%, Avg Response Time: %.2fms",
totalRequests, successfulRequests, failedRequests, successRate, avgResponseTimeMs
);
}
}
}
// Usage
MetricsHttpClient metricsClient = new MetricsHttpClient(HttpClientFactory.createDefault());
// Make requests...
for (int i = 0; i < 100; i++) {
try {
metricsClient.execute(request);
} catch (Exception e) {
// Handle error
}
}
// Get metrics
System.out.println(metricsClient.getMetrics());public class CompositeHttpClientBuilder {
private HttpClient client;
public CompositeHttpClientBuilder(HttpClient baseClient) {
this.client = baseClient;
}
public CompositeHttpClientBuilder withLogging(boolean logRequests, boolean logResponses) {
this.client = new LoggingHttpClient(client, logRequests, logResponses);
return this;
}
public CompositeHttpClientBuilder withRetry(int maxRetries) {
this.client = new RetryingHttpClient(client, maxRetries);
return this;
}
public CompositeHttpClientBuilder withCircuitBreaker(int failureThreshold,
long retryTimeoutMs,
long resetTimeoutMs) {
this.client = new CircuitBreakerHttpClient(client, failureThreshold,
retryTimeoutMs, resetTimeoutMs);
return this;
}
public CompositeHttpClientBuilder withRateLimit(int requestsPerSecond) {
this.client = new RateLimitedHttpClient(client, requestsPerSecond);
return this;
}
public CompositeHttpClientBuilder withCache(long ttl, TimeUnit unit) {
this.client = new CachingHttpClient(client, ttl, unit);
return this;
}
public CompositeHttpClientBuilder withMetrics() {
this.client = new MetricsHttpClient(client);
return this;
}
public HttpClient build() {
return client;
}
}
// Usage
HttpClient client = new CompositeHttpClientBuilder(HttpClientFactory.createDefault())
.withCache(5, TimeUnit.MINUTES)
.withRateLimit(10)
.withRetry(3)
.withCircuitBreaker(5, 30000, 60000)
.withMetrics()
.withLogging(true, false)
.build();import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ParallelRequestExecutor {
private final HttpClient client;
private final ExecutorService executor;
public ParallelRequestExecutor(HttpClient client, int maxConcurrency) {
this.client = client;
this.executor = Executors.newFixedThreadPool(maxConcurrency);
}
public List<SuccessfulHttpResponse> executeAll(List<HttpRequest> requests)
throws InterruptedException, ExecutionException {
List<CompletableFuture<SuccessfulHttpResponse>> futures = new ArrayList<>();
for (HttpRequest request : requests) {
CompletableFuture<SuccessfulHttpResponse> future = CompletableFuture.supplyAsync(
() -> client.execute(request),
executor
);
futures.add(future);
}
// Wait for all to complete
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.get();
// Collect results
List<SuccessfulHttpResponse> responses = new ArrayList<>();
for (CompletableFuture<SuccessfulHttpResponse> future : futures) {
responses.add(future.get());
}
return responses;
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
// Usage
ParallelRequestExecutor parallelExecutor = new ParallelRequestExecutor(
HttpClientFactory.createDefault(),
10 // max 10 concurrent requests
);
List<HttpRequest> requests = List.of(
HttpRequest.builder().method(HttpMethod.GET).url("https://api.example.com/1").build(),
HttpRequest.builder().method(HttpMethod.GET).url("https://api.example.com/2").build(),
HttpRequest.builder().method(HttpMethod.GET).url("https://api.example.com/3").build()
);
List<SuccessfulHttpResponse> responses = parallelExecutor.executeAll(requests);
parallelExecutor.shutdown();Install with Tessl CLI
npx tessl i tessl/maven-dev-langchain4j--langchain4j-http-client@1.11.0