Low-level orchestration framework for building stateful, multi-actor applications with LLMs
Deep dive into state management and channel patterns.
Store last value, error on concurrent writes. Used by default for fields without a reducer.
class LastValue<Value> extends BaseChannel<Value, Value, Value> {
constructor(initialValueFactory?: () => Value);
fromCheckpoint(checkpoint?: Value): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): Value;
isAvailable(): boolean;
}When to use: When only one node should write per step. Throws error if multiple nodes write simultaneously.
const State = Annotation.Root({
count: Annotation<number> // Implicitly uses LastValue
});Stores the last value received but assumes all values are equal if multiple received.
class AnyValue<Value> extends BaseChannel<Value, Value, Value> {
constructor();
fromCheckpoint(checkpoint?: Value): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): Value;
isAvailable(): boolean;
}When to use: When multiple nodes write identical values without causing an error.
Apply reducer function for custom aggregation logic.
class BinaryOperatorAggregate<ValueType, UpdateType = ValueType> {
constructor(
operator: BinaryOperator<ValueType, UpdateType>,
initialValueFactory?: () => ValueType
);
fromCheckpoint(checkpoint?: ValueType): this;
update(values: UpdateType[]): boolean;
get(): ValueType;
checkpoint(): ValueType;
isAvailable(): boolean;
}
type BinaryOperator<ValueType, UpdateType> = (
a: ValueType,
b: UpdateType
) => ValueType;Common patterns:
// Sum
total: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
})
// Max
highScore: Annotation<number>({
reducer: (a, b) => Math.max(a, b),
default: () => -Infinity
})
// Array concatenation
items: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
// Object merging
metadata: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
})
// Set union
tags: Annotation<Set<string>>({
reducer: (a, b) => new Set([...a, ...b]),
default: () => new Set()
})A configurable PubSub Topic channel that accumulates messages.
class Topic<Value> extends BaseChannel<Array<Value>, Value | Value[]> {
constructor(fields?: {
unique?: boolean;
accumulate?: boolean;
});
fromCheckpoint(checkpoint?: [Value[], Value[]]): this;
update(values: Array<Value | Value[]>): boolean;
get(): Array<Value>;
checkpoint(): [Value[], Value[]];
isAvailable(): boolean;
}Parameters:
unique - If true, only unique values (reference equality) addedaccumulate - If false, channel cleared after each stepevents: new Topic<Event>({
unique: false, // Allow duplicates
accumulate: true // Keep across steps
})Stores value received in step immediately preceding, then clears.
class EphemeralValue<Value> extends BaseChannel<Value, Value, Value> {
constructor(guard?: boolean);
fromCheckpoint(checkpoint?: Value): this;
update(values: Value[]): boolean;
get(): Value;
checkpoint(): Value;
isAvailable(): boolean;
}When to use: Temporary state that shouldn't persist across steps. Used internally for START/END nodes.
Wait until all named values are received before making value available. Used for coordinating parallel node execution.
class NamedBarrierValue<Value> extends BaseChannel<void, Value, Value[]> {
constructor(names: Set<Value>);
fromCheckpoint(checkpoint?: Value[]): this;
update(values: Value[]): boolean;
get(): void;
checkpoint(): Value[];
consume(): boolean;
isAvailable(): boolean;
}When to use: Automatically used when multiple edges converge to one node.
// Wait for both node1 and node2 to complete
.addEdge("node1", "merge")
.addEdge("node2", "merge")
// NamedBarrierValue created automatically for "merge"Dynamic synchronization that switches between priming and waiting states.
class DynamicBarrierValue<Value> extends BaseChannel<
void,
Value | WaitForNames<Value>,
[Value[] | undefined, Value[]]
> {
constructor();
fromCheckpoint(checkpoint?: [Value[] | undefined, Value[]]): this;
update(values: (Value | WaitForNames<Value>)[]): boolean;
get(): void;
checkpoint(): [Value[] | undefined, Value[]];
consume(): boolean;
isAvailable(): boolean;
}
interface WaitForNames<Value> {
__names: Value[];
}When to use: Map-reduce patterns where number of parallel tasks determined dynamically (used with Send).
Only update if condition met:
score: Annotation<number>({
reducer: (current, update) => {
// Only increase score
return Math.max(current, update);
},
default: () => 0
})
// Only update if newer
timestamp: Annotation<Date>({
reducer: (current, update) => {
return update > current ? update : current;
},
default: () => new Date(0)
})Track history with timestamps:
history: Annotation<Array<{timestamp: string, value: any}>>({
reducer: (current, update) => {
const timestamped = update.map(v => ({
timestamp: new Date().toISOString(),
value: v
}));
return [...current, ...timestamped];
},
default: () => []
})Keep only recent values:
recentEvents: Annotation<Event[]>({
reducer: (current, update) => {
const combined = [...current, ...update];
// Keep last 10 events
return combined.slice(-10);
},
default: () => []
})Maintain priority queue:
tasks: Annotation<Task[]>({
reducer: (current, update) => {
const combined = [...current, ...update];
return combined.sort((a, b) => b.priority - a.priority);
},
default: () => []
})function messagesStateReducer(
left: Messages,
right: Messages
): BaseMessage[];
const addMessages: typeof messagesStateReducer; // Alias
type Messages =
| Array<BaseMessage | BaseMessageLike>
| BaseMessage
| BaseMessageLike;The messagesStateReducer intelligently merges messages:
RemoveMessage for deletionsREMOVE_ALL_MESSAGES constantimport { RemoveMessage } from "@langchain/core/messages";
const filterNode = (state: State) => {
// Remove system messages
const removals = state.messages
.filter(m => m._getType() === "system")
.map(m => new RemoveMessage({ id: m.id }));
return { messages: removals };
};const modifyNode = (state: State) => {
const lastMessage = state.messages[state.messages.length - 1];
// Update existing message by ID
return {
messages: [new AIMessage({
...lastMessage,
content: modifyContent(lastMessage.content),
id: lastMessage.id // Same ID = update in place
})]
};
};const REMOVE_ALL_MESSAGES: "__remove_all__";import { RemoveMessage, REMOVE_ALL_MESSAGES } from "@langchain/langgraph";
const clearMessages = (state: any) => {
return {
messages: [new RemoveMessage({ id: REMOVE_ALL_MESSAGES })]
};
};function pushMessage(
message: BaseMessage | BaseMessageLike,
options?: RunnableConfig & { stateKey?: string | null }
): BaseMessage;Manually push a message before node completes execution. Message must have an ID.
import { pushMessage } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";
const streamingNode = async (state: State) => {
// Push intermediate message
pushMessage(new AIMessage({
content: "Processing started",
id: "msg-1"
}));
await processData(state.data);
// Push completion message
pushMessage(new AIMessage({
content: "Processing complete",
id: "msg-2"
}));
return { done: true };
};
// Push without persisting to state
pushMessage(message, { stateKey: null });Choose the appropriate channel based on update semantics:
| Channel | Write Behavior | Use Case |
|---|---|---|
| LastValue | Single writer per step, strict | Default for simple fields |
| AnyValue | Multiple writers allowed | Identical values from multiple nodes |
| Topic | Accumulates all values | Event collection, message queues |
| BinaryOperatorAggregate | Custom reduction logic | Sum, concat, merge, custom aggregation |
| EphemeralValue | Cleared after each step | Temporary state, START/END |
| NamedBarrierValue | Synchronize named values | Fan-in, waiting for multiple nodes |
| DynamicBarrierValue | Dynamic map-reduce | Send-based parallel execution |
interface StateDefinition {
[key: string]: BaseChannel | (() => BaseChannel);
}
type StateType<SD extends StateDefinition> = {
[key in keyof SD]: ExtractValueType<SD[key]>;
};
type UpdateType<SD extends StateDefinition> = {
[key in keyof SD]?: ExtractUpdateType<SD[key]>;
};
type NodeType<SD extends StateDefinition> = RunnableLike<
StateType<SD>,
UpdateType<SD> | Partial<StateType<SD>>
>;function isBaseChannel(obj: unknown): obj is BaseChannel;
function emptyChannels<Cc extends Record<string, BaseChannel>>(
channels: Cc,
checkpoint: ReadonlyCheckpoint
): Cc;
function createCheckpoint<ValueType>(
checkpoint: ReadonlyCheckpoint,
channels: Record<string, BaseChannel<ValueType>> | undefined,
step: number,
options?: { id?: string }
): Checkpoint;import { StateGraph, Annotation, START, END } from "@langchain/langgraph";
// Rich state with multiple reducer types
const ComplexState = Annotation.Root({
// Simple field (LastValue)
currentStep: Annotation<string>,
// Array accumulation
logs: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
}),
// Numeric aggregation
totalScore: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
}),
maxScore: Annotation<number>({
reducer: (a, b) => Math.max(a, b),
default: () => -Infinity
}),
// Object merging
metadata: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
}),
// Set operations
tags: Annotation<Set<string>>({
reducer: (a, b) => new Set([...a, ...b]),
default: () => new Set()
}),
// Conditional update (only increase)
version: Annotation<number>({
reducer: (current, update) => Math.max(current, update),
default: () => 1
}),
// Windowed history (last 5 items)
recentEvents: Annotation<any[]>({
reducer: (current, update) => {
const combined = [...current, ...update];
return combined.slice(-5);
},
default: () => []
})
});
const node1 = (state: typeof ComplexState.State) => ({
currentStep: "node1",
logs: ["Node 1 executed"],
totalScore: 10,
maxScore: 10,
metadata: { source: "node1", timestamp: Date.now() },
tags: new Set(["processed"]),
version: 2,
recentEvents: [{ type: "node1", data: "..." }]
});
const node2 = (state: typeof ComplexState.State) => ({
currentStep: "node2",
logs: ["Node 2 executed"],
totalScore: 15,
maxScore: 8, // Won't update (max is 10)
metadata: { output: "complete" },
tags: new Set(["validated"]),
version: 1, // Won't update (current is 2)
recentEvents: [{ type: "node2", data: "..." }]
});
const graph = new StateGraph(ComplexState)
.addNode("node1", node1)
.addNode("node2", node2)
.addEdge(START, "node1")
.addEdge(START, "node2") // Parallel execution
.addEdge("node1", END)
.addEdge("node2", END)
.compile();
const result = await graph.invoke({});
// {
// currentStep: "node1" or "node2" (last to complete),
// logs: ["Node 1 executed", "Node 2 executed"],
// totalScore: 25,
// maxScore: 10,
// metadata: { source: "node1", timestamp: ..., output: "complete" },
// tags: Set(["processed", "validated"]),
// version: 2,
// recentEvents: [{ type: "node1", ... }, { type: "node2", ... }]
// }This guide provides comprehensive coverage of state management and channels in LangGraph.