CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-anthropic

The official Python library for the anthropic API

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

streaming.mddocs/

Streaming Interface

Real-time message streaming provides immediate access to Claude's responses as they're generated, enabling responsive user interfaces and real-time processing of partial responses, tool use events, and completion updates.

Capabilities

Message Stream Classes

class MessageStream:
    def __enter__(self) -> MessageStream: ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
    def __iter__(self) -> Iterator[MessageStreamEvent]: ...
    
    def on_text(self, handler: Callable[[TextEvent], None]) -> MessageStream: ...
    def on_input_json(self, handler: Callable[[InputJsonEvent], None]) -> MessageStream: ...
    def on_message_stop(self, handler: Callable[[MessageStopEvent], None]) -> MessageStream: ...
    def on_content_block_start(self, handler: Callable[[ContentBlockStartEvent], None]) -> MessageStream: ...
    def on_content_block_delta(self, handler: Callable[[ContentBlockDeltaEvent], None]) -> MessageStream: ...
    def on_content_block_stop(self, handler: Callable[[ContentBlockStopEvent], None]) -> MessageStream: ...
    
    def get_final_message(self) -> Message: ...
    def get_final_text(self) -> str: ...
    
    @property
    def text_stream(self) -> Iterator[str]: ...
    @property
    def current_message_snapshot(self) -> Message: ...

class AsyncMessageStream:
    async def __aenter__(self) -> AsyncMessageStream: ...
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
    def __aiter__(self) -> AsyncIterator[MessageStreamEvent]: ...
    
    def on_text(self, handler: Callable[[TextEvent], Awaitable[None]]) -> AsyncMessageStream: ...
    def on_input_json(self, handler: Callable[[InputJsonEvent], Awaitable[None]]) -> AsyncMessageStream: ...
    def on_message_stop(self, handler: Callable[[MessageStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
    def on_content_block_start(self, handler: Callable[[ContentBlockStartEvent], Awaitable[None]]) -> AsyncMessageStream: ...
    def on_content_block_delta(self, handler: Callable[[ContentBlockDeltaEvent], Awaitable[None]]) -> AsyncMessageStream: ...
    def on_content_block_stop(self, handler: Callable[[ContentBlockStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
    
    async def get_final_message(self) -> Message: ...
    async def get_final_text(self) -> str: ...
    
    @property
    def text_stream(self) -> AsyncIterator[str]: ...
    @property
    def current_message_snapshot(self) -> Message: ...

Stream Managers

class MessageStreamManager:
    def stream(
        self,
        max_tokens: int,
        messages: List[MessageParam],
        model: str,
        *,
        metadata: Optional[MetadataParam] = None,
        stop_sequences: Optional[List[str]] = None,
        system: Optional[str] = None,
        temperature: Optional[float] = None,
        tool_choice: Optional[ToolChoiceParam] = None,
        tools: Optional[List[ToolParam]] = None,
        top_k: Optional[int] = None,  
        top_p: Optional[float] = None,
        **kwargs
    ) -> MessageStream: ...

class AsyncMessageStreamManager:
    def stream(
        self,
        max_tokens: int,
        messages: List[MessageParam],
        model: str,
        *,
        metadata: Optional[MetadataParam] = None,
        stop_sequences: Optional[List[str]] = None,
        system: Optional[str] = None,
        temperature: Optional[float] = None,
        tool_choice: Optional[ToolChoiceParam] = None,
        tools: Optional[List[ToolParam]] = None,
        top_k: Optional[int] = None,
        top_p: Optional[float] = None,
        **kwargs
    ) -> AsyncMessageStream: ...

Stream Event Types

Core Stream Events

class MessageStreamEvent(TypedDict):
    type: str

class MessageStartEvent(MessageStreamEvent):
    type: Literal["message_start"]
    message: Message

class MessageDeltaEvent(MessageStreamEvent):
    type: Literal["message_delta"]
    delta: MessageDeltaUsage
    usage: MessageDeltaUsage

class MessageStopEvent(MessageStreamEvent):
    type: Literal["message_stop"]

class ContentBlockStartEvent(MessageStreamEvent):
    type: Literal["content_block_start"]
    index: int
    content_block: ContentBlock

class ContentBlockDeltaEvent(MessageStreamEvent):
    type: Literal["content_block_delta"]
    index: int
    delta: Union[TextDelta, InputJSONDelta]

class ContentBlockStopEvent(MessageStreamEvent):
    type: Literal["content_block_stop"]
    index: int

Specific Event Types

class TextEvent(TypedDict):
    type: Literal["text"]
    text: str
    snapshot: str

class InputJsonEvent(TypedDict):
    type: Literal["input_json"]
    partial_json: str

class TextDelta(TypedDict):
    type: Literal["text_delta"]
    text: str

class InputJSONDelta(TypedDict):
    type: Literal["input_json_delta"]
    partial_json: str

class MessageDeltaUsage(TypedDict):
    output_tokens: int

Raw Stream Events

class RawMessageStreamEvent(TypedDict):
    type: str

class RawMessageStartEvent(RawMessageStreamEvent):
    type: Literal["message_start"]
    message: Message

class RawMessageDeltaEvent(RawMessageStreamEvent):
    type: Literal["message_delta"]
    delta: MessageDeltaUsage
    usage: MessageDeltaUsage

class RawMessageStopEvent(RawMessageStreamEvent):
    type: Literal["message_stop"]

class RawContentBlockStartEvent(RawMessageStreamEvent):
    type: Literal["content_block_start"]
    index: int
    content_block: ContentBlock

class RawContentBlockDeltaEvent(RawMessageStreamEvent):
    type: Literal["content_block_delta"]
    index: int
    delta: RawContentBlockDelta

class RawContentBlockStopEvent(RawMessageStreamEvent):
    type: Literal["content_block_stop"]
    index: int

class RawContentBlockDelta(TypedDict):
    type: str
    text: Optional[str]
    partial_json: Optional[str]

Usage Examples

Basic Text Streaming

from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Write a short story about a robot"}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Event Handler Pattern

def handle_text(event):
    print(f"Text: {event.text}")

def handle_stop(event):
    print("Message completed!")

with client.messages.stream(
    model="claude-sonnet-4-20250514", 
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Hello!"}
    ]
) as stream:
    stream.on_text(handle_text)
    stream.on_message_stop(handle_stop)
    
    # Process all events
    for event in stream:
        pass  # Events are handled by registered handlers
        
# Get final assembled message
final_message = stream.get_final_message()
print(final_message.content[0].text)

Tool Use Streaming

tools = [
    {
        "name": "get_weather",
        "description": "Get current weather",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            },
            "required": ["location"]
        }
    }
]

def handle_tool_use(event):
    print(f"Tool called: {event.name}")
    print(f"Input: {event.input}")

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user", "content": "What's the weather in London?"}
    ]
) as stream:
    stream.on_input_json(handle_tool_use)
    
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                print(f"Starting tool use: {event.content_block.name}")

Async Streaming

import asyncio
from anthropic import AsyncAnthropic

async def stream_chat():
    client = AsyncAnthropic()
    
    async def handle_text(event):
        print(f"Received: {event.text}")
    
    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": "Count to 10"}
        ]
    ) as stream:
        stream.on_text(handle_text)
        
        async for event in stream:
            if event.type == "message_stop":
                print("Finished!")
                break

asyncio.run(stream_chat())

Real-time Text Processing

accumulated_text = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Write a poem about the ocean"}
    ]
) as stream:
    for text in stream.text_stream:
        accumulated_text += text
        
        # Process text in real-time (e.g., update UI)
        if "\n" in text:  # New line completed
            lines = accumulated_text.split("\n")
            for line in lines[:-1]:  # Process complete lines
                print(f"Complete line: {line}")
            accumulated_text = lines[-1]  # Keep incomplete line

Streaming with Current Snapshot

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain quantum computing"}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            # Get current state of the message being built
            current_snapshot = stream.current_message_snapshot
            print(f"Current length: {len(current_snapshot.content[0].text)} chars")
            
    # Get final complete message
    final_message = stream.get_final_message()
    final_text = stream.get_final_text()

Error Handling in Streams

try:
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": "Hello!"}
        ]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="")
            
except Exception as e:
    print(f"Streaming error: {e}")
    # Handle connection issues, rate limits, etc.

Custom Stream Processing

class CustomStreamHandler:
    def __init__(self):
        self.word_count = 0
        self.sentences = []
        self.current_sentence = ""
    
    def handle_text(self, event):
        text = event.text
        self.current_sentence += text
        self.word_count += len(text.split())
        
        if "." in text or "!" in text or "?" in text:
            self.sentences.append(self.current_sentence.strip())
            self.current_sentence = ""
    
    def handle_stop(self, event):
        print(f"Final stats: {self.word_count} words, {len(self.sentences)} sentences")

handler = CustomStreamHandler()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Tell me about machine learning"}
    ]
) as stream:
    stream.on_text(handler.handle_text)
    stream.on_message_stop(handler.handle_stop)
    
    for event in stream:
        pass  # Let handlers process events

docs

batching.md

bedrock.md

beta.md

completions.md

configuration.md

errors.md

index.md

messages.md

models.md

streaming.md

tools.md

vertex.md

tile.json