Low-level orchestration framework for building stateful, multi-actor applications with LLMs
Simplified workflow definition using task() and entrypoint() decorators with built-in retry, caching, and state management. The Functional API provides an alternative to StateGraph for simpler workflows.
Define reusable task functions with retry and cache support. Tasks can only be called from within an entrypoint or StateGraph node.
function task<ArgsT extends unknown[], OutputT>(
optionsOrName: TaskOptions | string,
func: (...args: ArgsT) => Promise<OutputT>
): (...args: ArgsT) => Promise<OutputT>;
interface TaskOptions {
name: string;
retry?: RetryPolicy;
cachePolicy?: CachePolicy;
}import { task, entrypoint } from "@langchain/langgraph";
const addOne = task("add", async (a: number) => a + 1);
const workflow = entrypoint("example", async (numbers: number[]) => {
const promises = numbers.map(n => addOne(n));
const results = await Promise.all(promises);
return results;
});
await workflow.invoke([1, 2, 3]); // Returns [2, 3, 4]const fetchData = task({
name: "fetch",
retry: {
maxAttempts: 3,
initialInterval: 1000,
backoffFactor: 2
}
}, async (url: string) => {
const response = await fetch(url);
return response.json();
});
const workflow = entrypoint("dataFetcher", async (urls: string[]) => {
const results = await Promise.all(urls.map(url => fetchData(url)));
return results;
});const expensiveComputation = task({
name: "compute",
cachePolicy: {
keyFunc: (input) => JSON.stringify(input)
}
}, async (data: any) => {
// Expensive operation
return processData(data);
});const processItem = task("process", async (item: string) => {
await delay(100);
return `Processed: ${item}`;
});
const workflow = entrypoint("parallel", async (items: string[]) => {
// All tasks run in parallel
const results = await Promise.all(items.map(i => processItem(i)));
return results;
});Define workflow entrypoints with state persistence and checkpointing.
function entrypoint<InputT, OutputT>(
optionsOrName: EntrypointOptions | string,
func: (input: InputT) => Promise<OutputT>
): Pregel<...>;
interface EntrypointOptions {
name: string;
checkpointer?: BaseCheckpointSaver;
store?: BaseStore;
cache?: BaseCache;
}import { entrypoint, task } from "@langchain/langgraph";
const processStep = task("step", async (value: number) => value * 2);
const workflow = entrypoint("myWorkflow", async (input: number) => {
const step1 = await processStep(input);
const step2 = await processStep(step1);
return step2;
});
const result = await workflow.invoke(5); // Returns 20import { MemorySaver } from "@langchain/langgraph-checkpoint";
const checkpointer = new MemorySaver();
const workflow = entrypoint({
name: "statefulWorkflow",
checkpointer
}, async (input: string) => {
// Workflow with automatic state persistence
return processInput(input);
});
await workflow.invoke("data", {
configurable: { thread_id: "thread-1" }
});import { InMemoryStore } from "@langchain/langgraph-checkpoint";
const store = new InMemoryStore();
const workflow = entrypoint({
name: "workflowWithMemory",
store
}, async (input: any) => {
// Access store in tasks
return await processWithMemory(input);
});Return value to caller and separate state to persist.
interface EntrypointFunction {
final<ValueT, SaveT>(options: {
value: ValueT;
save?: SaveT;
}): EntrypointFinal<ValueT, SaveT>;
}
interface EntrypointFinal<ValueT, SaveT> {
value: ValueT;
save?: SaveT;
}const workflow = entrypoint({
name: "counter",
checkpointer: new MemorySaver()
}, async (input: { increment: number }) => {
const prevState = getPreviousState<{ count: number }>();
const newCount = (prevState?.count || 0) + input.increment;
return entrypoint.final({
value: `Count is now ${newCount}`,
save: { count: newCount }
});
});
// First call
const result1 = await workflow.invoke(
{ increment: 5 },
{ configurable: { thread_id: "1" } }
);
console.log(result1); // "Count is now 5"
// Second call - continues from saved state
const result2 = await workflow.invoke(
{ increment: 3 },
{ configurable: { thread_id: "1" } }
);
console.log(result2); // "Count is now 8"Functions for accessing execution context from within tasks and entrypoints.
Get previous checkpoint state within a task or entrypoint.
function getPreviousState<StateT>(): StateT | undefined;import { getPreviousState, entrypoint } from "@langchain/langgraph";
interface State {
counter: number;
history: string[];
}
const workflow = entrypoint({
name: "stateful",
checkpointer: new MemorySaver()
}, async (input: string) => {
const prev = getPreviousState<State>();
const newState = {
counter: (prev?.counter || 0) + 1,
history: [...(prev?.history || []), input]
};
return entrypoint.final({
value: `Processed ${newState.counter} items`,
save: newState
});
});Interrupt execution for human-in-the-loop.
function interrupt<I, R>(value: I): R;import { interrupt, entrypoint } from "@langchain/langgraph";
const workflow = entrypoint({
name: "humanInLoop",
checkpointer: new MemorySaver()
}, async (input: any) => {
const reviewData = interrupt({
question: "Approve this action?",
data: input
});
if (reviewData.approved) {
return { status: "approved" };
} else {
return { status: "rejected" };
}
});
// First invocation - will interrupt
await workflow.invoke(data, { configurable: { thread_id: "1" } });
// Resume with human input
await workflow.invoke(
new Command({ resume: { approved: true } }),
{ configurable: { thread_id: "1" } }
);Write custom data to stream output.
function writer<T>(chunk: T): void;import { writer, entrypoint } from "@langchain/langgraph";
const workflow = entrypoint("streaming", async (input: any) => {
writer({ progress: 0 });
const result1 = await processStep1(input);
writer({ progress: 0.5, step: "step1" });
const result2 = await processStep2(result1);
writer({ progress: 1.0, step: "step2" });
return result2;
});
// Stream custom data
for await (const data of await workflow.stream(input, {
streamMode: "custom"
})) {
console.log(data); // { progress: 0 }, { progress: 0.5, ... }, etc.
}Get store from current execution context.
function getStore(): BaseStore | undefined;import { getStore, task } from "@langchain/langgraph";
const savePreferences = task("save", async (userId: string, prefs: any) => {
const store = getStore();
if (store) {
await store.put(["users", userId], "preferences", prefs);
}
});
const loadPreferences = task("load", async (userId: string) => {
const store = getStore();
if (store) {
const item = await store.get(["users", userId], "preferences");
return item?.value;
}
return null;
});Get writer function from current execution context if custom stream mode is enabled.
function getWriter<T = any>(): ((chunk: T) => void) | undefined;import { getWriter, task } from "@langchain/langgraph";
const streamingTask = task("process", async (input: any) => {
const writer = getWriter();
if (writer) {
writer({ status: "processing", progress: 0.5 });
}
return processData(input);
});Get runtime config from current context.
function getConfig(): LangGraphRunnableConfig | undefined;import { getConfig, task } from "@langchain/langgraph";
const contextAwareTask = task("aware", async (input: any) => {
const config = getConfig();
const userId = config?.configurable?.user_id;
if (userId) {
return processForUser(input, userId);
}
return processAnonymous(input);
});Get input of currently executing task.
function getCurrentTaskInput(): unknown;import { getCurrentTaskInput, task } from "@langchain/langgraph";
const processTask = task("process", async (input: any) => {
const taskInput = getCurrentTaskInput();
console.log("Current task input:", taskInput);
return processInput(input);
});Manually push a message to a message stream. This is useful when you need to push a manually created message before the node has finished executing.
function pushMessage(
message: BaseMessage | BaseMessageLike,
options?: RunnableConfig & {
stateKey?: string | null;
}
): BaseMessage;message - The message to push. The message must have an ID set, otherwise an error will be thrown.options - Configuration options
stateKey - The key of the state to push the message to. Set to null to avoid persisting. Default: "messages"import { pushMessage, entrypoint } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";
const workflow = entrypoint("streaming", async (input: any) => {
// Push a message during execution
const msg = new AIMessage({
content: "Processing started",
id: "msg-1"
});
pushMessage(msg);
// Continue processing
const result = await processData(input);
// Push completion message
pushMessage(new AIMessage({
content: `Processing complete: ${result}`,
id: "msg-2"
}));
return result;
});
// Messages are automatically persisted to state after node execution// Push message without persisting to state
pushMessage(message, { stateKey: null });import { task, entrypoint } from "@langchain/langgraph";
const processChunk = task("process", async (chunk: string) => {
await delay(100);
return chunk.length;
});
const combineResults = task("combine", async (results: number[]) => {
return results.reduce((a, b) => a + b, 0);
});
const mapReduce = entrypoint("mapReduce", async (data: string[]) => {
// Map phase - parallel processing
const chunkResults = await Promise.all(
data.map(chunk => processChunk(chunk))
);
// Reduce phase
const finalResult = await combineResults(chunkResults);
return finalResult;
});
const result = await mapReduce.invoke(["hello", "world", "test"]);
console.log(result); // 14import { task, entrypoint, getPreviousState } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";
interface WorkflowState {
processedItems: string[];
totalProcessed: number;
}
const processItem = task("process", async (item: string) => {
return `Processed: ${item}`;
});
const workflow = entrypoint({
name: "statefulProcessor",
checkpointer: new MemorySaver()
}, async (items: string[]) => {
const prevState = getPreviousState<WorkflowState>();
const results = await Promise.all(items.map(i => processItem(i)));
const newState: WorkflowState = {
processedItems: [...(prevState?.processedItems || []), ...results],
totalProcessed: (prevState?.totalProcessed || 0) + results.length
};
return entrypoint.final({
value: {
results,
totalProcessed: newState.totalProcessed
},
save: newState
});
});
// First batch
await workflow.invoke(["a", "b"], {
configurable: { thread_id: "batch-1" }
});
// Second batch - continues from saved state
const result = await workflow.invoke(["c", "d"], {
configurable: { thread_id: "batch-1" }
});
console.log(result.totalProcessed); // 4import { task, entrypoint } from "@langchain/langgraph";
const unreliableOperation = task({
name: "unreliable",
retry: {
maxAttempts: 5,
initialInterval: 500,
backoffFactor: 2,
jitter: true,
retryOn: (error) => error.message.includes("transient")
}
}, async (input: any) => {
// Operation that might fail transiently
const result = await externalAPI(input);
return result;
});
const workflow = entrypoint("resilient", async (input: any) => {
try {
const result = await unreliableOperation(input);
return { success: true, result };
} catch (error) {
return { success: false, error: error.message };
}
});import { task, entrypoint, writer } from "@langchain/langgraph";
const validateInput = task("validate", async (input: any) => {
if (!input.data) throw new Error("Invalid input");
return input;
});
const enrichData = task("enrich", async (input: any) => {
return {
...input,
enriched: true,
timestamp: new Date().toISOString()
};
});
const transform = task("transform", async (input: any) => {
return {
result: processData(input.data),
metadata: { enriched: input.enriched, timestamp: input.timestamp }
};
});
const workflow = entrypoint("pipeline", async (input: any) => {
writer({ stage: "validation", progress: 0.1 });
const validated = await validateInput(input);
writer({ stage: "enrichment", progress: 0.4 });
const enriched = await enrichData(validated);
writer({ stage: "transformation", progress: 0.7 });
const transformed = await transform(enriched);
writer({ stage: "complete", progress: 1.0 });
return transformed;
});
// Monitor progress
for await (const event of await workflow.stream(input, {
streamMode: "custom"
})) {
console.log(`${event.stage}: ${event.progress * 100}%`);
}