Low-level orchestration framework for building stateful, multi-actor applications with LLMs
Understanding LangGraph's core building blocks: graphs, nodes, edges, and state.
A graph is a workflow consisting of nodes (steps) connected by edges (transitions). LangGraph provides two graph types:
High-level graph with automatic state management. Use this for most applications.
class StateGraph<StateDefinition> {
constructor(stateDefinition: StateDefinition);
addNode(name: string, action: NodeType<StateDefinition>): this;
addEdge(from: string, to: string): this;
addConditionalEdges(source: string, path: Function, pathMap?: object): this;
compile(options?: PregelOptions): CompiledStateGraph;
}When to use: Any application where you need shared state between nodes.
import { StateGraph, Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
value: Annotation<number>
});
const graph = new StateGraph(State)
.addNode("step1", (s) => ({ value: s.value + 1 }))
.addNode("step2", (s) => ({ value: s.value * 2 }))
.compile();Lower-level graph for manual state control. Use when you need fine-grained control.
class Graph<N extends string = typeof START | typeof END> {
addNode(key: string, action: Runnable): this;
addEdge(startKey: string, endKey: string): this;
compile(): CompiledGraph;
}When to use: Advanced scenarios requiring custom state handling logic.
Nodes are the processing units in your graph. They receive state and return updates.
type NodeFunction = (state: State, config?: RunnableConfig) => StateUpdate | Promise<StateUpdate>;// Synchronous node
const syncNode = (state: State) => {
return { count: state.count + 1 };
};
// Async node
const asyncNode = async (state: State) => {
const result = await fetchData(state.query);
return { data: result };
};
// Node with config
const configNode = (state: State, config: RunnableConfig) => {
const userId = config.configurable?.user_id;
return { user: userId };
};
// Node returning multiple fields
const multiFieldNode = (state: State) => {
return {
processed: true,
timestamp: new Date().toISOString(),
result: computeResult(state)
};
};Nodes execute in order determined by edges. Within a step, nodes that can run in parallel will execute concurrently.
const graph = new StateGraph(State)
.addNode("parallel1", node1)
.addNode("parallel2", node2)
.addNode("sequential", node3)
.addEdge(START, "parallel1")
.addEdge(START, "parallel2") // Runs in parallel with parallel1
.addEdge("parallel1", "sequential")
.addEdge("parallel2", "sequential") // Sequential waits for both
.compile();Edges define transitions between nodes. Three types exist:
Fixed transitions from one node to another.
.addEdge("nodeA", "nodeB")Use for: Sequential workflows where the next step is always the same.
Dynamic routing based on state.
const routingFunction = (state: State) => {
if (state.value > 10) return "high";
if (state.value > 5) return "medium";
return "low";
};
graph.addConditionalEdges("router", routingFunction, {
high: "highHandler",
medium: "mediumHandler",
low: "lowHandler"
});Use for: Branching logic, validation flows, decision trees.
const START: "__start__"; // Graph entry point
const END: "__end__"; // Graph exit pointEvery graph must have at least one edge from START and one edge to END.
State is the data that flows through your graph and is shared between nodes.
Define state using Annotation.Root():
import { Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
// Simple field (last value wins)
count: Annotation<number>,
// Field with default value
status: Annotation<string>({
default: () => "pending"
}),
// Field with reducer (custom aggregation)
items: Annotation<string[]>({
reducer: (current, update) => current.concat(update),
default: () => []
})
});type State = typeof StateAnnotation.State;
// { count: number, status: string, items: string[] }
type Update = typeof StateAnnotation.Update;
// { count?: number, status?: string, items?: string[] }Reducers determine how multiple writes to the same field are combined:
// Array concatenation
reducer: (a, b) => a.concat(b)
// Numeric addition
reducer: (a, b) => a + b
// Object merging
reducer: (a, b) => ({ ...a, ...b })
// Set union
reducer: (a, b) => new Set([...a, ...b])
// Custom logic
reducer: (current, update) => {
if (update > current) return update;
return current;
}Default behavior: Without a reducer, last value wins. If multiple nodes write to the same field in one step, an error is thrown.
Channels are the internal state storage primitives. Most users don't interact with channels directly—they're created automatically from your Annotation.
You typically don't need to think about channels unless:
See: State Management API for channel details.
LangGraph provides special support for message-based workflows:
Pre-built state annotation for chat applications:
import { MessagesAnnotation } from "@langchain/langgraph";
// Equivalent to:
// Annotation.Root({
// messages: Annotation<BaseMessage[]>({
// reducer: messagesStateReducer,
// default: () => []
// })
// })function messagesStateReducer(
left: Messages,
right: Messages
): BaseMessage[];
const addMessages: typeof messagesStateReducer; // AliasIntelligently merges messages:
import { HumanMessage, AIMessage, SystemMessage } from "@langchain/core/messages";
const chatGraph = new StateGraph(MessagesAnnotation)
.addNode("bot", async (state) => {
const response = await llm.invoke(state.messages);
return { messages: [response] };
})
.compile();
await chatGraph.invoke({
messages: [
new SystemMessage("You are a helpful assistant"),
new HumanMessage("Hello!")
]
});Compilation converts your graph definition into an executable runtime:
const app = graph.compile(options);interface PregelOptions {
checkpointer?: BaseCheckpointSaver; // For persistence
store?: BaseStore; // For long-term memory
cache?: BaseCache; // For caching
interruptBefore?: string[]; // Pause before nodes
interruptAfter?: string[]; // Pause after nodes
}class CompiledStateGraph {
invoke(input: Input, options?: PregelOptions): Promise<Output>;
stream(input: Input, options?: PregelOptions): IterableReadableStream;
getState(config: RunnableConfig): Promise<StateSnapshot>;
updateState(config: RunnableConfig, values: Update): Promise<RunnableConfig>;
getStateHistory(config: RunnableConfig): AsyncIterableIterator<StateSnapshot>;
}Understanding how LangGraph executes your graph:
invoke() or stream()A superstep is one iteration of the execution loop. Multiple nodes can execute in a single superstep if they're independent.
// This has 3 supersteps:
// 1. START → nodeA
// 2. nodeA → nodeB, nodeC (parallel)
// 3. nodeB, nodeC → END
const graph = new StateGraph(State)
.addNode("nodeA", nodeA)
.addNode("nodeB", nodeB)
.addNode("nodeC", nodeC)
.addEdge(START, "nodeA")
.addEdge("nodeA", "nodeB")
.addEdge("nodeA", "nodeC")
.addEdge("nodeB", END)
.addEdge("nodeC", END)
.compile();Maximum number of supersteps before termination:
await graph.invoke(input, {
recursionLimit: 50 // Default: 25
});Prevents infinite loops in cyclic graphs.
LangGraph provides full TypeScript type inference:
const State = Annotation.Root({
count: Annotation<number>,
items: Annotation<string[]>
});
// Types are automatically inferred
const node = (state: typeof State.State) => {
// state.count is number
// state.items is string[]
return {
count: state.count + 1, // ✓ Type-safe
items: ["new"] // ✓ Type-safe
// invalid: "wrong" // ✗ Type error
};
};import { StateType, UpdateType } from "@langchain/langgraph";
const State = Annotation.Root({
data: Annotation<{ value: number }>
});
type S = StateType<typeof State.spec>; // { data: { value: number } }
type U = UpdateType<typeof State.spec>; // { data?: { value: number } }.addEdge(START, "split")
.addEdge("split", "worker1")
.addEdge("split", "worker2")
.addEdge("split", "worker3")
.addEdge("worker1", "merge")
.addEdge("worker2", "merge")
.addEdge("worker3", "merge")
.addEdge("merge", END)const validator = (state: State) => {
return state.isValid ? "process" : "error";
};
graph
.addNode("validate", validateNode)
.addNode("process", processNode)
.addNode("error", errorNode)
.addConditionalEdges("validate", validator, {
process: "process",
error: "error"
});const shouldContinue = (state: State) => {
return state.done ? END : "process";
};
graph
.addNode("process", processNode)
.addEdge(START, "process")
.addConditionalEdges("process", shouldContinue, {
process: "process" // Loop back
});