Low-level orchestration framework for building stateful, multi-actor applications with LLMs
The Pregel execution engine runs graph workflows with support for streaming, interruption, checkpointing, and parallel execution. Multiple stream modes provide different levels of visibility into graph execution.
Core execution engine for graph workflows. Implements superstep-based message-passing computation with checkpointing and streaming support.
class Pregel<
Nodes extends Record<string, PregelNode>,
Channels extends Record<string, BaseChannel>,
ContextType = unknown,
InputType = any,
OutputType = any
> extends Runnable<InputType, OutputType> {
nodes: Nodes;
channels: Channels;
inputChannels: string | string[];
outputChannels: string | string[];
streamChannels?: string | string[];
streamMode?: StreamMode | StreamMode[];
interruptAfter?: string[] | All;
interruptBefore?: string[] | All;
stepTimeout?: number;
checkpointer?: BaseCheckpointSaver | false;
retryPolicy?: RetryPolicy;
store?: BaseStore;
cache?: BaseCache;
name?: string;
constructor(params: PregelParams<Nodes, Channels>);
invoke(
input: InputType,
options?: PregelOptions
): Promise<OutputType>;
stream(
input: InputType,
options?: PregelOptions
): IterableReadableStream<StreamOutput>;
streamEvents(
input: InputType,
options: PregelOptions & { version: "v1" | "v2" }
): IterableReadableStream<StreamEvent>;
getState(
config: LangGraphRunnableConfig,
options?: GetStateOptions
): Promise<StateSnapshot>;
getStateHistory(
config: LangGraphRunnableConfig,
options?: CheckpointListOptions
): AsyncIterableIterator<StateSnapshot>;
updateState(
config: LangGraphRunnableConfig,
values: UpdateType | Command,
asNode?: string
): Promise<RunnableConfig>;
bulkUpdateState(
config: LangGraphRunnableConfig,
supersteps: Array<[string, UpdateType | Command]>
): Promise<void>;
getSubgraphs(
namespace?: string,
recurse?: boolean
): IterableIterator<[string, Pregel]>; // Deprecated
getSubgraphsAsync(
namespace?: string,
recurse?: boolean
): AsyncIterableIterator<[string, Pregel]>; // Deprecated
withConfig(config: Partial<PregelOptions>): this;
validate(): this;
clearCache(): void;
}Constructor parameters for Pregel.
interface PregelParams<
Nodes extends Record<string, PregelNode>,
Channels extends Record<string, BaseChannel>
> {
nodes: Nodes;
channels: Channels;
inputChannels: string | string[];
outputChannels: string | string[];
streamChannels?: string | string[];
streamMode?: StreamMode | StreamMode[];
interruptAfter?: string[] | All;
interruptBefore?: string[] | All;
stepTimeout?: number;
checkpointer?: BaseCheckpointSaver | false;
retryPolicy?: RetryPolicy;
store?: BaseStore;
cache?: BaseCache;
name?: string;
description?: string;
autoValidate?: boolean;
userInterrupt?: unknown;
builder?: unknown;
}Execution options for invoke/stream operations.
interface PregelOptions<
Nodes extends Record<string, PregelNode> = Record<string, PregelNode>,
Channels extends Record<string, BaseChannel> = Record<string, BaseChannel>,
ContextType extends Record<string, any> = Record<string, any>,
TStreamMode extends StreamMode | StreamMode[] | undefined = StreamMode | StreamMode[] | undefined,
TSubgraphs extends boolean = boolean,
TEncoding extends "text/event-stream" | undefined = "text/event-stream" | undefined
> extends RunnableConfig<ContextType> {
streamMode?: TStreamMode;
inputKeys?: keyof Channels | Array<keyof Channels>;
outputKeys?: keyof Channels | Array<keyof Channels>;
interruptBefore?: All | Array<keyof Nodes>;
interruptAfter?: All | Array<keyof Nodes>;
debug?: boolean;
subgraphs?: TSubgraphs;
maxConcurrency?: number;
recursionLimit?: number;
encoding?: TEncoding;
signal?: AbortSignal;
}Different modes for streaming graph execution results.
type StreamMode =
| "values" // Stream complete state after each step
| "updates" // Stream only state changes after each step
| "debug" // Stream detailed execution events for debugging
| "messages" // Stream messages from within nodes
| "custom" // Stream custom events from within nodes
| "checkpoints" // Stream checkpoint snapshots
| "tasks"; // Stream task execution detailsStreams the complete state after each superstep.
for await (const chunk of await graph.stream(input, { streamMode: "values" })) {
console.log(chunk); // Complete state: { messages: [...], count: 5 }
}Streams only the state changes (deltas) produced by each node.
for await (const chunk of await graph.stream(input, { streamMode: "updates" })) {
console.log(chunk); // { nodeName: { messages: ["new message"] } }
}Streams detailed execution events including task execution, channel updates, and checkpoints.
for await (const event of await graph.stream(input, { streamMode: "debug" })) {
console.log(event);
// { type: "task", task: {...}, checkpoint: {...} }
}Streams messages emitted from within nodes (when using writer function).
const graph = new StateGraph(State)
.addNode("generate", async (state, config) => {
config.writer?.("Progress: 50%");
return { result: "done" };
})
.compile();
for await (const [message, metadata] of await graph.stream(input, { streamMode: "messages" })) {
console.log(message); // "Progress: 50%"
}Streams custom data written via the writer function.
for await (const data of await graph.stream(input, { streamMode: "custom" })) {
console.log(data); // Custom streamed data
}Streams checkpoint snapshots at each step.
for await (const snapshot of await graph.stream(input, { streamMode: "checkpoints" })) {
console.log(snapshot);
// { values: {...}, next: ["node1"], config: {...}, tasks: [...] }
}Streams task creation and completion events.
for await (const taskEvent of await graph.stream(input, { streamMode: "tasks" })) {
if ("input" in taskEvent) {
// Task created
console.log(`Task ${taskEvent.name} created`);
} else {
// Task completed
console.log(`Task ${taskEvent.name} completed with result:`, taskEvent.result);
}
}Stream multiple modes simultaneously.
for await (const chunk of await graph.stream(input, {
streamMode: ["values", "updates", "debug"]
})) {
const [namespace, mode, data] = chunk;
console.log(`Mode: ${mode}, Data:`, data);
}interface StateSnapshot {
values: Record<string, unknown>;
next: string[];
config: RunnableConfig;
metadata?: CheckpointMetadata;
tasks: PregelTaskDescription[];
createdAt?: string;
parentConfig?: RunnableConfig;
}
interface PregelTaskDescription {
id: string;
name: string;
interrupts: Interrupt[];
state?: unknown;
}
interface GetStateOptions {
subgraphs?: boolean;
}Static utility class for channel operations.
class Channel {
static subscribeTo(
channel: string,
options?: SingleChannelSubscriptionOptions
): PregelNode;
static subscribeTo(
channels: string[],
options?: MultipleChannelSubscriptionOptions
): PregelNode;
static writeTo(
channels: string[],
writes?: Record<string, WriteValue>
): ChannelWrite;
}
interface SingleChannelSubscriptionOptions {
key?: string;
tags?: string[];
}
interface MultipleChannelSubscriptionOptions {
tags?: string[];
}
type WriteValue = Runnable | ((input: unknown) => unknown) | unknown;
class ChannelWrite {
// Channel write operation for updating channels
}Node definition in Pregel graph.
class PregelNode<RunInput = unknown, RunOutput = unknown> extends Runnable<RunInput, RunOutput> {
channels: string[] | Record<string, string>;
triggers: string[];
writers: ChannelWrite[];
tags?: string[];
metadata?: Record<string, unknown>;
retryPolicy?: RetryPolicy;
cachePolicy?: CachePolicy;
subgraphs?: Pregel<any, any>[];
ends?: string[];
bound?: Runnable<RunInput, RunOutput>;
mapper?: (input: Record<string, any>) => RunInput;
constructor(params: {
channels: string[] | Record<string, string>;
triggers: string[];
writers?: ChannelWrite[];
tags?: string[];
metadata?: Record<string, unknown>;
retryPolicy?: RetryPolicy;
cachePolicy?: CachePolicy;
subgraphs?: Pregel<any, any>[];
ends?: string[];
bound?: Runnable<RunInput, RunOutput>;
mapper?: (input: Record<string, any>) => RunInput;
});
pipe<NewRunOutput>(
coerceable: RunnableLike<RunOutput, NewRunOutput>
): PregelNode<RunInput, NewRunOutput>;
}type Durability = "exit" | "async" | "sync";"exit" - Checkpoint only when graph exits"async" - Asynchronous checkpointing (doesn't block execution)"sync" - Synchronous checkpointing (blocks until checkpoint is saved)import { StateGraph, Annotation } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";
const State = Annotation.Root({
count: Annotation<number>
});
const graph = new StateGraph(State)
.addNode("increment", (state) => ({ count: state.count + 1 }))
.addEdge("__start__", "increment")
.addEdge("increment", "__end__")
.compile({ checkpointer: new MemorySaver() });
const result = await graph.invoke(
{ count: 0 },
{ configurable: { thread_id: "1" } }
);
console.log(result); // { count: 1 }const graph = new StateGraph(State)
.addNode("step1", (s) => ({ count: s.count + 1 }))
.addNode("step2", (s) => ({ count: s.count * 2 }))
.addEdge("__start__", "step1")
.addEdge("step1", "step2")
.addEdge("step2", "__end__")
.compile();
for await (const state of await graph.stream({ count: 5 }, {
streamMode: "values"
})) {
console.log(state);
// First: { count: 6 }
// Then: { count: 12 }
}for await (const update of await graph.stream({ count: 5 }, {
streamMode: "updates"
})) {
console.log(update);
// First: { step1: { count: 6 } }
// Then: { step2: { count: 12 } }
}const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("increment", (s) => ({ count: s.count + 1 }))
.addEdge("__start__", "increment")
.addEdge("increment", "__end__")
.compile({ checkpointer });
// Execute multiple times
await graph.invoke({ count: 0 }, { configurable: { thread_id: "1" } });
await graph.invoke({ count: 5 }, { configurable: { thread_id: "1" } });
// Get state history
const config = { configurable: { thread_id: "1" } };
for await (const snapshot of graph.getStateHistory(config)) {
console.log(snapshot.values); // { count: 6 }, then { count: 1 }
}const state = await graph.getState({
configurable: { thread_id: "1" }
});
console.log(state.values); // Current state
console.log(state.next); // Next nodes to execute
console.log(state.tasks); // Pending tasks// Update state as if a specific node ran
await graph.updateState(
{ configurable: { thread_id: "1" } },
{ count: 100 },
"increment" // Act as if this node made the update
);
// Continue execution from updated state
const result = await graph.invoke(null, {
configurable: { thread_id: "1" }
});// Apply multiple updates at once
await graph.bulkUpdateState(
{ configurable: { thread_id: "1" } },
[
["step1", { count: 10 }],
["step2", { count: 20 }]
]
);const graph = new StateGraph(State)
.addNode("process", (s) => ({ count: s.count + 1 }))
.addNode("review", (s) => s)
.addEdge("__start__", "process")
.addEdge("process", "review")
.addEdge("review", "__end__")
.compile({
checkpointer: new MemorySaver(),
interruptBefore: ["review"]
});
// First invocation stops before "review"
const config = { configurable: { thread_id: "1" } };
await graph.invoke({ count: 0 }, config);
const state = await graph.getState(config);
console.log(state.next); // ["review"]
// Resume execution
await graph.invoke(null, config);for await (const chunk of await graph.stream(input, {
streamMode: "values",
subgraphs: true
})) {
const [namespace, data] = chunk;
console.log(`Namespace: ${namespace.join(".")}, Data:`, data);
}for await (const event of await graph.stream(input, {
streamMode: "debug"
})) {
if (event.type === "task") {
console.log(`Task: ${event.task.name}`);
} else if (event.type === "checkpoint") {
console.log("Checkpoint saved");
}
}import { writer } from "@langchain/langgraph";
const graph = new StateGraph(State)
.addNode("process", async (state) => {
writer({ progress: 0.25 });
// ... do work
writer({ progress: 0.5 });
// ... do more work
writer({ progress: 1.0 });
return { count: state.count + 1 };
})
.compile();
for await (const customData of await graph.stream(input, {
streamMode: "custom"
})) {
console.log(customData); // { progress: 0.25 }, { progress: 0.5 }, etc.
}const result = await graph.invoke(input, {
recursionLimit: 100 // Maximum 100 steps
});const result = await graph.invoke(input, {
maxConcurrency: 5 // Run at most 5 parallel tasks
});const graph = new StateGraph(State)
.addNode("slow", async (s) => {
await new Promise(resolve => setTimeout(resolve, 10000));
return s;
})
.compile({ stepTimeout: 5000 }); // 5 second timeout per stepconst controller = new AbortController();
setTimeout(() => controller.abort(), 5000); // Abort after 5 seconds
try {
await graph.invoke(input, { signal: controller.signal });
} catch (error) {
console.log("Execution aborted");
}The getGraph() and getGraphAsync() methods return a drawable representation of the graph structure for visualization.
// Get graph structure (deprecated - use getGraphAsync instead)
const graphStructure = graph.getGraph(config);
// Get graph structure async (preferred)
const graphStructure = await graph.getGraphAsync(config);
// Structure includes nodes, edges, and entry/exit points
console.log("Nodes:", graphStructure.nodes);
console.log("Edges:", graphStructure.edges);The clearCache() method clears the task cache when using a cache policy.
import { task, entrypoint } from "@langchain/langgraph";
const cachedTask = task({
name: "expensive",
cachePolicy: {
keyFunc: (input) => JSON.stringify(input),
ttl: 300000 // 5 minutes
}
}, async (input) => {
return expensiveComputation(input);
});
const workflow = entrypoint("workflow", async (input) => {
return await cachedTask(input);
});
// Clear the cache
await workflow.clearCache();
// Next invocation will recompute cached tasks
const result = await workflow.invoke(input);