Low-level orchestration framework for building stateful, multi-actor applications with LLMs
Frequently used patterns and best practices for building LangGraph applications.
Collect items from multiple nodes:
const State = Annotation.Root({
results: Annotation<string[]>({
reducer: (current, update) => [...current, ...update],
default: () => []
})
});
const node1 = (state: State) => ({ results: ["result1"] });
const node2 = (state: State) => ({ results: ["result2"] });
// Final state.results = ["result1", "result2"]Sum, multiply, or find max/min:
const State = Annotation.Root({
total: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
}),
max: Annotation<number>({
reducer: (a, b) => Math.max(a, b),
default: () => -Infinity
})
});Combine metadata or configuration:
const State = Annotation.Root({
metadata: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
})
});
const node1 = (s: State) => ({ metadata: { source: "api" } });
const node2 = (s: State) => ({ metadata: { timestamp: Date.now() } });
// Final metadata = { source: "api", timestamp: 1234567890 }Maintain unique collections:
const State = Annotation.Root({
tags: Annotation<Set<string>>({
reducer: (a, b) => new Set([...a, ...b]),
default: () => new Set()
})
});Update based on current value:
const State = Annotation.Root({
score: Annotation<number>({
reducer: (current, update) => {
// Only update if new score is higher
return Math.max(current, update);
},
default: () => 0
})
});Route based on a condition:
const router = (state: State) => {
return state.isValid ? "success" : "failure";
};
graph
.addNode("validate", validateNode)
.addNode("success", successNode)
.addNode("failure", failureNode)
.addConditionalEdges("validate", router, {
success: "success",
failure: "failure"
});Route to one of many paths:
const classifier = (state: State) => {
if (state.type === "urgent") return "highPriority";
if (state.type === "normal") return "mediumPriority";
return "lowPriority";
};
graph.addConditionalEdges("classify", classifier, {
highPriority: "urgentHandler",
mediumPriority: "normalHandler",
lowPriority: "lowHandler"
});Always provide a default path:
const router = (state: State) => {
const handler = state.handlers.get(state.requestType);
return handler || "default";
};
graph.addConditionalEdges("route", router, {
typeA: "handlerA",
typeB: "handlerB",
default: "defaultHandler"
});Use Send for map-reduce patterns:
import { Send } from "@langchain/langgraph";
const fanOut = (state: State) => {
// Create one Send per item
return state.items.map(item =>
new Send("process", { item, results: [] })
);
};
const graph = new StateGraph(State)
.addNode("process", processNode)
.addNode("aggregate", aggregateNode)
.addConditionalEdges(START, fanOut)
.addEdge("process", "aggregate")
.compile();Process data through stages:
const graph = new StateGraph(State)
.addNode("extract", extractNode)
.addNode("transform", transformNode)
.addNode("load", loadNode)
.addEdge(START, "extract")
.addEdge("extract", "transform")
.addEdge("transform", "load")
.addEdge("load", END)
.compile();Execute independent tasks concurrently:
const graph = new StateGraph(State)
.addNode("taskA", taskANode)
.addNode("taskB", taskBNode)
.addNode("taskC", taskCNode)
.addNode("combine", combineNode)
.addEdge(START, "taskA")
.addEdge(START, "taskB")
.addEdge(START, "taskC")
.addEdge("taskA", "combine")
.addEdge("taskB", "combine")
.addEdge("taskC", "combine")
.addEdge("combine", END)
.compile();const State = Annotation.Root({
attempts: Annotation<number>({ default: () => 0 }),
success: Annotation<boolean>({ default: () => false })
});
const shouldRetry = (state: State) => {
if (state.success) return END;
if (state.attempts >= 3) return "failed";
return "retry";
};
const retryNode = (state: State) => ({
attempts: state.attempts + 1,
success: Math.random() > 0.5 // Simulated operation
});
graph
.addNode("retry", retryNode)
.addNode("failed", failedNode)
.addEdge(START, "retry")
.addConditionalEdges("retry", shouldRetry, {
retry: "retry",
failed: "failed"
});const State = Annotation.Root({
data: Annotation<any>,
isValid: Annotation<boolean>(),
validationErrors: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
});
const afterValidation = (state: State) => {
return state.isValid ? "process" : "fix";
};
graph
.addNode("validate", validateNode)
.addNode("fix", autoFixNode)
.addNode("process", processNode)
.addConditionalEdges("validate", afterValidation, {
process: "process",
fix: "fix"
})
.addEdge("fix", "validate"); // Loop back after fixReasoning and acting iteratively:
import { createReactAgent } from "@langchain/langgraph/prebuilt";
const agent = createReactAgent({
llm: model,
tools: [searchTool, calculatorTool]
});
// Agent will:
// 1. Reason about the task
// 2. Decide which tool to use
// 3. Execute tool
// 4. Observe result
// 5. Repeat until task completeBuild your own agent loop:
import { ToolNode, toolsCondition } from "@langchain/langgraph/prebuilt";
const callModel = async (state: MessagesState) => {
const response = await model.invoke(state.messages);
return { messages: [response] };
};
const graph = new StateGraph(MessagesAnnotation)
.addNode("agent", callModel)
.addNode("tools", new ToolNode(tools))
.addEdge(START, "agent")
.addConditionalEdges("agent", toolsCondition) // Returns "tools" or END
.addEdge("tools", "agent")
.compile();Multiple specialized agents:
const State = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: messagesStateReducer,
default: () => []
}),
task: Annotation<string>()
});
const router = (state: State) => {
const lastMessage = state.messages[state.messages.length - 1];
if (needsResearch(lastMessage)) return "researcher";
if (needsCoding(lastMessage)) return "coder";
return END;
};
graph
.addNode("researcher", researchAgent)
.addNode("coder", coderAgent)
.addNode("supervisor", supervisorNode)
.addEdge(START, "supervisor")
.addConditionalEdges("supervisor", router, {
researcher: "researcher",
coder: "coder"
})
.addEdge("researcher", "supervisor")
.addEdge("coder", "supervisor");import { MemorySaver } from "@langchain/langgraph-checkpoint";
const memory = new MemorySaver();
const chatbot = new StateGraph(MessagesAnnotation)
.addNode("chat", chatNode)
.addEdge(START, "chat")
.addEdge("chat", END)
.compile({ checkpointer: memory });
// Each user gets their own thread
await chatbot.invoke(
{ messages: [new HumanMessage("Hi")] },
{ configurable: { thread_id: "user-123" } }
);
// Continue conversation
await chatbot.invoke(
{ messages: [new HumanMessage("Tell me more")] },
{ configurable: { thread_id: "user-123" } }
);Save and restore from specific points:
// Get current state
const state = await graph.getState({
configurable: { thread_id: "thread-1" }
});
// Get state history
for await (const snapshot of graph.getStateHistory({
configurable: { thread_id: "thread-1" }
})) {
console.log(snapshot.values);
}
// Resume from specific checkpoint
await graph.invoke(null, snapshot.config);Store data across sessions:
import { InMemoryStore } from "@langchain/langgraph-checkpoint";
import { getStore } from "@langchain/langgraph";
const store = new InMemoryStore();
const node = async (state: State) => {
const userStore = getStore();
// Load user preferences
const prefs = await userStore.get(
["users", state.userId],
"preferences"
);
// Save updated preferences
await userStore.put(
["users", state.userId],
"preferences",
{ theme: "dark", lang: "en" }
);
return state;
};
const graph = new StateGraph(State)
.addNode("process", node)
.compile({ store });const resilientNode = async (state: State) => {
try {
const result = await riskyOperation(state.data);
return { result, error: null };
} catch (error) {
return { result: null, error: error.message };
}
};const State = Annotation.Root({
data: Annotation<any>(),
error: Annotation<string | null>({ default: () => null })
});
const checkError = (state: State) => {
return state.error ? "handleError" : "continue";
};
graph
.addNode("process", processNode)
.addNode("handleError", errorHandler)
.addNode("continue", continueNode)
.addConditionalEdges("process", checkError, {
handleError: "handleError",
continue: "continue"
});const processWithRetry = task({
name: "process",
retry: {
maxAttempts: 3,
initialInterval: 1000,
backoffFactor: 2,
jitter: true
}
}, async (data: any) => {
return await unreliableAPI(data);
});import { writer } from "@langchain/langgraph";
const longRunningNode = async (state: State) => {
writer({ progress: 0, stage: "starting" });
await step1();
writer({ progress: 0.33, stage: "step1" });
await step2();
writer({ progress: 0.66, stage: "step2" });
await step3();
writer({ progress: 1.0, stage: "complete" });
return { done: true };
};
// Consume progress
for await (const progress of await graph.stream(input, {
streamMode: "custom"
})) {
console.log(`${progress.stage}: ${progress.progress * 100}%`);
}for await (const chunk of await graph.stream(input, {
streamMode: "updates"
})) {
// chunk = { nodeName: stateUpdate }
updateUI(chunk);
}import { describe, it, expect } from "vitest";
describe("processNode", () => {
it("should increment count", () => {
const state = { count: 5 };
const result = processNode(state);
expect(result.count).toBe(6);
});
});it("should complete workflow", async () => {
const result = await graph.invoke({ count: 0 });
expect(result.count).toBe(10);
expect(result.done).toBe(true);
});const mockLLM = {
invoke: async (messages) => new AIMessage("Mocked response")
};
const agent = createReactAgent({
llm: mockLLM,
tools: [mockTool]
});
const result = await agent.invoke({
messages: [new HumanMessage("Test")]
});const expensiveTask = task({
name: "compute",
cachePolicy: {
keyFunc: (input) => JSON.stringify(input),
ttl: 300000 // 5 minutes
}
}, async (input: any) => {
return await expensiveComputation(input);
});Maximize throughput with concurrent nodes:
// These run in parallel automatically
.addEdge(START, "node1")
.addEdge(START, "node2")
.addEdge(START, "node3")Process items in batches:
const batchNode = async (state: State) => {
const batches = chunk(state.items, 10); // 10 items per batch
const results = [];
for (const batch of batches) {
const batchResults = await Promise.all(
batch.map(item => processItem(item))
);
results.push(...batchResults);
}
return { results };
};