Low-level orchestration framework for building stateful, multi-actor applications with LLMs
In-depth guide for building complex workflows with StateGraph and Graph.
Only include what nodes need to share:
const State = Annotation.Root({
input: Annotation<string>,
output: Annotation<string>
});const State = Annotation.Root({
// Core data
query: Annotation<string>,
results: Annotation<any[]>({
reducer: (a, b) => a.concat(b),
default: () => []
}),
// Metadata
metadata: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b}),
default: () => ({})
}),
// Tracking
attempts: Annotation<number>({ default: () => 0 }),
errors: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
});const InternalState = Annotation.Root({
query: Annotation<string>,
intermediateResults: Annotation<any[]>(),
processingSteps: Annotation<string[]>()
});
const InputState = Annotation.Root({
query: Annotation<string>
});
const OutputState = Annotation.Root({
results: Annotation<any[]>
});
const graph = new StateGraph(InternalState, {
input: InputState,
output: OutputState
});const validateNode = (state: State) => {
const errors = [];
if (!state.query) errors.push("Query required");
if (state.query.length < 3) errors.push("Query too short");
return {
isValid: errors.length === 0,
errors
};
};const transformNode = (state: State) => {
return {
processed: state.data.map(item => ({
...item,
normalized: normalize(item.value)
}))
};
};const aggregateNode = (state: State) => {
return {
summary: {
total: state.results.length,
successful: state.results.filter(r => r.success).length,
failed: state.results.filter(r => !r.success).length
}
};
};const router = (state: State) => {
const typeHandlers = {
query: "queryHandler",
command: "commandHandler",
event: "eventHandler"
};
return typeHandlers[state.type] || "defaultHandler";
};const priorityRouter = (state: State) => {
if (state.priority === "urgent") return "urgentQueue";
if (state.priority === "high") return "highQueue";
return "normalQueue";
};const complexRouter = (state: State) => {
if (state.requiresAuth && !state.isAuthenticated) {
return "authenticate";
}
if (state.requiresValidation && !state.isValidated) {
return "validate";
}
if (state.isReady) {
return "process";
}
return "prepare";
};Parallel processing with join:
graph
.addNode("split", splitNode)
.addNode("processA", processANode)
.addNode("processB", processBNode)
.addNode("join", joinNode)
.addEdge(START, "split")
.addEdge("split", "processA")
.addEdge("split", "processB")
.addEdge("processA", "join")
.addEdge("processB", "join")
.addEdge("join", END);const afterValidation = (state: State) => {
return state.isValid ? "process" : "handleError";
};
graph
.addNode("validate", validateNode)
.addNode("process", processNode)
.addNode("handleError", errorHandler)
.addConditionalEdges("validate", afterValidation)
.addEdge("handleError", END)
.addEdge("process", END);const shouldContinue = (state: State) => {
if (state.done) return END;
if (state.attempts >= state.maxAttempts) return "failed";
return "process";
};
graph
.addNode("process", processNode)
.addNode("failed", failureNode)
.addEdge(START, "process")
.addConditionalEdges("process", shouldContinue, {
process: "process",
failed: "failed"
});import { Send } from "@langchain/langgraph";
const State = Annotation.Root({
items: Annotation<string[]>,
results: Annotation<any[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
});
const fanOut = (state: State) => {
return state.items.map(item =>
new Send("process", { items: [item], results: [] })
);
};
const processNode = (state: State) => ({
results: [processItem(state.items[0])]
});
graph
.addNode("process", processNode)
.addConditionalEdges(START, fanOut)
.addEdge("process", END);const dynamicRouter = (state: State) => {
const sends = [];
if (state.needsValidation) {
sends.push(new Send("validate", state));
}
if (state.needsEnrichment) {
sends.push(new Send("enrich", state));
}
return sends.length > 0 ? sends : END;
};const subworkflow = new StateGraph(SubState)
.addNode("step1", step1Node)
.addNode("step2", step2Node)
.compile();
const mainGraph = new StateGraph(MainState)
.addNode("prepare", prepareNode)
.addNode("subprocess", subworkflow) // Use as node
.addNode("finalize", finalizeNode)
.addEdge(START, "prepare")
.addEdge("prepare", "subprocess")
.addEdge("subprocess", "finalize")
.addEdge("finalize", END);For advanced use cases requiring manual state control:
class Graph<N extends string = typeof START | typeof END> {
constructor();
addNode<K extends string>(
key: K,
action: RunnableLike<RunInput, RunOutput>,
options?: AddNodeOptions
): Graph<N | K>;
addEdge(
startKey: N | typeof START,
endKey: N | typeof END
): this;
addConditionalEdges(
source: N,
path: RunnableLike<RunInput, string | Send | (string | Send)[]>,
pathMap?: Record<string, N | typeof END>
): this;
setEntryPoint(key: N): this;
setFinishPoint(key: N): this;
compile(options?: {
checkpointer?: BaseCheckpointSaver | false;
interruptBefore?: N[];
interruptAfter?: N[];
name?: string;
}): CompiledGraph<N>;
}Use Graph when you need fine-grained control over state passing between nodes.
Add multiple nodes in sequence:
addSequence<K extends string>(
nodes: Record<K, RunnableLike<S, U>>
): this;const graph = new StateGraph(State)
.addSequence({
step1: (s) => ({ value: s.value + 1 }),
step2: (s) => ({ value: s.value * 2 }),
step3: (s) => ({ value: s.value + 10 })
})
.addEdge(START, "step1")
.addEdge("step3", END)
.compile();Add multiple nodes at once:
addNode<K extends string>(
nodes: Record<K, RunnableLike<S, U>>
): this;
addNode<K extends string>(
nodes: [key: K, action: RunnableLike<S, U>, options?: StateGraphAddNodeOptions][]
): this;// Object syntax
graph.addNode({
node1: node1Fn,
node2: node2Fn,
node3: node3Fn
});
// Array syntax with options
graph.addNode([
["node1", node1Fn, { retryPolicy: { maxAttempts: 3 } }],
["node2", node2Fn, { cachePolicy: true }]
]);Use Zod schemas for state validation:
import { z } from "zod";
const StateSchema = z.object({
count: z.number().default(0),
messages: z.array(z.string()).default([])
});
const graph = new StateGraph(StateSchema)
.addNode("increment", (state) => ({
count: state.count + 1,
messages: state.messages.concat(["incremented"])
}))
.compile();
// Input is validated against schema
await graph.invoke({ count: 5 });Low-level conditional edge implementation:
class Branch<IO, N extends string> {
constructor(options: {
path: RunnableLike<IO, BranchPathReturnValue>;
pathMap?: Record<string, N | typeof END> | (N | typeof END)[];
});
}
type BranchPathReturnValue =
| string
| Send
| (string | Send)[]
| Promise<string | Send | (string | Send)[]>;interface StateGraphAddNodeOptions<Nodes extends string = string> {
retryPolicy?: RetryPolicy;
cachePolicy?: CachePolicy | boolean;
input?: AnnotationRoot<any>;
metadata?: Record<string, unknown>;
subgraphs?: Pregel<any, any>[];
ends?: Nodes[];
defer?: boolean;
}graph.addNode("fetchData", fetchDataNode, {
retryPolicy: {
maxAttempts: 3,
initialInterval: 1000,
backoffFactor: 2,
retryOn: (error) => error.message.includes("timeout")
},
cachePolicy: {
keyFunc: (state) => state.userId,
ttl: 300000 // 5 minutes
},
ends: ["processData", "handleError"] // For Command routing
});import { StateGraph, Annotation, Send, Command, START, END } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";
// Complex state with multiple field types
const WorkflowState = Annotation.Root({
// Input
items: Annotation<string[]>,
// Processing state
processed: Annotation<any[]>({
reducer: (a, b) => a.concat(b),
default: () => []
}),
// Aggregated results
successCount: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
}),
// Error tracking
errors: Annotation<Array<{ item: string, error: string }>>({
reducer: (a, b) => a.concat(b),
default: () => []
}),
// Metadata
metadata: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
})
});
// Fan-out to process items in parallel
const fanOut = (state: typeof WorkflowState.State) => {
return state.items.map(item =>
new Send("processItem", {
items: [item],
processed: [],
successCount: 0,
errors: [],
metadata: {}
})
);
};
// Process individual item with error handling
const processItem = async (state: typeof WorkflowState.State) => {
const item = state.items[0];
try {
const result = await riskyOperation(item);
return {
processed: [result],
successCount: 1,
metadata: { [`item_${item}`]: "success" }
};
} catch (error) {
return {
errors: [{ item, error: error.message }],
metadata: { [`item_${item}`]: "failed" }
};
}
};
// Aggregate results
const aggregate = (state: typeof WorkflowState.State) => {
const total = state.items.length;
const success = state.successCount;
const failed = state.errors.length;
return {
metadata: {
...state.metadata,
summary: {
total,
success,
failed,
successRate: success / total
}
}
};
};
// Conditional routing based on results
const shouldRetry = (state: typeof WorkflowState.State) => {
const failedItems = state.errors.map(e => e.item);
if (failedItems.length === 0) {
return END;
}
if (state.metadata.retryAttempt >= 3) {
return "reportFailures";
}
// Retry failed items
return new Command({
update: {
items: failedItems,
metadata: { retryAttempt: (state.metadata.retryAttempt || 0) + 1 }
},
goto: "retry"
});
};
// Build the graph
const graph = new StateGraph(WorkflowState)
.addNode("processItem", processItem, {
retryPolicy: {
maxAttempts: 2,
initialInterval: 500
}
})
.addNode("aggregate", aggregate)
.addNode("retry", (s) => s) // Placeholder for retry logic
.addNode("reportFailures", reportFailuresNode)
.addConditionalEdges(START, fanOut)
.addEdge("processItem", "aggregate")
.addConditionalEdges("aggregate", shouldRetry, {
retry: "retry",
reportFailures: "reportFailures"
})
.addEdge("retry", START) // Loop back
.addEdge("reportFailures", END)
.compile({
checkpointer: new MemorySaver(),
interruptBefore: ["reportFailures"] // Human review before final failure report
});
// Execute
const result = await graph.invoke({
items: ["item1", "item2", "item3", "item4", "item5"]
}, {
configurable: { thread_id: "batch-1" }
});This guide provides comprehensive coverage of graph construction patterns in LangGraph.