Building applications with LLMs through composability
—
Asynchronous execution enables concurrent operations, improved performance, and better resource utilization in LLM applications. LangChain provides comprehensive async support across models, agents, and tools.
Async patterns in LangChain work at three levels:
ainvoke(), astream(), abatch()All async methods follow Python's async/await patterns and are fully compatible with asyncio.
Execute agents asynchronously using ainvoke():
async def ainvoke(
input: dict,
config: dict | None = None
) -> dictBasic Usage:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
agent = create_agent(model="openai:gpt-4o")
async def run_agent():
result = await agent.ainvoke({
"messages": [HumanMessage(content="What is the capital of France?")]
})
print(result["messages"][-1].content)
asyncio.run(run_agent())With Tools:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
@tool
async def search_web(query: str) -> str:
"""Search the web for information."""
# Simulate async API call
await asyncio.sleep(1)
return f"Results for: {query}"
agent = create_agent(
model="openai:gpt-4o",
tools=[search_web]
)
async def main():
result = await agent.ainvoke({
"messages": [HumanMessage(content="Search for Python tutorials")]
})
print(result["messages"][-1].content)
asyncio.run(main())Stream agent execution asynchronously using astream():
async def astream(
input: dict,
config: dict | None = None
) -> AsyncIterator[dict]Basic Streaming:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
agent = create_agent(model="openai:gpt-4o")
async def stream_agent():
async for chunk in agent.astream({
"messages": [HumanMessage(content="Write a poem")]
}):
if "agent" in chunk:
messages = chunk["agent"].get("messages", [])
for msg in messages:
if hasattr(msg, 'content') and msg.content:
print(msg.content, end="", flush=True)
asyncio.run(stream_agent())Streaming with Real-time Updates:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
@tool
async def analyze_data(data: str) -> str:
"""Analyze data (long-running operation)."""
await asyncio.sleep(2)
return f"Analysis complete: {len(data)} characters"
agent = create_agent(
model="openai:gpt-4o",
tools=[analyze_data]
)
async def stream_with_updates():
print("Starting analysis...")
async for chunk in agent.astream({
"messages": [HumanMessage(content="Analyze this: Hello world")]
}):
if "agent" in chunk:
print("Agent thinking...")
elif "tools" in chunk:
print("Tool executing...")
elif "__end__" in chunk:
print("Complete!")
asyncio.run(stream_with_updates())Execute multiple agent requests in parallel using abatch():
async def abatch(
inputs: list[dict],
config: dict | None = None
) -> list[dict]Batch Processing:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
agent = create_agent(model="openai:gpt-4o")
async def batch_execution():
# Execute multiple requests in parallel
results = await agent.abatch([
{"messages": [HumanMessage(content="What is Python?")]},
{"messages": [HumanMessage(content="What is JavaScript?")]},
{"messages": [HumanMessage(content="What is Rust?")]},
])
for i, result in enumerate(results, 1):
print(f"\nResult {i}:")
print(result["messages"][-1].content)
asyncio.run(batch_execution())Execute models asynchronously:
async def ainvoke(
messages: list[AnyMessage],
**kwargs: Any
) -> AIMessageBasic Usage:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
async def run_model():
response = await model.ainvoke([
HumanMessage(content="Explain async programming")
])
print(response.content)
asyncio.run(run_model())Stream model responses asynchronously:
async def astream(
messages: list[AnyMessage],
**kwargs: Any
) -> AsyncIterator[AIMessageChunk]Streaming Example:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
async def stream_model():
async for chunk in model.astream([
HumanMessage(content="Write a short story")
]):
print(chunk.content, end="", flush=True)
asyncio.run(stream_model())Execute multiple model requests in parallel:
async def abatch(
messages: list[list[AnyMessage]],
**kwargs: Any
) -> list[AIMessage]Batch Example:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
model = init_chat_model("openai:gpt-4o")
async def batch_model():
responses = await model.abatch([
[HumanMessage(content="What is 2+2?")],
[HumanMessage(content="What is 3+3?")],
[HumanMessage(content="What is 4+4?")],
])
for i, response in enumerate(responses, 1):
print(f"Response {i}: {response.content}")
asyncio.run(batch_model())Define async tools using async functions:
import asyncio
from langchain.tools import tool
import httpx
@tool
async def fetch_weather(city: str) -> dict:
"""Fetch current weather data for a city.
Args:
city: Name of the city
Returns:
Weather data
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.weather.com/v1/{city}",
timeout=10.0
)
return response.json()
@tool
async def search_database(query: str) -> list[dict]:
"""Search database asynchronously.
Args:
query: Search query
Returns:
Search results
"""
# Simulate async database query
await asyncio.sleep(0.5)
return [{"id": 1, "title": f"Result for {query}"}]Create async tools using BaseTool:
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
import httpx
class FetchURLInput(BaseModel):
url: str = Field(description="URL to fetch")
class AsyncFetchTool(BaseTool):
name: str = "fetch_url"
description: str = "Fetch content from a URL asynchronously"
args_schema: type[BaseModel] = FetchURLInput
def _run(self, url: str) -> str:
"""Sync implementation (fallback)."""
import requests
response = requests.get(url, timeout=10)
return response.text
async def _arun(self, url: str) -> str:
"""Async implementation."""
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
return response.textRun multiple agents concurrently:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
async def run_agent_task(agent, prompt: str, task_name: str):
"""Run a single agent task."""
print(f"Starting {task_name}...")
result = await agent.ainvoke({
"messages": [HumanMessage(content=prompt)]
})
print(f"{task_name} complete!")
return result
async def parallel_agents():
"""Run multiple agents in parallel."""
agent1 = create_agent(model="openai:gpt-4o")
agent2 = create_agent(model="openai:gpt-4o")
agent3 = create_agent(model="openai:gpt-4o")
# Execute all agents concurrently
results = await asyncio.gather(
run_agent_task(agent1, "Summarize Python", "Task 1"),
run_agent_task(agent2, "Summarize JavaScript", "Task 2"),
run_agent_task(agent3, "Summarize Rust", "Task 3"),
)
for i, result in enumerate(results, 1):
print(f"\nResult {i}:")
print(result["messages"][-1].content)
asyncio.run(parallel_agents())Execute multiple tools concurrently:
import asyncio
from langchain.tools import tool
import httpx
@tool
async def fetch_url_async(url: str) -> str:
"""Fetch URL content asynchronously."""
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
return response.text[:100] # First 100 chars
async def fetch_multiple_urls():
"""Fetch multiple URLs concurrently."""
urls = [
"https://example.com",
"https://example.org",
"https://example.net",
]
# Execute all fetches concurrently
results = await asyncio.gather(*[
fetch_url_async.ainvoke({"url": url})
for url in urls
])
for url, result in zip(urls, results):
print(f"{url}: {result[:50]}...")
asyncio.run(fetch_multiple_urls())Chain async operations in a pipeline:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
async def fetch_data(query: str) -> str:
"""Fetch data asynchronously."""
await asyncio.sleep(1)
return f"Raw data for: {query}"
async def process_data(data: str) -> str:
"""Process data asynchronously."""
await asyncio.sleep(1)
return data.upper()
async def analyze_with_llm(data: str) -> str:
"""Analyze with LLM."""
model = init_chat_model("openai:gpt-4o")
response = await model.ainvoke([
HumanMessage(content=f"Analyze this: {data}")
])
return response.content
async def pipeline(query: str):
"""Execute async pipeline."""
print(f"Processing: {query}")
# Sequential async operations
data = await fetch_data(query)
print(f"Fetched: {data}")
processed = await process_data(data)
print(f"Processed: {processed}")
analysis = await analyze_with_llm(processed)
print(f"Analysis: {analysis}")
return analysis
asyncio.run(pipeline("customer feedback"))Run multiple pipelines concurrently:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage
async def process_item(item: str, model) -> dict:
"""Process a single item through pipeline."""
# Step 1: Preprocess
await asyncio.sleep(0.5)
preprocessed = item.upper()
# Step 2: Analyze with LLM
response = await model.ainvoke([
HumanMessage(content=f"Analyze: {preprocessed}")
])
return {
"item": item,
"analysis": response.content
}
async def parallel_pipelines():
"""Process multiple items concurrently."""
model = init_chat_model("openai:gpt-4o")
items = ["item1", "item2", "item3", "item4"]
# Process all items concurrently
results = await asyncio.gather(*[
process_item(item, model)
for item in items
])
for result in results:
print(f"{result['item']}: {result['analysis']}")
asyncio.run(parallel_pipelines())Handle errors in async operations:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
async def safe_agent_call(agent, prompt: str):
"""Execute agent with error handling."""
try:
result = await agent.ainvoke({
"messages": [HumanMessage(content=prompt)]
})
return result
except Exception as e:
print(f"Error: {e}")
return None
async def main():
agent = create_agent(model="openai:gpt-4o")
result = await safe_agent_call(agent, "Hello")
if result:
print(result["messages"][-1].content)
asyncio.run(main())Handle errors in concurrent operations:
import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
async def safe_task(agent, prompt: str, task_id: int):
"""Execute task with error handling."""
try:
result = await agent.ainvoke({
"messages": [HumanMessage(content=prompt)]
})
return {"task_id": task_id, "result": result, "error": None}
except Exception as e:
return {"task_id": task_id, "result": None, "error": str(e)}
async def parallel_with_errors():
"""Handle errors in parallel execution."""
agent = create_agent(model="openai:gpt-4o")
results = await asyncio.gather(
safe_task(agent, "Valid prompt", 1),
safe_task(agent, "Another valid prompt", 2),
safe_task(agent, "Yet another prompt", 3),
return_exceptions=True # Don't fail fast
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
elif result["error"]:
print(f"Task {result['task_id']} error: {result['error']}")
else:
print(f"Task {result['task_id']} success")
asyncio.run(parallel_with_errors())# Execute multiple operations concurrently
results = await asyncio.gather(
operation1(),
operation2(),
operation3(),
)async def complex_workflow():
"""Use TaskGroup for complex async coordination."""
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(operation1())
task2 = tg.create_task(operation2())
task3 = tg.create_task(operation3())
# All tasks complete here
return task1.result(), task2.result(), task3.result()async def with_timeout():
"""Execute with timeout."""
try:
result = await asyncio.wait_for(
agent.ainvoke({"messages": [HumanMessage(content="Hello")]}),
timeout=30.0
)
return result
except asyncio.TimeoutError:
print("Operation timed out")
return None# Wrong - blocks event loop
async def bad_async():
result = agent.invoke({"messages": [...]}) # Blocking!
return result
# Right - non-blocking
async def good_async():
result = await agent.ainvoke({"messages": [...]})
return result# Wrong - doesn't wait for completion
async def bad_await():
result = agent.ainvoke({"messages": [...]}) # Returns coroutine
print(result) # Prints coroutine object, not result
# Right - awaits completion
async def good_await():
result = await agent.ainvoke({"messages": [...]})
print(result) # Prints actual result# Wrong - sequential (slow)
async def slow_execution():
result1 = await operation1()
result2 = await operation2()
result3 = await operation3()
return [result1, result2, result3]
# Right - parallel (fast)
async def fast_execution():
results = await asyncio.gather(
operation1(),
operation2(),
operation3(),
)
return resultsInstall with Tessl CLI
npx tessl i tessl/pypi-langchain@1.2.1