CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-langchain

Building applications with LLMs through composability

Pending
Overview
Eval results
Files

async.mddocs/patterns/

Async Patterns

Asynchronous execution enables concurrent operations, improved performance, and better resource utilization in LLM applications. LangChain provides comprehensive async support across models, agents, and tools.

Core Concepts

Async patterns in LangChain work at three levels:

  1. Async Model Execution: Non-blocking model inference with ainvoke(), astream(), abatch()
  2. Async Agent Execution: Non-blocking agent runs with full tool calling support
  3. Async Tools: Tools that perform async I/O operations
  4. Concurrent Execution: Running multiple operations in parallel

All async methods follow Python's async/await patterns and are fully compatible with asyncio.

Async Agent Execution

Basic Async Invocation

Execute agents asynchronously using ainvoke():

async def ainvoke(
    input: dict,
    config: dict | None = None
) -> dict

Basic Usage:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage

agent = create_agent(model="openai:gpt-4o")

async def run_agent():
    result = await agent.ainvoke({
        "messages": [HumanMessage(content="What is the capital of France?")]
    })
    print(result["messages"][-1].content)

asyncio.run(run_agent())

With Tools:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool

@tool
async def search_web(query: str) -> str:
    """Search the web for information."""
    # Simulate async API call
    await asyncio.sleep(1)
    return f"Results for: {query}"

agent = create_agent(
    model="openai:gpt-4o",
    tools=[search_web]
)

async def main():
    result = await agent.ainvoke({
        "messages": [HumanMessage(content="Search for Python tutorials")]
    })
    print(result["messages"][-1].content)

asyncio.run(main())

Async Streaming

Stream agent execution asynchronously using astream():

async def astream(
    input: dict,
    config: dict | None = None
) -> AsyncIterator[dict]

Basic Streaming:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage

agent = create_agent(model="openai:gpt-4o")

async def stream_agent():
    async for chunk in agent.astream({
        "messages": [HumanMessage(content="Write a poem")]
    }):
        if "agent" in chunk:
            messages = chunk["agent"].get("messages", [])
            for msg in messages:
                if hasattr(msg, 'content') and msg.content:
                    print(msg.content, end="", flush=True)

asyncio.run(stream_agent())

Streaming with Real-time Updates:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool

@tool
async def analyze_data(data: str) -> str:
    """Analyze data (long-running operation)."""
    await asyncio.sleep(2)
    return f"Analysis complete: {len(data)} characters"

agent = create_agent(
    model="openai:gpt-4o",
    tools=[analyze_data]
)

async def stream_with_updates():
    print("Starting analysis...")

    async for chunk in agent.astream({
        "messages": [HumanMessage(content="Analyze this: Hello world")]
    }):
        if "agent" in chunk:
            print("Agent thinking...")
        elif "tools" in chunk:
            print("Tool executing...")
        elif "__end__" in chunk:
            print("Complete!")

asyncio.run(stream_with_updates())

Async Batch Execution

Execute multiple agent requests in parallel using abatch():

async def abatch(
    inputs: list[dict],
    config: dict | None = None
) -> list[dict]

Batch Processing:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage

agent = create_agent(model="openai:gpt-4o")

async def batch_execution():
    # Execute multiple requests in parallel
    results = await agent.abatch([
        {"messages": [HumanMessage(content="What is Python?")]},
        {"messages": [HumanMessage(content="What is JavaScript?")]},
        {"messages": [HumanMessage(content="What is Rust?")]},
    ])

    for i, result in enumerate(results, 1):
        print(f"\nResult {i}:")
        print(result["messages"][-1].content)

asyncio.run(batch_execution())

Async Model Execution

Async Model Invocation

Execute models asynchronously:

async def ainvoke(
    messages: list[AnyMessage],
    **kwargs: Any
) -> AIMessage

Basic Usage:

import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

async def run_model():
    response = await model.ainvoke([
        HumanMessage(content="Explain async programming")
    ])
    print(response.content)

asyncio.run(run_model())

Async Model Streaming

Stream model responses asynchronously:

async def astream(
    messages: list[AnyMessage],
    **kwargs: Any
) -> AsyncIterator[AIMessageChunk]

Streaming Example:

import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

async def stream_model():
    async for chunk in model.astream([
        HumanMessage(content="Write a short story")
    ]):
        print(chunk.content, end="", flush=True)

asyncio.run(stream_model())

Async Model Batch

Execute multiple model requests in parallel:

async def abatch(
    messages: list[list[AnyMessage]],
    **kwargs: Any
) -> list[AIMessage]

Batch Example:

import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

model = init_chat_model("openai:gpt-4o")

async def batch_model():
    responses = await model.abatch([
        [HumanMessage(content="What is 2+2?")],
        [HumanMessage(content="What is 3+3?")],
        [HumanMessage(content="What is 4+4?")],
    ])

    for i, response in enumerate(responses, 1):
        print(f"Response {i}: {response.content}")

asyncio.run(batch_model())

Async Tools

Creating Async Tools

Define async tools using async functions:

import asyncio
from langchain.tools import tool
import httpx

@tool
async def fetch_weather(city: str) -> dict:
    """Fetch current weather data for a city.

    Args:
        city: Name of the city

    Returns:
        Weather data
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.weather.com/v1/{city}",
            timeout=10.0
        )
        return response.json()

@tool
async def search_database(query: str) -> list[dict]:
    """Search database asynchronously.

    Args:
        query: Search query

    Returns:
        Search results
    """
    # Simulate async database query
    await asyncio.sleep(0.5)
    return [{"id": 1, "title": f"Result for {query}"}]

Async Class-Based Tools

Create async tools using BaseTool:

from langchain.tools import BaseTool
from pydantic import BaseModel, Field
import httpx

class FetchURLInput(BaseModel):
    url: str = Field(description="URL to fetch")

class AsyncFetchTool(BaseTool):
    name: str = "fetch_url"
    description: str = "Fetch content from a URL asynchronously"
    args_schema: type[BaseModel] = FetchURLInput

    def _run(self, url: str) -> str:
        """Sync implementation (fallback)."""
        import requests
        response = requests.get(url, timeout=10)
        return response.text

    async def _arun(self, url: str) -> str:
        """Async implementation."""
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=10.0)
            return response.text

Concurrent Execution Patterns

Parallel Agent Execution

Run multiple agents concurrently:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage

async def run_agent_task(agent, prompt: str, task_name: str):
    """Run a single agent task."""
    print(f"Starting {task_name}...")
    result = await agent.ainvoke({
        "messages": [HumanMessage(content=prompt)]
    })
    print(f"{task_name} complete!")
    return result

async def parallel_agents():
    """Run multiple agents in parallel."""
    agent1 = create_agent(model="openai:gpt-4o")
    agent2 = create_agent(model="openai:gpt-4o")
    agent3 = create_agent(model="openai:gpt-4o")

    # Execute all agents concurrently
    results = await asyncio.gather(
        run_agent_task(agent1, "Summarize Python", "Task 1"),
        run_agent_task(agent2, "Summarize JavaScript", "Task 2"),
        run_agent_task(agent3, "Summarize Rust", "Task 3"),
    )

    for i, result in enumerate(results, 1):
        print(f"\nResult {i}:")
        print(result["messages"][-1].content)

asyncio.run(parallel_agents())

Concurrent Tool Execution

Execute multiple tools concurrently:

import asyncio
from langchain.tools import tool
import httpx

@tool
async def fetch_url_async(url: str) -> str:
    """Fetch URL content asynchronously."""
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        return response.text[:100]  # First 100 chars

async def fetch_multiple_urls():
    """Fetch multiple URLs concurrently."""
    urls = [
        "https://example.com",
        "https://example.org",
        "https://example.net",
    ]

    # Execute all fetches concurrently
    results = await asyncio.gather(*[
        fetch_url_async.ainvoke({"url": url})
        for url in urls
    ])

    for url, result in zip(urls, results):
        print(f"{url}: {result[:50]}...")

asyncio.run(fetch_multiple_urls())

Pipeline Pattern

Chain async operations in a pipeline:

import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

async def fetch_data(query: str) -> str:
    """Fetch data asynchronously."""
    await asyncio.sleep(1)
    return f"Raw data for: {query}"

async def process_data(data: str) -> str:
    """Process data asynchronously."""
    await asyncio.sleep(1)
    return data.upper()

async def analyze_with_llm(data: str) -> str:
    """Analyze with LLM."""
    model = init_chat_model("openai:gpt-4o")
    response = await model.ainvoke([
        HumanMessage(content=f"Analyze this: {data}")
    ])
    return response.content

async def pipeline(query: str):
    """Execute async pipeline."""
    print(f"Processing: {query}")

    # Sequential async operations
    data = await fetch_data(query)
    print(f"Fetched: {data}")

    processed = await process_data(data)
    print(f"Processed: {processed}")

    analysis = await analyze_with_llm(processed)
    print(f"Analysis: {analysis}")

    return analysis

asyncio.run(pipeline("customer feedback"))

Parallel Pipelines

Run multiple pipelines concurrently:

import asyncio
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage

async def process_item(item: str, model) -> dict:
    """Process a single item through pipeline."""
    # Step 1: Preprocess
    await asyncio.sleep(0.5)
    preprocessed = item.upper()

    # Step 2: Analyze with LLM
    response = await model.ainvoke([
        HumanMessage(content=f"Analyze: {preprocessed}")
    ])

    return {
        "item": item,
        "analysis": response.content
    }

async def parallel_pipelines():
    """Process multiple items concurrently."""
    model = init_chat_model("openai:gpt-4o")
    items = ["item1", "item2", "item3", "item4"]

    # Process all items concurrently
    results = await asyncio.gather(*[
        process_item(item, model)
        for item in items
    ])

    for result in results:
        print(f"{result['item']}: {result['analysis']}")

asyncio.run(parallel_pipelines())

Error Handling in Async

Try-Except in Async

Handle errors in async operations:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage

async def safe_agent_call(agent, prompt: str):
    """Execute agent with error handling."""
    try:
        result = await agent.ainvoke({
            "messages": [HumanMessage(content=prompt)]
        })
        return result
    except Exception as e:
        print(f"Error: {e}")
        return None

async def main():
    agent = create_agent(model="openai:gpt-4o")
    result = await safe_agent_call(agent, "Hello")
    if result:
        print(result["messages"][-1].content)

asyncio.run(main())

Handling Multiple Errors

Handle errors in concurrent operations:

import asyncio
from langchain.agents import create_agent
from langchain.messages import HumanMessage

async def safe_task(agent, prompt: str, task_id: int):
    """Execute task with error handling."""
    try:
        result = await agent.ainvoke({
            "messages": [HumanMessage(content=prompt)]
        })
        return {"task_id": task_id, "result": result, "error": None}
    except Exception as e:
        return {"task_id": task_id, "result": None, "error": str(e)}

async def parallel_with_errors():
    """Handle errors in parallel execution."""
    agent = create_agent(model="openai:gpt-4o")

    results = await asyncio.gather(
        safe_task(agent, "Valid prompt", 1),
        safe_task(agent, "Another valid prompt", 2),
        safe_task(agent, "Yet another prompt", 3),
        return_exceptions=True  # Don't fail fast
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")
        elif result["error"]:
            print(f"Task {result['task_id']} error: {result['error']}")
        else:
            print(f"Task {result['task_id']} success")

asyncio.run(parallel_with_errors())

Best Practices

Use asyncio.gather for Parallel Execution

# Execute multiple operations concurrently
results = await asyncio.gather(
    operation1(),
    operation2(),
    operation3(),
)

Use asyncio.TaskGroup for Complex Coordination

async def complex_workflow():
    """Use TaskGroup for complex async coordination."""
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(operation1())
        task2 = tg.create_task(operation2())
        task3 = tg.create_task(operation3())

    # All tasks complete here
    return task1.result(), task2.result(), task3.result()

Set Timeouts for Async Operations

async def with_timeout():
    """Execute with timeout."""
    try:
        result = await asyncio.wait_for(
            agent.ainvoke({"messages": [HumanMessage(content="Hello")]}),
            timeout=30.0
        )
        return result
    except asyncio.TimeoutError:
        print("Operation timed out")
        return None

Common Mistakes

Mistake: Using Sync Methods in Async Context

# Wrong - blocks event loop
async def bad_async():
    result = agent.invoke({"messages": [...]})  # Blocking!
    return result

# Right - non-blocking
async def good_async():
    result = await agent.ainvoke({"messages": [...]})
    return result

Mistake: Not Awaiting Async Calls

# Wrong - doesn't wait for completion
async def bad_await():
    result = agent.ainvoke({"messages": [...]})  # Returns coroutine
    print(result)  # Prints coroutine object, not result

# Right - awaits completion
async def good_await():
    result = await agent.ainvoke({"messages": [...]})
    print(result)  # Prints actual result

Mistake: Sequential Instead of Parallel

# Wrong - sequential (slow)
async def slow_execution():
    result1 = await operation1()
    result2 = await operation2()
    result3 = await operation3()
    return [result1, result2, result3]

# Right - parallel (fast)
async def fast_execution():
    results = await asyncio.gather(
        operation1(),
        operation2(),
        operation3(),
    )
    return results

Related Patterns

  • Streaming Patterns - Async streaming for real-time responses
  • Error Handling - Error handling in async operations
  • Persistence Patterns - Async persistence operations

Install with Tessl CLI

npx tessl i tessl/pypi-langchain

docs

index.md

quickstart.md

tile.json