The Anthropic Java SDK provides convenient access to the Anthropic REST API from applications written in Java
The Anthropic Java SDK provides comprehensive streaming support for processing message responses as they are generated. Streaming enables real-time interaction with Claude, delivering response chunks incrementally rather than waiting for complete responses. The SDK offers both synchronous and asynchronous streaming interfaces with event-based processing and helper utilities for accumulating streamed data.
The SDK provides streaming variants of message creation methods through the MessageService interface.
package com.anthropic.services.blocking;
public interface MessageService {
StreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);
StreamResponse<RawMessageStreamEvent> createStreaming(
MessageCreateParams params,
RequestOptions requestOptions
);
}package com.anthropic.services.async;
public interface MessageServiceAsync {
AsyncStreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);
AsyncStreamResponse<RawMessageStreamEvent> createStreaming(
MessageCreateParams params,
RequestOptions requestOptions
);
}The createStreaming methods return streaming response interfaces that emit RawMessageStreamEvent objects as the response is generated.
The StreamResponse interface provides synchronous streaming using Java's Stream API.
package com.anthropic.core.http;
public interface StreamResponse<T> extends Closeable {
Stream<T> stream();
void close();
}Methods:
stream() - Returns a Java Stream<T> of events that can be processed using standard stream operationsclose() - Closes the underlying connection and releases resourcesUsage Pattern:
Use try-with-resources to ensure proper resource cleanup:
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream().forEach(chunk -> {
System.out.println(chunk);
});
}The AsyncStreamResponse interface provides asynchronous streaming with a subscribe-based model.
package com.anthropic.core.http;
public interface AsyncStreamResponse<T> {
Subscription subscribe(Consumer<T> handler);
Subscription subscribe(Consumer<T> handler, Executor executor);
Subscription subscribe(Handler<T> handler);
Subscription subscribe(Handler<T> handler, Executor executor);
interface Handler<T> {
void onNext(T item);
void onComplete(Optional<Throwable> error);
}
interface Subscription {
CompletableFuture<Void> onCompleteFuture();
void cancel();
}
}Methods:
subscribe(Consumer<T>) - Subscribe with a simple event handlersubscribe(Consumer<T>, Executor) - Subscribe with a handler and custom executorsubscribe(Handler<T>) - Subscribe with a handler that receives completion notificationssubscribe(Handler<T>, Executor) - Subscribe with full handler and custom executorHandler Interface:
onNext(T) - Called for each streamed eventonComplete(Optional<Throwable>) - Called when stream completes (error present if failed)Subscription Interface:
onCompleteFuture() - Returns a CompletableFuture that completes when streaming finishescancel() - Cancels the stream subscriptionStreaming responses emit RawMessageStreamEvent objects, which is a union type representing different event types in the stream.
package com.anthropic.models.messages;
public final class RawMessageStreamEvent {
// Union type accessors
Optional<RawMessageStartEvent> messageStart();
Optional<RawMessageDeltaEvent> messageDelta();
Optional<RawMessageStopEvent> messageStop();
Optional<RawContentBlockStartEvent> contentBlockStart();
Optional<RawContentBlockDeltaEvent> contentBlockDelta();
Optional<RawContentBlockStopEvent> contentBlockStop();
// Type checking
boolean isMessageStart();
boolean isMessageDelta();
boolean isMessageStop();
boolean isContentBlockStart();
boolean isContentBlockDelta();
boolean isContentBlockStop();
// Type casting
RawMessageStartEvent asMessageStart();
RawMessageDeltaEvent asMessageDelta();
RawMessageStopEvent asMessageStop();
RawContentBlockStartEvent asContentBlockStart();
RawContentBlockDeltaEvent asContentBlockDelta();
RawContentBlockStopEvent asContentBlockStop();
}RawMessageStartEvent - Emitted when a new message starts:
package com.anthropic.models.messages;
public final class RawMessageStartEvent {
String type(); // "message_start"
Message message(); // Initial message with metadata
}RawMessageDeltaEvent - Emitted when message metadata changes:
package com.anthropic.models.messages;
public final class RawMessageDeltaEvent {
String type(); // "message_delta"
MessageDelta delta(); // Changes to message metadata
Usage usage(); // Updated token usage
}RawMessageStopEvent - Emitted when message generation completes:
package com.anthropic.models.messages;
public final class RawMessageStopEvent {
String type(); // "message_stop"
}RawContentBlockStartEvent - Emitted when a new content block starts:
package com.anthropic.models.messages;
public final class RawContentBlockStartEvent {
String type(); // "content_block_start"
int index(); // Index of the content block
ContentBlock contentBlock(); // Initial content block (TextBlock, ToolUseBlock, etc.)
}RawContentBlockDeltaEvent - Emitted when content is added to a block:
package com.anthropic.models.messages;
public final class RawContentBlockDeltaEvent {
String type(); // "content_block_delta"
int index(); // Index of the content block
ContentBlockDelta delta(); // Incremental content (text, partial JSON, etc.)
}RawContentBlockStopEvent - Emitted when a content block completes:
package com.anthropic.models.messages;
public final class RawContentBlockStopEvent {
String type(); // "content_block_stop"
int index(); // Index of the completed content block
}The MessageAccumulator helper class accumulates streaming events into a complete Message object.
package com.anthropic.helpers;
public final class MessageAccumulator {
static MessageAccumulator create();
RawMessageStreamEvent accumulate(RawMessageStreamEvent event);
Message message();
}Methods:
create() - Static factory method to create a new accumulatoraccumulate(RawMessageStreamEvent) - Process an event and return it (for chaining)message() - Retrieve the accumulated Message objectUsage:
The accumulator maintains state as events are processed and constructs a complete Message that mirrors what would be returned by the non-streaming API.
MessageAccumulator accumulator = MessageAccumulator.create();
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream()
.peek(accumulator::accumulate)
.forEach(event -> {
// Process events as they arrive
});
}
Message message = accumulator.message();The BetaMessageAccumulator accumulates beta streaming events into a BetaMessage object.
package com.anthropic.helpers;
public final class BetaMessageAccumulator {
static BetaMessageAccumulator create();
BetaRawMessageStreamEvent accumulate(BetaRawMessageStreamEvent event);
BetaMessage message();
<T> StructuredMessage<T> message(Class<T> structuredOutputClass);
}Methods:
create() - Static factory method to create a new accumulatoraccumulate(BetaRawMessageStreamEvent) - Process a beta event and return itmessage() - Retrieve the accumulated BetaMessage objectmessage(Class<T>) - Retrieve as StructuredMessage<T> for structured outputsUsage with Structured Outputs:
When using structured outputs with streaming, accumulate the JSON strings and then deserialize:
BetaMessageAccumulator accumulator = BetaMessageAccumulator.create();
client.beta().messages()
.createStreaming(params)
.subscribe(event -> {
accumulator.accumulate(event);
// Process streaming events
})
.onCompleteFuture()
.join();
StructuredMessage<BookList> message = accumulator.message(BookList.class);
BookList books = message.content().get(0).text().get().text();For asynchronous streaming, handlers execute on a dedicated thread pool. You can configure the executor at the client level or per-subscription.
package com.anthropic.client.okhttp;
public final class AnthropicOkHttpClient {
public static final class Builder {
public Builder streamHandlerExecutor(Executor executor);
public AnthropicClient build();
}
}Example:
import java.util.concurrent.Executors;
AnthropicClient client = AnthropicOkHttpClient.builder()
.fromEnv()
.streamHandlerExecutor(Executors.newFixedThreadPool(4))
.build();import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Executor executor = Executors.newFixedThreadPool(4);
client.async().messages().createStreaming(params)
.subscribe(chunk -> System.out.println(chunk), executor);The default executor is a cached thread pool suitable for most use cases. Configure a custom executor when you need specific thread management, resource limits, or integration with existing executor services.
Process events as they arrive using Java's Stream API:
import com.anthropic.client.AnthropicClient;
import com.anthropic.client.okhttp.AnthropicOkHttpClient;
import com.anthropic.core.http.StreamResponse;
import com.anthropic.models.messages.MessageCreateParams;
import com.anthropic.models.messages.Model;
import com.anthropic.models.messages.RawMessageStreamEvent;
AnthropicClient client = AnthropicOkHttpClient.fromEnv();
MessageCreateParams params = MessageCreateParams.builder()
.maxTokens(1024L)
.addUserMessage("Write a short poem about recursion")
.model(Model.CLAUDE_SONNET_4_5)
.build();
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream().forEach(event -> {
if (event.isContentBlockDelta()) {
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
System.out.print(textDelta.text());
});
}
});
}Use MessageAccumulator with Stream.peek() to accumulate while processing:
import com.anthropic.helpers.MessageAccumulator;
import com.anthropic.models.messages.Message;
MessageAccumulator accumulator = MessageAccumulator.create();
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream()
.peek(accumulator::accumulate)
.filter(event -> event.isContentBlockDelta())
.flatMap(event -> event.asContentBlockDelta().delta().text().stream())
.forEach(textDelta -> System.out.print(textDelta.text()));
}
Message message = accumulator.message();
System.out.println("\n\nStop reason: " + message.stopReason());
System.out.println("Tokens used: " + message.usage().outputTokens());Process only specific event types in the stream:
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
// Only process text deltas
streamResponse.stream()
.filter(event -> event.isContentBlockDelta())
.map(RawMessageStreamEvent::asContentBlockDelta)
.flatMap(deltaEvent -> deltaEvent.delta().text().stream())
.forEach(textDelta -> System.out.print(textDelta.text()));
}Always use try-with-resources to ensure proper cleanup:
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream().forEach(event -> {
// Process events
});
} catch (Exception e) {
System.err.println("Streaming failed: " + e.getMessage());
}
// Stream automatically closed hereSubscribe to events with a simple consumer:
import com.anthropic.client.AnthropicClientAsync;
import com.anthropic.client.okhttp.AnthropicOkHttpClientAsync;
AnthropicClientAsync client = AnthropicOkHttpClientAsync.fromEnv();
MessageCreateParams params = MessageCreateParams.builder()
.maxTokens(1024L)
.addUserMessage("Explain quantum computing")
.model(Model.CLAUDE_SONNET_4_5)
.build();
client.messages().createStreaming(params).subscribe(event -> {
if (event.isContentBlockDelta()) {
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
System.out.print(textDelta.text());
});
}
});Use the Handler interface to receive completion notifications:
import com.anthropic.core.http.AsyncStreamResponse;
import java.util.Optional;
client.messages().createStreaming(params).subscribe(
new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {
@Override
public void onNext(RawMessageStreamEvent event) {
if (event.isContentBlockDelta()) {
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
System.out.print(textDelta.text());
});
}
}
@Override
public void onComplete(Optional<Throwable> error) {
if (error.isPresent()) {
System.err.println("\nStreaming failed: " + error.get().getMessage());
} else {
System.out.println("\n\nStreaming completed successfully");
}
}
}
);Wait for streaming to complete using CompletableFuture:
import java.util.concurrent.CompletableFuture;
CompletableFuture<Void> streamingComplete =
client.messages().createStreaming(params)
.subscribe(event -> {
// Process events
})
.onCompleteFuture();
// Wait for completion
streamingComplete.join();
// Or handle completion asynchronously
streamingComplete.whenComplete((unused, error) -> {
if (error != null) {
System.err.println("Streaming failed: " + error.getMessage());
} else {
System.out.println("Streaming completed successfully");
}
});Use MessageAccumulator with asynchronous streaming:
import com.anthropic.helpers.MessageAccumulator;
import com.anthropic.models.messages.Message;
MessageAccumulator accumulator = MessageAccumulator.create();
client.messages()
.createStreaming(params)
.subscribe(event -> {
accumulator.accumulate(event);
// Process events as they arrive
if (event.isContentBlockDelta()) {
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
System.out.print(textDelta.text());
});
}
})
.onCompleteFuture()
.thenRun(() -> {
Message message = accumulator.message();
System.out.println("\n\nFinal message: " + message);
})
.join();Configure a custom executor for handler execution:
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Executor customExecutor = Executors.newFixedThreadPool(2);
client.messages()
.createStreaming(params)
.subscribe(
event -> {
// Handler runs on custom executor
System.out.println("Thread: " + Thread.currentThread().getName());
},
customExecutor
)
.onCompleteFuture()
.join();Cancel an ongoing stream subscription:
import com.anthropic.core.http.AsyncStreamResponse.Subscription;
Subscription subscription = client.messages()
.createStreaming(params)
.subscribe(event -> {
// Process events
});
// Cancel after some condition
if (shouldCancel) {
subscription.cancel();
}
// Or cancel after timeout
subscription.onCompleteFuture()
.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(ex -> {
subscription.cancel();
return null;
});Handle all event types in a streaming response:
streamResponse.stream().forEach(event -> {
if (event.isMessageStart()) {
RawMessageStartEvent start = event.asMessageStart();
System.out.println("Message started: " + start.message().id());
} else if (event.isMessageDelta()) {
RawMessageDeltaEvent delta = event.asMessageDelta();
System.out.println("Token usage: " + delta.usage().outputTokens());
} else if (event.isMessageStop()) {
System.out.println("Message generation complete");
} else if (event.isContentBlockStart()) {
RawContentBlockStartEvent start = event.asContentBlockStart();
System.out.println("Content block " + start.index() + " started");
} else if (event.isContentBlockDelta()) {
RawContentBlockDeltaEvent delta = event.asContentBlockDelta();
delta.delta().text().ifPresent(textDelta -> {
System.out.print(textDelta.text());
});
} else if (event.isContentBlockStop()) {
RawContentBlockStopEvent stop = event.asContentBlockStop();
System.out.println("\nContent block " + stop.index() + " complete");
}
});Filter and extract only text content from the stream:
streamResponse.stream()
.filter(event -> event.isContentBlockDelta())
.map(RawMessageStreamEvent::asContentBlockDelta)
.flatMap(delta -> delta.delta().text().stream())
.map(textDelta -> textDelta.text())
.forEach(System.out::print);Monitor token usage as the response streams:
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream()
.filter(event -> event.isMessageDelta())
.map(RawMessageStreamEvent::asMessageDelta)
.forEach(delta -> {
Usage usage = delta.usage();
System.out.println("Tokens generated: " + usage.outputTokens());
});
}Streaming methods support per-request configuration through RequestOptions:
import com.anthropic.core.RequestOptions;
import java.time.Duration;
RequestOptions options = RequestOptions.builder()
.timeout(Duration.ofMinutes(5))
.build();
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params, options)) {
streamResponse.stream().forEach(event -> {
// Process with custom timeout
});
}Handle errors using standard exception handling:
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream().forEach(event -> {
// Process events
});
} catch (AnthropicException e) {
System.err.println("API error: " + e.getMessage());
} catch (Exception e) {
System.err.println("Unexpected error: " + e.getMessage());
}Handle errors through the completion handler:
client.messages().createStreaming(params).subscribe(
new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {
@Override
public void onNext(RawMessageStreamEvent event) {
// Process events
}
@Override
public void onComplete(Optional<Throwable> error) {
if (error.isPresent()) {
Throwable ex = error.get();
if (ex instanceof AnthropicException) {
System.err.println("API error: " + ex.getMessage());
} else {
System.err.println("Unexpected error: " + ex.getMessage());
}
}
}
}
);Always use try-with-resources for synchronous streaming to ensure connections are closed:
// Good
try (StreamResponse<RawMessageStreamEvent> stream =
client.messages().createStreaming(params)) {
stream.stream().forEach(event -> {});
}
// Bad - stream might not be closed on error
StreamResponse<RawMessageStreamEvent> stream =
client.messages().createStreaming(params);
stream.stream().forEach(event -> {});Create a new MessageAccumulator for each streaming request:
// Good - new accumulator per request
MessageAccumulator accumulator = MessageAccumulator.create();
try (StreamResponse<RawMessageStreamEvent> stream =
client.messages().createStreaming(params)) {
stream.stream().peek(accumulator::accumulate).forEach(event -> {});
}
// Bad - reusing accumulator across requests
MessageAccumulator accumulator = MessageAccumulator.create();
// First request
client.messages().createStreaming(params1).stream()
.peek(accumulator::accumulate).forEach(event -> {});
// Second request - accumulator contains data from both requests!
client.messages().createStreaming(params2).stream()
.peek(accumulator::accumulate).forEach(event -> {});Stream handlers should be thread-safe when using asynchronous streaming:
// Good - thread-safe accumulation
MessageAccumulator accumulator = MessageAccumulator.create(); // Thread-safe
client.messages().createStreaming(params)
.subscribe(event -> accumulator.accumulate(event));
// Bad - non-thread-safe state modification
List<String> texts = new ArrayList<>(); // Not thread-safe
client.messages().createStreaming(params)
.subscribe(event -> {
event.contentBlockDelta().ifPresent(delta ->
texts.add(delta.delta().text().orElseThrow().text()) // Race condition!
);
});Implement proper error handling for production applications:
int maxRetries = 3;
int attempt = 0;
while (attempt < maxRetries) {
try {
try (StreamResponse<RawMessageStreamEvent> stream =
client.messages().createStreaming(params)) {
stream.stream().forEach(event -> {
// Process events
});
break; // Success
}
} catch (AnthropicException e) {
attempt++;
if (attempt >= maxRetries) {
throw e;
}
Thread.sleep(1000 * attempt); // Exponential backoff
}
}maxTokens) to provide immediate feedbackMessageAccumulator when you need the final Message objectInstall with Tessl CLI
npx tessl i tessl/maven-com-anthropic--anthropic-java