or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-setup.mderrors.mdindex.mdmessages.mdplatform-adapters.mdstreaming.mdstructured-outputs.mdtools.md

streaming.mddocs/

0

# Streaming Responses

1

2

The Anthropic Java SDK provides comprehensive streaming support for processing message responses as they are generated. Streaming enables real-time interaction with Claude, delivering response chunks incrementally rather than waiting for complete responses. The SDK offers both synchronous and asynchronous streaming interfaces with event-based processing and helper utilities for accumulating streamed data.

3

4

## Streaming Methods

5

6

The SDK provides streaming variants of message creation methods through the `MessageService` interface.

7

8

```java { .api }

9

package com.anthropic.services.blocking;

10

11

public interface MessageService {

12

StreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);

13

StreamResponse<RawMessageStreamEvent> createStreaming(

14

MessageCreateParams params,

15

RequestOptions requestOptions

16

);

17

}

18

```

19

20

```java { .api }

21

package com.anthropic.services.async;

22

23

public interface MessageServiceAsync {

24

AsyncStreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);

25

AsyncStreamResponse<RawMessageStreamEvent> createStreaming(

26

MessageCreateParams params,

27

RequestOptions requestOptions

28

);

29

}

30

```

31

32

The `createStreaming` methods return streaming response interfaces that emit `RawMessageStreamEvent` objects as the response is generated.

33

34

## StreamResponse Interface

35

36

The `StreamResponse` interface provides synchronous streaming using Java's `Stream` API.

37

38

```java { .api }

39

package com.anthropic.core.http;

40

41

public interface StreamResponse<T> extends Closeable {

42

Stream<T> stream();

43

void close();

44

}

45

```

46

47

**Methods:**

48

- `stream()` - Returns a Java `Stream<T>` of events that can be processed using standard stream operations

49

- `close()` - Closes the underlying connection and releases resources

50

51

**Usage Pattern:**

52

53

Use try-with-resources to ensure proper resource cleanup:

54

55

```java

56

try (StreamResponse<RawMessageStreamEvent> streamResponse =

57

client.messages().createStreaming(params)) {

58

streamResponse.stream().forEach(chunk -> {

59

System.out.println(chunk);

60

});

61

}

62

```

63

64

## AsyncStreamResponse Interface

65

66

The `AsyncStreamResponse` interface provides asynchronous streaming with a subscribe-based model.

67

68

```java { .api }

69

package com.anthropic.core.http;

70

71

public interface AsyncStreamResponse<T> {

72

Subscription subscribe(Consumer<T> handler);

73

Subscription subscribe(Consumer<T> handler, Executor executor);

74

Subscription subscribe(Handler<T> handler);

75

Subscription subscribe(Handler<T> handler, Executor executor);

76

77

interface Handler<T> {

78

void onNext(T item);

79

void onComplete(Optional<Throwable> error);

80

}

81

82

interface Subscription {

83

CompletableFuture<Void> onCompleteFuture();

84

void cancel();

85

}

86

}

87

```

88

89

**Methods:**

90

- `subscribe(Consumer<T>)` - Subscribe with a simple event handler

91

- `subscribe(Consumer<T>, Executor)` - Subscribe with a handler and custom executor

92

- `subscribe(Handler<T>)` - Subscribe with a handler that receives completion notifications

93

- `subscribe(Handler<T>, Executor)` - Subscribe with full handler and custom executor

94

95

**Handler Interface:**

96

- `onNext(T)` - Called for each streamed event

97

- `onComplete(Optional<Throwable>)` - Called when stream completes (error present if failed)

98

99

**Subscription Interface:**

100

- `onCompleteFuture()` - Returns a `CompletableFuture` that completes when streaming finishes

101

- `cancel()` - Cancels the stream subscription

102

103

## Stream Events

104

105

Streaming responses emit `RawMessageStreamEvent` objects, which is a union type representing different event types in the stream.

106

107

```java { .api }

108

package com.anthropic.models.messages;

109

110

public final class RawMessageStreamEvent {

111

// Union type accessors

112

Optional<RawMessageStartEvent> messageStart();

113

Optional<RawMessageDeltaEvent> messageDelta();

114

Optional<RawMessageStopEvent> messageStop();

115

Optional<RawContentBlockStartEvent> contentBlockStart();

116

Optional<RawContentBlockDeltaEvent> contentBlockDelta();

117

Optional<RawContentBlockStopEvent> contentBlockStop();

118

119

// Type checking

120

boolean isMessageStart();

121

boolean isMessageDelta();

122

boolean isMessageStop();

123

boolean isContentBlockStart();

124

boolean isContentBlockDelta();

125

boolean isContentBlockStop();

126

127

// Type casting

128

RawMessageStartEvent asMessageStart();

129

RawMessageDeltaEvent asMessageDelta();

130

RawMessageStopEvent asMessageStop();

131

RawContentBlockStartEvent asContentBlockStart();

132

RawContentBlockDeltaEvent asContentBlockDelta();

133

RawContentBlockStopEvent asContentBlockStop();

134

}

135

```

136

137

### Event Variants

138

139

**RawMessageStartEvent** - Emitted when a new message starts:

140

```java { .api }

141

package com.anthropic.models.messages;

142

143

public final class RawMessageStartEvent {

144

String type(); // "message_start"

145

Message message(); // Initial message with metadata

146

}

147

```

148

149

**RawMessageDeltaEvent** - Emitted when message metadata changes:

150

```java { .api }

151

package com.anthropic.models.messages;

152

153

public final class RawMessageDeltaEvent {

154

String type(); // "message_delta"

155

MessageDelta delta(); // Changes to message metadata

156

Usage usage(); // Updated token usage

157

}

158

```

159

160

**RawMessageStopEvent** - Emitted when message generation completes:

161

```java { .api }

162

package com.anthropic.models.messages;

163

164

public final class RawMessageStopEvent {

165

String type(); // "message_stop"

166

}

167

```

168

169

**RawContentBlockStartEvent** - Emitted when a new content block starts:

170

```java { .api }

171

package com.anthropic.models.messages;

172

173

public final class RawContentBlockStartEvent {

174

String type(); // "content_block_start"

175

int index(); // Index of the content block

176

ContentBlock contentBlock(); // Initial content block (TextBlock, ToolUseBlock, etc.)

177

}

178

```

179

180

**RawContentBlockDeltaEvent** - Emitted when content is added to a block:

181

```java { .api }

182

package com.anthropic.models.messages;

183

184

public final class RawContentBlockDeltaEvent {

185

String type(); // "content_block_delta"

186

int index(); // Index of the content block

187

ContentBlockDelta delta(); // Incremental content (text, partial JSON, etc.)

188

}

189

```

190

191

**RawContentBlockStopEvent** - Emitted when a content block completes:

192

```java { .api }

193

package com.anthropic.models.messages;

194

195

public final class RawContentBlockStopEvent {

196

String type(); // "content_block_stop"

197

int index(); // Index of the completed content block

198

}

199

```

200

201

## MessageAccumulator

202

203

The `MessageAccumulator` helper class accumulates streaming events into a complete `Message` object.

204

205

```java { .api }

206

package com.anthropic.helpers;

207

208

public final class MessageAccumulator {

209

static MessageAccumulator create();

210

211

RawMessageStreamEvent accumulate(RawMessageStreamEvent event);

212

Message message();

213

}

214

```

215

216

**Methods:**

217

- `create()` - Static factory method to create a new accumulator

218

- `accumulate(RawMessageStreamEvent)` - Process an event and return it (for chaining)

219

- `message()` - Retrieve the accumulated `Message` object

220

221

**Usage:**

222

223

The accumulator maintains state as events are processed and constructs a complete `Message` that mirrors what would be returned by the non-streaming API.

224

225

```java

226

MessageAccumulator accumulator = MessageAccumulator.create();

227

228

try (StreamResponse<RawMessageStreamEvent> streamResponse =

229

client.messages().createStreaming(params)) {

230

streamResponse.stream()

231

.peek(accumulator::accumulate)

232

.forEach(event -> {

233

// Process events as they arrive

234

});

235

}

236

237

Message message = accumulator.message();

238

```

239

240

## BetaMessageAccumulator

241

242

The `BetaMessageAccumulator` accumulates beta streaming events into a `BetaMessage` object.

243

244

```java { .api }

245

package com.anthropic.helpers;

246

247

public final class BetaMessageAccumulator {

248

static BetaMessageAccumulator create();

249

250

BetaRawMessageStreamEvent accumulate(BetaRawMessageStreamEvent event);

251

BetaMessage message();

252

<T> StructuredMessage<T> message(Class<T> structuredOutputClass);

253

}

254

```

255

256

**Methods:**

257

- `create()` - Static factory method to create a new accumulator

258

- `accumulate(BetaRawMessageStreamEvent)` - Process a beta event and return it

259

- `message()` - Retrieve the accumulated `BetaMessage` object

260

- `message(Class<T>)` - Retrieve as `StructuredMessage<T>` for structured outputs

261

262

**Usage with Structured Outputs:**

263

264

When using structured outputs with streaming, accumulate the JSON strings and then deserialize:

265

266

```java

267

BetaMessageAccumulator accumulator = BetaMessageAccumulator.create();

268

269

client.beta().messages()

270

.createStreaming(params)

271

.subscribe(event -> {

272

accumulator.accumulate(event);

273

// Process streaming events

274

})

275

.onCompleteFuture()

276

.join();

277

278

StructuredMessage<BookList> message = accumulator.message(BookList.class);

279

BookList books = message.content().get(0).text().get().text();

280

```

281

282

## Stream Handler Executor

283

284

For asynchronous streaming, handlers execute on a dedicated thread pool. You can configure the executor at the client level or per-subscription.

285

286

### Client-Level Configuration

287

288

```java { .api }

289

package com.anthropic.client.okhttp;

290

291

public final class AnthropicOkHttpClient {

292

public static final class Builder {

293

public Builder streamHandlerExecutor(Executor executor);

294

public AnthropicClient build();

295

}

296

}

297

```

298

299

**Example:**

300

301

```java

302

import java.util.concurrent.Executors;

303

304

AnthropicClient client = AnthropicOkHttpClient.builder()

305

.fromEnv()

306

.streamHandlerExecutor(Executors.newFixedThreadPool(4))

307

.build();

308

```

309

310

### Per-Subscription Configuration

311

312

```java

313

import java.util.concurrent.Executor;

314

import java.util.concurrent.Executors;

315

316

Executor executor = Executors.newFixedThreadPool(4);

317

318

client.async().messages().createStreaming(params)

319

.subscribe(chunk -> System.out.println(chunk), executor);

320

```

321

322

The default executor is a cached thread pool suitable for most use cases. Configure a custom executor when you need specific thread management, resource limits, or integration with existing executor services.

323

324

## Synchronous Streaming Examples

325

326

### Basic Synchronous Streaming

327

328

Process events as they arrive using Java's Stream API:

329

330

```java

331

import com.anthropic.client.AnthropicClient;

332

import com.anthropic.client.okhttp.AnthropicOkHttpClient;

333

import com.anthropic.core.http.StreamResponse;

334

import com.anthropic.models.messages.MessageCreateParams;

335

import com.anthropic.models.messages.Model;

336

import com.anthropic.models.messages.RawMessageStreamEvent;

337

338

AnthropicClient client = AnthropicOkHttpClient.fromEnv();

339

340

MessageCreateParams params = MessageCreateParams.builder()

341

.maxTokens(1024L)

342

.addUserMessage("Write a short poem about recursion")

343

.model(Model.CLAUDE_SONNET_4_5)

344

.build();

345

346

try (StreamResponse<RawMessageStreamEvent> streamResponse =

347

client.messages().createStreaming(params)) {

348

349

streamResponse.stream().forEach(event -> {

350

if (event.isContentBlockDelta()) {

351

event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {

352

System.out.print(textDelta.text());

353

});

354

}

355

});

356

}

357

```

358

359

### Accumulating with Stream Processing

360

361

Use `MessageAccumulator` with `Stream.peek()` to accumulate while processing:

362

363

```java

364

import com.anthropic.helpers.MessageAccumulator;

365

import com.anthropic.models.messages.Message;

366

367

MessageAccumulator accumulator = MessageAccumulator.create();

368

369

try (StreamResponse<RawMessageStreamEvent> streamResponse =

370

client.messages().createStreaming(params)) {

371

372

streamResponse.stream()

373

.peek(accumulator::accumulate)

374

.filter(event -> event.isContentBlockDelta())

375

.flatMap(event -> event.asContentBlockDelta().delta().text().stream())

376

.forEach(textDelta -> System.out.print(textDelta.text()));

377

}

378

379

Message message = accumulator.message();

380

System.out.println("\n\nStop reason: " + message.stopReason());

381

System.out.println("Tokens used: " + message.usage().outputTokens());

382

```

383

384

### Filtering Specific Event Types

385

386

Process only specific event types in the stream:

387

388

```java

389

try (StreamResponse<RawMessageStreamEvent> streamResponse =

390

client.messages().createStreaming(params)) {

391

392

// Only process text deltas

393

streamResponse.stream()

394

.filter(event -> event.isContentBlockDelta())

395

.map(RawMessageStreamEvent::asContentBlockDelta)

396

.flatMap(deltaEvent -> deltaEvent.delta().text().stream())

397

.forEach(textDelta -> System.out.print(textDelta.text()));

398

}

399

```

400

401

### Try-With-Resources Pattern

402

403

Always use try-with-resources to ensure proper cleanup:

404

405

```java

406

try (StreamResponse<RawMessageStreamEvent> streamResponse =

407

client.messages().createStreaming(params)) {

408

409

streamResponse.stream().forEach(event -> {

410

// Process events

411

});

412

413

} catch (Exception e) {

414

System.err.println("Streaming failed: " + e.getMessage());

415

}

416

// Stream automatically closed here

417

```

418

419

## Asynchronous Streaming Examples

420

421

### Basic Asynchronous Streaming

422

423

Subscribe to events with a simple consumer:

424

425

```java

426

import com.anthropic.client.AnthropicClientAsync;

427

import com.anthropic.client.okhttp.AnthropicOkHttpClientAsync;

428

429

AnthropicClientAsync client = AnthropicOkHttpClientAsync.fromEnv();

430

431

MessageCreateParams params = MessageCreateParams.builder()

432

.maxTokens(1024L)

433

.addUserMessage("Explain quantum computing")

434

.model(Model.CLAUDE_SONNET_4_5)

435

.build();

436

437

client.messages().createStreaming(params).subscribe(event -> {

438

if (event.isContentBlockDelta()) {

439

event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {

440

System.out.print(textDelta.text());

441

});

442

}

443

});

444

```

445

446

### Handling Completion and Errors

447

448

Use the `Handler` interface to receive completion notifications:

449

450

```java

451

import com.anthropic.core.http.AsyncStreamResponse;

452

import java.util.Optional;

453

454

client.messages().createStreaming(params).subscribe(

455

new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {

456

@Override

457

public void onNext(RawMessageStreamEvent event) {

458

if (event.isContentBlockDelta()) {

459

event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {

460

System.out.print(textDelta.text());

461

});

462

}

463

}

464

465

@Override

466

public void onComplete(Optional<Throwable> error) {

467

if (error.isPresent()) {

468

System.err.println("\nStreaming failed: " + error.get().getMessage());

469

} else {

470

System.out.println("\n\nStreaming completed successfully");

471

}

472

}

473

}

474

);

475

```

476

477

### Using Futures for Synchronization

478

479

Wait for streaming to complete using `CompletableFuture`:

480

481

```java

482

import java.util.concurrent.CompletableFuture;

483

484

CompletableFuture<Void> streamingComplete =

485

client.messages().createStreaming(params)

486

.subscribe(event -> {

487

// Process events

488

})

489

.onCompleteFuture();

490

491

// Wait for completion

492

streamingComplete.join();

493

494

// Or handle completion asynchronously

495

streamingComplete.whenComplete((unused, error) -> {

496

if (error != null) {

497

System.err.println("Streaming failed: " + error.getMessage());

498

} else {

499

System.out.println("Streaming completed successfully");

500

}

501

});

502

```

503

504

### Accumulating Asynchronous Streams

505

506

Use `MessageAccumulator` with asynchronous streaming:

507

508

```java

509

import com.anthropic.helpers.MessageAccumulator;

510

import com.anthropic.models.messages.Message;

511

512

MessageAccumulator accumulator = MessageAccumulator.create();

513

514

client.messages()

515

.createStreaming(params)

516

.subscribe(event -> {

517

accumulator.accumulate(event);

518

519

// Process events as they arrive

520

if (event.isContentBlockDelta()) {

521

event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {

522

System.out.print(textDelta.text());

523

});

524

}

525

})

526

.onCompleteFuture()

527

.thenRun(() -> {

528

Message message = accumulator.message();

529

System.out.println("\n\nFinal message: " + message);

530

})

531

.join();

532

```

533

534

### Custom Executor Configuration

535

536

Configure a custom executor for handler execution:

537

538

```java

539

import java.util.concurrent.Executor;

540

import java.util.concurrent.Executors;

541

542

Executor customExecutor = Executors.newFixedThreadPool(2);

543

544

client.messages()

545

.createStreaming(params)

546

.subscribe(

547

event -> {

548

// Handler runs on custom executor

549

System.out.println("Thread: " + Thread.currentThread().getName());

550

},

551

customExecutor

552

)

553

.onCompleteFuture()

554

.join();

555

```

556

557

### Cancelling Streams

558

559

Cancel an ongoing stream subscription:

560

561

```java

562

import com.anthropic.core.http.AsyncStreamResponse.Subscription;

563

564

Subscription subscription = client.messages()

565

.createStreaming(params)

566

.subscribe(event -> {

567

// Process events

568

});

569

570

// Cancel after some condition

571

if (shouldCancel) {

572

subscription.cancel();

573

}

574

575

// Or cancel after timeout

576

subscription.onCompleteFuture()

577

.orTimeout(30, TimeUnit.SECONDS)

578

.exceptionally(ex -> {

579

subscription.cancel();

580

return null;

581

});

582

```

583

584

## Stream Event Processing Patterns

585

586

### Processing All Event Types

587

588

Handle all event types in a streaming response:

589

590

```java

591

streamResponse.stream().forEach(event -> {

592

if (event.isMessageStart()) {

593

RawMessageStartEvent start = event.asMessageStart();

594

System.out.println("Message started: " + start.message().id());

595

596

} else if (event.isMessageDelta()) {

597

RawMessageDeltaEvent delta = event.asMessageDelta();

598

System.out.println("Token usage: " + delta.usage().outputTokens());

599

600

} else if (event.isMessageStop()) {

601

System.out.println("Message generation complete");

602

603

} else if (event.isContentBlockStart()) {

604

RawContentBlockStartEvent start = event.asContentBlockStart();

605

System.out.println("Content block " + start.index() + " started");

606

607

} else if (event.isContentBlockDelta()) {

608

RawContentBlockDeltaEvent delta = event.asContentBlockDelta();

609

delta.delta().text().ifPresent(textDelta -> {

610

System.out.print(textDelta.text());

611

});

612

613

} else if (event.isContentBlockStop()) {

614

RawContentBlockStopEvent stop = event.asContentBlockStop();

615

System.out.println("\nContent block " + stop.index() + " complete");

616

}

617

});

618

```

619

620

### Extracting Text Content Only

621

622

Filter and extract only text content from the stream:

623

624

```java

625

streamResponse.stream()

626

.filter(event -> event.isContentBlockDelta())

627

.map(RawMessageStreamEvent::asContentBlockDelta)

628

.flatMap(delta -> delta.delta().text().stream())

629

.map(textDelta -> textDelta.text())

630

.forEach(System.out::print);

631

```

632

633

### Tracking Usage Metrics

634

635

Monitor token usage as the response streams:

636

637

```java

638

try (StreamResponse<RawMessageStreamEvent> streamResponse =

639

client.messages().createStreaming(params)) {

640

641

streamResponse.stream()

642

.filter(event -> event.isMessageDelta())

643

.map(RawMessageStreamEvent::asMessageDelta)

644

.forEach(delta -> {

645

Usage usage = delta.usage();

646

System.out.println("Tokens generated: " + usage.outputTokens());

647

});

648

}

649

```

650

651

## Integration with Request Options

652

653

Streaming methods support per-request configuration through `RequestOptions`:

654

655

```java

656

import com.anthropic.core.RequestOptions;

657

import java.time.Duration;

658

659

RequestOptions options = RequestOptions.builder()

660

.timeout(Duration.ofMinutes(5))

661

.build();

662

663

try (StreamResponse<RawMessageStreamEvent> streamResponse =

664

client.messages().createStreaming(params, options)) {

665

666

streamResponse.stream().forEach(event -> {

667

// Process with custom timeout

668

});

669

}

670

```

671

672

## Error Handling

673

674

### Synchronous Streaming Errors

675

676

Handle errors using standard exception handling:

677

678

```java

679

try (StreamResponse<RawMessageStreamEvent> streamResponse =

680

client.messages().createStreaming(params)) {

681

682

streamResponse.stream().forEach(event -> {

683

// Process events

684

});

685

686

} catch (AnthropicException e) {

687

System.err.println("API error: " + e.getMessage());

688

} catch (Exception e) {

689

System.err.println("Unexpected error: " + e.getMessage());

690

}

691

```

692

693

### Asynchronous Streaming Errors

694

695

Handle errors through the completion handler:

696

697

```java

698

client.messages().createStreaming(params).subscribe(

699

new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {

700

@Override

701

public void onNext(RawMessageStreamEvent event) {

702

// Process events

703

}

704

705

@Override

706

public void onComplete(Optional<Throwable> error) {

707

if (error.isPresent()) {

708

Throwable ex = error.get();

709

if (ex instanceof AnthropicException) {

710

System.err.println("API error: " + ex.getMessage());

711

} else {

712

System.err.println("Unexpected error: " + ex.getMessage());

713

}

714

}

715

}

716

}

717

);

718

```

719

720

## Best Practices

721

722

### Resource Management

723

724

Always use try-with-resources for synchronous streaming to ensure connections are closed:

725

726

```java

727

// Good

728

try (StreamResponse<RawMessageStreamEvent> stream =

729

client.messages().createStreaming(params)) {

730

stream.stream().forEach(event -> {});

731

}

732

733

// Bad - stream might not be closed on error

734

StreamResponse<RawMessageStreamEvent> stream =

735

client.messages().createStreaming(params);

736

stream.stream().forEach(event -> {});

737

```

738

739

### Accumulator Usage

740

741

Create a new `MessageAccumulator` for each streaming request:

742

743

```java

744

// Good - new accumulator per request

745

MessageAccumulator accumulator = MessageAccumulator.create();

746

try (StreamResponse<RawMessageStreamEvent> stream =

747

client.messages().createStreaming(params)) {

748

stream.stream().peek(accumulator::accumulate).forEach(event -> {});

749

}

750

751

// Bad - reusing accumulator across requests

752

MessageAccumulator accumulator = MessageAccumulator.create();

753

// First request

754

client.messages().createStreaming(params1).stream()

755

.peek(accumulator::accumulate).forEach(event -> {});

756

// Second request - accumulator contains data from both requests!

757

client.messages().createStreaming(params2).stream()

758

.peek(accumulator::accumulate).forEach(event -> {});

759

```

760

761

### Thread Safety

762

763

Stream handlers should be thread-safe when using asynchronous streaming:

764

765

```java

766

// Good - thread-safe accumulation

767

MessageAccumulator accumulator = MessageAccumulator.create(); // Thread-safe

768

client.messages().createStreaming(params)

769

.subscribe(event -> accumulator.accumulate(event));

770

771

// Bad - non-thread-safe state modification

772

List<String> texts = new ArrayList<>(); // Not thread-safe

773

client.messages().createStreaming(params)

774

.subscribe(event -> {

775

event.contentBlockDelta().ifPresent(delta ->

776

texts.add(delta.delta().text().orElseThrow().text()) // Race condition!

777

);

778

});

779

```

780

781

### Error Recovery

782

783

Implement proper error handling for production applications:

784

785

```java

786

int maxRetries = 3;

787

int attempt = 0;

788

789

while (attempt < maxRetries) {

790

try {

791

try (StreamResponse<RawMessageStreamEvent> stream =

792

client.messages().createStreaming(params)) {

793

794

stream.stream().forEach(event -> {

795

// Process events

796

});

797

break; // Success

798

}

799

} catch (AnthropicException e) {

800

attempt++;

801

if (attempt >= maxRetries) {

802

throw e;

803

}

804

Thread.sleep(1000 * attempt); // Exponential backoff

805

}

806

}

807

```

808

809

## Performance Considerations

810

811

- **Streaming vs Non-Streaming**: Use streaming for long responses (high `maxTokens`) to provide immediate feedback

812

- **Executor Configuration**: Use dedicated executors for async streaming when processing many concurrent streams

813

- **Accumulator Overhead**: Only use `MessageAccumulator` when you need the final `Message` object

814

- **Event Filtering**: Filter events early in the stream pipeline to reduce processing overhead

815

- **Resource Limits**: Configure appropriate timeouts for long-running streams

816

817

## Related Functionality

818

819

- [Message Creation](./message-creation.md) - Non-streaming message creation

820

- [Client Setup and Configuration](./client-setup.md) - Configuring timeout and executor settings

821

- [Error Handling](./error-handling.md) - Comprehensive error handling strategies

822

- [Structured Outputs](./structured-outputs.md) - Using streaming with structured outputs

823