Building applications with LLMs through composability
—
Complete API signatures for all major LangChain classes and functions, organized by module.
def create_agent(
model: str | BaseChatModel,
*,
tools: Sequence[BaseTool | Callable | dict] | None = None,
system_prompt: str | SystemMessage | None = None,
middleware: Sequence[AgentMiddleware] | None = None,
response_format: ResponseFormat | type | None = None,
state_schema: type[AgentState] | None = None,
context_schema: type | None = None,
checkpointer: Checkpointer | None = None,
store: BaseStore | None = None,
interrupt_before: list[str] | None = None,
interrupt_after: list[str] | None = None,
debug: bool = False,
name: str | None = None,
cache: BaseCache | None = None,
) -> CompiledStateGraphCreate an agent with a language model, optional tools, and optional middleware.
Parameters:
model: Model identifier string (e.g., "openai:gpt-4o") or BaseChatModel instancetools: List of tools available to the agent (BaseTool instances, @tool decorated functions, or dicts)system_prompt: System instructions as string or SystemMessagemiddleware: List of middleware plugins for customizing behaviorresponse_format: Schema for structured output (Pydantic model, dataclass, TypedDict, or JSON schema)state_schema: Custom state schema that extends AgentStatecontext_schema: Schema for runtime contextcheckpointer: State persistence mechanism (e.g., MemorySaver)store: Cross-thread data storageinterrupt_before: Node names to pause execution beforeinterrupt_after: Node names to pause execution afterdebug: Enable verbose loggingname: Optional name for the graph (used when adding as subgraph)cache: Cache for execution resultsReturns: CompiledStateGraph - Runnable agent graph
class AgentState(TypedDict):
"""
Base state schema for agent execution.
Attributes:
messages: List of conversation messages
structured_response: Present when using response_format, contains the structured output
jump_to: Ephemeral field for control flow, used by middleware to redirect execution
"""
messages: list[AnyMessage]
structured_response: Any # Optional
jump_to: str # Optional, ephemeral# Synchronous methods
agent.invoke(input: dict, config: dict | None = None) -> dict
agent.stream(input: dict, config: dict | None = None, stream_mode: str = "values") -> Iterator[dict]
agent.batch(inputs: list[dict], config: dict | list[dict] | None = None) -> list[dict]
# Asynchronous methods
agent.ainvoke(input: dict, config: dict | None = None) -> dict
agent.astream(input: dict, config: dict | None = None, stream_mode: str = "values") -> AsyncIterator[dict]
agent.abatch(inputs: list[dict], config: dict | list[dict] | None = None) -> list[dict]# Strategy types
ToolStrategy # Use tool calls for structured output
ProviderStrategy # Use provider's native structured output (JSON mode)
AutoStrategy # Auto-detect best strategy (default)
# Union type
ResponseFormat = ToolStrategy | ProviderStrategy | AutoStrategyclass StructuredOutputError(Exception):
"""Base error for structured output failures."""
pass
class MultipleStructuredOutputsError(StructuredOutputError):
"""Raised when multiple output tools are called but only one expected."""
pass
class StructuredOutputValidationError(StructuredOutputError):
"""Raised when structured output fails schema validation."""
passfrom typing import Annotated
from langchain.agents.middleware.types import OmitFromInput, OmitFromOutput, PrivateStateAttr
# Field excluded from input schema
field: Annotated[int, OmitFromInput]
# Field excluded from output schema
field: Annotated[str, OmitFromOutput]
# Field completely private (not in input or output)
field: Annotated[dict, PrivateStateAttr]def init_chat_model(
model: str | None = None,
*,
model_provider: str | None = None,
configurable_fields: Literal["any"] | list[str] | tuple[str, ...] | None = None,
config_prefix: str | None = None,
**kwargs: Any
) -> BaseChatModelInitialize a chat model from any supported provider using string identifiers.
Parameters:
model: Model identifier in format "provider:model-name" (e.g., "openai:gpt-4o")model_provider: Override provider detectionconfigurable_fields: Which parameters can be set at runtime via config["configurable"]config_prefix: Prefix for configurable parameter names**kwargs: Provider-specific parameters (temperature, max_tokens, api_key, etc.)Returns: BaseChatModel instance
Common kwargs:
temperature (float): Controls randomness (0.0 = deterministic, 2.0 = maximum)max_tokens (int): Maximum tokens to generatetimeout (float): Request timeout in secondsmax_retries (int): Maximum retry attemptsbase_url (str): Custom API endpointrate_limiter (BaseRateLimiter): Rate limiter instanceopenai_api_key, anthropic_api_key)class BaseChatModel:
"""
Base class for chat models.
All chat models support synchronous and asynchronous execution,
streaming, and batch processing.
"""
def invoke(
self,
messages: list[AnyMessage],
**kwargs: Any
) -> AIMessage: ...
async def ainvoke(
self,
messages: list[AnyMessage],
**kwargs: Any
) -> AIMessage: ...
def stream(
self,
messages: list[AnyMessage],
**kwargs: Any
) -> Iterator[AIMessageChunk]: ...
async def astream(
self,
messages: list[AnyMessage],
**kwargs: Any
) -> AsyncIterator[AIMessageChunk]: ...
def batch(
self,
messages: list[list[AnyMessage]],
**kwargs: Any
) -> list[AIMessage]: ...
async def abatch(
self,
messages: list[list[AnyMessage]],
**kwargs: Any
) -> list[AIMessage]: ...
def bind_tools(
self,
tools: Sequence[BaseTool | dict],
**kwargs: Any
) -> BaseChatModel: ...
def with_structured_output(
self,
schema: type | dict,
**kwargs: Any
) -> Runnable: ...def init_embeddings(
model: str,
*,
provider: str | None = None,
**kwargs: Any
) -> EmbeddingsInitialize an embeddings model from any supported provider.
Parameters:
model: Model identifier in format "provider:model-name" (e.g., "openai:text-embedding-3-small")provider: Override provider detection**kwargs: Provider-specific parameters (api_key, dimensions, batch_size, etc.)Returns: Embeddings instance
Common kwargs:
api_key (str): Provider API key (provider-specific)dimensions (int): Output embedding dimensions (if supported)batch_size (int): Batch size for embedding multiple documentstimeout (float): Request timeout in secondsmax_retries (int): Maximum retry attemptsclass Embeddings:
"""
Base class for embeddings models.
All embeddings models support embedding single queries and
multiple documents, with both synchronous and asynchronous methods.
"""
def embed_query(self, text: str) -> list[float]: ...
def embed_documents(self, texts: list[str]) -> list[list[float]]: ...
async def aembed_query(self, text: str) -> list[float]: ...
async def aembed_documents(self, texts: list[str]) -> list[list[float]]: ...class HumanMessage(BaseMessage):
"""
User/human input message.
Attributes:
content: str | list[ContentBlock] - Message content (text or multimodal)
id: str | None - Unique message identifier
name: str | None - Optional sender name
metadata: dict - Additional metadata
response_metadata: dict - Response-specific metadata
"""
content: str | list[ContentBlock]
id: str | None = None
name: str | None = None
metadata: dict = {}
response_metadata: dict = {}
class AIMessage(BaseMessage):
"""
LLM/assistant response message.
Attributes:
content: str | list[ContentBlock] - Message content
id: str | None - Unique message identifier
name: str | None - Optional assistant name
tool_calls: list[ToolCall] - Tool/function calls made by the AI
invalid_tool_calls: list[InvalidToolCall] - Malformed tool calls
usage_metadata: UsageMetadata | None - Token usage information
metadata: dict - Additional metadata
response_metadata: dict - Provider-specific response data
"""
content: str | list[ContentBlock]
id: str | None = None
name: str | None = None
tool_calls: list[ToolCall] = []
invalid_tool_calls: list[InvalidToolCall] = []
usage_metadata: UsageMetadata | None = None
metadata: dict = {}
response_metadata: dict = {}
class SystemMessage(BaseMessage):
"""
System instruction message.
Attributes:
content: str | list[ContentBlock] - System instruction content
id: str | None - Unique message identifier
name: str | None - Optional system message name
metadata: dict - Additional metadata
"""
content: str | list[ContentBlock]
id: str | None = None
name: str | None = None
metadata: dict = {}
class ToolMessage(BaseMessage):
"""
Tool execution result message.
Attributes:
content: str | list[ContentBlock] - Tool execution result
tool_call_id: str - ID linking to the corresponding ToolCall
name: str | None - Tool name
status: str | None - Execution status ("success", "error")
metadata: dict - Additional metadata
"""
content: str | list[ContentBlock]
tool_call_id: str
name: str | None = None
status: str | None = None
metadata: dict = {}
class RemoveMessage(BaseMessage):
"""
Directive to remove a message from context.
Attributes:
id: str - ID of the message to remove
"""
id: strclass UsageMetadata:
"""
Token usage information.
Attributes:
input_tokens: int - Number of input tokens
output_tokens: int - Number of output tokens
total_tokens: int - Total tokens (input + output)
input_token_details: InputTokenDetails | None - Detailed input breakdown
output_token_details: OutputTokenDetails | None - Detailed output breakdown
"""
input_tokens: int
output_tokens: int
total_tokens: int
input_token_details: InputTokenDetails | None = None
output_token_details: OutputTokenDetails | None = Nonedef trim_messages(
messages: list[BaseMessage],
*,
max_tokens: int | None = None,
token_counter: Callable | None = None,
strategy: Literal["first", "last"] = "last",
allow_partial: bool = False,
start_on: str | list[str] | None = None,
end_on: str | list[str] | None = None,
include_system: bool = True,
) -> list[BaseMessage]Trim message list by token count or message count.
Parameters:
messages: List of messages to trimmax_tokens: Maximum token count to keeptoken_counter: Function to count tokens (defaults to approximate counter)strategy: Keep "first" or "last" messagesallow_partial: Allow partial message contentstart_on: Message type(s) to start onend_on: Message type(s) to end oninclude_system: Whether to always include system messagesReturns: Trimmed list of messages
def tool(
func: Callable | None = None,
*,
name: str | None = None,
description: str | None = None,
return_direct: bool = False,
args_schema: type[BaseModel] | None = None,
infer_schema: bool = True
) -> BaseToolDecorator to convert functions to LangChain tools.
Parameters:
func: Function to convert (or None if using decorator with arguments)name: Custom tool name (defaults to function name)description: Custom description (defaults to function docstring)return_direct: Whether to return result directly to userargs_schema: Pydantic model for input validationinfer_schema: Whether to infer schema from type hintsReturns: BaseTool instance
Usage:
@tool
def function_name(arg1: type1, arg2: type2) -> return_type:
"""Tool description.
Args:
arg1: Description of arg1
arg2: Description of arg2
Returns:
Description of return value
"""
...class BaseTool:
"""
Abstract base class for tools.
Subclasses must implement _run() or _arun() methods for execution.
"""
name: str
description: str
args_schema: type[BaseModel] | None = None
return_direct: bool = False
verbose: bool = False
def invoke(self, input: dict | str, config: dict | None = None) -> Any: ...
async def ainvoke(self, input: dict | str, config: dict | None = None) -> Any: ...
def _run(self, *args: Any, **kwargs: Any) -> Any: ...
async def _arun(self, *args: Any, **kwargs: Any) -> Any: ...Required Attributes:
name: Tool identifierdescription: Tool description for LLMOptional Attributes:
args_schema: Pydantic model for input validationreturn_direct: Whether to return result directly to userverbose: Enable verbose loggingMethods:
invoke(): Execute tool synchronously (don't override)ainvoke(): Execute tool asynchronously (don't override)_run(): Implementation method for sync execution (override this)_arun(): Implementation method for async execution (override this)class ToolException(Exception):
"""
Exception for tool execution errors.
When raised in a tool, the error message is passed back
to the LLM in a ToolMessage for error handling.
"""
passdef before_agent(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that runs before agent execution."""
...
def after_agent(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that runs after agent execution."""
...
def before_model(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that runs before model invocation."""
...
def after_model(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that runs after model invocation."""
...
def wrap_model_call(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that wraps model invocation."""
...
def wrap_tool_call(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that wraps tool invocation."""
...
def dynamic_prompt(func: Callable) -> AgentMiddleware:
"""Decorator for middleware that modifies the system prompt."""
...from typing import Annotated
# Inject agent state
InjectedState = Annotated[AgentState, "injected_state"]
# Inject store
InjectedStore = Annotated[BaseStore, "injected_store"]
# Inject tool-specific argument (hidden from LLM)
InjectedToolArg = Annotated[Any, "injected_tool_arg"]
# Inject tool call ID
InjectedToolCallId = Annotated[str, "injected_tool_call_id"]
# Tool runtime context
class ToolRuntime:
"""Runtime context available to tools via dependency injection."""
state: AgentState
store: BaseStore | None
config: dict
tool_call_id: strclass InMemoryRateLimiter(BaseRateLimiter):
"""
In-memory rate limiter for controlling API request rates.
Attributes:
requests_per_second: Maximum requests per second
check_every_n_seconds: How often to check rate (default: 0.1)
max_bucket_size: Maximum bucket size for burst (default: 1.0)
"""
def __init__(
self,
*,
requests_per_second: float,
check_every_n_seconds: float = 0.1,
max_bucket_size: float = 1.0
): ...Usage:
from langchain.rate_limiters import InMemoryRateLimiter
# Create rate limiter (10 requests per minute)
rate_limiter = InMemoryRateLimiter(requests_per_second=10/60)
# Use with model
model = init_chat_model("openai:gpt-4o", rate_limiter=rate_limiter)# From langchain_core.language_models
from langchain_core.language_models import BaseChatModel
# From langchain_core.messages
from langchain_core.messages import (
BaseMessage,
AnyMessage,
AIMessage,
AIMessageChunk,
HumanMessage,
SystemMessage,
ToolMessage,
RemoveMessage
)
# From langchain_core.tools
from langchain_core.tools import BaseTool, ToolException
# From langchain_core.embeddings
from langchain_core.embeddings import Embeddings
# From langchain_core.rate_limiters
from langchain_core.rate_limiters import BaseRateLimiter
# From langgraph
from langgraph.checkpoint import Checkpointer
from langgraph.store import BaseStore
from langgraph.graph import CompiledStateGraph
# From typing
from typing import Any, Callable, Iterator, AsyncIterator, Sequence, Literal# Agents
from langchain.agents import create_agent, AgentState
# Models
from langchain.chat_models import init_chat_model
from langchain.embeddings import init_embeddings
# Messages
from langchain.messages import (
HumanMessage, AIMessage, SystemMessage, ToolMessage, RemoveMessage,
trim_messages
)
# Tools
from langchain.tools import tool, BaseTool, ToolException
# Middleware
from langchain.agents.middleware import (
before_agent, after_agent, before_model, after_model,
wrap_model_call, wrap_tool_call, dynamic_prompt
)
# Rate Limiting
from langchain.rate_limiters import InMemoryRateLimiter
# Checkpointing (from langgraph)
from langgraph.checkpoint.memory import MemorySaver
# Store (from langgraph)
from langgraph.store import InMemoryStoreInstall with Tessl CLI
npx tessl i tessl/pypi-langchain