The official Python library for the anthropic API
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Real-time message streaming provides immediate access to Claude's responses as they're generated, enabling responsive user interfaces and real-time processing of partial responses, tool use events, and completion updates.
class MessageStream:
def __enter__(self) -> MessageStream: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
def __iter__(self) -> Iterator[MessageStreamEvent]: ...
def on_text(self, handler: Callable[[TextEvent], None]) -> MessageStream: ...
def on_input_json(self, handler: Callable[[InputJsonEvent], None]) -> MessageStream: ...
def on_message_stop(self, handler: Callable[[MessageStopEvent], None]) -> MessageStream: ...
def on_content_block_start(self, handler: Callable[[ContentBlockStartEvent], None]) -> MessageStream: ...
def on_content_block_delta(self, handler: Callable[[ContentBlockDeltaEvent], None]) -> MessageStream: ...
def on_content_block_stop(self, handler: Callable[[ContentBlockStopEvent], None]) -> MessageStream: ...
def get_final_message(self) -> Message: ...
def get_final_text(self) -> str: ...
@property
def text_stream(self) -> Iterator[str]: ...
@property
def current_message_snapshot(self) -> Message: ...
class AsyncMessageStream:
async def __aenter__(self) -> AsyncMessageStream: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
def __aiter__(self) -> AsyncIterator[MessageStreamEvent]: ...
def on_text(self, handler: Callable[[TextEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_input_json(self, handler: Callable[[InputJsonEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_message_stop(self, handler: Callable[[MessageStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_content_block_start(self, handler: Callable[[ContentBlockStartEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_content_block_delta(self, handler: Callable[[ContentBlockDeltaEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_content_block_stop(self, handler: Callable[[ContentBlockStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
async def get_final_message(self) -> Message: ...
async def get_final_text(self) -> str: ...
@property
def text_stream(self) -> AsyncIterator[str]: ...
@property
def current_message_snapshot(self) -> Message: ...class MessageStreamManager:
def stream(
self,
max_tokens: int,
messages: List[MessageParam],
model: str,
*,
metadata: Optional[MetadataParam] = None,
stop_sequences: Optional[List[str]] = None,
system: Optional[str] = None,
temperature: Optional[float] = None,
tool_choice: Optional[ToolChoiceParam] = None,
tools: Optional[List[ToolParam]] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
**kwargs
) -> MessageStream: ...
class AsyncMessageStreamManager:
def stream(
self,
max_tokens: int,
messages: List[MessageParam],
model: str,
*,
metadata: Optional[MetadataParam] = None,
stop_sequences: Optional[List[str]] = None,
system: Optional[str] = None,
temperature: Optional[float] = None,
tool_choice: Optional[ToolChoiceParam] = None,
tools: Optional[List[ToolParam]] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
**kwargs
) -> AsyncMessageStream: ...class MessageStreamEvent(TypedDict):
type: str
class MessageStartEvent(MessageStreamEvent):
type: Literal["message_start"]
message: Message
class MessageDeltaEvent(MessageStreamEvent):
type: Literal["message_delta"]
delta: MessageDeltaUsage
usage: MessageDeltaUsage
class MessageStopEvent(MessageStreamEvent):
type: Literal["message_stop"]
class ContentBlockStartEvent(MessageStreamEvent):
type: Literal["content_block_start"]
index: int
content_block: ContentBlock
class ContentBlockDeltaEvent(MessageStreamEvent):
type: Literal["content_block_delta"]
index: int
delta: Union[TextDelta, InputJSONDelta]
class ContentBlockStopEvent(MessageStreamEvent):
type: Literal["content_block_stop"]
index: intclass TextEvent(TypedDict):
type: Literal["text"]
text: str
snapshot: str
class InputJsonEvent(TypedDict):
type: Literal["input_json"]
partial_json: str
class TextDelta(TypedDict):
type: Literal["text_delta"]
text: str
class InputJSONDelta(TypedDict):
type: Literal["input_json_delta"]
partial_json: str
class MessageDeltaUsage(TypedDict):
output_tokens: intclass RawMessageStreamEvent(TypedDict):
type: str
class RawMessageStartEvent(RawMessageStreamEvent):
type: Literal["message_start"]
message: Message
class RawMessageDeltaEvent(RawMessageStreamEvent):
type: Literal["message_delta"]
delta: MessageDeltaUsage
usage: MessageDeltaUsage
class RawMessageStopEvent(RawMessageStreamEvent):
type: Literal["message_stop"]
class RawContentBlockStartEvent(RawMessageStreamEvent):
type: Literal["content_block_start"]
index: int
content_block: ContentBlock
class RawContentBlockDeltaEvent(RawMessageStreamEvent):
type: Literal["content_block_delta"]
index: int
delta: RawContentBlockDelta
class RawContentBlockStopEvent(RawMessageStreamEvent):
type: Literal["content_block_stop"]
index: int
class RawContentBlockDelta(TypedDict):
type: str
text: Optional[str]
partial_json: Optional[str]from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Write a short story about a robot"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)def handle_text(event):
print(f"Text: {event.text}")
def handle_stop(event):
print("Message completed!")
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Hello!"}
]
) as stream:
stream.on_text(handle_text)
stream.on_message_stop(handle_stop)
# Process all events
for event in stream:
pass # Events are handled by registered handlers
# Get final assembled message
final_message = stream.get_final_message()
print(final_message.content[0].text)tools = [
{
"name": "get_weather",
"description": "Get current weather",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
def handle_tool_use(event):
print(f"Tool called: {event.name}")
print(f"Input: {event.input}")
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in London?"}
]
) as stream:
stream.on_input_json(handle_tool_use)
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
print(f"Starting tool use: {event.content_block.name}")import asyncio
from anthropic import AsyncAnthropic
async def stream_chat():
client = AsyncAnthropic()
async def handle_text(event):
print(f"Received: {event.text}")
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Count to 10"}
]
) as stream:
stream.on_text(handle_text)
async for event in stream:
if event.type == "message_stop":
print("Finished!")
break
asyncio.run(stream_chat())accumulated_text = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Write a poem about the ocean"}
]
) as stream:
for text in stream.text_stream:
accumulated_text += text
# Process text in real-time (e.g., update UI)
if "\n" in text: # New line completed
lines = accumulated_text.split("\n")
for line in lines[:-1]: # Process complete lines
print(f"Complete line: {line}")
accumulated_text = lines[-1] # Keep incomplete linewith client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain quantum computing"}
]
) as stream:
for event in stream:
if event.type == "content_block_delta":
# Get current state of the message being built
current_snapshot = stream.current_message_snapshot
print(f"Current length: {len(current_snapshot.content[0].text)} chars")
# Get final complete message
final_message = stream.get_final_message()
final_text = stream.get_final_text()try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Hello!"}
]
) as stream:
for text in stream.text_stream:
print(text, end="")
except Exception as e:
print(f"Streaming error: {e}")
# Handle connection issues, rate limits, etc.class CustomStreamHandler:
def __init__(self):
self.word_count = 0
self.sentences = []
self.current_sentence = ""
def handle_text(self, event):
text = event.text
self.current_sentence += text
self.word_count += len(text.split())
if "." in text or "!" in text or "?" in text:
self.sentences.append(self.current_sentence.strip())
self.current_sentence = ""
def handle_stop(self, event):
print(f"Final stats: {self.word_count} words, {len(self.sentences)} sentences")
handler = CustomStreamHandler()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Tell me about machine learning"}
]
) as stream:
stream.on_text(handler.handle_text)
stream.on_message_stop(handler.handle_stop)
for event in stream:
pass # Let handlers process events