CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langchain--langgraph

Low-level orchestration framework for building stateful, multi-actor applications with LLMs

Overview
Eval results
Files

channels.mddocs/api/

Channels

Channels are state management primitives in LangGraph that provide different update semantics for how state values are stored and updated during graph execution. Each channel type implements specific behavior for handling concurrent writes and state aggregation.

Base Channel

Abstract base class defining the channel interface.

abstract class BaseChannel<
  ValueType = unknown,
  UpdateType = unknown,
  CheckpointType = unknown
> {
  abstract lc_graph_name: string;

  abstract fromCheckpoint(checkpoint?: CheckpointType): this;

  abstract update(values: UpdateType[]): boolean;

  abstract get(): ValueType;

  abstract checkpoint(): CheckpointType | undefined;

  consume(): boolean;

  finish(): boolean;

  isAvailable(): boolean;
}

LastValue

Stores only the most recent value received. Can receive at most one value per step. If multiple nodes attempt to write to this channel in a single step, an error is thrown.

class LastValue<Value> extends BaseChannel<Value, Value, Value> {
  lc_graph_name: "LastValue";

  constructor(initialValueFactory?: () => Value);

  fromCheckpoint(checkpoint?: Value): this;

  update(values: Value[]): boolean;

  get(): Value;

  checkpoint(): Value;

  isAvailable(): boolean;
}

Usage

import { LastValue } from "@langchain/langgraph";
import { StateGraph, Annotation } from "@langchain/langgraph";

// Used implicitly when defining state without a reducer
const State = Annotation.Root({
  count: Annotation<number>  // Uses LastValue internally
});

// Or explicitly
const State = Annotation.Root({
  count: new LastValue<number>(() => 0)
});

const graph = new StateGraph(State)
  .addNode("increment", (state) => ({ count: state.count + 1 }))
  .addNode("double", (state) => ({ count: state.count * 2 }))
  .compile();

// This would error if both nodes write to count in the same step

LastValueAfterFinish

Stores the last value received, but only makes it available after finish() is called. Once made available, clears the value. Used internally for deferred node execution.

class LastValueAfterFinish<Value> extends BaseChannel<
  Value,
  Value,
  [Value, boolean]
> {
  lc_graph_name: "LastValueAfterFinish";

  fromCheckpoint(checkpoint?: [Value, boolean]): this;

  update(values: Value[]): boolean;

  get(): Value;

  checkpoint(): [Value, boolean] | undefined;

  consume(): boolean;

  finish(): boolean;

  isAvailable(): boolean;
}

AnyValue

Stores the last value received, but assumes that if multiple values are received, they are all equal. Unlike LastValue, if multiple nodes write to this channel in a single step, values will be continuously overwritten without error.

class AnyValue<Value> extends BaseChannel<Value, Value, Value> {
  lc_graph_name: "AnyValue";

  constructor();

  fromCheckpoint(checkpoint?: Value): this;

  update(values: Value[]): boolean;

  get(): Value;

  checkpoint(): Value;

  isAvailable(): boolean;
}

Usage

import { AnyValue } from "@langchain/langgraph";

// Useful when multiple nodes write identical values
const channels = {
  status: new AnyValue<string>()
};

// Both nodes can write "completed" without causing an error

Topic

A configurable PubSub Topic channel that accumulates messages. Supports unique value filtering and optional accumulation across steps.

class Topic<Value> extends BaseChannel<
  Array<Value>,
  Value | Value[],
  [Value[], Value[]]
> {
  lc_graph_name: "Topic";

  constructor(fields?: {
    unique?: boolean;
    accumulate?: boolean;
  });

  fromCheckpoint(checkpoint?: [Value[], Value[]]): this;

  update(values: Array<Value | Value[]>): boolean;

  get(): Array<Value>;

  checkpoint(): [Value[], Value[]];

  isAvailable(): boolean;
}

Parameters

  • unique - If true, only unique values (using reference equality) will be added to the topic
  • accumulate - If false, the channel will be emptied after each step

Usage

import { Topic } from "@langchain/langgraph";
import { StateGraph, Annotation } from "@langchain/langgraph";

const State = Annotation.Root({
  events: new Topic<string>({ accumulate: true, unique: false })
});

const graph = new StateGraph(State)
  .addNode("producer1", () => ({ events: ["event1", "event2"] }))
  .addNode("producer2", () => ({ events: ["event3"] }))
  .compile();

// Result: { events: ["event1", "event2", "event3"] }

BinaryOperatorAggregate

Stores the result of applying a binary operator (reducer function) to the current value and each new value. Enables custom aggregation logic like summing numbers, concatenating arrays, or merging objects.

class BinaryOperatorAggregate<
  ValueType,
  UpdateType = ValueType
> extends BaseChannel<ValueType, UpdateType, ValueType> {
  lc_graph_name: "BinaryOperatorAggregate";

  constructor(
    operator: BinaryOperator<ValueType, UpdateType>,
    initialValueFactory?: () => ValueType
  );

  fromCheckpoint(checkpoint?: ValueType): this;

  update(values: UpdateType[]): boolean;

  get(): ValueType;

  checkpoint(): ValueType;

  isAvailable(): boolean;
}

type BinaryOperator<ValueType, UpdateType> = (
  a: ValueType,
  b: UpdateType
) => ValueType;

Usage

import { BinaryOperatorAggregate } from "@langchain/langgraph";
import { StateGraph, Annotation } from "@langchain/langgraph";

// Sum reducer
const State = Annotation.Root({
  total: Annotation<number>({
    reducer: (a, b) => a + b,
    default: () => 0
  })
});

// Array concatenation reducer
const MessagesState = Annotation.Root({
  messages: Annotation<string[]>({
    reducer: (left, right) => left.concat(right),
    default: () => []
  })
});

// Object merge reducer
const ConfigState = Annotation.Root({
  config: Annotation<Record<string, any>>({
    reducer: (a, b) => ({ ...a, ...b }),
    default: () => ({})
  })
});

EphemeralValue

Stores the value received in the step immediately preceding, then clears after. Useful for temporary state that shouldn't persist across steps.

class EphemeralValue<Value> extends BaseChannel<Value, Value, Value> {
  lc_graph_name: "EphemeralValue";

  constructor(guard?: boolean);

  fromCheckpoint(checkpoint?: Value): this;

  update(values: Value[]): boolean;

  get(): Value;

  checkpoint(): Value;

  isAvailable(): boolean;
}

Parameters

  • guard - If true (default), throws error if multiple values are received in one step

Usage

import { EphemeralValue } from "@langchain/langgraph";

// Used internally for START/END nodes
const channels = {
  __start__: new EphemeralValue<any>(),
  __end__: new EphemeralValue<any>()
};

NamedBarrierValue

A synchronization channel that waits until all named values are received before making the value available. Used for coordinating parallel node execution (fan-in pattern).

class NamedBarrierValue<Value> extends BaseChannel<
  void,
  Value,
  Value[]
> {
  lc_graph_name: "NamedBarrierValue";

  constructor(names: Set<Value>);

  fromCheckpoint(checkpoint?: Value[]): this;

  update(values: Value[]): boolean;

  get(): void;

  checkpoint(): Value[];

  consume(): boolean;

  isAvailable(): boolean;
}

Usage

import { NamedBarrierValue } from "@langchain/langgraph";

// Wait for both node1 and node2 to complete
const barrierChannel = new NamedBarrierValue(new Set(["node1", "node2"]));

// Used internally when you add edges from multiple nodes to one node

NamedBarrierValueAfterFinish

A named barrier that only makes the value available after finish() is called. Used internally for deferred parallel execution.

class NamedBarrierValueAfterFinish<Value> extends BaseChannel<
  void,
  Value,
  [Value[], boolean]
> {
  lc_graph_name: "NamedBarrierValueAfterFinish";

  constructor(names: Set<Value>);

  fromCheckpoint(checkpoint?: [Value[], boolean]): this;

  update(values: Value[]): boolean;

  get(): void;

  checkpoint(): [Value[], boolean];

  consume(): boolean;

  finish(): boolean;

  isAvailable(): boolean;
}

DynamicBarrierValue

A dynamic synchronization channel that switches between "priming" and "waiting" states. In the priming state, it can't be read. When it receives a WaitForNames update, it switches to waiting state and collects named values until all are received.

class DynamicBarrierValue<Value> extends BaseChannel<
  void,
  Value | WaitForNames<Value>,
  [Value[] | undefined, Value[]]
> {
  lc_graph_name: "DynamicBarrierValue";

  constructor();

  fromCheckpoint(checkpoint?: [Value[] | undefined, Value[]]): this;

  update(values: (Value | WaitForNames<Value>)[]): boolean;

  get(): void;

  checkpoint(): [Value[] | undefined, Value[]];

  consume(): boolean;

  isAvailable(): boolean;
}

interface WaitForNames<Value> {
  __names: Value[];
}

Usage

Used for map-reduce patterns where the number of parallel tasks is determined dynamically.

DynamicBarrierValueAfterFinish

A dynamic barrier with an additional finished flag. Only makes the value available after both all names are received AND finish() is called.

class DynamicBarrierValueAfterFinish<Value> extends BaseChannel<
  void,
  Value | WaitForNames<Value>,
  [Value[] | undefined, Value[], boolean]
> {
  lc_graph_name: "DynamicBarrierValueAfterFinish";

  constructor();

  fromCheckpoint(checkpoint?: [Value[] | undefined, Value[], boolean]): this;

  update(values: (Value | WaitForNames<Value>)[]): boolean;

  get(): void;

  checkpoint(): [Value[] | undefined, Value[], boolean];

  consume(): boolean;

  finish(): boolean;

  isAvailable(): boolean;
}

Utility Functions

function isBaseChannel(obj: unknown): obj is BaseChannel;

function emptyChannels<Cc extends Record<string, BaseChannel>>(
  channels: Cc,
  checkpoint: ReadonlyCheckpoint
): Cc;

function createCheckpoint<ValueType>(
  checkpoint: ReadonlyCheckpoint,
  channels: Record<string, BaseChannel<ValueType>> | undefined,
  step: number,
  options?: { id?: string }
): Checkpoint;

Channel Selection Guide

Choose the appropriate channel based on your update semantics:

  • LastValue: Single writer per step, strict concurrency control
  • AnyValue: Multiple writers allowed, assumes values are equal
  • Topic: Multiple writers, accumulates all values as array
  • BinaryOperatorAggregate: Custom reduction logic (sum, concat, merge)
  • EphemeralValue: Temporary state cleared after each step
  • NamedBarrierValue: Synchronize multiple parallel writers (fan-in)
  • DynamicBarrierValue: Dynamic map-reduce patterns

Common Patterns

Array Concatenation

const State = Annotation.Root({
  items: Annotation<string[]>({
    reducer: (left, right) => left.concat(right),
    default: () => []
  })
});

Numeric Aggregation

const State = Annotation.Root({
  sum: Annotation<number>({
    reducer: (a, b) => a + b,
    default: () => 0
  }),
  max: Annotation<number>({
    reducer: (a, b) => Math.max(a, b),
    default: () => -Infinity
  })
});

Object Merging

const State = Annotation.Root({
  metadata: Annotation<Record<string, any>>({
    reducer: (a, b) => ({ ...a, ...b }),
    default: () => ({})
  })
});

Set Union

const State = Annotation.Root({
  tags: Annotation<Set<string>>({
    reducer: (a, b) => new Set([...a, ...b]),
    default: () => new Set()
  })
});

Custom Business Logic

interface Counter {
  count: number;
  lastUpdate: string;
}

const State = Annotation.Root({
  counter: Annotation<Counter>({
    reducer: (current, update) => ({
      count: current.count + update.count,
      lastUpdate: update.lastUpdate
    }),
    default: () => ({ count: 0, lastUpdate: new Date().toISOString() })
  })
});

Install with Tessl CLI

npx tessl i tessl/npm-langchain--langgraph

docs

api

channels.md

control-flow-api.md

execution-api.md

functional-api.md

graph-api.md

graph-construction-full.md

imports.md

persistence-api.md

persistence-full.md

prebuilt.md

remote.md

state-management.md

types.md

zod.md

index.md

tile.json