CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-anthropic--anthropic-java

The Anthropic Java SDK provides convenient access to the Anthropic REST API from applications written in Java

Overview
Eval results
Files

streaming.mddocs/

Streaming Responses

The Anthropic Java SDK provides comprehensive streaming support for processing message responses as they are generated. Streaming enables real-time interaction with Claude, delivering response chunks incrementally rather than waiting for complete responses. The SDK offers both synchronous and asynchronous streaming interfaces with event-based processing and helper utilities for accumulating streamed data.

Streaming Methods

The SDK provides streaming variants of message creation methods through the MessageService interface.

package com.anthropic.services.blocking;

public interface MessageService {
    StreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);
    StreamResponse<RawMessageStreamEvent> createStreaming(
        MessageCreateParams params,
        RequestOptions requestOptions
    );
}
package com.anthropic.services.async;

public interface MessageServiceAsync {
    AsyncStreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);
    AsyncStreamResponse<RawMessageStreamEvent> createStreaming(
        MessageCreateParams params,
        RequestOptions requestOptions
    );
}

The createStreaming methods return streaming response interfaces that emit RawMessageStreamEvent objects as the response is generated.

StreamResponse Interface

The StreamResponse interface provides synchronous streaming using Java's Stream API.

package com.anthropic.core.http;

public interface StreamResponse<T> extends Closeable {
    Stream<T> stream();
    void close();
}

Methods:

  • stream() - Returns a Java Stream<T> of events that can be processed using standard stream operations
  • close() - Closes the underlying connection and releases resources

Usage Pattern:

Use try-with-resources to ensure proper resource cleanup:

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {
    streamResponse.stream().forEach(chunk -> {
        System.out.println(chunk);
    });
}

AsyncStreamResponse Interface

The AsyncStreamResponse interface provides asynchronous streaming with a subscribe-based model.

package com.anthropic.core.http;

public interface AsyncStreamResponse<T> {
    Subscription subscribe(Consumer<T> handler);
    Subscription subscribe(Consumer<T> handler, Executor executor);
    Subscription subscribe(Handler<T> handler);
    Subscription subscribe(Handler<T> handler, Executor executor);

    interface Handler<T> {
        void onNext(T item);
        void onComplete(Optional<Throwable> error);
    }

    interface Subscription {
        CompletableFuture<Void> onCompleteFuture();
        void cancel();
    }
}

Methods:

  • subscribe(Consumer<T>) - Subscribe with a simple event handler
  • subscribe(Consumer<T>, Executor) - Subscribe with a handler and custom executor
  • subscribe(Handler<T>) - Subscribe with a handler that receives completion notifications
  • subscribe(Handler<T>, Executor) - Subscribe with full handler and custom executor

Handler Interface:

  • onNext(T) - Called for each streamed event
  • onComplete(Optional<Throwable>) - Called when stream completes (error present if failed)

Subscription Interface:

  • onCompleteFuture() - Returns a CompletableFuture that completes when streaming finishes
  • cancel() - Cancels the stream subscription

Stream Events

Streaming responses emit RawMessageStreamEvent objects, which is a union type representing different event types in the stream.

package com.anthropic.models.messages;

public final class RawMessageStreamEvent {
    // Union type accessors
    Optional<RawMessageStartEvent> messageStart();
    Optional<RawMessageDeltaEvent> messageDelta();
    Optional<RawMessageStopEvent> messageStop();
    Optional<RawContentBlockStartEvent> contentBlockStart();
    Optional<RawContentBlockDeltaEvent> contentBlockDelta();
    Optional<RawContentBlockStopEvent> contentBlockStop();

    // Type checking
    boolean isMessageStart();
    boolean isMessageDelta();
    boolean isMessageStop();
    boolean isContentBlockStart();
    boolean isContentBlockDelta();
    boolean isContentBlockStop();

    // Type casting
    RawMessageStartEvent asMessageStart();
    RawMessageDeltaEvent asMessageDelta();
    RawMessageStopEvent asMessageStop();
    RawContentBlockStartEvent asContentBlockStart();
    RawContentBlockDeltaEvent asContentBlockDelta();
    RawContentBlockStopEvent asContentBlockStop();
}

Event Variants

RawMessageStartEvent - Emitted when a new message starts:

package com.anthropic.models.messages;

public final class RawMessageStartEvent {
    String type(); // "message_start"
    Message message(); // Initial message with metadata
}

RawMessageDeltaEvent - Emitted when message metadata changes:

package com.anthropic.models.messages;

public final class RawMessageDeltaEvent {
    String type(); // "message_delta"
    MessageDelta delta(); // Changes to message metadata
    Usage usage(); // Updated token usage
}

RawMessageStopEvent - Emitted when message generation completes:

package com.anthropic.models.messages;

public final class RawMessageStopEvent {
    String type(); // "message_stop"
}

RawContentBlockStartEvent - Emitted when a new content block starts:

package com.anthropic.models.messages;

public final class RawContentBlockStartEvent {
    String type(); // "content_block_start"
    int index(); // Index of the content block
    ContentBlock contentBlock(); // Initial content block (TextBlock, ToolUseBlock, etc.)
}

RawContentBlockDeltaEvent - Emitted when content is added to a block:

package com.anthropic.models.messages;

public final class RawContentBlockDeltaEvent {
    String type(); // "content_block_delta"
    int index(); // Index of the content block
    ContentBlockDelta delta(); // Incremental content (text, partial JSON, etc.)
}

RawContentBlockStopEvent - Emitted when a content block completes:

package com.anthropic.models.messages;

public final class RawContentBlockStopEvent {
    String type(); // "content_block_stop"
    int index(); // Index of the completed content block
}

MessageAccumulator

The MessageAccumulator helper class accumulates streaming events into a complete Message object.

package com.anthropic.helpers;

public final class MessageAccumulator {
    static MessageAccumulator create();

    RawMessageStreamEvent accumulate(RawMessageStreamEvent event);
    Message message();
}

Methods:

  • create() - Static factory method to create a new accumulator
  • accumulate(RawMessageStreamEvent) - Process an event and return it (for chaining)
  • message() - Retrieve the accumulated Message object

Usage:

The accumulator maintains state as events are processed and constructs a complete Message that mirrors what would be returned by the non-streaming API.

MessageAccumulator accumulator = MessageAccumulator.create();

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {
    streamResponse.stream()
        .peek(accumulator::accumulate)
        .forEach(event -> {
            // Process events as they arrive
        });
}

Message message = accumulator.message();

BetaMessageAccumulator

The BetaMessageAccumulator accumulates beta streaming events into a BetaMessage object.

package com.anthropic.helpers;

public final class BetaMessageAccumulator {
    static BetaMessageAccumulator create();

    BetaRawMessageStreamEvent accumulate(BetaRawMessageStreamEvent event);
    BetaMessage message();
    <T> StructuredMessage<T> message(Class<T> structuredOutputClass);
}

Methods:

  • create() - Static factory method to create a new accumulator
  • accumulate(BetaRawMessageStreamEvent) - Process a beta event and return it
  • message() - Retrieve the accumulated BetaMessage object
  • message(Class<T>) - Retrieve as StructuredMessage<T> for structured outputs

Usage with Structured Outputs:

When using structured outputs with streaming, accumulate the JSON strings and then deserialize:

BetaMessageAccumulator accumulator = BetaMessageAccumulator.create();

client.beta().messages()
    .createStreaming(params)
    .subscribe(event -> {
        accumulator.accumulate(event);
        // Process streaming events
    })
    .onCompleteFuture()
    .join();

StructuredMessage<BookList> message = accumulator.message(BookList.class);
BookList books = message.content().get(0).text().get().text();

Stream Handler Executor

For asynchronous streaming, handlers execute on a dedicated thread pool. You can configure the executor at the client level or per-subscription.

Client-Level Configuration

package com.anthropic.client.okhttp;

public final class AnthropicOkHttpClient {
    public static final class Builder {
        public Builder streamHandlerExecutor(Executor executor);
        public AnthropicClient build();
    }
}

Example:

import java.util.concurrent.Executors;

AnthropicClient client = AnthropicOkHttpClient.builder()
    .fromEnv()
    .streamHandlerExecutor(Executors.newFixedThreadPool(4))
    .build();

Per-Subscription Configuration

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

Executor executor = Executors.newFixedThreadPool(4);

client.async().messages().createStreaming(params)
    .subscribe(chunk -> System.out.println(chunk), executor);

The default executor is a cached thread pool suitable for most use cases. Configure a custom executor when you need specific thread management, resource limits, or integration with existing executor services.

Synchronous Streaming Examples

Basic Synchronous Streaming

Process events as they arrive using Java's Stream API:

import com.anthropic.client.AnthropicClient;
import com.anthropic.client.okhttp.AnthropicOkHttpClient;
import com.anthropic.core.http.StreamResponse;
import com.anthropic.models.messages.MessageCreateParams;
import com.anthropic.models.messages.Model;
import com.anthropic.models.messages.RawMessageStreamEvent;

AnthropicClient client = AnthropicOkHttpClient.fromEnv();

MessageCreateParams params = MessageCreateParams.builder()
    .maxTokens(1024L)
    .addUserMessage("Write a short poem about recursion")
    .model(Model.CLAUDE_SONNET_4_5)
    .build();

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {

    streamResponse.stream().forEach(event -> {
        if (event.isContentBlockDelta()) {
            event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
                System.out.print(textDelta.text());
            });
        }
    });
}

Accumulating with Stream Processing

Use MessageAccumulator with Stream.peek() to accumulate while processing:

import com.anthropic.helpers.MessageAccumulator;
import com.anthropic.models.messages.Message;

MessageAccumulator accumulator = MessageAccumulator.create();

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {

    streamResponse.stream()
        .peek(accumulator::accumulate)
        .filter(event -> event.isContentBlockDelta())
        .flatMap(event -> event.asContentBlockDelta().delta().text().stream())
        .forEach(textDelta -> System.out.print(textDelta.text()));
}

Message message = accumulator.message();
System.out.println("\n\nStop reason: " + message.stopReason());
System.out.println("Tokens used: " + message.usage().outputTokens());

Filtering Specific Event Types

Process only specific event types in the stream:

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {

    // Only process text deltas
    streamResponse.stream()
        .filter(event -> event.isContentBlockDelta())
        .map(RawMessageStreamEvent::asContentBlockDelta)
        .flatMap(deltaEvent -> deltaEvent.delta().text().stream())
        .forEach(textDelta -> System.out.print(textDelta.text()));
}

Try-With-Resources Pattern

Always use try-with-resources to ensure proper cleanup:

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {

    streamResponse.stream().forEach(event -> {
        // Process events
    });

} catch (Exception e) {
    System.err.println("Streaming failed: " + e.getMessage());
}
// Stream automatically closed here

Asynchronous Streaming Examples

Basic Asynchronous Streaming

Subscribe to events with a simple consumer:

import com.anthropic.client.AnthropicClientAsync;
import com.anthropic.client.okhttp.AnthropicOkHttpClientAsync;

AnthropicClientAsync client = AnthropicOkHttpClientAsync.fromEnv();

MessageCreateParams params = MessageCreateParams.builder()
    .maxTokens(1024L)
    .addUserMessage("Explain quantum computing")
    .model(Model.CLAUDE_SONNET_4_5)
    .build();

client.messages().createStreaming(params).subscribe(event -> {
    if (event.isContentBlockDelta()) {
        event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
            System.out.print(textDelta.text());
        });
    }
});

Handling Completion and Errors

Use the Handler interface to receive completion notifications:

import com.anthropic.core.http.AsyncStreamResponse;
import java.util.Optional;

client.messages().createStreaming(params).subscribe(
    new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {
        @Override
        public void onNext(RawMessageStreamEvent event) {
            if (event.isContentBlockDelta()) {
                event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
                    System.out.print(textDelta.text());
                });
            }
        }

        @Override
        public void onComplete(Optional<Throwable> error) {
            if (error.isPresent()) {
                System.err.println("\nStreaming failed: " + error.get().getMessage());
            } else {
                System.out.println("\n\nStreaming completed successfully");
            }
        }
    }
);

Using Futures for Synchronization

Wait for streaming to complete using CompletableFuture:

import java.util.concurrent.CompletableFuture;

CompletableFuture<Void> streamingComplete =
    client.messages().createStreaming(params)
        .subscribe(event -> {
            // Process events
        })
        .onCompleteFuture();

// Wait for completion
streamingComplete.join();

// Or handle completion asynchronously
streamingComplete.whenComplete((unused, error) -> {
    if (error != null) {
        System.err.println("Streaming failed: " + error.getMessage());
    } else {
        System.out.println("Streaming completed successfully");
    }
});

Accumulating Asynchronous Streams

Use MessageAccumulator with asynchronous streaming:

import com.anthropic.helpers.MessageAccumulator;
import com.anthropic.models.messages.Message;

MessageAccumulator accumulator = MessageAccumulator.create();

client.messages()
    .createStreaming(params)
    .subscribe(event -> {
        accumulator.accumulate(event);

        // Process events as they arrive
        if (event.isContentBlockDelta()) {
            event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
                System.out.print(textDelta.text());
            });
        }
    })
    .onCompleteFuture()
    .thenRun(() -> {
        Message message = accumulator.message();
        System.out.println("\n\nFinal message: " + message);
    })
    .join();

Custom Executor Configuration

Configure a custom executor for handler execution:

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

Executor customExecutor = Executors.newFixedThreadPool(2);

client.messages()
    .createStreaming(params)
    .subscribe(
        event -> {
            // Handler runs on custom executor
            System.out.println("Thread: " + Thread.currentThread().getName());
        },
        customExecutor
    )
    .onCompleteFuture()
    .join();

Cancelling Streams

Cancel an ongoing stream subscription:

import com.anthropic.core.http.AsyncStreamResponse.Subscription;

Subscription subscription = client.messages()
    .createStreaming(params)
    .subscribe(event -> {
        // Process events
    });

// Cancel after some condition
if (shouldCancel) {
    subscription.cancel();
}

// Or cancel after timeout
subscription.onCompleteFuture()
    .orTimeout(30, TimeUnit.SECONDS)
    .exceptionally(ex -> {
        subscription.cancel();
        return null;
    });

Stream Event Processing Patterns

Processing All Event Types

Handle all event types in a streaming response:

streamResponse.stream().forEach(event -> {
    if (event.isMessageStart()) {
        RawMessageStartEvent start = event.asMessageStart();
        System.out.println("Message started: " + start.message().id());

    } else if (event.isMessageDelta()) {
        RawMessageDeltaEvent delta = event.asMessageDelta();
        System.out.println("Token usage: " + delta.usage().outputTokens());

    } else if (event.isMessageStop()) {
        System.out.println("Message generation complete");

    } else if (event.isContentBlockStart()) {
        RawContentBlockStartEvent start = event.asContentBlockStart();
        System.out.println("Content block " + start.index() + " started");

    } else if (event.isContentBlockDelta()) {
        RawContentBlockDeltaEvent delta = event.asContentBlockDelta();
        delta.delta().text().ifPresent(textDelta -> {
            System.out.print(textDelta.text());
        });

    } else if (event.isContentBlockStop()) {
        RawContentBlockStopEvent stop = event.asContentBlockStop();
        System.out.println("\nContent block " + stop.index() + " complete");
    }
});

Extracting Text Content Only

Filter and extract only text content from the stream:

streamResponse.stream()
    .filter(event -> event.isContentBlockDelta())
    .map(RawMessageStreamEvent::asContentBlockDelta)
    .flatMap(delta -> delta.delta().text().stream())
    .map(textDelta -> textDelta.text())
    .forEach(System.out::print);

Tracking Usage Metrics

Monitor token usage as the response streams:

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {

    streamResponse.stream()
        .filter(event -> event.isMessageDelta())
        .map(RawMessageStreamEvent::asMessageDelta)
        .forEach(delta -> {
            Usage usage = delta.usage();
            System.out.println("Tokens generated: " + usage.outputTokens());
        });
}

Integration with Request Options

Streaming methods support per-request configuration through RequestOptions:

import com.anthropic.core.RequestOptions;
import java.time.Duration;

RequestOptions options = RequestOptions.builder()
    .timeout(Duration.ofMinutes(5))
    .build();

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params, options)) {

    streamResponse.stream().forEach(event -> {
        // Process with custom timeout
    });
}

Error Handling

Synchronous Streaming Errors

Handle errors using standard exception handling:

try (StreamResponse<RawMessageStreamEvent> streamResponse =
         client.messages().createStreaming(params)) {

    streamResponse.stream().forEach(event -> {
        // Process events
    });

} catch (AnthropicException e) {
    System.err.println("API error: " + e.getMessage());
} catch (Exception e) {
    System.err.println("Unexpected error: " + e.getMessage());
}

Asynchronous Streaming Errors

Handle errors through the completion handler:

client.messages().createStreaming(params).subscribe(
    new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {
        @Override
        public void onNext(RawMessageStreamEvent event) {
            // Process events
        }

        @Override
        public void onComplete(Optional<Throwable> error) {
            if (error.isPresent()) {
                Throwable ex = error.get();
                if (ex instanceof AnthropicException) {
                    System.err.println("API error: " + ex.getMessage());
                } else {
                    System.err.println("Unexpected error: " + ex.getMessage());
                }
            }
        }
    }
);

Best Practices

Resource Management

Always use try-with-resources for synchronous streaming to ensure connections are closed:

// Good
try (StreamResponse<RawMessageStreamEvent> stream =
         client.messages().createStreaming(params)) {
    stream.stream().forEach(event -> {});
}

// Bad - stream might not be closed on error
StreamResponse<RawMessageStreamEvent> stream =
    client.messages().createStreaming(params);
stream.stream().forEach(event -> {});

Accumulator Usage

Create a new MessageAccumulator for each streaming request:

// Good - new accumulator per request
MessageAccumulator accumulator = MessageAccumulator.create();
try (StreamResponse<RawMessageStreamEvent> stream =
         client.messages().createStreaming(params)) {
    stream.stream().peek(accumulator::accumulate).forEach(event -> {});
}

// Bad - reusing accumulator across requests
MessageAccumulator accumulator = MessageAccumulator.create();
// First request
client.messages().createStreaming(params1).stream()
    .peek(accumulator::accumulate).forEach(event -> {});
// Second request - accumulator contains data from both requests!
client.messages().createStreaming(params2).stream()
    .peek(accumulator::accumulate).forEach(event -> {});

Thread Safety

Stream handlers should be thread-safe when using asynchronous streaming:

// Good - thread-safe accumulation
MessageAccumulator accumulator = MessageAccumulator.create(); // Thread-safe
client.messages().createStreaming(params)
    .subscribe(event -> accumulator.accumulate(event));

// Bad - non-thread-safe state modification
List<String> texts = new ArrayList<>(); // Not thread-safe
client.messages().createStreaming(params)
    .subscribe(event -> {
        event.contentBlockDelta().ifPresent(delta ->
            texts.add(delta.delta().text().orElseThrow().text()) // Race condition!
        );
    });

Error Recovery

Implement proper error handling for production applications:

int maxRetries = 3;
int attempt = 0;

while (attempt < maxRetries) {
    try {
        try (StreamResponse<RawMessageStreamEvent> stream =
                 client.messages().createStreaming(params)) {

            stream.stream().forEach(event -> {
                // Process events
            });
            break; // Success
        }
    } catch (AnthropicException e) {
        attempt++;
        if (attempt >= maxRetries) {
            throw e;
        }
        Thread.sleep(1000 * attempt); // Exponential backoff
    }
}

Performance Considerations

  • Streaming vs Non-Streaming: Use streaming for long responses (high maxTokens) to provide immediate feedback
  • Executor Configuration: Use dedicated executors for async streaming when processing many concurrent streams
  • Accumulator Overhead: Only use MessageAccumulator when you need the final Message object
  • Event Filtering: Filter events early in the stream pipeline to reduce processing overhead
  • Resource Limits: Configure appropriate timeouts for long-running streams

Related Functionality

  • Client Setup and Configuration - Configuring timeout and executor settings
  • Structured Outputs - Using streaming with structured outputs

Install with Tessl CLI

npx tessl i tessl/maven-com-anthropic--anthropic-java

docs

client-setup.md

errors.md

index.md

messages.md

platform-adapters.md

streaming.md

structured-outputs.md

tools.md

tile.json