0
# Streaming Responses
1
2
The Anthropic Java SDK provides comprehensive streaming support for processing message responses as they are generated. Streaming enables real-time interaction with Claude, delivering response chunks incrementally rather than waiting for complete responses. The SDK offers both synchronous and asynchronous streaming interfaces with event-based processing and helper utilities for accumulating streamed data.
3
4
## Streaming Methods
5
6
The SDK provides streaming variants of message creation methods through the `MessageService` interface.
7
8
```java { .api }
9
package com.anthropic.services.blocking;
10
11
public interface MessageService {
12
StreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);
13
StreamResponse<RawMessageStreamEvent> createStreaming(
14
MessageCreateParams params,
15
RequestOptions requestOptions
16
);
17
}
18
```
19
20
```java { .api }
21
package com.anthropic.services.async;
22
23
public interface MessageServiceAsync {
24
AsyncStreamResponse<RawMessageStreamEvent> createStreaming(MessageCreateParams params);
25
AsyncStreamResponse<RawMessageStreamEvent> createStreaming(
26
MessageCreateParams params,
27
RequestOptions requestOptions
28
);
29
}
30
```
31
32
The `createStreaming` methods return streaming response interfaces that emit `RawMessageStreamEvent` objects as the response is generated.
33
34
## StreamResponse Interface
35
36
The `StreamResponse` interface provides synchronous streaming using Java's `Stream` API.
37
38
```java { .api }
39
package com.anthropic.core.http;
40
41
public interface StreamResponse<T> extends Closeable {
42
Stream<T> stream();
43
void close();
44
}
45
```
46
47
**Methods:**
48
- `stream()` - Returns a Java `Stream<T>` of events that can be processed using standard stream operations
49
- `close()` - Closes the underlying connection and releases resources
50
51
**Usage Pattern:**
52
53
Use try-with-resources to ensure proper resource cleanup:
54
55
```java
56
try (StreamResponse<RawMessageStreamEvent> streamResponse =
57
client.messages().createStreaming(params)) {
58
streamResponse.stream().forEach(chunk -> {
59
System.out.println(chunk);
60
});
61
}
62
```
63
64
## AsyncStreamResponse Interface
65
66
The `AsyncStreamResponse` interface provides asynchronous streaming with a subscribe-based model.
67
68
```java { .api }
69
package com.anthropic.core.http;
70
71
public interface AsyncStreamResponse<T> {
72
Subscription subscribe(Consumer<T> handler);
73
Subscription subscribe(Consumer<T> handler, Executor executor);
74
Subscription subscribe(Handler<T> handler);
75
Subscription subscribe(Handler<T> handler, Executor executor);
76
77
interface Handler<T> {
78
void onNext(T item);
79
void onComplete(Optional<Throwable> error);
80
}
81
82
interface Subscription {
83
CompletableFuture<Void> onCompleteFuture();
84
void cancel();
85
}
86
}
87
```
88
89
**Methods:**
90
- `subscribe(Consumer<T>)` - Subscribe with a simple event handler
91
- `subscribe(Consumer<T>, Executor)` - Subscribe with a handler and custom executor
92
- `subscribe(Handler<T>)` - Subscribe with a handler that receives completion notifications
93
- `subscribe(Handler<T>, Executor)` - Subscribe with full handler and custom executor
94
95
**Handler Interface:**
96
- `onNext(T)` - Called for each streamed event
97
- `onComplete(Optional<Throwable>)` - Called when stream completes (error present if failed)
98
99
**Subscription Interface:**
100
- `onCompleteFuture()` - Returns a `CompletableFuture` that completes when streaming finishes
101
- `cancel()` - Cancels the stream subscription
102
103
## Stream Events
104
105
Streaming responses emit `RawMessageStreamEvent` objects, which is a union type representing different event types in the stream.
106
107
```java { .api }
108
package com.anthropic.models.messages;
109
110
public final class RawMessageStreamEvent {
111
// Union type accessors
112
Optional<RawMessageStartEvent> messageStart();
113
Optional<RawMessageDeltaEvent> messageDelta();
114
Optional<RawMessageStopEvent> messageStop();
115
Optional<RawContentBlockStartEvent> contentBlockStart();
116
Optional<RawContentBlockDeltaEvent> contentBlockDelta();
117
Optional<RawContentBlockStopEvent> contentBlockStop();
118
119
// Type checking
120
boolean isMessageStart();
121
boolean isMessageDelta();
122
boolean isMessageStop();
123
boolean isContentBlockStart();
124
boolean isContentBlockDelta();
125
boolean isContentBlockStop();
126
127
// Type casting
128
RawMessageStartEvent asMessageStart();
129
RawMessageDeltaEvent asMessageDelta();
130
RawMessageStopEvent asMessageStop();
131
RawContentBlockStartEvent asContentBlockStart();
132
RawContentBlockDeltaEvent asContentBlockDelta();
133
RawContentBlockStopEvent asContentBlockStop();
134
}
135
```
136
137
### Event Variants
138
139
**RawMessageStartEvent** - Emitted when a new message starts:
140
```java { .api }
141
package com.anthropic.models.messages;
142
143
public final class RawMessageStartEvent {
144
String type(); // "message_start"
145
Message message(); // Initial message with metadata
146
}
147
```
148
149
**RawMessageDeltaEvent** - Emitted when message metadata changes:
150
```java { .api }
151
package com.anthropic.models.messages;
152
153
public final class RawMessageDeltaEvent {
154
String type(); // "message_delta"
155
MessageDelta delta(); // Changes to message metadata
156
Usage usage(); // Updated token usage
157
}
158
```
159
160
**RawMessageStopEvent** - Emitted when message generation completes:
161
```java { .api }
162
package com.anthropic.models.messages;
163
164
public final class RawMessageStopEvent {
165
String type(); // "message_stop"
166
}
167
```
168
169
**RawContentBlockStartEvent** - Emitted when a new content block starts:
170
```java { .api }
171
package com.anthropic.models.messages;
172
173
public final class RawContentBlockStartEvent {
174
String type(); // "content_block_start"
175
int index(); // Index of the content block
176
ContentBlock contentBlock(); // Initial content block (TextBlock, ToolUseBlock, etc.)
177
}
178
```
179
180
**RawContentBlockDeltaEvent** - Emitted when content is added to a block:
181
```java { .api }
182
package com.anthropic.models.messages;
183
184
public final class RawContentBlockDeltaEvent {
185
String type(); // "content_block_delta"
186
int index(); // Index of the content block
187
ContentBlockDelta delta(); // Incremental content (text, partial JSON, etc.)
188
}
189
```
190
191
**RawContentBlockStopEvent** - Emitted when a content block completes:
192
```java { .api }
193
package com.anthropic.models.messages;
194
195
public final class RawContentBlockStopEvent {
196
String type(); // "content_block_stop"
197
int index(); // Index of the completed content block
198
}
199
```
200
201
## MessageAccumulator
202
203
The `MessageAccumulator` helper class accumulates streaming events into a complete `Message` object.
204
205
```java { .api }
206
package com.anthropic.helpers;
207
208
public final class MessageAccumulator {
209
static MessageAccumulator create();
210
211
RawMessageStreamEvent accumulate(RawMessageStreamEvent event);
212
Message message();
213
}
214
```
215
216
**Methods:**
217
- `create()` - Static factory method to create a new accumulator
218
- `accumulate(RawMessageStreamEvent)` - Process an event and return it (for chaining)
219
- `message()` - Retrieve the accumulated `Message` object
220
221
**Usage:**
222
223
The accumulator maintains state as events are processed and constructs a complete `Message` that mirrors what would be returned by the non-streaming API.
224
225
```java
226
MessageAccumulator accumulator = MessageAccumulator.create();
227
228
try (StreamResponse<RawMessageStreamEvent> streamResponse =
229
client.messages().createStreaming(params)) {
230
streamResponse.stream()
231
.peek(accumulator::accumulate)
232
.forEach(event -> {
233
// Process events as they arrive
234
});
235
}
236
237
Message message = accumulator.message();
238
```
239
240
## BetaMessageAccumulator
241
242
The `BetaMessageAccumulator` accumulates beta streaming events into a `BetaMessage` object.
243
244
```java { .api }
245
package com.anthropic.helpers;
246
247
public final class BetaMessageAccumulator {
248
static BetaMessageAccumulator create();
249
250
BetaRawMessageStreamEvent accumulate(BetaRawMessageStreamEvent event);
251
BetaMessage message();
252
<T> StructuredMessage<T> message(Class<T> structuredOutputClass);
253
}
254
```
255
256
**Methods:**
257
- `create()` - Static factory method to create a new accumulator
258
- `accumulate(BetaRawMessageStreamEvent)` - Process a beta event and return it
259
- `message()` - Retrieve the accumulated `BetaMessage` object
260
- `message(Class<T>)` - Retrieve as `StructuredMessage<T>` for structured outputs
261
262
**Usage with Structured Outputs:**
263
264
When using structured outputs with streaming, accumulate the JSON strings and then deserialize:
265
266
```java
267
BetaMessageAccumulator accumulator = BetaMessageAccumulator.create();
268
269
client.beta().messages()
270
.createStreaming(params)
271
.subscribe(event -> {
272
accumulator.accumulate(event);
273
// Process streaming events
274
})
275
.onCompleteFuture()
276
.join();
277
278
StructuredMessage<BookList> message = accumulator.message(BookList.class);
279
BookList books = message.content().get(0).text().get().text();
280
```
281
282
## Stream Handler Executor
283
284
For asynchronous streaming, handlers execute on a dedicated thread pool. You can configure the executor at the client level or per-subscription.
285
286
### Client-Level Configuration
287
288
```java { .api }
289
package com.anthropic.client.okhttp;
290
291
public final class AnthropicOkHttpClient {
292
public static final class Builder {
293
public Builder streamHandlerExecutor(Executor executor);
294
public AnthropicClient build();
295
}
296
}
297
```
298
299
**Example:**
300
301
```java
302
import java.util.concurrent.Executors;
303
304
AnthropicClient client = AnthropicOkHttpClient.builder()
305
.fromEnv()
306
.streamHandlerExecutor(Executors.newFixedThreadPool(4))
307
.build();
308
```
309
310
### Per-Subscription Configuration
311
312
```java
313
import java.util.concurrent.Executor;
314
import java.util.concurrent.Executors;
315
316
Executor executor = Executors.newFixedThreadPool(4);
317
318
client.async().messages().createStreaming(params)
319
.subscribe(chunk -> System.out.println(chunk), executor);
320
```
321
322
The default executor is a cached thread pool suitable for most use cases. Configure a custom executor when you need specific thread management, resource limits, or integration with existing executor services.
323
324
## Synchronous Streaming Examples
325
326
### Basic Synchronous Streaming
327
328
Process events as they arrive using Java's Stream API:
329
330
```java
331
import com.anthropic.client.AnthropicClient;
332
import com.anthropic.client.okhttp.AnthropicOkHttpClient;
333
import com.anthropic.core.http.StreamResponse;
334
import com.anthropic.models.messages.MessageCreateParams;
335
import com.anthropic.models.messages.Model;
336
import com.anthropic.models.messages.RawMessageStreamEvent;
337
338
AnthropicClient client = AnthropicOkHttpClient.fromEnv();
339
340
MessageCreateParams params = MessageCreateParams.builder()
341
.maxTokens(1024L)
342
.addUserMessage("Write a short poem about recursion")
343
.model(Model.CLAUDE_SONNET_4_5)
344
.build();
345
346
try (StreamResponse<RawMessageStreamEvent> streamResponse =
347
client.messages().createStreaming(params)) {
348
349
streamResponse.stream().forEach(event -> {
350
if (event.isContentBlockDelta()) {
351
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
352
System.out.print(textDelta.text());
353
});
354
}
355
});
356
}
357
```
358
359
### Accumulating with Stream Processing
360
361
Use `MessageAccumulator` with `Stream.peek()` to accumulate while processing:
362
363
```java
364
import com.anthropic.helpers.MessageAccumulator;
365
import com.anthropic.models.messages.Message;
366
367
MessageAccumulator accumulator = MessageAccumulator.create();
368
369
try (StreamResponse<RawMessageStreamEvent> streamResponse =
370
client.messages().createStreaming(params)) {
371
372
streamResponse.stream()
373
.peek(accumulator::accumulate)
374
.filter(event -> event.isContentBlockDelta())
375
.flatMap(event -> event.asContentBlockDelta().delta().text().stream())
376
.forEach(textDelta -> System.out.print(textDelta.text()));
377
}
378
379
Message message = accumulator.message();
380
System.out.println("\n\nStop reason: " + message.stopReason());
381
System.out.println("Tokens used: " + message.usage().outputTokens());
382
```
383
384
### Filtering Specific Event Types
385
386
Process only specific event types in the stream:
387
388
```java
389
try (StreamResponse<RawMessageStreamEvent> streamResponse =
390
client.messages().createStreaming(params)) {
391
392
// Only process text deltas
393
streamResponse.stream()
394
.filter(event -> event.isContentBlockDelta())
395
.map(RawMessageStreamEvent::asContentBlockDelta)
396
.flatMap(deltaEvent -> deltaEvent.delta().text().stream())
397
.forEach(textDelta -> System.out.print(textDelta.text()));
398
}
399
```
400
401
### Try-With-Resources Pattern
402
403
Always use try-with-resources to ensure proper cleanup:
404
405
```java
406
try (StreamResponse<RawMessageStreamEvent> streamResponse =
407
client.messages().createStreaming(params)) {
408
409
streamResponse.stream().forEach(event -> {
410
// Process events
411
});
412
413
} catch (Exception e) {
414
System.err.println("Streaming failed: " + e.getMessage());
415
}
416
// Stream automatically closed here
417
```
418
419
## Asynchronous Streaming Examples
420
421
### Basic Asynchronous Streaming
422
423
Subscribe to events with a simple consumer:
424
425
```java
426
import com.anthropic.client.AnthropicClientAsync;
427
import com.anthropic.client.okhttp.AnthropicOkHttpClientAsync;
428
429
AnthropicClientAsync client = AnthropicOkHttpClientAsync.fromEnv();
430
431
MessageCreateParams params = MessageCreateParams.builder()
432
.maxTokens(1024L)
433
.addUserMessage("Explain quantum computing")
434
.model(Model.CLAUDE_SONNET_4_5)
435
.build();
436
437
client.messages().createStreaming(params).subscribe(event -> {
438
if (event.isContentBlockDelta()) {
439
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
440
System.out.print(textDelta.text());
441
});
442
}
443
});
444
```
445
446
### Handling Completion and Errors
447
448
Use the `Handler` interface to receive completion notifications:
449
450
```java
451
import com.anthropic.core.http.AsyncStreamResponse;
452
import java.util.Optional;
453
454
client.messages().createStreaming(params).subscribe(
455
new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {
456
@Override
457
public void onNext(RawMessageStreamEvent event) {
458
if (event.isContentBlockDelta()) {
459
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
460
System.out.print(textDelta.text());
461
});
462
}
463
}
464
465
@Override
466
public void onComplete(Optional<Throwable> error) {
467
if (error.isPresent()) {
468
System.err.println("\nStreaming failed: " + error.get().getMessage());
469
} else {
470
System.out.println("\n\nStreaming completed successfully");
471
}
472
}
473
}
474
);
475
```
476
477
### Using Futures for Synchronization
478
479
Wait for streaming to complete using `CompletableFuture`:
480
481
```java
482
import java.util.concurrent.CompletableFuture;
483
484
CompletableFuture<Void> streamingComplete =
485
client.messages().createStreaming(params)
486
.subscribe(event -> {
487
// Process events
488
})
489
.onCompleteFuture();
490
491
// Wait for completion
492
streamingComplete.join();
493
494
// Or handle completion asynchronously
495
streamingComplete.whenComplete((unused, error) -> {
496
if (error != null) {
497
System.err.println("Streaming failed: " + error.getMessage());
498
} else {
499
System.out.println("Streaming completed successfully");
500
}
501
});
502
```
503
504
### Accumulating Asynchronous Streams
505
506
Use `MessageAccumulator` with asynchronous streaming:
507
508
```java
509
import com.anthropic.helpers.MessageAccumulator;
510
import com.anthropic.models.messages.Message;
511
512
MessageAccumulator accumulator = MessageAccumulator.create();
513
514
client.messages()
515
.createStreaming(params)
516
.subscribe(event -> {
517
accumulator.accumulate(event);
518
519
// Process events as they arrive
520
if (event.isContentBlockDelta()) {
521
event.asContentBlockDelta().delta().text().ifPresent(textDelta -> {
522
System.out.print(textDelta.text());
523
});
524
}
525
})
526
.onCompleteFuture()
527
.thenRun(() -> {
528
Message message = accumulator.message();
529
System.out.println("\n\nFinal message: " + message);
530
})
531
.join();
532
```
533
534
### Custom Executor Configuration
535
536
Configure a custom executor for handler execution:
537
538
```java
539
import java.util.concurrent.Executor;
540
import java.util.concurrent.Executors;
541
542
Executor customExecutor = Executors.newFixedThreadPool(2);
543
544
client.messages()
545
.createStreaming(params)
546
.subscribe(
547
event -> {
548
// Handler runs on custom executor
549
System.out.println("Thread: " + Thread.currentThread().getName());
550
},
551
customExecutor
552
)
553
.onCompleteFuture()
554
.join();
555
```
556
557
### Cancelling Streams
558
559
Cancel an ongoing stream subscription:
560
561
```java
562
import com.anthropic.core.http.AsyncStreamResponse.Subscription;
563
564
Subscription subscription = client.messages()
565
.createStreaming(params)
566
.subscribe(event -> {
567
// Process events
568
});
569
570
// Cancel after some condition
571
if (shouldCancel) {
572
subscription.cancel();
573
}
574
575
// Or cancel after timeout
576
subscription.onCompleteFuture()
577
.orTimeout(30, TimeUnit.SECONDS)
578
.exceptionally(ex -> {
579
subscription.cancel();
580
return null;
581
});
582
```
583
584
## Stream Event Processing Patterns
585
586
### Processing All Event Types
587
588
Handle all event types in a streaming response:
589
590
```java
591
streamResponse.stream().forEach(event -> {
592
if (event.isMessageStart()) {
593
RawMessageStartEvent start = event.asMessageStart();
594
System.out.println("Message started: " + start.message().id());
595
596
} else if (event.isMessageDelta()) {
597
RawMessageDeltaEvent delta = event.asMessageDelta();
598
System.out.println("Token usage: " + delta.usage().outputTokens());
599
600
} else if (event.isMessageStop()) {
601
System.out.println("Message generation complete");
602
603
} else if (event.isContentBlockStart()) {
604
RawContentBlockStartEvent start = event.asContentBlockStart();
605
System.out.println("Content block " + start.index() + " started");
606
607
} else if (event.isContentBlockDelta()) {
608
RawContentBlockDeltaEvent delta = event.asContentBlockDelta();
609
delta.delta().text().ifPresent(textDelta -> {
610
System.out.print(textDelta.text());
611
});
612
613
} else if (event.isContentBlockStop()) {
614
RawContentBlockStopEvent stop = event.asContentBlockStop();
615
System.out.println("\nContent block " + stop.index() + " complete");
616
}
617
});
618
```
619
620
### Extracting Text Content Only
621
622
Filter and extract only text content from the stream:
623
624
```java
625
streamResponse.stream()
626
.filter(event -> event.isContentBlockDelta())
627
.map(RawMessageStreamEvent::asContentBlockDelta)
628
.flatMap(delta -> delta.delta().text().stream())
629
.map(textDelta -> textDelta.text())
630
.forEach(System.out::print);
631
```
632
633
### Tracking Usage Metrics
634
635
Monitor token usage as the response streams:
636
637
```java
638
try (StreamResponse<RawMessageStreamEvent> streamResponse =
639
client.messages().createStreaming(params)) {
640
641
streamResponse.stream()
642
.filter(event -> event.isMessageDelta())
643
.map(RawMessageStreamEvent::asMessageDelta)
644
.forEach(delta -> {
645
Usage usage = delta.usage();
646
System.out.println("Tokens generated: " + usage.outputTokens());
647
});
648
}
649
```
650
651
## Integration with Request Options
652
653
Streaming methods support per-request configuration through `RequestOptions`:
654
655
```java
656
import com.anthropic.core.RequestOptions;
657
import java.time.Duration;
658
659
RequestOptions options = RequestOptions.builder()
660
.timeout(Duration.ofMinutes(5))
661
.build();
662
663
try (StreamResponse<RawMessageStreamEvent> streamResponse =
664
client.messages().createStreaming(params, options)) {
665
666
streamResponse.stream().forEach(event -> {
667
// Process with custom timeout
668
});
669
}
670
```
671
672
## Error Handling
673
674
### Synchronous Streaming Errors
675
676
Handle errors using standard exception handling:
677
678
```java
679
try (StreamResponse<RawMessageStreamEvent> streamResponse =
680
client.messages().createStreaming(params)) {
681
682
streamResponse.stream().forEach(event -> {
683
// Process events
684
});
685
686
} catch (AnthropicException e) {
687
System.err.println("API error: " + e.getMessage());
688
} catch (Exception e) {
689
System.err.println("Unexpected error: " + e.getMessage());
690
}
691
```
692
693
### Asynchronous Streaming Errors
694
695
Handle errors through the completion handler:
696
697
```java
698
client.messages().createStreaming(params).subscribe(
699
new AsyncStreamResponse.Handler<RawMessageStreamEvent>() {
700
@Override
701
public void onNext(RawMessageStreamEvent event) {
702
// Process events
703
}
704
705
@Override
706
public void onComplete(Optional<Throwable> error) {
707
if (error.isPresent()) {
708
Throwable ex = error.get();
709
if (ex instanceof AnthropicException) {
710
System.err.println("API error: " + ex.getMessage());
711
} else {
712
System.err.println("Unexpected error: " + ex.getMessage());
713
}
714
}
715
}
716
}
717
);
718
```
719
720
## Best Practices
721
722
### Resource Management
723
724
Always use try-with-resources for synchronous streaming to ensure connections are closed:
725
726
```java
727
// Good
728
try (StreamResponse<RawMessageStreamEvent> stream =
729
client.messages().createStreaming(params)) {
730
stream.stream().forEach(event -> {});
731
}
732
733
// Bad - stream might not be closed on error
734
StreamResponse<RawMessageStreamEvent> stream =
735
client.messages().createStreaming(params);
736
stream.stream().forEach(event -> {});
737
```
738
739
### Accumulator Usage
740
741
Create a new `MessageAccumulator` for each streaming request:
742
743
```java
744
// Good - new accumulator per request
745
MessageAccumulator accumulator = MessageAccumulator.create();
746
try (StreamResponse<RawMessageStreamEvent> stream =
747
client.messages().createStreaming(params)) {
748
stream.stream().peek(accumulator::accumulate).forEach(event -> {});
749
}
750
751
// Bad - reusing accumulator across requests
752
MessageAccumulator accumulator = MessageAccumulator.create();
753
// First request
754
client.messages().createStreaming(params1).stream()
755
.peek(accumulator::accumulate).forEach(event -> {});
756
// Second request - accumulator contains data from both requests!
757
client.messages().createStreaming(params2).stream()
758
.peek(accumulator::accumulate).forEach(event -> {});
759
```
760
761
### Thread Safety
762
763
Stream handlers should be thread-safe when using asynchronous streaming:
764
765
```java
766
// Good - thread-safe accumulation
767
MessageAccumulator accumulator = MessageAccumulator.create(); // Thread-safe
768
client.messages().createStreaming(params)
769
.subscribe(event -> accumulator.accumulate(event));
770
771
// Bad - non-thread-safe state modification
772
List<String> texts = new ArrayList<>(); // Not thread-safe
773
client.messages().createStreaming(params)
774
.subscribe(event -> {
775
event.contentBlockDelta().ifPresent(delta ->
776
texts.add(delta.delta().text().orElseThrow().text()) // Race condition!
777
);
778
});
779
```
780
781
### Error Recovery
782
783
Implement proper error handling for production applications:
784
785
```java
786
int maxRetries = 3;
787
int attempt = 0;
788
789
while (attempt < maxRetries) {
790
try {
791
try (StreamResponse<RawMessageStreamEvent> stream =
792
client.messages().createStreaming(params)) {
793
794
stream.stream().forEach(event -> {
795
// Process events
796
});
797
break; // Success
798
}
799
} catch (AnthropicException e) {
800
attempt++;
801
if (attempt >= maxRetries) {
802
throw e;
803
}
804
Thread.sleep(1000 * attempt); // Exponential backoff
805
}
806
}
807
```
808
809
## Performance Considerations
810
811
- **Streaming vs Non-Streaming**: Use streaming for long responses (high `maxTokens`) to provide immediate feedback
812
- **Executor Configuration**: Use dedicated executors for async streaming when processing many concurrent streams
813
- **Accumulator Overhead**: Only use `MessageAccumulator` when you need the final `Message` object
814
- **Event Filtering**: Filter events early in the stream pipeline to reduce processing overhead
815
- **Resource Limits**: Configure appropriate timeouts for long-running streams
816
817
## Related Functionality
818
819
- [Message Creation](./message-creation.md) - Non-streaming message creation
820
- [Client Setup and Configuration](./client-setup.md) - Configuring timeout and executor settings
821
- [Error Handling](./error-handling.md) - Comprehensive error handling strategies
822
- [Structured Outputs](./structured-outputs.md) - Using streaming with structured outputs
823