pypi-anthropic

Description
The official Python library for the anthropic API
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-anthropic@0.66.0

streaming.md docs/

1
# Streaming Interface
2
3
Real-time message streaming provides immediate access to Claude's responses as they're generated, enabling responsive user interfaces and real-time processing of partial responses, tool use events, and completion updates.
4
5
## Capabilities
6
7
### Message Stream Classes
8
9
```python { .api }
10
class MessageStream:
11
def __enter__(self) -> MessageStream: ...
12
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
13
def __iter__(self) -> Iterator[MessageStreamEvent]: ...
14
15
def on_text(self, handler: Callable[[TextEvent], None]) -> MessageStream: ...
16
def on_input_json(self, handler: Callable[[InputJsonEvent], None]) -> MessageStream: ...
17
def on_message_stop(self, handler: Callable[[MessageStopEvent], None]) -> MessageStream: ...
18
def on_content_block_start(self, handler: Callable[[ContentBlockStartEvent], None]) -> MessageStream: ...
19
def on_content_block_delta(self, handler: Callable[[ContentBlockDeltaEvent], None]) -> MessageStream: ...
20
def on_content_block_stop(self, handler: Callable[[ContentBlockStopEvent], None]) -> MessageStream: ...
21
22
def get_final_message(self) -> Message: ...
23
def get_final_text(self) -> str: ...
24
25
@property
26
def text_stream(self) -> Iterator[str]: ...
27
@property
28
def current_message_snapshot(self) -> Message: ...
29
30
class AsyncMessageStream:
31
async def __aenter__(self) -> AsyncMessageStream: ...
32
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
33
def __aiter__(self) -> AsyncIterator[MessageStreamEvent]: ...
34
35
def on_text(self, handler: Callable[[TextEvent], Awaitable[None]]) -> AsyncMessageStream: ...
36
def on_input_json(self, handler: Callable[[InputJsonEvent], Awaitable[None]]) -> AsyncMessageStream: ...
37
def on_message_stop(self, handler: Callable[[MessageStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
38
def on_content_block_start(self, handler: Callable[[ContentBlockStartEvent], Awaitable[None]]) -> AsyncMessageStream: ...
39
def on_content_block_delta(self, handler: Callable[[ContentBlockDeltaEvent], Awaitable[None]]) -> AsyncMessageStream: ...
40
def on_content_block_stop(self, handler: Callable[[ContentBlockStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
41
42
async def get_final_message(self) -> Message: ...
43
async def get_final_text(self) -> str: ...
44
45
@property
46
def text_stream(self) -> AsyncIterator[str]: ...
47
@property
48
def current_message_snapshot(self) -> Message: ...
49
```
50
51
### Stream Managers
52
53
```python { .api }
54
class MessageStreamManager:
55
def stream(
56
self,
57
max_tokens: int,
58
messages: List[MessageParam],
59
model: str,
60
*,
61
metadata: Optional[MetadataParam] = None,
62
stop_sequences: Optional[List[str]] = None,
63
system: Optional[str] = None,
64
temperature: Optional[float] = None,
65
tool_choice: Optional[ToolChoiceParam] = None,
66
tools: Optional[List[ToolParam]] = None,
67
top_k: Optional[int] = None,
68
top_p: Optional[float] = None,
69
**kwargs
70
) -> MessageStream: ...
71
72
class AsyncMessageStreamManager:
73
def stream(
74
self,
75
max_tokens: int,
76
messages: List[MessageParam],
77
model: str,
78
*,
79
metadata: Optional[MetadataParam] = None,
80
stop_sequences: Optional[List[str]] = None,
81
system: Optional[str] = None,
82
temperature: Optional[float] = None,
83
tool_choice: Optional[ToolChoiceParam] = None,
84
tools: Optional[List[ToolParam]] = None,
85
top_k: Optional[int] = None,
86
top_p: Optional[float] = None,
87
**kwargs
88
) -> AsyncMessageStream: ...
89
```
90
91
## Stream Event Types
92
93
### Core Stream Events
94
95
```python { .api }
96
class MessageStreamEvent(TypedDict):
97
type: str
98
99
class MessageStartEvent(MessageStreamEvent):
100
type: Literal["message_start"]
101
message: Message
102
103
class MessageDeltaEvent(MessageStreamEvent):
104
type: Literal["message_delta"]
105
delta: MessageDeltaUsage
106
usage: MessageDeltaUsage
107
108
class MessageStopEvent(MessageStreamEvent):
109
type: Literal["message_stop"]
110
111
class ContentBlockStartEvent(MessageStreamEvent):
112
type: Literal["content_block_start"]
113
index: int
114
content_block: ContentBlock
115
116
class ContentBlockDeltaEvent(MessageStreamEvent):
117
type: Literal["content_block_delta"]
118
index: int
119
delta: Union[TextDelta, InputJSONDelta]
120
121
class ContentBlockStopEvent(MessageStreamEvent):
122
type: Literal["content_block_stop"]
123
index: int
124
```
125
126
### Specific Event Types
127
128
```python { .api }
129
class TextEvent(TypedDict):
130
type: Literal["text"]
131
text: str
132
snapshot: str
133
134
class InputJsonEvent(TypedDict):
135
type: Literal["input_json"]
136
partial_json: str
137
138
class TextDelta(TypedDict):
139
type: Literal["text_delta"]
140
text: str
141
142
class InputJSONDelta(TypedDict):
143
type: Literal["input_json_delta"]
144
partial_json: str
145
146
class MessageDeltaUsage(TypedDict):
147
output_tokens: int
148
```
149
150
### Raw Stream Events
151
152
```python { .api }
153
class RawMessageStreamEvent(TypedDict):
154
type: str
155
156
class RawMessageStartEvent(RawMessageStreamEvent):
157
type: Literal["message_start"]
158
message: Message
159
160
class RawMessageDeltaEvent(RawMessageStreamEvent):
161
type: Literal["message_delta"]
162
delta: MessageDeltaUsage
163
usage: MessageDeltaUsage
164
165
class RawMessageStopEvent(RawMessageStreamEvent):
166
type: Literal["message_stop"]
167
168
class RawContentBlockStartEvent(RawMessageStreamEvent):
169
type: Literal["content_block_start"]
170
index: int
171
content_block: ContentBlock
172
173
class RawContentBlockDeltaEvent(RawMessageStreamEvent):
174
type: Literal["content_block_delta"]
175
index: int
176
delta: RawContentBlockDelta
177
178
class RawContentBlockStopEvent(RawMessageStreamEvent):
179
type: Literal["content_block_stop"]
180
index: int
181
182
class RawContentBlockDelta(TypedDict):
183
type: str
184
text: Optional[str]
185
partial_json: Optional[str]
186
```
187
188
## Usage Examples
189
190
### Basic Text Streaming
191
192
```python
193
from anthropic import Anthropic
194
195
client = Anthropic()
196
197
with client.messages.stream(
198
model="claude-sonnet-4-20250514",
199
max_tokens=1024,
200
messages=[
201
{"role": "user", "content": "Write a short story about a robot"}
202
]
203
) as stream:
204
for text in stream.text_stream:
205
print(text, end="", flush=True)
206
```
207
208
### Event Handler Pattern
209
210
```python
211
def handle_text(event):
212
print(f"Text: {event.text}")
213
214
def handle_stop(event):
215
print("Message completed!")
216
217
with client.messages.stream(
218
model="claude-sonnet-4-20250514",
219
max_tokens=1024,
220
messages=[
221
{"role": "user", "content": "Hello!"}
222
]
223
) as stream:
224
stream.on_text(handle_text)
225
stream.on_message_stop(handle_stop)
226
227
# Process all events
228
for event in stream:
229
pass # Events are handled by registered handlers
230
231
# Get final assembled message
232
final_message = stream.get_final_message()
233
print(final_message.content[0].text)
234
```
235
236
### Tool Use Streaming
237
238
```python
239
tools = [
240
{
241
"name": "get_weather",
242
"description": "Get current weather",
243
"input_schema": {
244
"type": "object",
245
"properties": {
246
"location": {"type": "string"}
247
},
248
"required": ["location"]
249
}
250
}
251
]
252
253
def handle_tool_use(event):
254
print(f"Tool called: {event.name}")
255
print(f"Input: {event.input}")
256
257
with client.messages.stream(
258
model="claude-sonnet-4-20250514",
259
max_tokens=1024,
260
tools=tools,
261
messages=[
262
{"role": "user", "content": "What's the weather in London?"}
263
]
264
) as stream:
265
stream.on_input_json(handle_tool_use)
266
267
for event in stream:
268
if event.type == "content_block_start":
269
if event.content_block.type == "tool_use":
270
print(f"Starting tool use: {event.content_block.name}")
271
```
272
273
### Async Streaming
274
275
```python
276
import asyncio
277
from anthropic import AsyncAnthropic
278
279
async def stream_chat():
280
client = AsyncAnthropic()
281
282
async def handle_text(event):
283
print(f"Received: {event.text}")
284
285
async with client.messages.stream(
286
model="claude-sonnet-4-20250514",
287
max_tokens=1024,
288
messages=[
289
{"role": "user", "content": "Count to 10"}
290
]
291
) as stream:
292
stream.on_text(handle_text)
293
294
async for event in stream:
295
if event.type == "message_stop":
296
print("Finished!")
297
break
298
299
asyncio.run(stream_chat())
300
```
301
302
### Real-time Text Processing
303
304
```python
305
accumulated_text = ""
306
307
with client.messages.stream(
308
model="claude-sonnet-4-20250514",
309
max_tokens=1024,
310
messages=[
311
{"role": "user", "content": "Write a poem about the ocean"}
312
]
313
) as stream:
314
for text in stream.text_stream:
315
accumulated_text += text
316
317
# Process text in real-time (e.g., update UI)
318
if "\n" in text: # New line completed
319
lines = accumulated_text.split("\n")
320
for line in lines[:-1]: # Process complete lines
321
print(f"Complete line: {line}")
322
accumulated_text = lines[-1] # Keep incomplete line
323
```
324
325
### Streaming with Current Snapshot
326
327
```python
328
with client.messages.stream(
329
model="claude-sonnet-4-20250514",
330
max_tokens=1024,
331
messages=[
332
{"role": "user", "content": "Explain quantum computing"}
333
]
334
) as stream:
335
for event in stream:
336
if event.type == "content_block_delta":
337
# Get current state of the message being built
338
current_snapshot = stream.current_message_snapshot
339
print(f"Current length: {len(current_snapshot.content[0].text)} chars")
340
341
# Get final complete message
342
final_message = stream.get_final_message()
343
final_text = stream.get_final_text()
344
```
345
346
### Error Handling in Streams
347
348
```python
349
try:
350
with client.messages.stream(
351
model="claude-sonnet-4-20250514",
352
max_tokens=1024,
353
messages=[
354
{"role": "user", "content": "Hello!"}
355
]
356
) as stream:
357
for text in stream.text_stream:
358
print(text, end="")
359
360
except Exception as e:
361
print(f"Streaming error: {e}")
362
# Handle connection issues, rate limits, etc.
363
```
364
365
### Custom Stream Processing
366
367
```python
368
class CustomStreamHandler:
369
def __init__(self):
370
self.word_count = 0
371
self.sentences = []
372
self.current_sentence = ""
373
374
def handle_text(self, event):
375
text = event.text
376
self.current_sentence += text
377
self.word_count += len(text.split())
378
379
if "." in text or "!" in text or "?" in text:
380
self.sentences.append(self.current_sentence.strip())
381
self.current_sentence = ""
382
383
def handle_stop(self, event):
384
print(f"Final stats: {self.word_count} words, {len(self.sentences)} sentences")
385
386
handler = CustomStreamHandler()
387
388
with client.messages.stream(
389
model="claude-sonnet-4-20250514",
390
max_tokens=1024,
391
messages=[
392
{"role": "user", "content": "Tell me about machine learning"}
393
]
394
) as stream:
395
stream.on_text(handler.handle_text)
396
stream.on_message_stop(handler.handle_stop)
397
398
for event in stream:
399
pass # Let handlers process events
400
```