CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langchain--langgraph-sdk

TypeScript/JavaScript SDK for interacting with the LangGraph REST API server

Overview
Eval results
Files

runs.mddocs/

Runs Management and Streaming

The RunsClient provides comprehensive run execution and monitoring capabilities for LangGraph applications. It enables real-time streaming of run execution, batch processing, run lifecycle management, and flexible stream modes for different data requirements. Runs represent individual executions of LangGraph assistants within thread contexts.

Core Functionality

The RunsClient supports:

  • Streaming Execution: Real-time streaming of run execution with various stream modes
  • Run Lifecycle: Create, get, list, cancel, and delete run operations
  • Batch Processing: Create multiple runs simultaneously for parallel execution
  • Stream Management: Join existing streams and manage streaming sessions
  • Wait Operations: Block until run completion with configurable timeouts
  • Type Safety: Full TypeScript support with generic types for state and events

RunsClient API

class RunsClient<TStateType = DefaultValues, TUpdateType = TStateType, TCustomEventType = unknown> extends BaseClient {
  /**
   * Stream run execution with real-time updates and events
   * @param threadId - Thread ID for execution context (null for new thread)
   * @param assistantId - Assistant ID to execute
   * @param payload - Stream configuration and input data
   * @returns TypedAsyncGenerator for streaming events
   */
  stream<
    TStreamMode extends StreamMode[] = ["values"],
    TSubgraphs extends boolean = false
  >(
    threadId: string | null,
    assistantId: string,
    payload?: RunsStreamPayload
  ): TypedAsyncGenerator<TStreamMode, TSubgraphs, TStateType, TUpdateType, TCustomEventType>;

  /**
   * Create a new run execution
   * @param threadId - Thread ID for execution context
   * @param assistantId - Assistant ID to execute
   * @param payload - Run configuration and input data
   * @returns Promise resolving to created run
   */
  create(threadId: string, assistantId: string, payload?: RunsCreatePayload): Promise<Run>;

  /**
   * Create multiple runs in parallel for batch processing
   * @param payloads - Array of run creation configurations with assistant IDs
   * @returns Promise resolving to array of created runs
   */
  createBatch(payloads: (RunsCreatePayload & { assistantId: string })[]): Promise<Run[]>;

  /**
   * Wait for run completion and return final state values
   * @param threadId - Thread ID (null for new thread)
   * @param assistantId - Assistant ID to execute
   * @param payload - Wait configuration
   * @returns Promise resolving to final thread state values
   */
  wait(
    threadId: string | null,
    assistantId: string,
    payload?: RunsWaitPayload
  ): Promise<ThreadState["values"]>;

  /**
   * List all runs for a specific thread with pagination
   * @param threadId - Thread ID to list runs for
   * @param options - Listing and filtering options
   * @returns Promise resolving to array of runs
   */
  list(threadId: string, options?: ListRunsOptions): Promise<Run[]>;

  /**
   * Get specific run by ID
   * @param threadId - Thread ID containing the run
   * @param runId - Run ID to retrieve
   * @returns Promise resolving to run details
   */
  get(threadId: string, runId: string): Promise<Run>;

  /**
   * Cancel a running execution
   * @param threadId - Thread ID containing the run
   * @param runId - Run ID to cancel
   * @param wait - Whether to wait for cancellation to complete
   * @param action - Cancellation action type
   * @returns Promise resolving when cancellation completes
   */
  cancel(
    threadId: string,
    runId: string,
    wait?: boolean,
    action?: CancelAction
  ): Promise<void>;

  /**
   * Wait for run to complete execution
   * @param threadId - Thread ID containing the run
   * @param runId - Run ID to wait for
   * @param options - Join options including signal for cancellation
   * @returns Promise resolving when run completes
   */
  join(
    threadId: string,
    runId: string,
    options?: { signal?: AbortSignal }
  ): Promise<void>;

  /**
   * Join an existing streaming run to receive ongoing events
   * @param threadId - Thread ID (null if not specified)
   * @param runId - Run ID to join
   * @param options - Stream joining options
   * @returns AsyncGenerator for streaming events
   */
  joinStream<TStreamMode extends StreamMode[] = ["values"]>(
    threadId: string | null,
    runId: string,
    options?: JoinStreamOptions<TStreamMode>
  ): AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;

  /**
   * Delete a run permanently
   * @param threadId - Thread ID containing the run
   * @param runId - Run ID to delete
   * @returns Promise resolving when deletion completes
   */
  delete(threadId: string, runId: string): Promise<void>;
}

Core Types

Run Interface

interface Run {
  /** Unique run identifier */
  run_id: string;
  /** Thread ID for execution context */
  thread_id: string;
  /** Assistant ID being executed */
  assistant_id: string;
  /** Run creation timestamp */
  created_at: string;
  /** Run last update timestamp */
  updated_at: string;
  /** Current run status */
  status: RunStatus;
  /** Run metadata */
  metadata: Metadata;
  /** Run configuration */
  config: Config;
  /** Run kwargs */
  kwargs: Record<string, any>;
  /** Multitask strategy */
  multitask_strategy: MultitaskStrategy;
}

type RunStatus = "pending" | "running" | "error" | "success" | "timeout" | "interrupted";

type CancelAction = "interrupt" | "rollback";

type MultitaskStrategy = "reject" | "interrupt" | "rollback" | "enqueue";

Stream Types

type StreamMode =
  | "values"      // Current state values
  | "updates"     // State updates
  | "messages"    // Message updates
  | "events"      // All execution events
  | "debug"       // Debug information
  | "metadata"    // Metadata updates
  | "custom";     // Custom events

// Stream Event Types
interface ValuesStreamEvent<ValuesType = DefaultValues> {
  event: "values";
  data: ValuesType;
  timestamp: string;
  tags?: string[];
}

interface UpdatesStreamEvent<UpdateType = DefaultValues> {
  event: "updates";
  data: Record<string, UpdateType>;
  timestamp: string;
  tags?: string[];
}

interface MessagesStreamEvent {
  event: "messages";
  data: Message[];
  timestamp: string;
  tags?: string[];
}

interface EventsStreamEvent {
  event: "events";
  data: {
    event: string;
    name: string;
    data: Record<string, any>;
    tags: string[];
  };
  timestamp: string;
}

interface DebugStreamEvent {
  event: "debug";
  data: {
    type: string;
    timestamp: string;
    step: number;
    payload: Record<string, any>;
  };
  timestamp: string;
}

interface MetadataStreamEvent {
  event: "metadata";
  data: Metadata;
  timestamp: string;
}

interface CustomStreamEvent<TCustomEventType = unknown> {
  event: "custom";
  data: TCustomEventType;
  timestamp: string;
}

type StreamEvent<
  TStreamMode extends StreamMode[] = ["values"],
  TStateType = DefaultValues,
  TUpdateType = TStateType,
  TCustomEventType = unknown
> =
  | (TStreamMode extends readonly ("values" | "values" | "values")[] ? ValuesStreamEvent<TStateType> : never)
  | (TStreamMode extends readonly ("updates" | "updates" | "updates")[] ? UpdatesStreamEvent<TUpdateType> : never)
  | (TStreamMode extends readonly ("messages" | "messages" | "messages")[] ? MessagesStreamEvent : never)
  | (TStreamMode extends readonly ("events" | "events" | "events")[] ? EventsStreamEvent : never)
  | (TStreamMode extends readonly ("debug" | "debug" | "debug")[] ? DebugStreamEvent : never)
  | (TStreamMode extends readonly ("metadata" | "metadata" | "metadata")[] ? MetadataStreamEvent : never)
  | (TStreamMode extends readonly ("custom" | "custom" | "custom")[] ? CustomStreamEvent<TCustomEventType> : never);

TypedAsyncGenerator

type TypedAsyncGenerator<
  TStreamMode extends StreamMode[] = ["values"],
  TSubgraphs extends boolean = false,
  TStateType = DefaultValues,
  TUpdateType = TStateType,
  TCustomEventType = unknown
> = AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;

Payload Types

Stream and Create Payloads

interface RunsStreamPayload {
  /** Input data for the run */
  input?: Record<string, any>;
  /** Run configuration */
  config?: Config;
  /** Stream modes to enable */
  streamMode?: StreamMode | StreamMode[];
  /** Enable subgraph streaming */
  streamSubgraphs?: boolean;
  /** Run metadata */
  metadata?: Metadata;
  /** Multitask strategy */
  multitaskStrategy?: MultitaskStrategy;
  /** Feedback configuration */
  feedbackKeys?: string[];
  /** Interrupt after nodes */
  interruptAfter?: string[];
  /** Interrupt before nodes */
  interruptBefore?: string[];
  /** Webhook configuration */
  webhook?: string;
  /** Request kwargs */
  kwargs?: Record<string, any>;
}

interface RunsCreatePayload {
  /** Input data for the run */
  input?: Record<string, any>;
  /** Run configuration */
  config?: Config;
  /** Run metadata */
  metadata?: Metadata;
  /** Multitask strategy */
  multitaskStrategy?: MultitaskStrategy;
  /** Webhook configuration */
  webhook?: string;
  /** Interrupt after nodes */
  interruptAfter?: string[];
  /** Interrupt before nodes */
  interruptBefore?: string[];
  /** Request kwargs */
  kwargs?: Record<string, any>;
}

interface RunsWaitPayload extends RunsCreatePayload {
  /** Maximum wait time in seconds */
  timeout?: number;
}

Query and Options

interface ListRunsOptions {
  /** Maximum runs to return */
  limit?: number;
  /** Pagination offset */
  offset?: number;
  /** Filter by run status */
  status?: RunStatus;
  /** Filter by metadata */
  metadata?: Record<string, any>;
  /** Created after timestamp */
  createdAfter?: string;
  /** Created before timestamp */
  createdBefore?: string;
}

interface JoinStreamOptions<TStreamMode extends StreamMode[] = ["values"]> {
  /** Stream modes to join */
  streamMode?: TStreamMode;
  /** Enable subgraph streaming */
  streamSubgraphs?: boolean;
  /** Abort signal for cancellation */
  signal?: AbortSignal;
}

Usage Examples

Basic Streaming

import { Client } from "@langchain/langgraph-sdk";

const client = new Client({
  apiUrl: "https://api.langgraph.example.com",
  apiKey: "your-api-key"
});

// Stream a simple run with values
const stream = client.runs.stream(
  "thread_123",
  "assistant_456",
  {
    input: { message: "Hello, how can you help me?" },
    streamMode: "values"
  }
);

for await (const chunk of stream) {
  if (chunk.event === "values") {
    console.log("Current state:", chunk.data);
  }
}

Multi-Mode Streaming

// Stream with multiple modes for comprehensive monitoring
const multiModeStream = client.runs.stream(
  "thread_123",
  "assistant_456",
  {
    input: {
      message: "Analyze this data",
      data: [1, 2, 3, 4, 5]
    },
    streamMode: ["values", "updates", "messages", "debug"],
    streamSubgraphs: true,
    metadata: { session_id: "session_789" }
  }
);

for await (const chunk of multiModeStream) {
  switch (chunk.event) {
    case "values":
      console.log("📊 State values:", chunk.data);
      break;

    case "updates":
      console.log("🔄 State updates:", chunk.data);
      break;

    case "messages":
      console.log("💬 Messages:", chunk.data);
      break;

    case "debug":
      console.log("🐛 Debug info:", chunk.data);
      break;
  }

  console.log("⏰ Timestamp:", chunk.timestamp);
  if (chunk.tags) {
    console.log("🏷️ Tags:", chunk.tags);
  }
}

Run Creation and Management

// Create a run without streaming
const run = await client.runs.create(
  "thread_123",
  "assistant_456",
  {
    input: { query: "What's the weather like?" },
    config: {
      configurable: {
        temperature: 0.7,
        max_tokens: 1000
      }
    },
    metadata: { priority: "high" },
    multitaskStrategy: "interrupt"
  }
);

console.log("Created run:", run.run_id);
console.log("Status:", run.status);

// Wait for completion
await client.runs.join("thread_123", run.run_id);

// Get updated run status
const updatedRun = await client.runs.get("thread_123", run.run_id);
console.log("Final status:", updatedRun.status);

Batch Processing

// Create multiple runs in parallel
const batchPayloads = [
  {
    assistantId: "assistant_456",
    input: { task: "analyze_data_1" },
    config: { tags: ["batch", "data_analysis"] }
  },
  {
    assistantId: "assistant_789",
    input: { task: "generate_report_1" },
    config: { tags: ["batch", "reporting"] }
  },
  {
    assistantId: "assistant_456",
    input: { task: "analyze_data_2" },
    config: { tags: ["batch", "data_analysis"] }
  }
];

const batchRuns = await client.runs.createBatch(batchPayloads);

console.log(`Created ${batchRuns.length} runs in batch`);

// Monitor all runs
const runPromises = batchRuns.map(run =>
  client.runs.join("thread_123", run.run_id)
);

await Promise.all(runPromises);
console.log("All batch runs completed");

Wait Operations

// Wait for completion with timeout
try {
  const result = await client.runs.wait(
    "thread_123",
    "assistant_456",
    {
      input: { message: "Process this request" },
      timeout: 30, // 30 seconds timeout
      config: {
        recursion_limit: 50
      }
    }
  );

  console.log("Final result:", result);
} catch (error) {
  if (error.name === "TimeoutError") {
    console.log("Run timed out");
  } else {
    console.error("Run failed:", error);
  }
}

Stream Joining and Cancellation

// Join an existing stream
const existingStream = client.runs.joinStream(
  "thread_123",
  "run_456",
  {
    streamMode: ["values", "messages"],
    streamSubgraphs: false
  }
);

// Monitor with cancellation support
const abortController = new AbortController();

// Set timeout for stream monitoring
setTimeout(() => {
  abortController.abort();
  console.log("Stream monitoring cancelled");
}, 30000);

try {
  for await (const chunk of existingStream) {
    if (abortController.signal.aborted) {
      break;
    }

    console.log("Stream event:", chunk.event);
    console.log("Data:", chunk.data);
  }
} catch (error) {
  if (error.name === "AbortError") {
    console.log("Stream was cancelled");
  }
}

// Cancel a run if needed
await client.runs.cancel("thread_123", "run_456", true, "interrupt");

Advanced Streaming with Custom Events

interface CustomEvent {
  type: "progress" | "warning" | "metric";
  payload: Record<string, any>;
  component: string;
}

// Stream with custom event handling
const customStream = client.runs.stream<
  ["values", "custom"],
  false,
  { messages: Message[]; progress: number },
  { messages: Message[]; progress: number },
  CustomEvent
>(
  "thread_123",
  "assistant_456",
  {
    input: { task: "complex_analysis" },
    streamMode: ["values", "custom"],
    metadata: { enable_custom_events: true }
  }
);

for await (const chunk of customStream) {
  switch (chunk.event) {
    case "values":
      console.log("Progress:", chunk.data.progress);
      console.log("Messages count:", chunk.data.messages.length);
      break;

    case "custom":
      const customEvent = chunk.data;
      console.log(`Custom ${customEvent.type} from ${customEvent.component}:`);
      console.log(customEvent.payload);

      if (customEvent.type === "warning") {
        console.warn("⚠️ Warning received:", customEvent.payload);
      }
      break;
  }
}

Error Handling and Recovery

async function robustStreamProcessing() {
  const maxRetries = 3;
  let retryCount = 0;

  while (retryCount < maxRetries) {
    try {
      const stream = client.runs.stream(
        "thread_123",
        "assistant_456",
        {
          input: { message: "Process this carefully" },
          streamMode: ["values", "debug"],
          config: {
            tags: [`attempt_${retryCount + 1}`]
          }
        }
      );

      for await (const chunk of stream) {
        if (chunk.event === "values") {
          console.log("Processing:", chunk.data);
        } else if (chunk.event === "debug") {
          console.log("Debug:", chunk.data.payload);
        }
      }

      console.log("Stream completed successfully");
      break;

    } catch (error) {
      retryCount++;
      console.error(`Attempt ${retryCount} failed:`, error);

      if (retryCount >= maxRetries) {
        console.error("All retry attempts exhausted");
        throw error;
      }

      // Exponential backoff
      const delay = Math.pow(2, retryCount) * 1000;
      console.log(`Retrying in ${delay}ms...`);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
}

// Run listing and cleanup
async function manageRuns() {
  // List recent runs
  const runs = await client.runs.list("thread_123", {
    limit: 50,
    status: "success",
    createdAfter: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString()
  });

  console.log(`Found ${runs.length} successful runs from last 24h`);

  // Clean up old failed runs
  const failedRuns = await client.runs.list("thread_123", {
    status: "error",
    createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString()
  });

  for (const run of failedRuns) {
    try {
      await client.runs.delete("thread_123", run.run_id);
      console.log(`Deleted failed run: ${run.run_id}`);
    } catch (error) {
      console.error(`Failed to delete run ${run.run_id}:`, error);
    }
  }
}

The RunsClient provides comprehensive run execution capabilities with real-time streaming, flexible event handling, and robust lifecycle management, making it the core component for executing LangGraph assistants and monitoring their progress.

Install with Tessl CLI

npx tessl i tessl/npm-langchain--langgraph-sdk

docs

assistants.md

auth.md

client.md

crons.md

index.md

react.md

runs.md

store.md

threads.md

tile.json