CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langchain--langgraph

Low-level orchestration framework for building stateful, multi-actor applications with LLMs

Overview
Eval results
Files

functional-api.mddocs/api/

Functional API

Simplified workflow definition using task() and entrypoint() decorators with built-in retry, caching, and state management. The Functional API provides an alternative to StateGraph for simpler workflows.

task()

Define reusable task functions with retry and cache support. Tasks can only be called from within an entrypoint or StateGraph node.

function task<ArgsT extends unknown[], OutputT>(
  optionsOrName: TaskOptions | string,
  func: (...args: ArgsT) => Promise<OutputT>
): (...args: ArgsT) => Promise<OutputT>;

interface TaskOptions {
  name: string;
  retry?: RetryPolicy;
  cachePolicy?: CachePolicy;
}

Usage Examples

Basic Task

import { task, entrypoint } from "@langchain/langgraph";

const addOne = task("add", async (a: number) => a + 1);

const workflow = entrypoint("example", async (numbers: number[]) => {
  const promises = numbers.map(n => addOne(n));
  const results = await Promise.all(promises);
  return results;
});

await workflow.invoke([1, 2, 3]); // Returns [2, 3, 4]

Task with Retry

const fetchData = task({
  name: "fetch",
  retry: {
    maxAttempts: 3,
    initialInterval: 1000,
    backoffFactor: 2
  }
}, async (url: string) => {
  const response = await fetch(url);
  return response.json();
});

const workflow = entrypoint("dataFetcher", async (urls: string[]) => {
  const results = await Promise.all(urls.map(url => fetchData(url)));
  return results;
});

Task with Caching

const expensiveComputation = task({
  name: "compute",
  cachePolicy: {
    keyFunc: (input) => JSON.stringify(input)
  }
}, async (data: any) => {
  // Expensive operation
  return processData(data);
});

Parallel Task Execution

const processItem = task("process", async (item: string) => {
  await delay(100);
  return `Processed: ${item}`;
});

const workflow = entrypoint("parallel", async (items: string[]) => {
  // All tasks run in parallel
  const results = await Promise.all(items.map(i => processItem(i)));
  return results;
});

entrypoint()

Define workflow entrypoints with state persistence and checkpointing.

function entrypoint<InputT, OutputT>(
  optionsOrName: EntrypointOptions | string,
  func: (input: InputT) => Promise<OutputT>
): Pregel<...>;

interface EntrypointOptions {
  name: string;
  checkpointer?: BaseCheckpointSaver;
  store?: BaseStore;
  cache?: BaseCache;
}

Usage Examples

Basic Entrypoint

import { entrypoint, task } from "@langchain/langgraph";

const processStep = task("step", async (value: number) => value * 2);

const workflow = entrypoint("myWorkflow", async (input: number) => {
  const step1 = await processStep(input);
  const step2 = await processStep(step1);
  return step2;
});

const result = await workflow.invoke(5); // Returns 20

Entrypoint with Checkpointing

import { MemorySaver } from "@langchain/langgraph-checkpoint";

const checkpointer = new MemorySaver();

const workflow = entrypoint({
  name: "statefulWorkflow",
  checkpointer
}, async (input: string) => {
  // Workflow with automatic state persistence
  return processInput(input);
});

await workflow.invoke("data", {
  configurable: { thread_id: "thread-1" }
});

Entrypoint with Store

import { InMemoryStore } from "@langchain/langgraph-checkpoint";

const store = new InMemoryStore();

const workflow = entrypoint({
  name: "workflowWithMemory",
  store
}, async (input: any) => {
  // Access store in tasks
  return await processWithMemory(input);
});

entrypoint.final()

Return value to caller and separate state to persist.

interface EntrypointFunction {
  final<ValueT, SaveT>(options: {
    value: ValueT;
    save?: SaveT;
  }): EntrypointFinal<ValueT, SaveT>;
}

interface EntrypointFinal<ValueT, SaveT> {
  value: ValueT;
  save?: SaveT;
}

Usage Examples

const workflow = entrypoint({
  name: "counter",
  checkpointer: new MemorySaver()
}, async (input: { increment: number }) => {
  const prevState = getPreviousState<{ count: number }>();
  const newCount = (prevState?.count || 0) + input.increment;

  return entrypoint.final({
    value: `Count is now ${newCount}`,
    save: { count: newCount }
  });
});

// First call
const result1 = await workflow.invoke(
  { increment: 5 },
  { configurable: { thread_id: "1" } }
);
console.log(result1); // "Count is now 5"

// Second call - continues from saved state
const result2 = await workflow.invoke(
  { increment: 3 },
  { configurable: { thread_id: "1" } }
);
console.log(result2); // "Count is now 8"

Runtime Utilities

Functions for accessing execution context from within tasks and entrypoints.

getPreviousState()

Get previous checkpoint state within a task or entrypoint.

function getPreviousState<StateT>(): StateT | undefined;

Usage

import { getPreviousState, entrypoint } from "@langchain/langgraph";

interface State {
  counter: number;
  history: string[];
}

const workflow = entrypoint({
  name: "stateful",
  checkpointer: new MemorySaver()
}, async (input: string) => {
  const prev = getPreviousState<State>();

  const newState = {
    counter: (prev?.counter || 0) + 1,
    history: [...(prev?.history || []), input]
  };

  return entrypoint.final({
    value: `Processed ${newState.counter} items`,
    save: newState
  });
});

interrupt()

Interrupt execution for human-in-the-loop.

function interrupt<I, R>(value: I): R;

Usage

import { interrupt, entrypoint } from "@langchain/langgraph";

const workflow = entrypoint({
  name: "humanInLoop",
  checkpointer: new MemorySaver()
}, async (input: any) => {
  const reviewData = interrupt({
    question: "Approve this action?",
    data: input
  });

  if (reviewData.approved) {
    return { status: "approved" };
  } else {
    return { status: "rejected" };
  }
});

// First invocation - will interrupt
await workflow.invoke(data, { configurable: { thread_id: "1" } });

// Resume with human input
await workflow.invoke(
  new Command({ resume: { approved: true } }),
  { configurable: { thread_id: "1" } }
);

writer()

Write custom data to stream output.

function writer<T>(chunk: T): void;

Usage

import { writer, entrypoint } from "@langchain/langgraph";

const workflow = entrypoint("streaming", async (input: any) => {
  writer({ progress: 0 });

  const result1 = await processStep1(input);
  writer({ progress: 0.5, step: "step1" });

  const result2 = await processStep2(result1);
  writer({ progress: 1.0, step: "step2" });

  return result2;
});

// Stream custom data
for await (const data of await workflow.stream(input, {
  streamMode: "custom"
})) {
  console.log(data); // { progress: 0 }, { progress: 0.5, ... }, etc.
}

getStore()

Get store from current execution context.

function getStore(): BaseStore | undefined;

Usage

import { getStore, task } from "@langchain/langgraph";

const savePreferences = task("save", async (userId: string, prefs: any) => {
  const store = getStore();
  if (store) {
    await store.put(["users", userId], "preferences", prefs);
  }
});

const loadPreferences = task("load", async (userId: string) => {
  const store = getStore();
  if (store) {
    const item = await store.get(["users", userId], "preferences");
    return item?.value;
  }
  return null;
});

getWriter()

Get writer function from current execution context if custom stream mode is enabled.

function getWriter<T = any>(): ((chunk: T) => void) | undefined;

Usage

import { getWriter, task } from "@langchain/langgraph";

const streamingTask = task("process", async (input: any) => {
  const writer = getWriter();
  if (writer) {
    writer({ status: "processing", progress: 0.5 });
  }
  return processData(input);
});

getConfig()

Get runtime config from current context.

function getConfig(): LangGraphRunnableConfig | undefined;

Usage

import { getConfig, task } from "@langchain/langgraph";

const contextAwareTask = task("aware", async (input: any) => {
  const config = getConfig();
  const userId = config?.configurable?.user_id;

  if (userId) {
    return processForUser(input, userId);
  }
  return processAnonymous(input);
});

getCurrentTaskInput()

Get input of currently executing task.

function getCurrentTaskInput(): unknown;

Usage

import { getCurrentTaskInput, task } from "@langchain/langgraph";

const processTask = task("process", async (input: any) => {
  const taskInput = getCurrentTaskInput();
  console.log("Current task input:", taskInput);
  return processInput(input);
});

pushMessage()

Manually push a message to a message stream. This is useful when you need to push a manually created message before the node has finished executing.

function pushMessage(
  message: BaseMessage | BaseMessageLike,
  options?: RunnableConfig & {
    stateKey?: string | null;
  }
): BaseMessage;

Parameters

  • message - The message to push. The message must have an ID set, otherwise an error will be thrown.
  • options - Configuration options
    • stateKey - The key of the state to push the message to. Set to null to avoid persisting. Default: "messages"

Usage

import { pushMessage, entrypoint } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";

const workflow = entrypoint("streaming", async (input: any) => {
  // Push a message during execution
  const msg = new AIMessage({
    content: "Processing started",
    id: "msg-1"
  });

  pushMessage(msg);

  // Continue processing
  const result = await processData(input);

  // Push completion message
  pushMessage(new AIMessage({
    content: `Processing complete: ${result}`,
    id: "msg-2"
  }));

  return result;
});

// Messages are automatically persisted to state after node execution

Streaming Without Persistence

// Push message without persisting to state
pushMessage(message, { stateKey: null });

Complete Examples

Map-Reduce with Tasks

import { task, entrypoint } from "@langchain/langgraph";

const processChunk = task("process", async (chunk: string) => {
  await delay(100);
  return chunk.length;
});

const combineResults = task("combine", async (results: number[]) => {
  return results.reduce((a, b) => a + b, 0);
});

const mapReduce = entrypoint("mapReduce", async (data: string[]) => {
  // Map phase - parallel processing
  const chunkResults = await Promise.all(
    data.map(chunk => processChunk(chunk))
  );

  // Reduce phase
  const finalResult = await combineResults(chunkResults);

  return finalResult;
});

const result = await mapReduce.invoke(["hello", "world", "test"]);
console.log(result); // 14

Stateful Workflow with Persistence

import { task, entrypoint, getPreviousState } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";

interface WorkflowState {
  processedItems: string[];
  totalProcessed: number;
}

const processItem = task("process", async (item: string) => {
  return `Processed: ${item}`;
});

const workflow = entrypoint({
  name: "statefulProcessor",
  checkpointer: new MemorySaver()
}, async (items: string[]) => {
  const prevState = getPreviousState<WorkflowState>();

  const results = await Promise.all(items.map(i => processItem(i)));

  const newState: WorkflowState = {
    processedItems: [...(prevState?.processedItems || []), ...results],
    totalProcessed: (prevState?.totalProcessed || 0) + results.length
  };

  return entrypoint.final({
    value: {
      results,
      totalProcessed: newState.totalProcessed
    },
    save: newState
  });
});

// First batch
await workflow.invoke(["a", "b"], {
  configurable: { thread_id: "batch-1" }
});

// Second batch - continues from saved state
const result = await workflow.invoke(["c", "d"], {
  configurable: { thread_id: "batch-1" }
});
console.log(result.totalProcessed); // 4

Error Handling with Retry

import { task, entrypoint } from "@langchain/langgraph";

const unreliableOperation = task({
  name: "unreliable",
  retry: {
    maxAttempts: 5,
    initialInterval: 500,
    backoffFactor: 2,
    jitter: true,
    retryOn: (error) => error.message.includes("transient")
  }
}, async (input: any) => {
  // Operation that might fail transiently
  const result = await externalAPI(input);
  return result;
});

const workflow = entrypoint("resilient", async (input: any) => {
  try {
    const result = await unreliableOperation(input);
    return { success: true, result };
  } catch (error) {
    return { success: false, error: error.message };
  }
});

Multi-Step Workflow

import { task, entrypoint, writer } from "@langchain/langgraph";

const validateInput = task("validate", async (input: any) => {
  if (!input.data) throw new Error("Invalid input");
  return input;
});

const enrichData = task("enrich", async (input: any) => {
  return {
    ...input,
    enriched: true,
    timestamp: new Date().toISOString()
  };
});

const transform = task("transform", async (input: any) => {
  return {
    result: processData(input.data),
    metadata: { enriched: input.enriched, timestamp: input.timestamp }
  };
});

const workflow = entrypoint("pipeline", async (input: any) => {
  writer({ stage: "validation", progress: 0.1 });
  const validated = await validateInput(input);

  writer({ stage: "enrichment", progress: 0.4 });
  const enriched = await enrichData(validated);

  writer({ stage: "transformation", progress: 0.7 });
  const transformed = await transform(enriched);

  writer({ stage: "complete", progress: 1.0 });
  return transformed;
});

// Monitor progress
for await (const event of await workflow.stream(input, {
  streamMode: "custom"
})) {
  console.log(`${event.stage}: ${event.progress * 100}%`);
}

Install with Tessl CLI

npx tessl i tessl/npm-langchain--langgraph@1.0.1

docs

api

channels.md

control-flow-api.md

execution-api.md

functional-api.md

graph-api.md

graph-construction-full.md

imports.md

persistence-api.md

persistence-full.md

prebuilt.md

remote.md

state-management.md

types.md

zod.md

index.md

tile.json