Low-level orchestration framework for building stateful, multi-actor applications with LLMs
Channels are state management primitives in LangGraph that provide different update semantics for how state values are stored and updated during graph execution. Each channel type implements specific behavior for handling concurrent writes and state aggregation.
Abstract base class defining the channel interface.
abstract class BaseChannel<
ValueType = unknown,
UpdateType = unknown,
CheckpointType = unknown
> {
abstract lc_graph_name: string;
abstract fromCheckpoint(checkpoint?: CheckpointType): this;
abstract update(values: UpdateType[]): boolean;
abstract get(): ValueType;
abstract checkpoint(): CheckpointType | undefined;
consume(): boolean;
finish(): boolean;
isAvailable(): boolean;
}Stores only the most recent value received. Can receive at most one value per step. If multiple nodes attempt to write to this channel in a single step, an error is thrown.
class LastValue<Value> extends BaseChannel<Value, Value, Value> {
lc_graph_name: "LastValue";
constructor(initialValueFactory?: () => Value);
fromCheckpoint(checkpoint?: Value): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): Value;
isAvailable(): boolean;
}import { LastValue } from "@langchain/langgraph";
import { StateGraph, Annotation } from "@langchain/langgraph";
// Used implicitly when defining state without a reducer
const State = Annotation.Root({
count: Annotation<number> // Uses LastValue internally
});
// Or explicitly
const State = Annotation.Root({
count: new LastValue<number>(() => 0)
});
const graph = new StateGraph(State)
.addNode("increment", (state) => ({ count: state.count + 1 }))
.addNode("double", (state) => ({ count: state.count * 2 }))
.compile();
// This would error if both nodes write to count in the same stepStores the last value received, but only makes it available after finish() is called. Once made available, clears the value. Used internally for deferred node execution.
class LastValueAfterFinish<Value> extends BaseChannel<
Value,
Value,
[Value, boolean]
> {
lc_graph_name: "LastValueAfterFinish";
fromCheckpoint(checkpoint?: [Value, boolean]): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): [Value, boolean] | undefined;
consume(): boolean;
finish(): boolean;
isAvailable(): boolean;
}Stores the last value received, but assumes that if multiple values are received, they are all equal. Unlike LastValue, if multiple nodes write to this channel in a single step, values will be continuously overwritten without error.
class AnyValue<Value> extends BaseChannel<Value, Value, Value> {
lc_graph_name: "AnyValue";
constructor();
fromCheckpoint(checkpoint?: Value): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): Value;
isAvailable(): boolean;
}import { AnyValue } from "@langchain/langgraph";
// Useful when multiple nodes write identical values
const channels = {
status: new AnyValue<string>()
};
// Both nodes can write "completed" without causing an errorA configurable PubSub Topic channel that accumulates messages. Supports unique value filtering and optional accumulation across steps.
class Topic<Value> extends BaseChannel<
Array<Value>,
Value | Value[],
[Value[], Value[]]
> {
lc_graph_name: "Topic";
constructor(fields?: {
unique?: boolean;
accumulate?: boolean;
});
fromCheckpoint(checkpoint?: [Value[], Value[]]): this;
update(values: Array<Value | Value[]>): boolean;
get(): Array<Value>;
checkpoint(): [Value[], Value[]];
isAvailable(): boolean;
}unique - If true, only unique values (using reference equality) will be added to the topicaccumulate - If false, the channel will be emptied after each stepimport { Topic } from "@langchain/langgraph";
import { StateGraph, Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
events: new Topic<string>({ accumulate: true, unique: false })
});
const graph = new StateGraph(State)
.addNode("producer1", () => ({ events: ["event1", "event2"] }))
.addNode("producer2", () => ({ events: ["event3"] }))
.compile();
// Result: { events: ["event1", "event2", "event3"] }Stores the result of applying a binary operator (reducer function) to the current value and each new value. Enables custom aggregation logic like summing numbers, concatenating arrays, or merging objects.
class BinaryOperatorAggregate<
ValueType,
UpdateType = ValueType
> extends BaseChannel<ValueType, UpdateType, ValueType> {
lc_graph_name: "BinaryOperatorAggregate";
constructor(
operator: BinaryOperator<ValueType, UpdateType>,
initialValueFactory?: () => ValueType
);
fromCheckpoint(checkpoint?: ValueType): this;
update(values: UpdateType[]): boolean;
get(): ValueType;
checkpoint(): ValueType;
isAvailable(): boolean;
}
type BinaryOperator<ValueType, UpdateType> = (
a: ValueType,
b: UpdateType
) => ValueType;import { BinaryOperatorAggregate } from "@langchain/langgraph";
import { StateGraph, Annotation } from "@langchain/langgraph";
// Sum reducer
const State = Annotation.Root({
total: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
})
});
// Array concatenation reducer
const MessagesState = Annotation.Root({
messages: Annotation<string[]>({
reducer: (left, right) => left.concat(right),
default: () => []
})
});
// Object merge reducer
const ConfigState = Annotation.Root({
config: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
})
});Stores the value received in the step immediately preceding, then clears after. Useful for temporary state that shouldn't persist across steps.
class EphemeralValue<Value> extends BaseChannel<Value, Value, Value> {
lc_graph_name: "EphemeralValue";
constructor(guard?: boolean);
fromCheckpoint(checkpoint?: Value): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): Value;
isAvailable(): boolean;
}guard - If true (default), throws error if multiple values are received in one stepimport { EphemeralValue } from "@langchain/langgraph";
// Used internally for START/END nodes
const channels = {
__start__: new EphemeralValue<any>(),
__end__: new EphemeralValue<any>()
};A synchronization channel that waits until all named values are received before making the value available. Used for coordinating parallel node execution (fan-in pattern).
class NamedBarrierValue<Value> extends BaseChannel<
void,
Value,
Value[]
> {
lc_graph_name: "NamedBarrierValue";
constructor(names: Set<Value>);
fromCheckpoint(checkpoint?: Value[]): this;
update(values: Value[]): boolean;
get(): void;
checkpoint(): Value[];
consume(): boolean;
isAvailable(): boolean;
}import { NamedBarrierValue } from "@langchain/langgraph";
// Wait for both node1 and node2 to complete
const barrierChannel = new NamedBarrierValue(new Set(["node1", "node2"]));
// Used internally when you add edges from multiple nodes to one nodeA named barrier that only makes the value available after finish() is called. Used internally for deferred parallel execution.
class NamedBarrierValueAfterFinish<Value> extends BaseChannel<
void,
Value,
[Value[], boolean]
> {
lc_graph_name: "NamedBarrierValueAfterFinish";
constructor(names: Set<Value>);
fromCheckpoint(checkpoint?: [Value[], boolean]): this;
update(values: Value[]): boolean;
get(): void;
checkpoint(): [Value[], boolean];
consume(): boolean;
finish(): boolean;
isAvailable(): boolean;
}A dynamic synchronization channel that switches between "priming" and "waiting" states. In the priming state, it can't be read. When it receives a WaitForNames update, it switches to waiting state and collects named values until all are received.
class DynamicBarrierValue<Value> extends BaseChannel<
void,
Value | WaitForNames<Value>,
[Value[] | undefined, Value[]]
> {
lc_graph_name: "DynamicBarrierValue";
constructor();
fromCheckpoint(checkpoint?: [Value[] | undefined, Value[]]): this;
update(values: (Value | WaitForNames<Value>)[]): boolean;
get(): void;
checkpoint(): [Value[] | undefined, Value[]];
consume(): boolean;
isAvailable(): boolean;
}
interface WaitForNames<Value> {
__names: Value[];
}Used for map-reduce patterns where the number of parallel tasks is determined dynamically.
A dynamic barrier with an additional finished flag. Only makes the value available after both all names are received AND finish() is called.
class DynamicBarrierValueAfterFinish<Value> extends BaseChannel<
void,
Value | WaitForNames<Value>,
[Value[] | undefined, Value[], boolean]
> {
lc_graph_name: "DynamicBarrierValueAfterFinish";
constructor();
fromCheckpoint(checkpoint?: [Value[] | undefined, Value[], boolean]): this;
update(values: (Value | WaitForNames<Value>)[]): boolean;
get(): void;
checkpoint(): [Value[] | undefined, Value[], boolean];
consume(): boolean;
finish(): boolean;
isAvailable(): boolean;
}function isBaseChannel(obj: unknown): obj is BaseChannel;
function emptyChannels<Cc extends Record<string, BaseChannel>>(
channels: Cc,
checkpoint: ReadonlyCheckpoint
): Cc;
function createCheckpoint<ValueType>(
checkpoint: ReadonlyCheckpoint,
channels: Record<string, BaseChannel<ValueType>> | undefined,
step: number,
options?: { id?: string }
): Checkpoint;Choose the appropriate channel based on your update semantics:
const State = Annotation.Root({
items: Annotation<string[]>({
reducer: (left, right) => left.concat(right),
default: () => []
})
});const State = Annotation.Root({
sum: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
}),
max: Annotation<number>({
reducer: (a, b) => Math.max(a, b),
default: () => -Infinity
})
});const State = Annotation.Root({
metadata: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
})
});const State = Annotation.Root({
tags: Annotation<Set<string>>({
reducer: (a, b) => new Set([...a, ...b]),
default: () => new Set()
})
});interface Counter {
count: number;
lastUpdate: string;
}
const State = Annotation.Root({
counter: Annotation<Counter>({
reducer: (current, update) => ({
count: current.count + update.count,
lastUpdate: update.lastUpdate
}),
default: () => ({ count: 0, lastUpdate: new Date().toISOString() })
})
});