The official Python library for the anthropic API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Practical patterns for real-time streaming. For complete reference, see Streaming API and Streaming Guide.
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # New line after stream endsThat's it! The .text_stream property automatically filters out non-text events and gives you text deltas ready to print.
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "What is 2+2?"}]
) as stream:
# Stream the text
for text in stream.text_stream:
print(text, end="", flush=True)
# Get complete message with metadata
message = stream.get_final_message()
print(f"\n\nToken usage: {message.usage.output_tokens}")
print(f"Stop reason: {message.stop_reason}")For more control, iterate over all stream events:
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "message_start":
print(f"[Stream started: {event.message.id}]")
elif event.type == "content_block_start":
print(f"\n[Content block {event.index} started]")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "content_block_stop":
print(f"\n[Content block {event.index} stopped]")
elif event.type == "message_delta":
print(f"\n[Stop reason: {event.delta.stop_reason}]")
print(f"[Tokens used: {event.usage.output_tokens}]")
elif event.type == "message_stop":
print("\n[Stream completed]")Detect when Claude wants to use tools:
tools = [{
"name": "get_weather",
"description": "Get weather",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}]
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in SF?"}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
print(f"\n[Tool call: {event.content_block.name}]")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
print(event.delta.partial_json, end="")
message = stream.get_final_message()
# Process tool calls
for block in message.content:
if block.type == "tool_use":
print(f"\nTool: {block.name}")
print(f"Input: {block.input}")Monitor token usage in real-time:
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a long essay"}]
) as stream:
current_tokens = 0
for event in stream:
if event.type == "message_delta":
current_tokens = event.usage.output_tokens
print(f"\r[Tokens: {current_tokens}]", end="")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)For async applications:
import asyncio
from anthropic import AsyncAnthropic
async def stream_response():
client = AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
asyncio.run(stream_response())Run multiple streams in parallel:
import asyncio
from anthropic import AsyncAnthropic
async def stream_question(client: AsyncAnthropic, question: str) -> str:
"""Stream a question and return final text"""
async with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": question}]
) as stream:
# Consume stream
async for _ in stream:
pass
return stream.get_final_text()
async def main():
client = AsyncAnthropic()
questions = [
"What is Python?",
"What is JavaScript?",
"What is Rust?"
]
# Run all streams concurrently
results = await asyncio.gather(*[
stream_question(client, q) for q in questions
])
for question, answer in zip(questions, results):
print(f"\nQ: {question}")
print(f"A: {answer}")
asyncio.run(main())Buffer output for smoother display:
import time
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
buffer = ""
for text in stream.text_stream:
buffer += text
# Flush every 10 characters or at punctuation
if len(buffer) >= 10 or text in ".!?\n":
print(buffer, end="", flush=True)
buffer = ""
time.sleep(0.02) # Smooth animation
# Flush remaining
if buffer:
print(buffer, end="", flush=True)Get partial message during streaming:
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Count to 10"}]
) as stream:
for event in stream:
# Access current accumulated message
snapshot = stream.current_message_snapshot
if snapshot.content:
current_text = snapshot.content[0].text if snapshot.content[0].type == "text" else ""
print(f"\rCurrent length: {len(current_text)}", end="")from anthropic import APIError, APITimeoutError
try:
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except APITimeoutError:
print("\n[Stream timed out]")
except APIError as e:
print(f"\n[Stream error: {e.message}]")conversation = []
def stream_turn(user_message: str):
"""Stream a conversation turn"""
conversation.append({"role": "user", "content": user_message})
print(f"\nUser: {user_message}")
print("Claude: ", end="")
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=conversation
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
message = stream.get_final_message()
conversation.append({
"role": "assistant",
"content": message.content
})
print() # New line
# Conversation
stream_turn("Hi, I'm Alice")
stream_turn("What's my name?") # Claude remembers "Alice"with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
system="You are a helpful Python expert. Be concise.",
messages=[{"role": "user", "content": "Explain list comprehensions"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)# Creative streaming
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
temperature=0.8,
messages=[{"role": "user", "content": "Write a creative story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Stream with extended thinking (beta):
with client.beta.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=2048,
thinking={"type": "enabled", "budget_tokens": 1000},
messages=[{"role": "user", "content": "Solve this complex problem: ..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(f"[Thinking: {event.delta.thinking}]", end="")
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
# Access underlying HTTP response
request_id = stream.response.headers.get("request-id")
print(f"[Request ID: {request_id}]")
for text in stream.text_stream:
print(text, end="", flush=True)Handle keyboard interrupts gracefully:
import signal
import sys
def signal_handler(sig, frame):
print("\n[Streaming interrupted]")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a very long story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except KeyboardInterrupt:
print("\n[Streaming stopped]")For advanced use cases without context manager:
stream_manager = client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)
stream = stream_manager.__enter__()
try:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
message = stream.get_final_message()
finally:
stream_manager.__exit__(None, None, None)# Good - automatic cleanup
with client.messages.stream(...) as stream:
for text in stream.text_stream:
print(text, end="")
# Bad - manual cleanup required
stream = client.messages.stream(...)
# ... easy to forget cleanupimport httpx
client = Anthropic(
timeout=httpx.Timeout(120.0) # 2 minutes for long streams
)Always handle potential interruptions gracefully for better UX.
When handling many concurrent streams, use AsyncAnthropic for better performance.