Low-level orchestration framework for building stateful, multi-actor applications with LLMs
Dynamic routing and state updates using Send for targeted node invocation and Command for combining state updates with routing directives. These primitives enable sophisticated control flow patterns without explicit conditional edges.
A message or packet to send to a specific node in the graph. Send enables dynamic invocation of nodes with custom state, enabling patterns like map-reduce where the same node is invoked multiple times in parallel with different inputs.
class Send<Node extends string = string, Args = any> {
lg_name: "Send";
node: Node;
args: Args;
constructor(node: Node, args: Args);
toJSON(): {
lg_name: "Send";
node: Node;
args: Args;
};
}
function _isSend(x: unknown): x is Send;import { Annotation, Send, StateGraph } from "@langchain/langgraph";
const ChainState = Annotation.Root({
subjects: Annotation<string[]>,
jokes: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
});
const continueToJokes = (state: typeof ChainState.State) => {
// Send to generate_joke once per subject
return state.subjects.map((subject) => {
return new Send("generate_joke", { subjects: [subject] });
});
};
const graph = new StateGraph(ChainState)
.addNode("generate_joke", (state) => ({
jokes: [`Joke about ${state.subjects[0]}`]
}))
.addConditionalEdges("__start__", continueToJokes)
.addEdge("generate_joke", "__end__")
.compile();
const result = await graph.invoke({ subjects: ["cats", "dogs"] });
// { subjects: ["cats", "dogs"], jokes: ["Joke about cats", "Joke about dogs"] }const State = Annotation.Root({
items: Annotation<string[]>,
results: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => []
})
});
const fanOut = (state: typeof State.State) => {
// Dynamically create parallel tasks
return state.items.map(item =>
new Send("process", { items: [item], results: [] })
);
};
const graph = new StateGraph(State)
.addNode("process", (state) => ({
results: [`Processed: ${state.items[0]}`]
}))
.addConditionalEdges("__start__", fanOut)
.addEdge("process", "__end__")
.compile();const routeLogic = (state: any) => {
const sends: Send[] = [];
if (state.needsReview) {
sends.push(new Send("human_review", state));
}
if (state.needsValidation) {
sends.push(new Send("validate", state));
}
return sends.length > 0 ? sends : "__end__";
};
const graph = new StateGraph(State)
.addNode("human_review", reviewNode)
.addNode("validate", validateNode)
.addConditionalEdges("__start__", routeLogic)
.compile();Combines state updates with routing directives. Command allows nodes to return both state updates AND routing information, eliminating the need for separate conditional edges.
class Command<
Resume = unknown,
Update extends Record<string, unknown> = Record<string, unknown>,
Nodes extends string = string
> {
readonly lg_name: "Command";
lc_direct_tool_output: true;
resume?: Resume;
graph?: string;
update?: Update | [string, unknown][];
goto?: Nodes | Send<Nodes> | (Nodes | Send<Nodes>)[];
static PARENT: "__parent__";
constructor(args: {
resume?: Resume;
graph?: string;
update?: Update | [string, unknown][];
goto?: Nodes | Send<Nodes> | (Nodes | Send<Nodes>)[];
});
toJSON(): {
lg_name: "Command";
update?: Update | [string, unknown][];
resume?: Resume;
goto?: Nodes | Send<Nodes> | (Nodes | Send<Nodes>)[];
};
}
function isCommand(x: unknown): x is Command;update - State updates to apply (written as if the node returned this value)goto - Routing directive (node name, array of nodes, Send object, or array of Sends)graph - Target graph (current graph, specific graph name, or Command.PARENT for parent graph)resume - Resume value for use with interrupt()import { Annotation, Command } from "@langchain/langgraph";
const StateAnnotation = Annotation.Root({
foo: Annotation<string>
});
const nodeA = (state: typeof StateAnnotation.State) => {
console.log("Called A");
const goto = Math.random() > 0.5 ? "nodeB" : "nodeC";
return new Command({
update: { foo: "a" },
goto
});
};
const nodeB = (state: typeof StateAnnotation.State) => ({
foo: state.foo + "|b"
});
const nodeC = (state: typeof StateAnnotation.State) => ({
foo: state.foo + "|c"
});
const graph = new StateGraph(StateAnnotation)
.addNode("nodeA", nodeA, { ends: ["nodeB", "nodeC"] })
.addNode("nodeB", nodeB)
.addNode("nodeC", nodeC)
.addEdge("__start__", "nodeA")
.compile();
await graph.invoke({ foo: "" });
// Randomly: { foo: "a|b" } or { foo: "a|c" }const router = (state: any) => {
return new Command({
update: { processed: true },
goto: ["validate", "log"] // Go to both nodes
});
};const dynamicRouter = (state: any) => {
const sends = state.items.map(item =>
new Send("process", { item })
);
return new Command({
update: { started: true },
goto: sends
});
};// In a subgraph node, route to parent graph
const subgraphNode = (state: any) => {
if (state.needsEscalation) {
return new Command({
update: { escalated: true },
graph: Command.PARENT,
goto: "parent_node"
});
}
return { processed: true };
};const State = Annotation.Root({
value: Annotation<number>,
path: Annotation<string>
});
const decisionNode = (state: typeof State.State) => {
let nextNode: string;
let pathTaken: string;
if (state.value > 100) {
nextNode = "highValue";
pathTaken = "high";
} else if (state.value > 50) {
nextNode = "mediumValue";
pathTaken = "medium";
} else {
nextNode = "lowValue";
pathTaken = "low";
}
return new Command({
update: { path: pathTaken },
goto: nextNode
});
};
const graph = new StateGraph(State)
.addNode("decide", decisionNode, {
ends: ["highValue", "mediumValue", "lowValue"]
})
.addNode("highValue", (s) => ({ value: s.value * 2 }))
.addNode("mediumValue", (s) => ({ value: s.value * 1.5 }))
.addNode("lowValue", (s) => ({ value: s.value + 10 }))
.addEdge("__start__", "decide")
.compile();const State = Annotation.Root({
count: Annotation<number>,
iterations: Annotation<number>
});
const loop = (state: typeof State.State) => {
if (state.iterations < 5) {
return new Command({
update: {
count: state.count + 1,
iterations: state.iterations + 1
},
goto: "loop" // Loop back to self
});
}
return new Command({
update: { count: state.count },
goto: "__end__"
});
};
const graph = new StateGraph(State)
.addNode("loop", loop, { ends: ["loop"] })
.addEdge("__start__", "loop")
.compile();Function to interrupt node execution for human-in-the-loop workflows.
function interrupt<I, R>(value: I): R;
type Interrupt<Value = any> = {
id?: string;
value?: Value;
};
function isInterrupted<Value = unknown>(
values: unknown
): values is { [INTERRUPT]: Interrupt<Value>[] };
const INTERRUPT: "__interrupt__";import { interrupt } from "@langchain/langgraph";
const reviewNode = async (state: any) => {
// Request human review
const reviewData = interrupt({
question: "Approve this action?",
context: state
});
if (reviewData.approved) {
return { status: "approved" };
} else {
return { status: "rejected" };
}
};
const graph = new StateGraph(State)
.addNode("review", reviewNode)
.compile({ checkpointer: new MemorySaver() });
// First call - will interrupt
const result = await graph.invoke(input, {
configurable: { thread_id: "1" }
});
if (isInterrupted(result)) {
console.log("Waiting for human input");
console.log(result[INTERRUPT][0].value); // { question: "Approve...", context: {...} }
// Resume with human input
const finalResult = await graph.invoke(
new Command({
resume: { approved: true }
}),
{ configurable: { thread_id: "1" } }
);
}const multiStepNode = async (state: any) => {
// First review point
const review1 = interrupt({
step: 1,
data: state.data1
});
// Process with review1 result
const intermediate = processStep1(review1);
// Second review point
const review2 = interrupt({
step: 2,
data: intermediate
});
return { result: finalProcess(review2) };
};Traditional conditional edges using path functions.
interface BranchOptions<IO, N extends string> {
source: N;
path: RunnableLike<IO, BranchPathReturnValue>;
pathMap?: Record<string, N | typeof END> | (N | typeof END)[];
}
type BranchPathReturnValue =
| string
| Send
| (string | Send)[]
| Promise<string | Send | (string | Send)[]>;const routeByValue = (state: any): string => {
return state.value > 50 ? "high" : "low";
};
graph.addConditionalEdges(
"check",
routeByValue,
{
high: "processHigh",
low: "processLow"
}
);const dynamicRoute = (state: any): Send[] => {
return state.items.map(item =>
new Send("process", { item })
);
};
graph.addConditionalEdges("start", dynamicRoute);const flexibleRoute = (state: any): string | Send | (string | Send)[] => {
if (state.mode === "batch") {
return state.items.map(item => new Send("process", { item }));
} else if (state.mode === "single") {
return "processSingle";
} else {
return ["validate", "log"];
}
};const START: "__start__";
const END: "__end__";
const INTERRUPT: "__interrupt__";import { Annotation, Command, Send } from "@langchain/langgraph";
const OrchestratorState = Annotation.Root({
task: Annotation<string>,
subtasks: Annotation<string[]>,
results: Annotation<Record<string, any>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({})
}),
status: Annotation<string>
});
const orchestrator = (state: typeof OrchestratorState.State) => {
// Break down task into subtasks
const subtasks = analyzeTask(state.task);
// Send to worker agents in parallel
const sends = subtasks.map((subtask, i) =>
new Send("worker", {
task: subtask,
id: `worker_${i}`
})
);
return new Command({
update: {
subtasks: subtasks,
status: "processing"
},
goto: sends
});
};
const worker = (state: any) => {
const result = processSubtask(state.task);
return new Command({
update: {
results: { [state.id]: result }
},
goto: "aggregator"
});
};
const aggregator = (state: typeof OrchestratorState.State) => {
if (Object.keys(state.results).length < state.subtasks.length) {
// Still waiting for more results
return { status: "waiting" };
}
const finalResult = combineResults(state.results);
if (needsReview(finalResult)) {
return new Command({
update: { status: "review_needed" },
goto: "human_review"
});
}
return new Command({
update: { status: "completed" },
goto: "__end__"
});
};
const graph = new StateGraph(OrchestratorState)
.addNode("orchestrator", orchestrator, {
ends: ["worker"]
})
.addNode("worker", worker, {
ends: ["aggregator"]
})
.addNode("aggregator", aggregator, {
ends: ["human_review"]
})
.addNode("human_review", humanReviewNode)
.addEdge("__start__", "orchestrator")
.addEdge("human_review", "__end__")
.compile();