Low-level orchestration framework for building stateful, multi-actor applications with LLMs
State persistence and time-travel debugging with checkpoint savers and stores.
Checkpoint savers provide persistence for graph state, enabling:
Abstract interface for checkpoint persistence:
abstract class BaseCheckpointSaver {
abstract put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata
): Promise<RunnableConfig>;
abstract getTuple(
config: RunnableConfig
): Promise<CheckpointTuple | undefined>;
abstract list(
config: RunnableConfig,
options?: CheckpointListOptions
): AsyncIterableIterator<CheckpointTuple>;
abstract putWrites(
config: RunnableConfig,
writes: PendingWrite[],
taskId: string
): Promise<void>;
}In-memory checkpoint storage for development and testing:
class MemorySaver extends BaseCheckpointSaver {
constructor();
}Note: MemorySaver stores checkpoints in memory only. Data is lost when the process ends. For production, use persistent checkpoint savers like PostgresSaver or SqliteSaver from @langchain/langgraph-checkpoint-* packages.
import { StateGraph, Annotation } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";
const State = Annotation.Root({
count: Annotation<number>({ default: () => 0 }),
messages: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
});
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("process", (s) => ({
count: s.count + 1,
messages: [`Processed ${s.count + 1}`]
}))
.addEdge(START, "process")
.addEdge("process", END)
.compile({ checkpointer });
// Each invocation saves state
await graph.invoke({ count: 0 }, {
configurable: { thread_id: "conversation-1" }
});
// State is persisted and can be resumed
const result = await graph.invoke({ count: 10 }, {
configurable: { thread_id: "conversation-1" }
});Threads isolate state between different conversations or workflows.
// User 1 conversation
await graph.invoke(input1, {
configurable: { thread_id: "user-1" }
});
// User 2 conversation (independent state)
await graph.invoke(input2, {
configurable: { thread_id: "user-2" }
});
// Continue user 1
await graph.invoke(null, {
configurable: { thread_id: "user-1" }
});// Per-user threads
{ thread_id: `user-${userId}` }
// Per-conversation threads
{ thread_id: `conversation-${conversationId}` }
// Hierarchical threads
{ thread_id: `org-${orgId}/user-${userId}/session-${sessionId}` }const state = await graph.getState({
configurable: { thread_id: "thread-1" }
});
console.log(state.values); // Current state values
console.log(state.next); // Next nodes to execute
console.log(state.tasks); // Pending tasks
console.log(state.metadata); // Checkpoint metadatainterface StateSnapshot {
values: Record<string, unknown>;
next: string[];
config: RunnableConfig;
metadata?: CheckpointMetadata;
tasks: PregelTaskDescription[];
createdAt?: string;
parentConfig?: RunnableConfig;
}
interface CheckpointMetadata {
source: "input" | "loop" | "update";
step: number;
writes: Record<string, unknown> | null;
parents?: Record<string, string>;
[key: string]: unknown;
}Access complete history of checkpoints:
// Get all checkpoints for a thread
const history = [];
for await (const snapshot of graph.getStateHistory({
configurable: { thread_id: "thread-1" }
})) {
history.push(snapshot);
}
// Checkpoints are returned in reverse chronological order (newest first)
console.log(history[0].values); // Most recent state
console.log(history[1].values); // Previous state
console.log(history[2].values); // State before thatinterface CheckpointListOptions {
before?: RunnableConfig; // List checkpoints before this one
limit?: number; // Maximum number of checkpoints
filter?: Record<string, unknown>; // Filter by metadata
}// Get last 10 checkpoints
const recent = [];
for await (const snapshot of graph.getStateHistory(config, { limit: 10 })) {
recent.push(snapshot);
}
// Get checkpoints before a specific point
const beforeCheckpoint = [];
for await (const snapshot of graph.getStateHistory(config, {
before: specificConfig
})) {
beforeCheckpoint.push(snapshot);
}
// Filter by metadata
for await (const snapshot of graph.getStateHistory(config, {
filter: { source: "loop" }
})) {
console.log(snapshot);
}// Get state history
const snapshots = [];
for await (const snapshot of graph.getStateHistory({
configurable: { thread_id: "thread-1" }
})) {
snapshots.push(snapshot);
}
// Resume from 3rd checkpoint (index 2)
const previousSnapshot = snapshots[2];
await graph.invoke(null, previousSnapshot.config);
// This continues execution from that point in timeUpdate state programmatically without executing nodes:
// Update state as if a node ran
await graph.updateState(
{ configurable: { thread_id: "thread-1" } },
{ count: 100, messages: ["Manual update"] },
"processNode" // Act as if this node made the update
);
// Continue execution from updated state
const result = await graph.invoke(null, {
configurable: { thread_id: "thread-1" }
});import { Command } from "@langchain/langgraph";
// Update and specify next node
await graph.updateState(
config,
new Command({
update: { count: 50 },
goto: "specificNode"
})
);Stores provide long-term memory that persists across different workflow runs, separate from checkpoint state.
abstract class BaseStore {
abstract get(
namespace: NameSpacePath,
key: string
): Promise<Item | null>;
abstract put(
namespace: NameSpacePath,
key: string,
value: unknown
): Promise<void>;
abstract delete(
namespace: NameSpacePath,
key: string
): Promise<void>;
abstract search(
namespace: NameSpacePath,
options?: SearchOptions
): AsyncIterableIterator<Item>;
abstract listNamespaces(
prefix?: NameSpacePath,
options?: ListNamespacesOptions
): AsyncIterableIterator<NameSpacePath>;
abstract batch(
operations: Operation[]
): Promise<(Item | null)[]>;
}
type NameSpacePath = string[];
interface Item {
namespace: NameSpacePath;
key: string;
value: unknown;
createdAt: string;
updatedAt: string;
}class InMemoryStore extends BaseStore {
constructor();
}import { InMemoryStore } from "@langchain/langgraph-checkpoint";
import { getStore } from "@langchain/langgraph";
const store = new InMemoryStore();
const node = async (state: State) => {
const userStore = getStore();
if (!userStore) return state;
// Load user preferences
const prefs = await userStore.get(
["users", state.userId],
"preferences"
);
console.log("User preferences:", prefs?.value);
// Update preferences
await userStore.put(
["users", state.userId],
"preferences",
{
theme: state.selectedTheme,
language: state.selectedLanguage,
lastUpdated: new Date().toISOString()
}
);
return { preferencesLoaded: true };
};
const graph = new StateGraph(State)
.addNode("loadPreferences", node)
.compile({
checkpointer: new MemorySaver(),
store // Provide store to graph
});// Organize data hierarchically
await store.put(["org", "acme", "users"], "user123", userData);
await store.put(["org", "acme", "settings"], "theme", themeData);
await store.put(["org", "beta", "users"], "user456", userData);interface SearchOptions {
filter?: MatchCondition;
limit?: number;
offset?: number;
}// Search within namespace
for await (const item of store.search(["users"], {
filter: { active: true },
limit: 100
})) {
console.log(item.key, item.value);
}const operations = [
{ type: "get", namespace: ["users"], key: "user1" },
{ type: "put", namespace: ["users"], key: "user2", value: { name: "Alice" } },
{ type: "delete", namespace: ["users"], key: "user3" }
];
const results = await store.batch(operations);// List all namespaces
for await (const ns of store.listNamespaces()) {
console.log("Namespace:", ns);
}
// List with depth limit
for await (const ns of store.listNamespaces([], { maxDepth: 2 })) {
console.log("Namespace:", ns);
}interface Checkpoint {
v: number;
id: string;
ts: string;
channel_values: Record<string, unknown>;
channel_versions: Record<string, number>;
versions_seen: Record<string, Record<string, number>>;
pending_sends?: SendProtocol[];
}
interface CheckpointTuple {
config: RunnableConfig;
checkpoint: Checkpoint;
metadata: CheckpointMetadata;
parentConfig?: RunnableConfig;
pendingWrites?: PendingWrite[];
}import {
StateGraph,
Annotation,
MessagesAnnotation,
START,
END
} from "@langchain/langgraph";
import { MemorySaver, InMemoryStore } from "@langchain/langgraph-checkpoint";
import { getStore } from "@langchain/langgraph";
import { HumanMessage, AIMessage } from "@langchain/core/messages";
import { ChatOpenAI } from "@langchain/openai";
// Extend MessagesAnnotation with user info
const ChatState = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: messagesStateReducer,
default: () => []
}),
userId: Annotation<string>(),
userName: Annotation<string>()
});
const model = new ChatOpenAI();
// Load user profile from store
const loadProfile = async (state: typeof ChatState.State) => {
const store = getStore();
if (!store) return state;
const profile = await store.get(
["users", state.userId],
"profile"
);
return {
userName: profile?.value?.name || "Guest"
};
};
// Main chat node
const chat = async (state: typeof ChatState.State) => {
// Prepend user context
const systemMessage = new SystemMessage(
`You are chatting with ${state.userName}. Be friendly and personalized.`
);
const response = await model.invoke([
systemMessage,
...state.messages
]);
return { messages: [response] };
};
// Save conversation summary to store
const saveSummary = async (state: typeof ChatState.State) => {
const store = getStore();
if (!store) return state;
const summary = {
lastMessage: state.messages[state.messages.length - 1].content,
messageCount: state.messages.length,
timestamp: new Date().toISOString()
};
await store.put(
["users", state.userId],
"lastConversation",
summary
);
return state;
};
// Build chatbot
const checkpointer = new MemorySaver();
const store = new InMemoryStore();
const chatbot = new StateGraph(ChatState)
.addNode("loadProfile", loadProfile)
.addNode("chat", chat)
.addNode("saveSummary", saveSummary)
.addEdge(START, "loadProfile")
.addEdge("loadProfile", "chat")
.addEdge("chat", "saveSummary")
.addEdge("saveSummary", END)
.compile({ checkpointer, store });
// Usage
const userId = "user-123";
// First conversation
await chatbot.invoke({
messages: [new HumanMessage("Hi! My name is Alice.")],
userId
}, {
configurable: { thread_id: `chat-${userId}-1` }
});
// Continue conversation
await chatbot.invoke({
messages: [new HumanMessage("What's my name?")],
userId
}, {
configurable: { thread_id: `chat-${userId}-1` }
});
// Response will know the name is Alice
// New conversation in different thread but same user
await chatbot.invoke({
messages: [new HumanMessage("Hello again!")],
userId
}, {
configurable: { thread_id: `chat-${userId}-2` }
});
// Will load same user profile from store
// Check conversation history
const history = [];
for await (const snapshot of chatbot.getStateHistory({
configurable: { thread_id: `chat-${userId}-1` }
}, { limit: 5 })) {
history.push({
messages: snapshot.values.messages,
timestamp: snapshot.createdAt
});
}
console.log("Recent conversation snapshots:", history);
// Check what's in the store
const lastConv = await store.get(["users", userId], "lastConversation");
console.log("Last conversation summary:", lastConv?.value);This guide provides comprehensive coverage of checkpointing and persistence in LangGraph.