CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langchain--langgraph

Low-level orchestration framework for building stateful, multi-actor applications with LLMs

Overview
Eval results
Files

graph-construction.mddocs/guides/

Graph Construction Guide

In-depth guide for building complex workflows with StateGraph and Graph.

State Definition Strategies

Minimal State

Only include what nodes need to share:

const State = Annotation.Root({
  input: Annotation<string>,
  output: Annotation<string>
});

Rich State with Metadata

const State = Annotation.Root({
  // Core data
  query: Annotation<string>,
  results: Annotation<any[]>({
    reducer: (a, b) => a.concat(b),
    default: () => []
  }),

  // Metadata
  metadata: Annotation<Record<string, any>>({
    reducer: (a, b) => ({ ...a, ...b}),
    default: () => ({})
  }),

  // Tracking
  attempts: Annotation<number>({ default: () => 0 }),
  errors: Annotation<string[]>({
    reducer: (a, b) => a.concat(b),
    default: () => []
  })
});

Separate Input/Output Schemas

const InternalState = Annotation.Root({
  query: Annotation<string>,
  intermediateResults: Annotation<any[]>(),
  processingSteps: Annotation<string[]>()
});

const InputState = Annotation.Root({
  query: Annotation<string>
});

const OutputState = Annotation.Root({
  results: Annotation<any[]>
});

const graph = new StateGraph(InternalState, {
  input: InputState,
  output: OutputState
});

Node Design Patterns

Validation Node

const validateNode = (state: State) => {
  const errors = [];
  if (!state.query) errors.push("Query required");
  if (state.query.length < 3) errors.push("Query too short");

  return {
    isValid: errors.length === 0,
    errors
  };
};

Transformation Node

const transformNode = (state: State) => {
  return {
    processed: state.data.map(item => ({
      ...item,
      normalized: normalize(item.value)
    }))
  };
};

Aggregation Node

const aggregateNode = (state: State) => {
  return {
    summary: {
      total: state.results.length,
      successful: state.results.filter(r => r.success).length,
      failed: state.results.filter(r => !r.success).length
    }
  };
};

Conditional Routing Patterns

Type-Based Routing

const router = (state: State) => {
  const typeHandlers = {
    query: "queryHandler",
    command: "commandHandler",
    event: "eventHandler"
  };
  return typeHandlers[state.type] || "defaultHandler";
};

Priority-Based Routing

const priorityRouter = (state: State) => {
  if (state.priority === "urgent") return "urgentQueue";
  if (state.priority === "high") return "highQueue";
  return "normalQueue";
};

Multi-Condition Routing

const complexRouter = (state: State) => {
  if (state.requiresAuth && !state.isAuthenticated) {
    return "authenticate";
  }
  if (state.requiresValidation && !state.isValidated) {
    return "validate";
  }
  if (state.isReady) {
    return "process";
  }
  return "prepare";
};

Advanced Topologies

Diamond Pattern

Parallel processing with join:

graph
  .addNode("split", splitNode)
  .addNode("processA", processANode)
  .addNode("processB", processBNode)
  .addNode("join", joinNode)
  .addEdge(START, "split")
  .addEdge("split", "processA")
  .addEdge("split", "processB")
  .addEdge("processA", "join")
  .addEdge("processB", "join")
  .addEdge("join", END);

Pipeline with Fallback

const afterValidation = (state: State) => {
  return state.isValid ? "process" : "handleError";
};

graph
  .addNode("validate", validateNode)
  .addNode("process", processNode)
  .addNode("handleError", errorHandler)
  .addConditionalEdges("validate", afterValidation)
  .addEdge("handleError", END)
  .addEdge("process", END);

Loop with Exit Condition

const shouldContinue = (state: State) => {
  if (state.done) return END;
  if (state.attempts >= state.maxAttempts) return "failed";
  return "process";
};

graph
  .addNode("process", processNode)
  .addNode("failed", failureNode)
  .addEdge(START, "process")
  .addConditionalEdges("process", shouldContinue, {
    process: "process",
    failed: "failed"
  });

Dynamic Execution with Send

Map-Reduce

import { Send } from "@langchain/langgraph";

const State = Annotation.Root({
  items: Annotation<string[]>,
  results: Annotation<any[]>({
    reducer: (a, b) => a.concat(b),
    default: () => []
  })
});

const fanOut = (state: State) => {
  return state.items.map(item =>
    new Send("process", { items: [item], results: [] })
  );
};

const processNode = (state: State) => ({
  results: [processItem(state.items[0])]
});

graph
  .addNode("process", processNode)
  .addConditionalEdges(START, fanOut)
  .addEdge("process", END);

Conditional Parallel Execution

const dynamicRouter = (state: State) => {
  const sends = [];

  if (state.needsValidation) {
    sends.push(new Send("validate", state));
  }
  if (state.needsEnrichment) {
    sends.push(new Send("enrich", state));
  }

  return sends.length > 0 ? sends : END;
};

Subgraphs and Composition

Nested Workflows

const subworkflow = new StateGraph(SubState)
  .addNode("step1", step1Node)
  .addNode("step2", step2Node)
  .compile();

const mainGraph = new StateGraph(MainState)
  .addNode("prepare", prepareNode)
  .addNode("subprocess", subworkflow)  // Use as node
  .addNode("finalize", finalizeNode)
  .addEdge(START, "prepare")
  .addEdge("prepare", "subprocess")
  .addEdge("subprocess", "finalize")
  .addEdge("finalize", END);

Graph Class (Low-Level)

For advanced use cases requiring manual state control:

class Graph<N extends string = typeof START | typeof END> {
  constructor();

  addNode<K extends string>(
    key: K,
    action: RunnableLike<RunInput, RunOutput>,
    options?: AddNodeOptions
  ): Graph<N | K>;

  addEdge(
    startKey: N | typeof START,
    endKey: N | typeof END
  ): this;

  addConditionalEdges(
    source: N,
    path: RunnableLike<RunInput, string | Send | (string | Send)[]>,
    pathMap?: Record<string, N | typeof END>
  ): this;

  setEntryPoint(key: N): this;
  setFinishPoint(key: N): this;

  compile(options?: {
    checkpointer?: BaseCheckpointSaver | false;
    interruptBefore?: N[];
    interruptAfter?: N[];
    name?: string;
  }): CompiledGraph<N>;
}

Use Graph when you need fine-grained control over state passing between nodes.

Sequence Helper

Add multiple nodes in sequence:

addSequence<K extends string>(
  nodes: Record<K, RunnableLike<S, U>>
): this;
const graph = new StateGraph(State)
  .addSequence({
    step1: (s) => ({ value: s.value + 1 }),
    step2: (s) => ({ value: s.value * 2 }),
    step3: (s) => ({ value: s.value + 10 })
  })
  .addEdge(START, "step1")
  .addEdge("step3", END)
  .compile();

Multiple Batch Node Addition

Add multiple nodes at once:

addNode<K extends string>(
  nodes: Record<K, RunnableLike<S, U>>
): this;

addNode<K extends string>(
  nodes: [key: K, action: RunnableLike<S, U>, options?: StateGraphAddNodeOptions][]
): this;
// Object syntax
graph.addNode({
  node1: node1Fn,
  node2: node2Fn,
  node3: node3Fn
});

// Array syntax with options
graph.addNode([
  ["node1", node1Fn, { retryPolicy: { maxAttempts: 3 } }],
  ["node2", node2Fn, { cachePolicy: true }]
]);

Zod Schema Integration

Use Zod schemas for state validation:

import { z } from "zod";

const StateSchema = z.object({
  count: z.number().default(0),
  messages: z.array(z.string()).default([])
});

const graph = new StateGraph(StateSchema)
  .addNode("increment", (state) => ({
    count: state.count + 1,
    messages: state.messages.concat(["incremented"])
  }))
  .compile();

// Input is validated against schema
await graph.invoke({ count: 5 });

Branch Class

Low-level conditional edge implementation:

class Branch<IO, N extends string> {
  constructor(options: {
    path: RunnableLike<IO, BranchPathReturnValue>;
    pathMap?: Record<string, N | typeof END> | (N | typeof END)[];
  });
}

type BranchPathReturnValue =
  | string
  | Send
  | (string | Send)[]
  | Promise<string | Send | (string | Send)[]>;

Node Options

StateGraphAddNodeOptions

interface StateGraphAddNodeOptions<Nodes extends string = string> {
  retryPolicy?: RetryPolicy;
  cachePolicy?: CachePolicy | boolean;
  input?: AnnotationRoot<any>;
  metadata?: Record<string, unknown>;
  subgraphs?: Pregel<any, any>[];
  ends?: Nodes[];
  defer?: boolean;
}
  • retryPolicy: Automatic retry on failure
  • cachePolicy: Cache node results
  • input: Separate input schema for node
  • metadata: Custom metadata
  • subgraphs: Nested subgraphs
  • ends: Possible destination nodes for Command
  • defer: Defer execution

Example with Options

graph.addNode("fetchData", fetchDataNode, {
  retryPolicy: {
    maxAttempts: 3,
    initialInterval: 1000,
    backoffFactor: 2,
    retryOn: (error) => error.message.includes("timeout")
  },
  cachePolicy: {
    keyFunc: (state) => state.userId,
    ttl: 300000  // 5 minutes
  },
  ends: ["processData", "handleError"]  // For Command routing
});

Complete Advanced Example

import { StateGraph, Annotation, Send, Command, START, END } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";

// Complex state with multiple field types
const WorkflowState = Annotation.Root({
  // Input
  items: Annotation<string[]>,

  // Processing state
  processed: Annotation<any[]>({
    reducer: (a, b) => a.concat(b),
    default: () => []
  }),

  // Aggregated results
  successCount: Annotation<number>({
    reducer: (a, b) => a + b,
    default: () => 0
  }),

  // Error tracking
  errors: Annotation<Array<{ item: string, error: string }>>({
    reducer: (a, b) => a.concat(b),
    default: () => []
  }),

  // Metadata
  metadata: Annotation<Record<string, any>>({
    reducer: (a, b) => ({ ...a, ...b }),
    default: () => ({})
  })
});

// Fan-out to process items in parallel
const fanOut = (state: typeof WorkflowState.State) => {
  return state.items.map(item =>
    new Send("processItem", {
      items: [item],
      processed: [],
      successCount: 0,
      errors: [],
      metadata: {}
    })
  );
};

// Process individual item with error handling
const processItem = async (state: typeof WorkflowState.State) => {
  const item = state.items[0];

  try {
    const result = await riskyOperation(item);
    return {
      processed: [result],
      successCount: 1,
      metadata: { [`item_${item}`]: "success" }
    };
  } catch (error) {
    return {
      errors: [{ item, error: error.message }],
      metadata: { [`item_${item}`]: "failed" }
    };
  }
};

// Aggregate results
const aggregate = (state: typeof WorkflowState.State) => {
  const total = state.items.length;
  const success = state.successCount;
  const failed = state.errors.length;

  return {
    metadata: {
      ...state.metadata,
      summary: {
        total,
        success,
        failed,
        successRate: success / total
      }
    }
  };
};

// Conditional routing based on results
const shouldRetry = (state: typeof WorkflowState.State) => {
  const failedItems = state.errors.map(e => e.item);

  if (failedItems.length === 0) {
    return END;
  }

  if (state.metadata.retryAttempt >= 3) {
    return "reportFailures";
  }

  // Retry failed items
  return new Command({
    update: {
      items: failedItems,
      metadata: { retryAttempt: (state.metadata.retryAttempt || 0) + 1 }
    },
    goto: "retry"
  });
};

// Build the graph
const graph = new StateGraph(WorkflowState)
  .addNode("processItem", processItem, {
    retryPolicy: {
      maxAttempts: 2,
      initialInterval: 500
    }
  })
  .addNode("aggregate", aggregate)
  .addNode("retry", (s) => s)  // Placeholder for retry logic
  .addNode("reportFailures", reportFailuresNode)
  .addConditionalEdges(START, fanOut)
  .addEdge("processItem", "aggregate")
  .addConditionalEdges("aggregate", shouldRetry, {
    retry: "retry",
    reportFailures: "reportFailures"
  })
  .addEdge("retry", START)  // Loop back
  .addEdge("reportFailures", END)
  .compile({
    checkpointer: new MemorySaver(),
    interruptBefore: ["reportFailures"]  // Human review before final failure report
  });

// Execute
const result = await graph.invoke({
  items: ["item1", "item2", "item3", "item4", "item5"]
}, {
  configurable: { thread_id: "batch-1" }
});

This guide provides comprehensive coverage of graph construction patterns in LangGraph.

Install with Tessl CLI

npx tessl i tessl/npm-langchain--langgraph@1.0.1

docs

index.md

tile.json