Low-level orchestration framework for building stateful, multi-actor applications with LLMs
State persistence and time-travel capabilities through checkpoint savers and stores. Supports state snapshots, history navigation, resumption from any checkpoint, and long-term memory storage.
Checkpoint savers provide persistence for graph state, enabling time-travel debugging, state resumption, and long-running workflows.
Abstract base class for checkpoint persistence implementations.
abstract class BaseCheckpointSaver {
abstract put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata
): Promise<RunnableConfig>;
abstract getTuple(
config: RunnableConfig
): Promise<CheckpointTuple | undefined>;
abstract list(
config: RunnableConfig,
options?: CheckpointListOptions
): AsyncIterableIterator<CheckpointTuple>;
abstract putWrites(
config: RunnableConfig,
writes: PendingWrite[],
taskId: string
): Promise<void>;
}In-memory checkpoint storage for development and testing.
class MemorySaver extends BaseCheckpointSaver {
constructor();
put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata
): Promise<RunnableConfig>;
getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined>;
list(
config: RunnableConfig,
options?: CheckpointListOptions
): AsyncIterableIterator<CheckpointTuple>;
putWrites(
config: RunnableConfig,
writes: PendingWrite[],
taskId: string
): Promise<void>;
}import { StateGraph, Annotation } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";
const State = Annotation.Root({
count: Annotation<number>
});
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("increment", (s) => ({ count: s.count + 1 }))
.addEdge("__start__", "increment")
.addEdge("increment", "__end__")
.compile({ checkpointer });
// First execution
await graph.invoke({ count: 0 }, {
configurable: { thread_id: "user-123" }
});
// Second execution - resumes from checkpoint
await graph.invoke({ count: 5 }, {
configurable: { thread_id: "user-123" }
});// Thread 1
await graph.invoke(input1, {
configurable: { thread_id: "thread-1" }
});
// Thread 2 (independent)
await graph.invoke(input2, {
configurable: { thread_id: "thread-2" }
});
// Thread 1 continues
await graph.invoke(null, {
configurable: { thread_id: "thread-1" }
});State snapshot data structure.
interface Checkpoint {
v: number;
id: string;
ts: string;
channel_values: Record<string, unknown>;
channel_versions: Record<string, number>;
versions_seen: Record<string, Record<string, number>>;
pending_sends?: SendProtocol[];
}Metadata associated with checkpoints.
interface CheckpointMetadata {
source: "input" | "loop" | "update";
step: number;
writes: Record<string, unknown> | null;
parents?: Record<string, string>;
[key: string]: unknown;
}Complete checkpoint with config and metadata.
interface CheckpointTuple {
config: RunnableConfig;
checkpoint: Checkpoint;
metadata: CheckpointMetadata;
parentConfig?: RunnableConfig;
pendingWrites?: PendingWrite[];
}Options for listing checkpoints.
interface CheckpointListOptions {
before?: RunnableConfig;
limit?: number;
filter?: Record<string, unknown>;
}Write operation pending execution.
type PendingWrite = [string, unknown];import { MemorySaver } from "@langchain/langgraph-checkpoint";
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("step1", (s) => ({ value: s.value + 1 }))
.addNode("step2", (s) => ({ value: s.value * 2 }))
.addEdge("__start__", "step1")
.addEdge("step1", "step2")
.addEdge("step2", "__end__")
.compile({ checkpointer });
// Execute
await graph.invoke({ value: 5 }, {
configurable: { thread_id: "1" }
});
// Get all checkpoints
const config = { configurable: { thread_id: "1" } };
const history = [];
for await (const snapshot of graph.getStateHistory(config)) {
history.push(snapshot);
}
console.log(history);
// [
// { values: { value: 12 }, next: [], ... }, // Final state
// { values: { value: 6 }, next: ["step2"], ... }, // After step1
// { values: { value: 5 }, next: ["step1"], ... } // Initial state
// ]// Get state at specific point
const config = { configurable: { thread_id: "1" } };
const states = [];
for await (const snapshot of graph.getStateHistory(config)) {
states.push(snapshot);
}
// Resume from second checkpoint
const resumeConfig = states[1].config;
await graph.invoke(null, resumeConfig);const recentCheckpoints = [];
for await (const snapshot of graph.getStateHistory(config, {
limit: 10,
filter: { source: "loop" }
})) {
recentCheckpoints.push(snapshot);
}Long-term memory storage across workflow runs.
Abstract interface for long-term memory storage.
abstract class BaseStore {
abstract get(
namespace: NameSpacePath,
key: string
): Promise<Item | null>;
abstract put(
namespace: NameSpacePath,
key: string,
value: unknown
): Promise<void>;
abstract delete(
namespace: NameSpacePath,
key: string
): Promise<void>;
abstract search(
namespace: NameSpacePath,
options?: SearchOptions
): AsyncIterableIterator<Item>;
abstract listNamespaces(
prefix?: NameSpacePath,
options?: ListNamespacesOptions
): AsyncIterableIterator<NameSpacePath>;
abstract batch(
operations: Operation[]
): Promise<(Item | null)[]>;
}In-memory implementation of BaseStore.
class InMemoryStore extends BaseStore {
constructor();
get(namespace: NameSpacePath, key: string): Promise<Item | null>;
put(namespace: NameSpacePath, key: string, value: unknown): Promise<void>;
delete(namespace: NameSpacePath, key: string): Promise<void>;
search(
namespace: NameSpacePath,
options?: SearchOptions
): AsyncIterableIterator<Item>;
listNamespaces(
prefix?: NameSpacePath,
options?: ListNamespacesOptions
): AsyncIterableIterator<NameSpacePath>;
batch(operations: Operation[]): Promise<(Item | null)[]>;
}Batched store implementation for efficiency.
class AsyncBatchedStore extends BaseStore {
constructor(store: BaseStore, options?: {
maxBatchSize?: number;
maxBatchWaitMs?: number;
});
get(namespace: NameSpacePath, key: string): Promise<Item | null>;
put(namespace: NameSpacePath, key: string, value: unknown): Promise<void>;
delete(namespace: NameSpacePath, key: string): Promise<void>;
search(
namespace: NameSpacePath,
options?: SearchOptions
): AsyncIterableIterator<Item>;
listNamespaces(
prefix?: NameSpacePath,
options?: ListNamespacesOptions
): AsyncIterableIterator<NameSpacePath>;
batch(operations: Operation[]): Promise<(Item | null)[]>;
}Store item with namespace, key, value.
interface Item {
namespace: NameSpacePath;
key: string;
value: unknown;
createdAt: string;
updatedAt: string;
}Hierarchical namespace path.
type NameSpacePath = string[];Store operation types.
type Operation =
| GetOperation
| PutOperation
| DeleteOperation
| SearchOperation
| ListNamespacesOperation;
interface GetOperation {
type: "get";
namespace: NameSpacePath;
key: string;
}
interface PutOperation {
type: "put";
namespace: NameSpacePath;
key: string;
value: unknown;
}
interface DeleteOperation {
type: "delete";
namespace: NameSpacePath;
key: string;
}
interface SearchOperation {
type: "search";
namespace: NameSpacePath;
options?: SearchOptions;
}
interface ListNamespacesOperation {
type: "listNamespaces";
prefix?: NameSpacePath;
options?: ListNamespacesOptions;
}Options for searching store items.
interface SearchOptions {
filter?: MatchCondition;
limit?: number;
offset?: number;
}
interface MatchCondition {
[key: string]: unknown;
}Options for listing namespaces.
interface ListNamespacesOptions {
maxDepth?: number;
limit?: number;
offset?: number;
}import { InMemoryStore } from "@langchain/langgraph-checkpoint";
import { getStore } from "@langchain/langgraph";
const store = new InMemoryStore();
const graph = new StateGraph(State)
.addNode("save", async (state) => {
const store = getStore();
if (store) {
await store.put(["user", state.userId], "preferences", {
theme: "dark",
language: "en"
});
}
return state;
})
.addNode("load", async (state) => {
const store = getStore();
if (store) {
const prefs = await store.get(["user", state.userId], "preferences");
return { ...state, preferences: prefs?.value };
}
return state;
})
.compile({ store });
await graph.invoke({ userId: "123" });const store = new InMemoryStore();
// Save items
await store.put(["users"], "user1", { name: "Alice", age: 30 });
await store.put(["users"], "user2", { name: "Bob", age: 25 });
// Search
for await (const item of store.search(["users"], {
filter: { age: 30 },
limit: 10
})) {
console.log(item.key, item.value);
}const operations: Operation[] = [
{ type: "get", namespace: ["users"], key: "user1" },
{ type: "put", namespace: ["users"], key: "user3", value: { name: "Charlie" } },
{ type: "delete", namespace: ["users"], key: "user2" }
];
const results = await store.batch(operations);for await (const ns of store.listNamespaces([], { maxDepth: 2 })) {
console.log(ns); // ["users"], ["config"], etc.
}Caching for expensive computations.
Abstract interface for cache implementations.
abstract class BaseCache {
abstract lookup(
namespace: string[],
key: string
): Promise<unknown | undefined>;
abstract update(
namespace: string[],
key: string,
value: unknown
): Promise<void>;
}const graph = new StateGraph(State)
.addNode("expensive", expensiveComputation, {
cachePolicy: {
keyFunc: (input) => JSON.stringify(input)
}
})
.compile({ cache: myCache });import { StateGraph, Annotation, MemorySaver } from "@langchain/langgraph";
import { InMemoryStore } from "@langchain/langgraph-checkpoint";
const ChatState = Annotation.Root({
messages: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
}),
userId: Annotation<string>
});
const checkpointer = new MemorySaver();
const store = new InMemoryStore();
const chatbot = new StateGraph(ChatState)
.addNode("loadHistory", async (state) => {
const store = getStore();
if (store) {
const history = await store.get(
["users", state.userId],
"chat_history"
);
return {
messages: history?.value?.messages || []
};
}
return state;
})
.addNode("respond", async (state) => {
const response = await generateResponse(state.messages);
return { messages: [response] };
})
.addNode("saveHistory", async (state) => {
const store = getStore();
if (store) {
await store.put(
["users", state.userId],
"chat_history",
{ messages: state.messages }
);
}
return state;
})
.addEdge("__start__", "loadHistory")
.addEdge("loadHistory", "respond")
.addEdge("respond", "saveHistory")
.addEdge("saveHistory", "__end__")
.compile({ checkpointer, store });
// Use with persistence
await chatbot.invoke({
messages: ["Hello"],
userId: "user-123"
}, {
configurable: { thread_id: "conversation-1" }
});