CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langchain--langgraph

Low-level orchestration framework for building stateful, multi-actor applications with LLMs

Overview
Eval results
Files

execution-api.mddocs/api/

Execution & Streaming

The Pregel execution engine runs graph workflows with support for streaming, interruption, checkpointing, and parallel execution. Multiple stream modes provide different levels of visibility into graph execution.

Pregel Class

Core execution engine for graph workflows. Implements superstep-based message-passing computation with checkpointing and streaming support.

class Pregel<
  Nodes extends Record<string, PregelNode>,
  Channels extends Record<string, BaseChannel>,
  ContextType = unknown,
  InputType = any,
  OutputType = any
> extends Runnable<InputType, OutputType> {
  nodes: Nodes;
  channels: Channels;
  inputChannels: string | string[];
  outputChannels: string | string[];
  streamChannels?: string | string[];
  streamMode?: StreamMode | StreamMode[];
  interruptAfter?: string[] | All;
  interruptBefore?: string[] | All;
  stepTimeout?: number;
  checkpointer?: BaseCheckpointSaver | false;
  retryPolicy?: RetryPolicy;
  store?: BaseStore;
  cache?: BaseCache;
  name?: string;

  constructor(params: PregelParams<Nodes, Channels>);

  invoke(
    input: InputType,
    options?: PregelOptions
  ): Promise<OutputType>;

  stream(
    input: InputType,
    options?: PregelOptions
  ): IterableReadableStream<StreamOutput>;

  streamEvents(
    input: InputType,
    options: PregelOptions & { version: "v1" | "v2" }
  ): IterableReadableStream<StreamEvent>;

  getState(
    config: LangGraphRunnableConfig,
    options?: GetStateOptions
  ): Promise<StateSnapshot>;

  getStateHistory(
    config: LangGraphRunnableConfig,
    options?: CheckpointListOptions
  ): AsyncIterableIterator<StateSnapshot>;

  updateState(
    config: LangGraphRunnableConfig,
    values: UpdateType | Command,
    asNode?: string
  ): Promise<RunnableConfig>;

  bulkUpdateState(
    config: LangGraphRunnableConfig,
    supersteps: Array<[string, UpdateType | Command]>
  ): Promise<void>;

  getSubgraphs(
    namespace?: string,
    recurse?: boolean
  ): IterableIterator<[string, Pregel]>; // Deprecated

  getSubgraphsAsync(
    namespace?: string,
    recurse?: boolean
  ): AsyncIterableIterator<[string, Pregel]>; // Deprecated

  withConfig(config: Partial<PregelOptions>): this;

  validate(): this;

  clearCache(): void;
}

PregelParams

Constructor parameters for Pregel.

interface PregelParams<
  Nodes extends Record<string, PregelNode>,
  Channels extends Record<string, BaseChannel>
> {
  nodes: Nodes;
  channels: Channels;
  inputChannels: string | string[];
  outputChannels: string | string[];
  streamChannels?: string | string[];
  streamMode?: StreamMode | StreamMode[];
  interruptAfter?: string[] | All;
  interruptBefore?: string[] | All;
  stepTimeout?: number;
  checkpointer?: BaseCheckpointSaver | false;
  retryPolicy?: RetryPolicy;
  store?: BaseStore;
  cache?: BaseCache;
  name?: string;
  description?: string;
  autoValidate?: boolean;
  userInterrupt?: unknown;
  builder?: unknown;
}

PregelOptions

Execution options for invoke/stream operations.

interface PregelOptions<
  Nodes extends Record<string, PregelNode> = Record<string, PregelNode>,
  Channels extends Record<string, BaseChannel> = Record<string, BaseChannel>,
  ContextType extends Record<string, any> = Record<string, any>,
  TStreamMode extends StreamMode | StreamMode[] | undefined = StreamMode | StreamMode[] | undefined,
  TSubgraphs extends boolean = boolean,
  TEncoding extends "text/event-stream" | undefined = "text/event-stream" | undefined
> extends RunnableConfig<ContextType> {
  streamMode?: TStreamMode;
  inputKeys?: keyof Channels | Array<keyof Channels>;
  outputKeys?: keyof Channels | Array<keyof Channels>;
  interruptBefore?: All | Array<keyof Nodes>;
  interruptAfter?: All | Array<keyof Nodes>;
  debug?: boolean;
  subgraphs?: TSubgraphs;
  maxConcurrency?: number;
  recursionLimit?: number;
  encoding?: TEncoding;
  signal?: AbortSignal;
}

Stream Modes

Different modes for streaming graph execution results.

type StreamMode =
  | "values"     // Stream complete state after each step
  | "updates"    // Stream only state changes after each step
  | "debug"      // Stream detailed execution events for debugging
  | "messages"   // Stream messages from within nodes
  | "custom"     // Stream custom events from within nodes
  | "checkpoints" // Stream checkpoint snapshots
  | "tasks";     // Stream task execution details

Values Mode

Streams the complete state after each superstep.

for await (const chunk of await graph.stream(input, { streamMode: "values" })) {
  console.log(chunk); // Complete state: { messages: [...], count: 5 }
}

Updates Mode

Streams only the state changes (deltas) produced by each node.

for await (const chunk of await graph.stream(input, { streamMode: "updates" })) {
  console.log(chunk); // { nodeName: { messages: ["new message"] } }
}

Debug Mode

Streams detailed execution events including task execution, channel updates, and checkpoints.

for await (const event of await graph.stream(input, { streamMode: "debug" })) {
  console.log(event);
  // { type: "task", task: {...}, checkpoint: {...} }
}

Messages Mode

Streams messages emitted from within nodes (when using writer function).

const graph = new StateGraph(State)
  .addNode("generate", async (state, config) => {
    config.writer?.("Progress: 50%");
    return { result: "done" };
  })
  .compile();

for await (const [message, metadata] of await graph.stream(input, { streamMode: "messages" })) {
  console.log(message); // "Progress: 50%"
}

Custom Mode

Streams custom data written via the writer function.

for await (const data of await graph.stream(input, { streamMode: "custom" })) {
  console.log(data); // Custom streamed data
}

Checkpoints Mode

Streams checkpoint snapshots at each step.

for await (const snapshot of await graph.stream(input, { streamMode: "checkpoints" })) {
  console.log(snapshot);
  // { values: {...}, next: ["node1"], config: {...}, tasks: [...] }
}

Tasks Mode

Streams task creation and completion events.

for await (const taskEvent of await graph.stream(input, { streamMode: "tasks" })) {
  if ("input" in taskEvent) {
    // Task created
    console.log(`Task ${taskEvent.name} created`);
  } else {
    // Task completed
    console.log(`Task ${taskEvent.name} completed with result:`, taskEvent.result);
  }
}

Multiple Stream Modes

Stream multiple modes simultaneously.

for await (const chunk of await graph.stream(input, {
  streamMode: ["values", "updates", "debug"]
})) {
  const [namespace, mode, data] = chunk;
  console.log(`Mode: ${mode}, Data:`, data);
}

State Snapshots

interface StateSnapshot {
  values: Record<string, unknown>;
  next: string[];
  config: RunnableConfig;
  metadata?: CheckpointMetadata;
  tasks: PregelTaskDescription[];
  createdAt?: string;
  parentConfig?: RunnableConfig;
}

interface PregelTaskDescription {
  id: string;
  name: string;
  interrupts: Interrupt[];
  state?: unknown;
}

interface GetStateOptions {
  subgraphs?: boolean;
}

Channel Utilities

Static utility class for channel operations.

class Channel {
  static subscribeTo(
    channel: string,
    options?: SingleChannelSubscriptionOptions
  ): PregelNode;

  static subscribeTo(
    channels: string[],
    options?: MultipleChannelSubscriptionOptions
  ): PregelNode;

  static writeTo(
    channels: string[],
    writes?: Record<string, WriteValue>
  ): ChannelWrite;
}

interface SingleChannelSubscriptionOptions {
  key?: string;
  tags?: string[];
}

interface MultipleChannelSubscriptionOptions {
  tags?: string[];
}

type WriteValue = Runnable | ((input: unknown) => unknown) | unknown;

class ChannelWrite {
  // Channel write operation for updating channels
}

PregelNode

Node definition in Pregel graph.

class PregelNode<RunInput = unknown, RunOutput = unknown> extends Runnable<RunInput, RunOutput> {
  channels: string[] | Record<string, string>;
  triggers: string[];
  writers: ChannelWrite[];
  tags?: string[];
  metadata?: Record<string, unknown>;
  retryPolicy?: RetryPolicy;
  cachePolicy?: CachePolicy;
  subgraphs?: Pregel<any, any>[];
  ends?: string[];
  bound?: Runnable<RunInput, RunOutput>;
  mapper?: (input: Record<string, any>) => RunInput;

  constructor(params: {
    channels: string[] | Record<string, string>;
    triggers: string[];
    writers?: ChannelWrite[];
    tags?: string[];
    metadata?: Record<string, unknown>;
    retryPolicy?: RetryPolicy;
    cachePolicy?: CachePolicy;
    subgraphs?: Pregel<any, any>[];
    ends?: string[];
    bound?: Runnable<RunInput, RunOutput>;
    mapper?: (input: Record<string, any>) => RunInput;
  });

  pipe<NewRunOutput>(
    coerceable: RunnableLike<RunOutput, NewRunOutput>
  ): PregelNode<RunInput, NewRunOutput>;
}

Durability Modes

type Durability = "exit" | "async" | "sync";
  • "exit" - Checkpoint only when graph exits
  • "async" - Asynchronous checkpointing (doesn't block execution)
  • "sync" - Synchronous checkpointing (blocks until checkpoint is saved)

Usage Examples

Basic Invocation

import { StateGraph, Annotation } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";

const State = Annotation.Root({
  count: Annotation<number>
});

const graph = new StateGraph(State)
  .addNode("increment", (state) => ({ count: state.count + 1 }))
  .addEdge("__start__", "increment")
  .addEdge("increment", "__end__")
  .compile({ checkpointer: new MemorySaver() });

const result = await graph.invoke(
  { count: 0 },
  { configurable: { thread_id: "1" } }
);
console.log(result); // { count: 1 }

Streaming Values

const graph = new StateGraph(State)
  .addNode("step1", (s) => ({ count: s.count + 1 }))
  .addNode("step2", (s) => ({ count: s.count * 2 }))
  .addEdge("__start__", "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "__end__")
  .compile();

for await (const state of await graph.stream({ count: 5 }, {
  streamMode: "values"
})) {
  console.log(state);
  // First: { count: 6 }
  // Then: { count: 12 }
}

Streaming Updates

for await (const update of await graph.stream({ count: 5 }, {
  streamMode: "updates"
})) {
  console.log(update);
  // First: { step1: { count: 6 } }
  // Then: { step2: { count: 12 } }
}

State History Navigation

const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
  .addNode("increment", (s) => ({ count: s.count + 1 }))
  .addEdge("__start__", "increment")
  .addEdge("increment", "__end__")
  .compile({ checkpointer });

// Execute multiple times
await graph.invoke({ count: 0 }, { configurable: { thread_id: "1" } });
await graph.invoke({ count: 5 }, { configurable: { thread_id: "1" } });

// Get state history
const config = { configurable: { thread_id: "1" } };
for await (const snapshot of graph.getStateHistory(config)) {
  console.log(snapshot.values); // { count: 6 }, then { count: 1 }
}

Getting Current State

const state = await graph.getState({
  configurable: { thread_id: "1" }
});
console.log(state.values); // Current state
console.log(state.next); // Next nodes to execute
console.log(state.tasks); // Pending tasks

Updating State Externally

// Update state as if a specific node ran
await graph.updateState(
  { configurable: { thread_id: "1" } },
  { count: 100 },
  "increment" // Act as if this node made the update
);

// Continue execution from updated state
const result = await graph.invoke(null, {
  configurable: { thread_id: "1" }
});

Bulk State Updates

// Apply multiple updates at once
await graph.bulkUpdateState(
  { configurable: { thread_id: "1" } },
  [
    ["step1", { count: 10 }],
    ["step2", { count: 20 }]
  ]
);

Interrupts

const graph = new StateGraph(State)
  .addNode("process", (s) => ({ count: s.count + 1 }))
  .addNode("review", (s) => s)
  .addEdge("__start__", "process")
  .addEdge("process", "review")
  .addEdge("review", "__end__")
  .compile({
    checkpointer: new MemorySaver(),
    interruptBefore: ["review"]
  });

// First invocation stops before "review"
const config = { configurable: { thread_id: "1" } };
await graph.invoke({ count: 0 }, config);

const state = await graph.getState(config);
console.log(state.next); // ["review"]

// Resume execution
await graph.invoke(null, config);

Subgraph Streaming

for await (const chunk of await graph.stream(input, {
  streamMode: "values",
  subgraphs: true
})) {
  const [namespace, data] = chunk;
  console.log(`Namespace: ${namespace.join(".")}, Data:`, data);
}

Debug Mode

for await (const event of await graph.stream(input, {
  streamMode: "debug"
})) {
  if (event.type === "task") {
    console.log(`Task: ${event.task.name}`);
  } else if (event.type === "checkpoint") {
    console.log("Checkpoint saved");
  }
}

Custom Stream Data

import { writer } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("process", async (state) => {
    writer({ progress: 0.25 });
    // ... do work
    writer({ progress: 0.5 });
    // ... do more work
    writer({ progress: 1.0 });
    return { count: state.count + 1 };
  })
  .compile();

for await (const customData of await graph.stream(input, {
  streamMode: "custom"
})) {
  console.log(customData); // { progress: 0.25 }, { progress: 0.5 }, etc.
}

Recursion Limits

const result = await graph.invoke(input, {
  recursionLimit: 100 // Maximum 100 steps
});

Concurrency Control

const result = await graph.invoke(input, {
  maxConcurrency: 5 // Run at most 5 parallel tasks
});

Timeout Control

const graph = new StateGraph(State)
  .addNode("slow", async (s) => {
    await new Promise(resolve => setTimeout(resolve, 10000));
    return s;
  })
  .compile({ stepTimeout: 5000 }); // 5 second timeout per step

Abort Signals

const controller = new AbortController();

setTimeout(() => controller.abort(), 5000); // Abort after 5 seconds

try {
  await graph.invoke(input, { signal: controller.signal });
} catch (error) {
  console.log("Execution aborted");
}

Getting Graph Structure

The getGraph() and getGraphAsync() methods return a drawable representation of the graph structure for visualization.

// Get graph structure (deprecated - use getGraphAsync instead)
const graphStructure = graph.getGraph(config);

// Get graph structure async (preferred)
const graphStructure = await graph.getGraphAsync(config);

// Structure includes nodes, edges, and entry/exit points
console.log("Nodes:", graphStructure.nodes);
console.log("Edges:", graphStructure.edges);

Clearing Cache

The clearCache() method clears the task cache when using a cache policy.

import { task, entrypoint } from "@langchain/langgraph";

const cachedTask = task({
  name: "expensive",
  cachePolicy: {
    keyFunc: (input) => JSON.stringify(input),
    ttl: 300000 // 5 minutes
  }
}, async (input) => {
  return expensiveComputation(input);
});

const workflow = entrypoint("workflow", async (input) => {
  return await cachedTask(input);
});

// Clear the cache
await workflow.clearCache();

// Next invocation will recompute cached tasks
const result = await workflow.invoke(input);

Install with Tessl CLI

npx tessl i tessl/npm-langchain--langgraph@1.0.1

docs

api

channels.md

control-flow-api.md

execution-api.md

functional-api.md

graph-api.md

graph-construction-full.md

imports.md

persistence-api.md

persistence-full.md

prebuilt.md

remote.md

state-management.md

types.md

zod.md

index.md

tile.json