CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langchain--langgraph

Low-level orchestration framework for building stateful, multi-actor applications with LLMs

Overview
Eval results
Files

state-and-channels.mddocs/guides/

State & Channels Guide

Deep dive into state management and channel patterns.

Channel Types and Use Cases

LastValue - Exclusive Write

Store last value, error on concurrent writes. Used by default for fields without a reducer.

class LastValue<Value> extends BaseChannel<Value, Value, Value> {
  constructor(initialValueFactory?: () => Value);
  fromCheckpoint(checkpoint?: Value): this;
  update(values: Value[]): boolean;
  get(): Value;
  checkpoint(): Value;
  isAvailable(): boolean;
}

When to use: When only one node should write per step. Throws error if multiple nodes write simultaneously.

const State = Annotation.Root({
  count: Annotation<number>  // Implicitly uses LastValue
});

AnyValue - Allow Concurrent Writes

Stores the last value received but assumes all values are equal if multiple received.

class AnyValue<Value> extends BaseChannel<Value, Value, Value> {
  constructor();
  fromCheckpoint(checkpoint?: Value): this;
  update(values: Value[]): boolean;
  get(): Value;
  checkpoint(): Value;
  isAvailable(): boolean;
}

When to use: When multiple nodes write identical values without causing an error.

BinaryOperatorAggregate - Custom Reduction

Apply reducer function for custom aggregation logic.

class BinaryOperatorAggregate<ValueType, UpdateType = ValueType> {
  constructor(
    operator: BinaryOperator<ValueType, UpdateType>,
    initialValueFactory?: () => ValueType
  );
  fromCheckpoint(checkpoint?: ValueType): this;
  update(values: UpdateType[]): boolean;
  get(): ValueType;
  checkpoint(): ValueType;
  isAvailable(): boolean;
}

type BinaryOperator<ValueType, UpdateType> = (
  a: ValueType,
  b: UpdateType
) => ValueType;

Common patterns:

// Sum
total: Annotation<number>({
  reducer: (a, b) => a + b,
  default: () => 0
})

// Max
highScore: Annotation<number>({
  reducer: (a, b) => Math.max(a, b),
  default: () => -Infinity
})

// Array concatenation
items: Annotation<string[]>({
  reducer: (a, b) => a.concat(b),
  default: () => []
})

// Object merging
metadata: Annotation<Record<string, any>>({
  reducer: (a, b) => ({ ...a, ...b }),
  default: () => ({})
})

// Set union
tags: Annotation<Set<string>>({
  reducer: (a, b) => new Set([...a, ...b]),
  default: () => new Set()
})

Topic - Accumulate All Values

A configurable PubSub Topic channel that accumulates messages.

class Topic<Value> extends BaseChannel<Array<Value>, Value | Value[]> {
  constructor(fields?: {
    unique?: boolean;
    accumulate?: boolean;
  });
  fromCheckpoint(checkpoint?: [Value[], Value[]]): this;
  update(values: Array<Value | Value[]>): boolean;
  get(): Array<Value>;
  checkpoint(): [Value[], Value[]];
  isAvailable(): boolean;
}

Parameters:

  • unique - If true, only unique values (reference equality) added
  • accumulate - If false, channel cleared after each step
events: new Topic<Event>({
  unique: false,       // Allow duplicates
  accumulate: true     // Keep across steps
})

EphemeralValue - Temporary State

Stores value received in step immediately preceding, then clears.

class EphemeralValue<Value> extends BaseChannel<Value, Value, Value> {
  constructor(guard?: boolean);
  fromCheckpoint(checkpoint?: Value): this;
  update(values: Value[]): boolean;
  get(): Value;
  checkpoint(): Value;
  isAvailable(): boolean;
}

When to use: Temporary state that shouldn't persist across steps. Used internally for START/END nodes.

NamedBarrierValue - Fan-In Synchronization

Wait until all named values are received before making value available. Used for coordinating parallel node execution.

class NamedBarrierValue<Value> extends BaseChannel<void, Value, Value[]> {
  constructor(names: Set<Value>);
  fromCheckpoint(checkpoint?: Value[]): this;
  update(values: Value[]): boolean;
  get(): void;
  checkpoint(): Value[];
  consume(): boolean;
  isAvailable(): boolean;
}

When to use: Automatically used when multiple edges converge to one node.

// Wait for both node1 and node2 to complete
.addEdge("node1", "merge")
.addEdge("node2", "merge")
// NamedBarrierValue created automatically for "merge"

DynamicBarrierValue - Dynamic Map-Reduce

Dynamic synchronization that switches between priming and waiting states.

class DynamicBarrierValue<Value> extends BaseChannel<
  void,
  Value | WaitForNames<Value>,
  [Value[] | undefined, Value[]]
> {
  constructor();
  fromCheckpoint(checkpoint?: [Value[] | undefined, Value[]]): this;
  update(values: (Value | WaitForNames<Value>)[]): boolean;
  get(): void;
  checkpoint(): [Value[] | undefined, Value[]];
  consume(): boolean;
  isAvailable(): boolean;
}

interface WaitForNames<Value> {
  __names: Value[];
}

When to use: Map-reduce patterns where number of parallel tasks determined dynamically (used with Send).

Advanced State Patterns

Conditional Updates

Only update if condition met:

score: Annotation<number>({
  reducer: (current, update) => {
    // Only increase score
    return Math.max(current, update);
  },
  default: () => 0
})

// Only update if newer
timestamp: Annotation<Date>({
  reducer: (current, update) => {
    return update > current ? update : current;
  },
  default: () => new Date(0)
})

Temporal State

Track history with timestamps:

history: Annotation<Array<{timestamp: string, value: any}>>({
  reducer: (current, update) => {
    const timestamped = update.map(v => ({
      timestamp: new Date().toISOString(),
      value: v
    }));
    return [...current, ...timestamped];
  },
  default: () => []
})

Windowed State

Keep only recent values:

recentEvents: Annotation<Event[]>({
  reducer: (current, update) => {
    const combined = [...current, ...update];
    // Keep last 10 events
    return combined.slice(-10);
  },
  default: () => []
})

Prioritized State

Maintain priority queue:

tasks: Annotation<Task[]>({
  reducer: (current, update) => {
    const combined = [...current, ...update];
    return combined.sort((a, b) => b.priority - a.priority);
  },
  default: () => []
})

Message State Patterns

Message Reducers

function messagesStateReducer(
  left: Messages,
  right: Messages
): BaseMessage[];

const addMessages: typeof messagesStateReducer; // Alias

type Messages =
  | Array<BaseMessage | BaseMessageLike>
  | BaseMessage
  | BaseMessageLike;

The messagesStateReducer intelligently merges messages:

  • Appends new messages to the end
  • Updates existing messages by ID
  • Handles RemoveMessage for deletions
  • Supports REMOVE_ALL_MESSAGES constant

Message Filtering

import { RemoveMessage } from "@langchain/core/messages";

const filterNode = (state: State) => {
  // Remove system messages
  const removals = state.messages
    .filter(m => m._getType() === "system")
    .map(m => new RemoveMessage({ id: m.id }));

  return { messages: removals };
};

Message Modification

const modifyNode = (state: State) => {
  const lastMessage = state.messages[state.messages.length - 1];

  // Update existing message by ID
  return {
    messages: [new AIMessage({
      ...lastMessage,
      content: modifyContent(lastMessage.content),
      id: lastMessage.id  // Same ID = update in place
    })]
  };
};

Clear All Messages

const REMOVE_ALL_MESSAGES: "__remove_all__";
import { RemoveMessage, REMOVE_ALL_MESSAGES } from "@langchain/langgraph";

const clearMessages = (state: any) => {
  return {
    messages: [new RemoveMessage({ id: REMOVE_ALL_MESSAGES })]
  };
};

pushMessage - Manual Message Streaming

function pushMessage(
  message: BaseMessage | BaseMessageLike,
  options?: RunnableConfig & { stateKey?: string | null }
): BaseMessage;

Manually push a message before node completes execution. Message must have an ID.

import { pushMessage } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";

const streamingNode = async (state: State) => {
  // Push intermediate message
  pushMessage(new AIMessage({
    content: "Processing started",
    id: "msg-1"
  }));

  await processData(state.data);

  // Push completion message
  pushMessage(new AIMessage({
    content: "Processing complete",
    id: "msg-2"
  }));

  return { done: true };
};

// Push without persisting to state
pushMessage(message, { stateKey: null });

Channel Selection Guide

Choose the appropriate channel based on update semantics:

ChannelWrite BehaviorUse Case
LastValueSingle writer per step, strictDefault for simple fields
AnyValueMultiple writers allowedIdentical values from multiple nodes
TopicAccumulates all valuesEvent collection, message queues
BinaryOperatorAggregateCustom reduction logicSum, concat, merge, custom aggregation
EphemeralValueCleared after each stepTemporary state, START/END
NamedBarrierValueSynchronize named valuesFan-in, waiting for multiple nodes
DynamicBarrierValueDynamic map-reduceSend-based parallel execution

State Type Utilities

interface StateDefinition {
  [key: string]: BaseChannel | (() => BaseChannel);
}

type StateType<SD extends StateDefinition> = {
  [key in keyof SD]: ExtractValueType<SD[key]>;
};

type UpdateType<SD extends StateDefinition> = {
  [key in keyof SD]?: ExtractUpdateType<SD[key]>;
};

type NodeType<SD extends StateDefinition> = RunnableLike<
  StateType<SD>,
  UpdateType<SD> | Partial<StateType<SD>>
>;

Channel Utility Functions

function isBaseChannel(obj: unknown): obj is BaseChannel;

function emptyChannels<Cc extends Record<string, BaseChannel>>(
  channels: Cc,
  checkpoint: ReadonlyCheckpoint
): Cc;

function createCheckpoint<ValueType>(
  checkpoint: ReadonlyCheckpoint,
  channels: Record<string, BaseChannel<ValueType>> | undefined,
  step: number,
  options?: { id?: string }
): Checkpoint;

Complete Example: Complex State Management

import { StateGraph, Annotation, START, END } from "@langchain/langgraph";

// Rich state with multiple reducer types
const ComplexState = Annotation.Root({
  // Simple field (LastValue)
  currentStep: Annotation<string>,

  // Array accumulation
  logs: Annotation<string[]>({
    reducer: (a, b) => a.concat(b),
    default: () => []
  }),

  // Numeric aggregation
  totalScore: Annotation<number>({
    reducer: (a, b) => a + b,
    default: () => 0
  }),
  maxScore: Annotation<number>({
    reducer: (a, b) => Math.max(a, b),
    default: () => -Infinity
  }),

  // Object merging
  metadata: Annotation<Record<string, any>>({
    reducer: (a, b) => ({ ...a, ...b }),
    default: () => ({})
  }),

  // Set operations
  tags: Annotation<Set<string>>({
    reducer: (a, b) => new Set([...a, ...b]),
    default: () => new Set()
  }),

  // Conditional update (only increase)
  version: Annotation<number>({
    reducer: (current, update) => Math.max(current, update),
    default: () => 1
  }),

  // Windowed history (last 5 items)
  recentEvents: Annotation<any[]>({
    reducer: (current, update) => {
      const combined = [...current, ...update];
      return combined.slice(-5);
    },
    default: () => []
  })
});

const node1 = (state: typeof ComplexState.State) => ({
  currentStep: "node1",
  logs: ["Node 1 executed"],
  totalScore: 10,
  maxScore: 10,
  metadata: { source: "node1", timestamp: Date.now() },
  tags: new Set(["processed"]),
  version: 2,
  recentEvents: [{ type: "node1", data: "..." }]
});

const node2 = (state: typeof ComplexState.State) => ({
  currentStep: "node2",
  logs: ["Node 2 executed"],
  totalScore: 15,
  maxScore: 8,  // Won't update (max is 10)
  metadata: { output: "complete" },
  tags: new Set(["validated"]),
  version: 1,  // Won't update (current is 2)
  recentEvents: [{ type: "node2", data: "..." }]
});

const graph = new StateGraph(ComplexState)
  .addNode("node1", node1)
  .addNode("node2", node2)
  .addEdge(START, "node1")
  .addEdge(START, "node2")  // Parallel execution
  .addEdge("node1", END)
  .addEdge("node2", END)
  .compile();

const result = await graph.invoke({});
// {
//   currentStep: "node1" or "node2" (last to complete),
//   logs: ["Node 1 executed", "Node 2 executed"],
//   totalScore: 25,
//   maxScore: 10,
//   metadata: { source: "node1", timestamp: ..., output: "complete" },
//   tags: Set(["processed", "validated"]),
//   version: 2,
//   recentEvents: [{ type: "node1", ... }, { type: "node2", ... }]
// }

This guide provides comprehensive coverage of state management and channels in LangGraph.

Install with Tessl CLI

npx tessl i tessl/npm-langchain--langgraph@1.0.1

docs

index.md

tile.json