0
# Streaming
1
2
Real-time streaming of message responses as they are generated by the Anthropic API. The streaming functionality provides both synchronous and asynchronous interfaces for receiving incremental updates during message generation, including text deltas, thinking processes, tool use, and other content blocks.
3
4
## Core Imports
5
6
```python
7
from anthropic import Anthropic, AsyncAnthropic
8
from anthropic import MessageStream, AsyncMessageStream
9
from anthropic import MessageStreamManager, AsyncMessageStreamManager
10
from anthropic import BetaMessageStream, BetaAsyncMessageStream
11
from anthropic import BetaMessageStreamManager, BetaAsyncMessageStreamManager
12
```
13
14
## Basic Usage
15
16
### Basic Synchronous Streaming
17
18
```python
19
from anthropic import Anthropic
20
21
client = Anthropic()
22
23
with client.messages.stream(
24
model="claude-sonnet-4-5-20250929",
25
max_tokens=1024,
26
messages=[{"role": "user", "content": "Tell me a story"}]
27
) as stream:
28
for event in stream:
29
if event.type == "text":
30
print(event.text, end="", flush=True)
31
32
# Get the complete message after streaming
33
message = stream.get_final_message()
34
```
35
36
### Basic Asynchronous Streaming
37
38
```python
39
from anthropic import AsyncAnthropic
40
import asyncio
41
42
client = AsyncAnthropic()
43
44
async def main():
45
async with client.messages.stream(
46
model="claude-sonnet-4-5-20250929",
47
max_tokens=1024,
48
messages=[{"role": "user", "content": "Tell me a story"}]
49
) as stream:
50
async for event in stream:
51
if event.type == "text":
52
print(event.text, end="", flush=True)
53
54
message = await stream.get_final_message()
55
56
asyncio.run(main())
57
```
58
59
### Text Stream Iterator
60
61
For simple text-only streaming, use the `text_stream` property:
62
63
```python
64
with client.messages.stream(
65
model="claude-sonnet-4-5-20250929",
66
max_tokens=1024,
67
messages=[{"role": "user", "content": "Count to 10"}]
68
) as stream:
69
for text in stream.text_stream:
70
print(text, end="", flush=True)
71
print()
72
```
73
74
## Capabilities
75
76
### Stream Management Classes
77
78
Context managers that wrap streaming responses and handle resource cleanup automatically.
79
80
```python { .api }
81
class MessageStreamManager:
82
"""
83
Synchronous context manager for MessageStream.
84
85
Returned by client.messages.stream() to enable context manager usage.
86
Automatically closes the stream when exiting the context.
87
"""
88
89
def __enter__(self) -> MessageStream: ...
90
def __exit__(
91
self,
92
exc_type: type[BaseException] | None,
93
exc: BaseException | None,
94
exc_tb: TracebackType | None
95
) -> None: ...
96
97
class AsyncMessageStreamManager:
98
"""
99
Asynchronous context manager for AsyncMessageStream.
100
101
Returned by async client.messages.stream() to enable async context manager usage.
102
Automatically closes the stream when exiting the context.
103
"""
104
105
async def __aenter__(self) -> AsyncMessageStream: ...
106
async def __aexit__(
107
self,
108
exc_type: type[BaseException] | None,
109
exc: BaseException | None,
110
exc_tb: TracebackType | None
111
) -> None: ...
112
113
class BetaMessageStreamManager(Generic[ResponseFormatT]):
114
"""
115
Synchronous context manager for BetaMessageStream.
116
117
Supports structured output parsing when ResponseFormatT is provided.
118
"""
119
120
def __enter__(self) -> BetaMessageStream[ResponseFormatT]: ...
121
def __exit__(
122
self,
123
exc_type: type[BaseException] | None,
124
exc: BaseException | None,
125
exc_tb: TracebackType | None
126
) -> None: ...
127
128
class BetaAsyncMessageStreamManager(Generic[ResponseFormatT]):
129
"""
130
Asynchronous context manager for BetaAsyncMessageStream.
131
132
Supports structured output parsing when ResponseFormatT is provided.
133
"""
134
135
async def __aenter__(self) -> BetaAsyncMessageStream[ResponseFormatT]: ...
136
async def __aexit__(
137
self,
138
exc_type: type[BaseException] | None,
139
exc: BaseException | None,
140
exc_tb: TracebackType | None
141
) -> None: ...
142
```
143
144
### Message Stream Classes
145
146
High-level wrappers that accumulate streaming events and provide access to the final message.
147
148
```python { .api }
149
class MessageStream:
150
"""
151
Synchronous message stream with automatic event accumulation.
152
153
Provides both raw stream events and convenient text_stream iterator.
154
Accumulates content into a final Message object.
155
"""
156
157
text_stream: Iterator[str]
158
"""Iterator over just the text deltas in the stream."""
159
160
@property
161
def response(self) -> httpx.Response:
162
"""The underlying HTTP response object."""
163
...
164
165
@property
166
def request_id(self) -> str | None:
167
"""The request ID from response headers."""
168
...
169
170
@property
171
def current_message_snapshot(self) -> Message:
172
"""Current accumulated message state."""
173
...
174
175
def __iter__(self) -> Iterator[MessageStreamEvent]:
176
"""Iterate over stream events."""
177
...
178
179
def __enter__(self) -> Self:
180
"""Enter context manager."""
181
...
182
183
def __exit__(
184
self,
185
exc_type: type[BaseException] | None,
186
exc: BaseException | None,
187
exc_tb: TracebackType | None
188
) -> None:
189
"""Exit context manager and close stream."""
190
...
191
192
def close(self) -> None:
193
"""
194
Close the response and release the connection.
195
196
Automatically called if the response body is read to completion.
197
"""
198
...
199
200
def get_final_message(self) -> Message:
201
"""
202
Waits until the stream has been read to completion and returns
203
the accumulated Message object.
204
"""
205
...
206
207
def get_final_text(self) -> str:
208
"""
209
Returns all text content blocks concatenated together.
210
211
Raises RuntimeError if no text content blocks were returned.
212
"""
213
...
214
215
def until_done(self) -> None:
216
"""Blocks until the stream has been consumed."""
217
...
218
219
class AsyncMessageStream:
220
"""
221
Asynchronous message stream with automatic event accumulation.
222
223
Async equivalent of MessageStream with the same functionality.
224
"""
225
226
text_stream: AsyncIterator[str]
227
"""Async iterator over just the text deltas in the stream."""
228
229
@property
230
def response(self) -> httpx.Response:
231
"""The underlying HTTP response object."""
232
...
233
234
@property
235
def request_id(self) -> str | None:
236
"""The request ID from response headers."""
237
...
238
239
@property
240
def current_message_snapshot(self) -> Message:
241
"""Current accumulated message state."""
242
...
243
244
def __aiter__(self) -> AsyncIterator[MessageStreamEvent]:
245
"""Async iterate over stream events."""
246
...
247
248
async def __aenter__(self) -> Self:
249
"""Enter async context manager."""
250
...
251
252
async def __aexit__(
253
self,
254
exc_type: type[BaseException] | None,
255
exc: BaseException | None,
256
exc_tb: TracebackType | None
257
) -> None:
258
"""Exit async context manager and close stream."""
259
...
260
261
async def close(self) -> None:
262
"""
263
Close the response and release the connection.
264
265
Automatically called if the response body is read to completion.
266
"""
267
...
268
269
async def get_final_message(self) -> Message:
270
"""
271
Waits until the stream has been read to completion and returns
272
the accumulated Message object.
273
"""
274
...
275
276
async def get_final_text(self) -> str:
277
"""
278
Returns all text content blocks concatenated together.
279
280
Raises RuntimeError if no text content blocks were returned.
281
"""
282
...
283
284
async def until_done(self) -> None:
285
"""Waits until the stream has been consumed."""
286
...
287
```
288
289
### Beta Message Stream Classes
290
291
Extended streaming classes with support for beta features like structured outputs and advanced content types.
292
293
```python { .api }
294
class BetaMessageStream(Generic[ResponseFormatT]):
295
"""
296
Synchronous beta message stream with structured output support.
297
298
Supports parsing of structured outputs when output_format is provided.
299
"""
300
301
text_stream: Iterator[str]
302
"""Iterator over just the text deltas in the stream."""
303
304
@property
305
def response(self) -> httpx.Response:
306
"""The underlying HTTP response object."""
307
...
308
309
@property
310
def request_id(self) -> str | None:
311
"""The request ID from response headers."""
312
...
313
314
@property
315
def current_message_snapshot(self) -> ParsedBetaMessage[ResponseFormatT]:
316
"""Current accumulated message state with parsed output."""
317
...
318
319
def __iter__(self) -> Iterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
320
"""Iterate over stream events."""
321
...
322
323
def __enter__(self) -> Self:
324
"""Enter context manager."""
325
...
326
327
def __exit__(
328
self,
329
exc_type: type[BaseException] | None,
330
exc: BaseException | None,
331
exc_tb: TracebackType | None
332
) -> None:
333
"""Exit context manager and close stream."""
334
...
335
336
def close(self) -> None:
337
"""
338
Close the response and release the connection.
339
340
Automatically called if the response body is read to completion.
341
"""
342
...
343
344
def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:
345
"""
346
Waits until the stream has been read to completion and returns
347
the accumulated ParsedBetaMessage object with structured output.
348
"""
349
...
350
351
def get_final_text(self) -> str:
352
"""
353
Returns all text content blocks concatenated together.
354
355
Raises RuntimeError if no text content blocks were returned.
356
"""
357
...
358
359
def until_done(self) -> None:
360
"""Blocks until the stream has been consumed."""
361
...
362
363
class BetaAsyncMessageStream(Generic[ResponseFormatT]):
364
"""
365
Asynchronous beta message stream with structured output support.
366
367
Async equivalent of BetaMessageStream with the same functionality.
368
"""
369
370
text_stream: AsyncIterator[str]
371
"""Async iterator over just the text deltas in the stream."""
372
373
@property
374
def response(self) -> httpx.Response:
375
"""The underlying HTTP response object."""
376
...
377
378
@property
379
def request_id(self) -> str | None:
380
"""The request ID from response headers."""
381
...
382
383
@property
384
def current_message_snapshot(self) -> ParsedBetaMessage[ResponseFormatT]:
385
"""Current accumulated message state with parsed output."""
386
...
387
388
def __aiter__(self) -> AsyncIterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
389
"""Async iterate over stream events."""
390
...
391
392
async def __aenter__(self) -> Self:
393
"""Enter async context manager."""
394
...
395
396
async def __aexit__(
397
self,
398
exc_type: type[BaseException] | None,
399
exc: BaseException | None,
400
exc_tb: TracebackType | None
401
) -> None:
402
"""Exit async context manager and close stream."""
403
...
404
405
async def close(self) -> None:
406
"""
407
Close the response and release the connection.
408
409
Automatically called if the response body is read to completion.
410
"""
411
...
412
413
async def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:
414
"""
415
Waits until the stream has been read to completion and returns
416
the accumulated ParsedBetaMessage object with structured output.
417
"""
418
...
419
420
async def get_final_text(self) -> str:
421
"""
422
Returns all text content blocks concatenated together.
423
424
Raises RuntimeError if no text content blocks were returned.
425
"""
426
...
427
428
async def until_done(self) -> None:
429
"""Waits until the stream has been consumed."""
430
...
431
```
432
433
### Stream Event Types
434
435
Event objects emitted during streaming that represent different types of content and state changes.
436
437
```python { .api }
438
class TextEvent:
439
"""
440
Emitted when a text delta is received.
441
442
Provides both the incremental delta and complete accumulated text.
443
"""
444
445
type: Literal["text"]
446
text: str
447
"""The text delta"""
448
449
snapshot: str
450
"""The entire accumulated text"""
451
452
class CitationEvent:
453
"""
454
Emitted when a citation is added to text content.
455
"""
456
457
type: Literal["citation"]
458
citation: Citation
459
"""The new citation"""
460
461
snapshot: List[Citation]
462
"""All of the accumulated citations"""
463
464
class ThinkingEvent:
465
"""
466
Emitted when a thinking delta is received during extended thinking.
467
"""
468
469
type: Literal["thinking"]
470
thinking: str
471
"""The thinking delta"""
472
473
snapshot: str
474
"""The accumulated thinking so far"""
475
476
class SignatureEvent:
477
"""
478
Emitted when the signature of a thinking block is received.
479
"""
480
481
type: Literal["signature"]
482
signature: str
483
"""The signature of the thinking block"""
484
485
class InputJsonEvent:
486
"""
487
Emitted when tool input JSON is being streamed.
488
489
Provides partial JSON string and continuously parsed object.
490
"""
491
492
type: Literal["input_json"]
493
partial_json: str
494
"""
495
A partial JSON string delta
496
497
e.g. '"San Francisco,'
498
"""
499
500
snapshot: object
501
"""
502
The currently accumulated parsed object.
503
504
e.g. {'location': 'San Francisco, CA'}
505
"""
506
507
class MessageStopEvent:
508
"""
509
Emitted when message generation completes.
510
511
Contains the final accumulated message.
512
"""
513
514
type: Literal["message_stop"]
515
message: Message
516
517
class ContentBlockStopEvent:
518
"""
519
Emitted when a content block completes.
520
521
Contains the final content block.
522
"""
523
524
type: Literal["content_block_stop"]
525
content_block: ContentBlock
526
527
MessageStreamEvent = Union[
528
TextEvent,
529
CitationEvent,
530
ThinkingEvent,
531
SignatureEvent,
532
InputJsonEvent,
533
RawMessageStartEvent,
534
RawMessageDeltaEvent,
535
MessageStopEvent,
536
RawContentBlockStartEvent,
537
RawContentBlockDeltaEvent,
538
ContentBlockStopEvent,
539
]
540
"""Union of all possible message stream events."""
541
```
542
543
### Beta Stream Event Types
544
545
Extended event types for beta streaming features including structured outputs.
546
547
```python { .api }
548
class ParsedBetaTextEvent:
549
"""
550
Beta text event with structured output parsing support.
551
"""
552
553
type: Literal["text"]
554
text: str
555
"""The text delta"""
556
557
snapshot: str
558
"""The entire accumulated text"""
559
560
def parsed_snapshot(self) -> Dict[str, Any]:
561
"""
562
Parse the accumulated text as JSON.
563
564
Uses partial mode for incomplete JSON structures.
565
"""
566
...
567
568
class BetaCitationEvent:
569
"""
570
Beta citation event (same as standard CitationEvent).
571
"""
572
573
type: Literal["citation"]
574
citation: Citation
575
"""The new citation"""
576
577
snapshot: List[Citation]
578
"""All of the accumulated citations"""
579
580
class BetaThinkingEvent:
581
"""
582
Beta thinking event (same as standard ThinkingEvent).
583
"""
584
585
type: Literal["thinking"]
586
thinking: str
587
"""The thinking delta"""
588
589
snapshot: str
590
"""The accumulated thinking so far"""
591
592
class BetaSignatureEvent:
593
"""
594
Beta signature event (same as standard SignatureEvent).
595
"""
596
597
type: Literal["signature"]
598
signature: str
599
"""The signature of the thinking block"""
600
601
class BetaInputJsonEvent:
602
"""
603
Beta input JSON event (same as standard InputJsonEvent).
604
"""
605
606
type: Literal["input_json"]
607
partial_json: str
608
"""A partial JSON string delta"""
609
610
snapshot: object
611
"""The currently accumulated parsed object."""
612
613
class ParsedBetaMessageStopEvent(Generic[ResponseFormatT]):
614
"""
615
Beta message stop event with structured output.
616
"""
617
618
type: Literal["message_stop"]
619
message: ParsedBetaMessage[ResponseFormatT]
620
621
class ParsedBetaContentBlockStopEvent(Generic[ResponseFormatT]):
622
"""
623
Beta content block stop event with structured output.
624
"""
625
626
type: Literal["content_block_stop"]
627
content_block: ParsedBetaContentBlock[ResponseFormatT]
628
629
ParsedBetaMessageStreamEvent = Union[
630
ParsedBetaTextEvent,
631
BetaCitationEvent,
632
BetaThinkingEvent,
633
BetaSignatureEvent,
634
BetaInputJsonEvent,
635
BetaRawMessageStartEvent,
636
BetaRawMessageDeltaEvent,
637
ParsedBetaMessageStopEvent[ResponseFormatT],
638
BetaRawContentBlockStartEvent,
639
BetaRawContentBlockDeltaEvent,
640
ParsedBetaContentBlockStopEvent[ResponseFormatT],
641
]
642
"""Union of all possible beta message stream events."""
643
```
644
645
## Usage Examples
646
647
### Event Filtering and Handling
648
649
Process specific event types from the stream:
650
651
```python
652
from anthropic import Anthropic
653
654
client = Anthropic()
655
656
with client.messages.stream(
657
model="claude-sonnet-4-5-20250929",
658
max_tokens=1024,
659
messages=[{"role": "user", "content": "Explain quantum computing"}]
660
) as stream:
661
for event in stream:
662
if event.type == "text":
663
print(event.text, end="", flush=True)
664
elif event.type == "content_block_stop":
665
print("\n\nContent block finished:", event.content_block.type)
666
elif event.type == "message_stop":
667
print("\n\nMessage complete!")
668
print("Stop reason:", event.message.stop_reason)
669
print("Tokens used:", event.message.usage.output_tokens)
670
```
671
672
### Streaming with Thinking
673
674
Access extended thinking process during streaming:
675
676
```python
677
from anthropic import Anthropic
678
679
client = Anthropic()
680
681
with client.messages.stream(
682
model="claude-sonnet-4-5-20250929",
683
max_tokens=3200,
684
thinking={"type": "enabled", "budget_tokens": 1600},
685
messages=[{"role": "user", "content": "Create a haiku about Anthropic."}]
686
) as stream:
687
thinking_started = False
688
text_started = False
689
690
for event in stream:
691
if event.type == "thinking":
692
if not thinking_started:
693
print("Thinking:\n---------")
694
thinking_started = True
695
print(event.thinking, end="", flush=True)
696
697
elif event.type == "text":
698
if not text_started:
699
print("\n\nText:\n-----")
700
text_started = True
701
print(event.text, end="", flush=True)
702
```
703
704
### Structured Output Streaming
705
706
Stream structured outputs with incremental parsing:
707
708
```python
709
import pydantic
710
from anthropic import Anthropic
711
712
class Order(pydantic.BaseModel):
713
product_name: str
714
price: float
715
quantity: int
716
717
client = Anthropic()
718
719
prompt = """
720
Extract the product name, price, and quantity from this customer message:
721
"Hi, I'd like to order 2 packs of Green Tea for 5.50 dollars each."
722
"""
723
724
with client.beta.messages.stream(
725
model="claude-sonnet-4-5-20250929-structured-outputs",
726
messages=[{"role": "user", "content": prompt}],
727
betas=["structured-outputs-2025-09-17"],
728
max_tokens=1024,
729
output_format=Order,
730
) as stream:
731
for event in stream:
732
if event.type == "text":
733
# Access incrementally parsed structured output
734
print(event.parsed_snapshot())
735
736
# Get final parsed output
737
final_message = stream.get_final_message()
738
if final_message.parsed_output:
739
order = final_message.parsed_output
740
print(f"Product: {order.product_name}")
741
print(f"Price: ${order.price}")
742
print(f"Quantity: {order.quantity}")
743
```
744
745
### Async Streaming with Error Handling
746
747
Handle errors gracefully in async streaming:
748
749
```python
750
from anthropic import AsyncAnthropic
751
import asyncio
752
753
client = AsyncAnthropic()
754
755
async def stream_with_error_handling():
756
try:
757
async with client.messages.stream(
758
model="claude-sonnet-4-5-20250929",
759
max_tokens=1024,
760
messages=[{"role": "user", "content": "Tell me a joke"}]
761
) as stream:
762
async for event in stream:
763
if event.type == "text":
764
print(event.text, end="", flush=True)
765
766
message = await stream.get_final_message()
767
print(f"\n\nFinal message ID: {message.id}")
768
769
except Exception as e:
770
print(f"Error during streaming: {e}")
771
772
asyncio.run(stream_with_error_handling())
773
```
774
775
### Tool Use Streaming
776
777
Stream tool use and input JSON as it's generated:
778
779
```python
780
from anthropic import Anthropic
781
782
client = Anthropic()
783
784
tools = [
785
{
786
"name": "get_weather",
787
"description": "Get the weather for a location",
788
"input_schema": {
789
"type": "object",
790
"properties": {
791
"location": {"type": "string"},
792
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
793
},
794
"required": ["location"]
795
}
796
}
797
]
798
799
with client.messages.stream(
800
model="claude-sonnet-4-5-20250929",
801
max_tokens=1024,
802
tools=tools,
803
messages=[{"role": "user", "content": "What's the weather in San Francisco?"}]
804
) as stream:
805
for event in stream:
806
if event.type == "content_block_start":
807
if event.content_block.type == "tool_use":
808
print(f"\nTool: {event.content_block.name}")
809
810
elif event.type == "input_json":
811
print(f"\nPartial input: {event.snapshot}")
812
813
elif event.type == "content_block_stop":
814
if event.content_block.type == "tool_use":
815
print(f"\nFinal input: {event.content_block.input}")
816
```
817
818
### Manual Stream Lifecycle
819
820
Control stream lifecycle without context manager:
821
822
```python
823
from anthropic import Anthropic
824
825
client = Anthropic()
826
827
# Start stream without context manager
828
stream_manager = client.messages.stream(
829
model="claude-sonnet-4-5-20250929",
830
max_tokens=1024,
831
messages=[{"role": "user", "content": "Count to 5"}]
832
)
833
834
try:
835
stream = stream_manager.__enter__()
836
837
for event in stream:
838
if event.type == "text":
839
print(event.text, end="", flush=True)
840
841
message = stream.get_final_message()
842
print(f"\n\nMessage ID: {message.id}")
843
844
finally:
845
stream_manager.__exit__(None, None, None)
846
```
847
848
### Accessing Response Metadata
849
850
Access HTTP response information during streaming:
851
852
```python
853
from anthropic import Anthropic
854
855
client = Anthropic()
856
857
with client.messages.stream(
858
model="claude-sonnet-4-5-20250929",
859
max_tokens=1024,
860
messages=[{"role": "user", "content": "Hello"}]
861
) as stream:
862
# Access response metadata
863
print(f"Request ID: {stream.request_id}")
864
print(f"Response headers: {stream.response.headers}")
865
866
for event in stream:
867
if event.type == "text":
868
print(event.text, end="", flush=True)
869
870
# Access final snapshot before closing
871
final_snapshot = stream.current_message_snapshot
872
print(f"\n\nFinal token count: {final_snapshot.usage.output_tokens}")
873
```
874
875
## Types
876
877
### Type Aliases and Backward Compatibility
878
879
```python { .api }
880
# Backward compatibility aliases for beta types
881
BetaTextEvent = ParsedBetaTextEvent
882
BetaMessageStopEvent = ParsedBetaMessageStopEvent[object]
883
BetaMessageStreamEvent = ParsedBetaMessageStreamEvent
884
BetaContentBlockStopEvent = ParsedBetaContentBlockStopEvent[object]
885
```
886
887
### Generic Type Parameters
888
889
```python { .api }
890
ResponseFormatT = TypeVar('ResponseFormatT')
891
"""
892
Type variable for structured output format in beta streaming.
893
894
Can be a Pydantic model or other response format definition.
895
"""
896
```
897