Building stateful, multi-actor applications with LLMs
A low-level orchestration framework for building, managing, and deploying long-running, stateful agents and workflows with large language models. LangGraph provides essential infrastructure for creating durable, fault-tolerant agent systems with comprehensive state management, human-in-the-loop capabilities, and seamless integration with LangChain's ecosystem.
pip install langgraphfrom langgraph.graph import StateGraph, MessageGraph, MessagesState, START, END
from langgraph.types import Command, Send, interrupt, StreamModefrom langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# Define state schema
class State(TypedDict):
messages: list[str]
count: int
# Create graph builder
builder = StateGraph(State)
# Add nodes (processing steps)
def process_node(state: State) -> State:
return {
"messages": state["messages"] + ["processed"],
"count": state["count"] + 1
}
builder.add_node("process", process_node)
# Define edges (flow control)
builder.add_edge(START, "process")
builder.add_edge("process", END)
# Compile to executable graph
graph = builder.compile()
# Execute synchronously
result = graph.invoke({"messages": [], "count": 0})
print(result) # {'messages': ['processed'], 'count': 1}
# Or stream output
for chunk in graph.stream({"messages": [], "count": 0}):
print(chunk)LangGraph is built on a graph-based execution model inspired by Google's Pregel and Apache Beam:
LangGraph provides two main API levels:
StateGraph, MessageGraph): Builder pattern for defining graphs declarativelyBuild stateful workflows using the StateGraph and MessageGraph classes. Define nodes as processing functions, add edges for flow control, and compile to executable graphs.
class StateGraph:
def __init__(
self,
state_schema: type,
context_schema: type | None = None,
input_schema: type | None = None,
output_schema: type | None = None
): ...
def add_node(self, key: str, action: RunnableLike) -> Self: ...
def add_edge(self, start_key: str | list[str], end_key: str) -> Self: ...
def add_conditional_edges(
self,
source: str,
path: Callable,
path_map: dict[Any, str] | None = None
) -> Self: ...
def compile(
self,
checkpointer: Checkpointer = None,
store: BaseStore = None,
cache: BaseCache = None,
interrupt_before: Sequence[str] | All = None,
interrupt_after: Sequence[str] | All = None,
debug: bool = False
) -> CompiledStateGraph: ...
class MessageGraph(StateGraph):
"""Pre-configured StateGraph for message-based workflows"""
...Execute compiled graphs synchronously or asynchronously with streaming support. Control execution flow, handle interrupts, and manage state during runtime.
class CompiledStateGraph:
def invoke(self, input: InputT, config: RunnableConfig | None = None) -> OutputT: ...
async def ainvoke(self, input: InputT, config: RunnableConfig | None = None) -> OutputT: ...
def stream(
self,
input: InputT,
config: RunnableConfig | None = None,
stream_mode: StreamMode = "updates"
) -> Iterator: ...
async def astream(
self,
input: InputT,
config: RunnableConfig | None = None,
stream_mode: StreamMode = "updates"
) -> AsyncIterator: ...Read and update graph state during and after execution. Access state snapshots, historical states, and modify state programmatically.
class CompiledStateGraph:
def get_state(self, config: RunnableConfig, subgraphs: bool = False) -> StateSnapshot: ...
async def aget_state(self, config: RunnableConfig, subgraphs: bool = False) -> StateSnapshot: ...
def get_state_history(
self,
config: RunnableConfig,
filter: dict | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> Iterator[StateSnapshot]: ...
def update_state(
self,
config: RunnableConfig,
values: dict | Any | None = None,
as_node: str | None = None
) -> RunnableConfig: ...Implement custom state management patterns using channels. Channels control how state values are stored, updated, and aggregated.
class LastValue:
"""Store the most recent value, overwriting previous values"""
...
class BinaryOperatorAggregate:
"""Aggregate values using a binary operator (e.g., addition, list concatenation)"""
...
class Topic:
"""Collect all values as a sequence"""
...
class EphemeralValue:
"""Temporary value cleared after each step"""
...Pause graph execution at specific points to allow human inspection and intervention. Resume execution with modified state or user input.
def interrupt(value: Any) -> Any:
"""Interrupt execution from within a node with a resumable value"""
...
class Command:
"""Command to update state and control flow upon resumption"""
graph: str | None = None
update: Any | None = None
resume: dict[str, Any] | Any | None = None
goto: Send | Sequence[Send] = ()Interrupts and Human-in-the-Loop
Define workflows using Python function decorators instead of graph builders. Create parallelizable tasks and entrypoints with automatic graph generation.
@task(
name: str | None = None,
retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
cache_policy: CachePolicy | None = None,
)
def my_task(...) -> T: ...
@entrypoint(
checkpointer: BaseCheckpointSaver | None = None,
store: BaseStore | None = None,
cache: BaseCache | None = None,
context_schema: type[ContextT] | None = None,
)
def my_workflow(input: InputT) -> OutputT: ...Specialized utilities for managing conversation messages in agent workflows. Merge, add, and manipulate message lists with automatic deduplication.
def add_messages(
left: Messages,
right: Messages,
format: Literal["langchain-openai"] | None = None
) -> Messages:
"""Merge message lists, updating existing messages by ID"""
...
class MessagesState(TypedDict):
"""Predefined state schema with messages field"""
messages: Annotated[list[MessageLikeRepresentation], add_messages]Configure automatic retry behavior for failed nodes and caching policies to optimize performance.
class RetryPolicy(NamedTuple):
"""Configuration for retrying failed nodes"""
initial_interval: float = 0.5
backoff_factor: float = 2.0
max_interval: float = 128.0
max_attempts: int = 3
jitter: bool = True
retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool] = default_retry_on
class CachePolicy:
"""Configuration for caching node results"""
key_func: Callable[..., str | bytes] = default_cache_key
ttl: int | None = NoneAccess runtime context, stores, and custom stream writers from within nodes.
def get_config() -> RunnableConfig:
"""Get current RunnableConfig in execution context"""
...
def get_store() -> BaseStore:
"""Get current BaseStore from within node"""
...
def get_stream_writer() -> StreamWriter:
"""Get StreamWriter for custom stream output"""
...
def get_runtime(context_schema: type[ContextT] | None = None) -> Runtime[ContextT]:
"""Get runtime for current graph execution"""
...Direct control over the graph execution engine for advanced use cases requiring fine-grained control over nodes, channels, and execution.
class Pregel:
"""Low-level graph execution engine"""
def __init__(
self,
nodes: dict,
channels: dict,
input_channels: str | list,
output_channels: str | list,
...
): ...
class NodeBuilder:
"""Builder for configuring Pregel nodes"""
def subscribe_to(self, *channels: str) -> Self: ...
def read_from(self, *channels: str) -> Self: ...
def do(self, action: RunnableLike) -> Self: ...
def write_to(self, *channels: str, mapper: Callable | None = None) -> Self: ...Core type definitions, constants, and data structures used throughout LangGraph.
# Constants
START: str # First (virtual) node
END: str # Last (virtual) node
TAG_NOSTREAM: str # Disable streaming tag
TAG_HIDDEN: str # Hide from tracing tag
# Stream modes
StreamMode = Literal["values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"]
# State snapshot
class StateSnapshot(NamedTuple):
values: dict[str, Any] | Any
next: tuple[str, ...]
config: RunnableConfig
metadata: CheckpointMetadata | None
created_at: str | None
parent_config: RunnableConfig | None
tasks: tuple[PregelTask, ...]
interrupts: tuple[Interrupt, ...]
# Send messages to specific nodes
class Send:
node: str
arg: AnyException classes and error codes for handling graph execution failures.
class GraphRecursionError(RecursionError):
"""Raised when graph exceeds maximum recursion steps"""
...
class InvalidUpdateError(Exception):
"""Raised when attempting invalid channel update"""
...
class EmptyChannelError(Exception):
"""Raised when reading from empty channel"""
...
class ErrorCode:
GRAPH_RECURSION_LIMIT: str
INVALID_CONCURRENT_GRAPH_UPDATE: str
INVALID_GRAPH_NODE_RETURN_VALUE: str
MULTIPLE_SUBGRAPHS: str
INVALID_CHAT_HISTORY: strInstall with Tessl CLI
npx tessl i tessl/pypi-langgraph