Building applications with LLMs through composability
—
Streaming enables real-time delivery of content as it's generated by language models and agents. This provides responsive user interfaces and allows progressive rendering of responses.
Streaming works at two levels in LangChain:
Both synchronous and asynchronous streaming are supported across all models and agents.
Stream model responses synchronously using the stream() method:
def stream(
messages: list[AnyMessage],
**kwargs: Any
) -> Iterator[AIMessageChunk]Basic Usage:
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
# Stream response chunks
for chunk in model.stream([HumanMessage(content="Write a poem about Python")]):
print(chunk.content, end="", flush=True)Accumulating Chunks:
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
# Collect all chunks
chunks = []
for chunk in model.stream([HumanMessage(content="Explain quantum computing")]):
chunks.append(chunk)
print(chunk.content, end="", flush=True)
# Combine chunks into final message
final_message = chunks[0]
for chunk in chunks[1:]:
final_message += chunk
print(f"\n\nFinal message: {final_message.content}")
print(f"Total tokens: {final_message.usage_metadata['total_tokens']}")Stream model responses asynchronously using the astream() method:
async def astream(
messages: list[AnyMessage],
**kwargs: Any
) -> AsyncIterator[AIMessageChunk]Basic Async Streaming:
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
# Async streaming
async for chunk in model.astream([HumanMessage(content="Write a story")]):
print(chunk.content, end="", flush=True)Parallel Async Streaming:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
async def stream_response(prompt: str):
"""Stream a single response."""
print(f"\n=== {prompt} ===")
async for chunk in model.astream([HumanMessage(content=prompt)]):
print(chunk.content, end="", flush=True)
print("\n")
async def stream_multiple():
"""Stream multiple responses in parallel."""
await asyncio.gather(
stream_response("What is Python?"),
stream_response("What is JavaScript?"),
stream_response("What is Rust?")
)
# Run parallel streaming
asyncio.run(stream_multiple())When models make tool calls, the tool call information is streamed incrementally:
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
from langchain.tools import tool
@tool
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"Sunny, 72°F in {city}"
model = init_chat_model("openai:gpt-4o")
model_with_tools = model.bind_tools([get_weather])
# Stream with tool calls
for chunk in model_with_tools.stream([
HumanMessage(content="What's the weather in Paris?")
]):
# Text content
if chunk.content:
print(chunk.content, end="", flush=True)
# Tool call chunks
if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
for tool_chunk in chunk.tool_call_chunks:
if tool_chunk.get('name'):
print(f"\n[Tool: {tool_chunk['name']}]")
if tool_chunk.get('args'):
print(f"[Args: {tool_chunk['args']}]", end="")Stream agent execution including intermediate steps, tool calls, and final responses:
def stream(
input: dict,
config: dict | None = None
) -> Iterator[dict]Basic Agent Streaming:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
@tool
def calculator(expression: str) -> float:
"""Evaluate a mathematical expression."""
return eval(expression)
agent = create_agent(
model="openai:gpt-4o",
tools=[calculator],
system_prompt="You are a helpful math assistant."
)
# Stream agent execution
for chunk in agent.stream({
"messages": [HumanMessage(content="What is 123 * 456?")]
}):
print(chunk)Filtering Stream Events:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return f"Results for: {query}"
agent = create_agent(
model="openai:gpt-4o",
tools=[search_web]
)
# Stream and filter for specific events
for chunk in agent.stream({
"messages": [HumanMessage(content="Search for Python tutorials")]
}):
# Check for agent node output
if "agent" in chunk:
messages = chunk["agent"].get("messages", [])
for msg in messages:
if hasattr(msg, 'content') and msg.content:
print(f"Agent: {msg.content}")
if hasattr(msg, 'tool_calls') and msg.tool_calls:
for tc in msg.tool_calls:
print(f"Calling tool: {tc['name']} with {tc['args']}")
# Check for tool node output
if "tools" in chunk:
messages = chunk["tools"].get("messages", [])
for msg in messages:
print(f"Tool result: {msg.content}")Stream agent execution asynchronously:
async def astream(
input: dict,
config: dict | None = None
) -> AsyncIterator[dict]Basic Async Agent Streaming:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
@tool
async def fetch_data(url: str) -> str:
"""Fetch data from a URL."""
# Async fetch implementation
return f"Data from {url}"
agent = create_agent(
model="openai:gpt-4o",
tools=[fetch_data]
)
# Async streaming
async for chunk in agent.astream({
"messages": [HumanMessage(content="Fetch data from example.com")]
}):
print(chunk)Real-time Progress Updates:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
import asyncio
@tool
async def analyze_data(data: str) -> str:
"""Analyze data (simulated long-running task)."""
await asyncio.sleep(2)
return f"Analysis complete: {len(data)} characters processed"
agent = create_agent(
model="openai:gpt-4o",
tools=[analyze_data]
)
async def stream_with_progress():
"""Stream with real-time progress updates."""
print("Starting agent execution...")
async for chunk in agent.astream({
"messages": [HumanMessage(content="Analyze this data: Hello world")]
}):
# Update UI/progress bar in real-time
if "agent" in chunk:
print("Agent thinking...")
elif "tools" in chunk:
print("Tool executing...")
elif "__end__" in chunk:
print("Done!")
asyncio.run(stream_with_progress())Combine streaming with checkpointing for resumable conversations:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
# Create agent with checkpointer
checkpointer = MemorySaver()
agent = create_agent(
model="openai:gpt-4o",
checkpointer=checkpointer
)
# Stream with thread ID
config = {"configurable": {"thread_id": "conversation-1"}}
print("First message:")
for chunk in agent.stream({
"messages": [HumanMessage(content="My name is Alice")]
}, config=config):
if "agent" in chunk:
messages = chunk["agent"].get("messages", [])
for msg in messages:
if hasattr(msg, 'content') and msg.content:
print(msg.content, end="", flush=True)
print("\n\nSecond message:")
for chunk in agent.stream({
"messages": [HumanMessage(content="What's my name?")]
}, config=config):
if "agent" in chunk:
messages = chunk["agent"].get("messages", [])
for msg in messages:
if hasattr(msg, 'content') and msg.content:
print(msg.content, end="", flush=True)Streaming chunks may contain partial data. Always handle cases where fields are None or empty:
for chunk in model.stream(messages):
# Safe content handling
if chunk.content:
print(chunk.content, end="", flush=True)
# Safe tool call handling
if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
for tc in chunk.tool_call_chunks:
if tc.get('name'):
print(f"\nTool: {tc['name']}")Buffer chunks to reduce UI update frequency:
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
buffer = ""
buffer_size = 10 # Update UI every 10 characters
for chunk in model.stream([HumanMessage(content="Write a long story")]):
buffer += chunk.content
# Update UI when buffer reaches size
if len(buffer) >= buffer_size:
print(buffer, end="", flush=True)
buffer = ""
# Print remaining buffer
if buffer:
print(buffer, end="", flush=True)Always wrap streaming in try-except blocks:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
agent = create_agent(model="openai:gpt-4o")
try:
for chunk in agent.stream({
"messages": [HumanMessage(content="Hello")]
}):
print(chunk)
except Exception as e:
print(f"Streaming error: {e}")
# Handle error gracefully# Wrong - output buffered
for chunk in model.stream(messages):
print(chunk.content, end="")
# Right - immediate output
for chunk in model.stream(messages):
print(chunk.content, end="", flush=True)# Wrong - blocks event loop
async def bad_stream():
for chunk in model.stream(messages): # Should use astream
print(chunk.content)
# Right - non-blocking
async def good_stream():
async for chunk in model.astream(messages):
print(chunk.content)# Wrong - loses metadata
for chunk in model.stream(messages):
print(chunk.content)
# Usage metadata is lost
# Right - accumulate for metadata
chunks = []
for chunk in model.stream(messages):
chunks.append(chunk)
print(chunk.content, end="", flush=True)
final = chunks[0]
for chunk in chunks[1:]:
final += chunk
print(f"\nTokens used: {final.usage_metadata['total_tokens']}")Install with Tessl CLI
npx tessl i tessl/pypi-langchain@1.2.1