CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-langchain

Building applications with LLMs through composability

Pending
Overview
Eval results
Files

streaming.mddocs/patterns/

Streaming Patterns

Streaming enables real-time delivery of content as it's generated by language models and agents. This provides responsive user interfaces and allows progressive rendering of responses.

Core Concepts

Streaming works at two levels in LangChain:

  1. Model Streaming: Stream tokens/chunks directly from the language model as they are generated
  2. Agent Streaming: Stream intermediate steps, tool calls, and final responses from agent execution

Both synchronous and asynchronous streaming are supported across all models and agents.

Model Streaming

Synchronous Model Streaming

Stream model responses synchronously using the stream() method:

def stream(
    messages: list[AnyMessage],
    **kwargs: Any
) -> Iterator[AIMessageChunk]

Basic Usage:

from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

# Stream response chunks
for chunk in model.stream([HumanMessage(content="Write a poem about Python")]):
    print(chunk.content, end="", flush=True)

Accumulating Chunks:

from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

# Collect all chunks
chunks = []
for chunk in model.stream([HumanMessage(content="Explain quantum computing")]):
    chunks.append(chunk)
    print(chunk.content, end="", flush=True)

# Combine chunks into final message
final_message = chunks[0]
for chunk in chunks[1:]:
    final_message += chunk

print(f"\n\nFinal message: {final_message.content}")
print(f"Total tokens: {final_message.usage_metadata['total_tokens']}")

Asynchronous Model Streaming

Stream model responses asynchronously using the astream() method:

async def astream(
    messages: list[AnyMessage],
    **kwargs: Any
) -> AsyncIterator[AIMessageChunk]

Basic Async Streaming:

from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

# Async streaming
async for chunk in model.astream([HumanMessage(content="Write a story")]):
    print(chunk.content, end="", flush=True)

Parallel Async Streaming:

import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

async def stream_response(prompt: str):
    """Stream a single response."""
    print(f"\n=== {prompt} ===")
    async for chunk in model.astream([HumanMessage(content=prompt)]):
        print(chunk.content, end="", flush=True)
    print("\n")

async def stream_multiple():
    """Stream multiple responses in parallel."""
    await asyncio.gather(
        stream_response("What is Python?"),
        stream_response("What is JavaScript?"),
        stream_response("What is Rust?")
    )

# Run parallel streaming
asyncio.run(stream_multiple())

Handling Tool Call Chunks

When models make tool calls, the tool call information is streamed incrementally:

from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
from langchain.tools import tool

@tool
def get_weather(city: str) -> str:
    """Get weather for a city."""
    return f"Sunny, 72°F in {city}"

model = init_chat_model("openai:gpt-4o")
model_with_tools = model.bind_tools([get_weather])

# Stream with tool calls
for chunk in model_with_tools.stream([
    HumanMessage(content="What's the weather in Paris?")
]):
    # Text content
    if chunk.content:
        print(chunk.content, end="", flush=True)

    # Tool call chunks
    if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
        for tool_chunk in chunk.tool_call_chunks:
            if tool_chunk.get('name'):
                print(f"\n[Tool: {tool_chunk['name']}]")
            if tool_chunk.get('args'):
                print(f"[Args: {tool_chunk['args']}]", end="")

Agent Streaming

Synchronous Agent Streaming

Stream agent execution including intermediate steps, tool calls, and final responses:

def stream(
    input: dict,
    config: dict | None = None
) -> Iterator[dict]

Basic Agent Streaming:

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool

@tool
def calculator(expression: str) -> float:
    """Evaluate a mathematical expression."""
    return eval(expression)

agent = create_agent(
    model="openai:gpt-4o",
    tools=[calculator],
    system_prompt="You are a helpful math assistant."
)

# Stream agent execution
for chunk in agent.stream({
    "messages": [HumanMessage(content="What is 123 * 456?")]
}):
    print(chunk)

Filtering Stream Events:

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Results for: {query}"

agent = create_agent(
    model="openai:gpt-4o",
    tools=[search_web]
)

# Stream and filter for specific events
for chunk in agent.stream({
    "messages": [HumanMessage(content="Search for Python tutorials")]
}):
    # Check for agent node output
    if "agent" in chunk:
        messages = chunk["agent"].get("messages", [])
        for msg in messages:
            if hasattr(msg, 'content') and msg.content:
                print(f"Agent: {msg.content}")
            if hasattr(msg, 'tool_calls') and msg.tool_calls:
                for tc in msg.tool_calls:
                    print(f"Calling tool: {tc['name']} with {tc['args']}")

    # Check for tool node output
    if "tools" in chunk:
        messages = chunk["tools"].get("messages", [])
        for msg in messages:
            print(f"Tool result: {msg.content}")

Asynchronous Agent Streaming

Stream agent execution asynchronously:

async def astream(
    input: dict,
    config: dict | None = None
) -> AsyncIterator[dict]

Basic Async Agent Streaming:

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool

@tool
async def fetch_data(url: str) -> str:
    """Fetch data from a URL."""
    # Async fetch implementation
    return f"Data from {url}"

agent = create_agent(
    model="openai:gpt-4o",
    tools=[fetch_data]
)

# Async streaming
async for chunk in agent.astream({
    "messages": [HumanMessage(content="Fetch data from example.com")]
}):
    print(chunk)

Real-time Progress Updates:

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
import asyncio

@tool
async def analyze_data(data: str) -> str:
    """Analyze data (simulated long-running task)."""
    await asyncio.sleep(2)
    return f"Analysis complete: {len(data)} characters processed"

agent = create_agent(
    model="openai:gpt-4o",
    tools=[analyze_data]
)

async def stream_with_progress():
    """Stream with real-time progress updates."""
    print("Starting agent execution...")

    async for chunk in agent.astream({
        "messages": [HumanMessage(content="Analyze this data: Hello world")]
    }):
        # Update UI/progress bar in real-time
        if "agent" in chunk:
            print("Agent thinking...")
        elif "tools" in chunk:
            print("Tool executing...")
        elif "__end__" in chunk:
            print("Done!")

asyncio.run(stream_with_progress())

Streaming with Persistence

Combine streaming with checkpointing for resumable conversations:

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver

# Create agent with checkpointer
checkpointer = MemorySaver()
agent = create_agent(
    model="openai:gpt-4o",
    checkpointer=checkpointer
)

# Stream with thread ID
config = {"configurable": {"thread_id": "conversation-1"}}

print("First message:")
for chunk in agent.stream({
    "messages": [HumanMessage(content="My name is Alice")]
}, config=config):
    if "agent" in chunk:
        messages = chunk["agent"].get("messages", [])
        for msg in messages:
            if hasattr(msg, 'content') and msg.content:
                print(msg.content, end="", flush=True)

print("\n\nSecond message:")
for chunk in agent.stream({
    "messages": [HumanMessage(content="What's my name?")]
}, config=config):
    if "agent" in chunk:
        messages = chunk["agent"].get("messages", [])
        for msg in messages:
            if hasattr(msg, 'content') and msg.content:
                print(msg.content, end="", flush=True)

Best Practices

Handle Incomplete Chunks

Streaming chunks may contain partial data. Always handle cases where fields are None or empty:

for chunk in model.stream(messages):
    # Safe content handling
    if chunk.content:
        print(chunk.content, end="", flush=True)

    # Safe tool call handling
    if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
        for tc in chunk.tool_call_chunks:
            if tc.get('name'):
                print(f"\nTool: {tc['name']}")

Buffering for UI Updates

Buffer chunks to reduce UI update frequency:

from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

buffer = ""
buffer_size = 10  # Update UI every 10 characters

for chunk in model.stream([HumanMessage(content="Write a long story")]):
    buffer += chunk.content

    # Update UI when buffer reaches size
    if len(buffer) >= buffer_size:
        print(buffer, end="", flush=True)
        buffer = ""

# Print remaining buffer
if buffer:
    print(buffer, end="", flush=True)

Error Handling in Streams

Always wrap streaming in try-except blocks:

from langchain.agents import create_agent
from langchain.messages import HumanMessage

agent = create_agent(model="openai:gpt-4o")

try:
    for chunk in agent.stream({
        "messages": [HumanMessage(content="Hello")]
    }):
        print(chunk)
except Exception as e:
    print(f"Streaming error: {e}")
    # Handle error gracefully

Common Mistakes

Mistake: Not Flushing Output

# Wrong - output buffered
for chunk in model.stream(messages):
    print(chunk.content, end="")

# Right - immediate output
for chunk in model.stream(messages):
    print(chunk.content, end="", flush=True)

Mistake: Blocking Async Streams

# Wrong - blocks event loop
async def bad_stream():
    for chunk in model.stream(messages):  # Should use astream
        print(chunk.content)

# Right - non-blocking
async def good_stream():
    async for chunk in model.astream(messages):
        print(chunk.content)

Mistake: Not Accumulating Chunks

# Wrong - loses metadata
for chunk in model.stream(messages):
    print(chunk.content)
# Usage metadata is lost

# Right - accumulate for metadata
chunks = []
for chunk in model.stream(messages):
    chunks.append(chunk)
    print(chunk.content, end="", flush=True)

final = chunks[0]
for chunk in chunks[1:]:
    final += chunk
print(f"\nTokens used: {final.usage_metadata['total_tokens']}")

Related Patterns

  • Async Patterns - Async execution patterns for concurrent operations
  • Persistence Patterns - Combining streaming with state persistence
  • Error Handling - Handling errors during streaming

Install with Tessl CLI

npx tessl i tessl/pypi-langchain

docs

index.md

quickstart.md

tile.json