TypeScript/JavaScript SDK for interacting with the LangGraph REST API server
The RunsClient provides comprehensive run execution and monitoring capabilities for LangGraph applications. It enables real-time streaming of run execution, batch processing, run lifecycle management, and flexible stream modes for different data requirements. Runs represent individual executions of LangGraph assistants within thread contexts.
The RunsClient supports:
class RunsClient<TStateType = DefaultValues, TUpdateType = TStateType, TCustomEventType = unknown> extends BaseClient {
/**
* Stream run execution with real-time updates and events
* @param threadId - Thread ID for execution context (null for new thread)
* @param assistantId - Assistant ID to execute
* @param payload - Stream configuration and input data
* @returns TypedAsyncGenerator for streaming events
*/
stream<
TStreamMode extends StreamMode[] = ["values"],
TSubgraphs extends boolean = false
>(
threadId: string | null,
assistantId: string,
payload?: RunsStreamPayload
): TypedAsyncGenerator<TStreamMode, TSubgraphs, TStateType, TUpdateType, TCustomEventType>;
/**
* Create a new run execution
* @param threadId - Thread ID for execution context
* @param assistantId - Assistant ID to execute
* @param payload - Run configuration and input data
* @returns Promise resolving to created run
*/
create(threadId: string, assistantId: string, payload?: RunsCreatePayload): Promise<Run>;
/**
* Create multiple runs in parallel for batch processing
* @param payloads - Array of run creation configurations with assistant IDs
* @returns Promise resolving to array of created runs
*/
createBatch(payloads: (RunsCreatePayload & { assistantId: string })[]): Promise<Run[]>;
/**
* Wait for run completion and return final state values
* @param threadId - Thread ID (null for new thread)
* @param assistantId - Assistant ID to execute
* @param payload - Wait configuration
* @returns Promise resolving to final thread state values
*/
wait(
threadId: string | null,
assistantId: string,
payload?: RunsWaitPayload
): Promise<ThreadState["values"]>;
/**
* List all runs for a specific thread with pagination
* @param threadId - Thread ID to list runs for
* @param options - Listing and filtering options
* @returns Promise resolving to array of runs
*/
list(threadId: string, options?: ListRunsOptions): Promise<Run[]>;
/**
* Get specific run by ID
* @param threadId - Thread ID containing the run
* @param runId - Run ID to retrieve
* @returns Promise resolving to run details
*/
get(threadId: string, runId: string): Promise<Run>;
/**
* Cancel a running execution
* @param threadId - Thread ID containing the run
* @param runId - Run ID to cancel
* @param wait - Whether to wait for cancellation to complete
* @param action - Cancellation action type
* @returns Promise resolving when cancellation completes
*/
cancel(
threadId: string,
runId: string,
wait?: boolean,
action?: CancelAction
): Promise<void>;
/**
* Wait for run to complete execution
* @param threadId - Thread ID containing the run
* @param runId - Run ID to wait for
* @param options - Join options including signal for cancellation
* @returns Promise resolving when run completes
*/
join(
threadId: string,
runId: string,
options?: { signal?: AbortSignal }
): Promise<void>;
/**
* Join an existing streaming run to receive ongoing events
* @param threadId - Thread ID (null if not specified)
* @param runId - Run ID to join
* @param options - Stream joining options
* @returns AsyncGenerator for streaming events
*/
joinStream<TStreamMode extends StreamMode[] = ["values"]>(
threadId: string | null,
runId: string,
options?: JoinStreamOptions<TStreamMode>
): AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;
/**
* Delete a run permanently
* @param threadId - Thread ID containing the run
* @param runId - Run ID to delete
* @returns Promise resolving when deletion completes
*/
delete(threadId: string, runId: string): Promise<void>;
}interface Run {
/** Unique run identifier */
run_id: string;
/** Thread ID for execution context */
thread_id: string;
/** Assistant ID being executed */
assistant_id: string;
/** Run creation timestamp */
created_at: string;
/** Run last update timestamp */
updated_at: string;
/** Current run status */
status: RunStatus;
/** Run metadata */
metadata: Metadata;
/** Run configuration */
config: Config;
/** Run kwargs */
kwargs: Record<string, any>;
/** Multitask strategy */
multitask_strategy: MultitaskStrategy;
}
type RunStatus = "pending" | "running" | "error" | "success" | "timeout" | "interrupted";
type CancelAction = "interrupt" | "rollback";
type MultitaskStrategy = "reject" | "interrupt" | "rollback" | "enqueue";type StreamMode =
| "values" // Current state values
| "updates" // State updates
| "messages" // Message updates
| "events" // All execution events
| "debug" // Debug information
| "metadata" // Metadata updates
| "custom"; // Custom events
// Stream Event Types
interface ValuesStreamEvent<ValuesType = DefaultValues> {
event: "values";
data: ValuesType;
timestamp: string;
tags?: string[];
}
interface UpdatesStreamEvent<UpdateType = DefaultValues> {
event: "updates";
data: Record<string, UpdateType>;
timestamp: string;
tags?: string[];
}
interface MessagesStreamEvent {
event: "messages";
data: Message[];
timestamp: string;
tags?: string[];
}
interface EventsStreamEvent {
event: "events";
data: {
event: string;
name: string;
data: Record<string, any>;
tags: string[];
};
timestamp: string;
}
interface DebugStreamEvent {
event: "debug";
data: {
type: string;
timestamp: string;
step: number;
payload: Record<string, any>;
};
timestamp: string;
}
interface MetadataStreamEvent {
event: "metadata";
data: Metadata;
timestamp: string;
}
interface CustomStreamEvent<TCustomEventType = unknown> {
event: "custom";
data: TCustomEventType;
timestamp: string;
}
type StreamEvent<
TStreamMode extends StreamMode[] = ["values"],
TStateType = DefaultValues,
TUpdateType = TStateType,
TCustomEventType = unknown
> =
| (TStreamMode extends readonly ("values" | "values" | "values")[] ? ValuesStreamEvent<TStateType> : never)
| (TStreamMode extends readonly ("updates" | "updates" | "updates")[] ? UpdatesStreamEvent<TUpdateType> : never)
| (TStreamMode extends readonly ("messages" | "messages" | "messages")[] ? MessagesStreamEvent : never)
| (TStreamMode extends readonly ("events" | "events" | "events")[] ? EventsStreamEvent : never)
| (TStreamMode extends readonly ("debug" | "debug" | "debug")[] ? DebugStreamEvent : never)
| (TStreamMode extends readonly ("metadata" | "metadata" | "metadata")[] ? MetadataStreamEvent : never)
| (TStreamMode extends readonly ("custom" | "custom" | "custom")[] ? CustomStreamEvent<TCustomEventType> : never);type TypedAsyncGenerator<
TStreamMode extends StreamMode[] = ["values"],
TSubgraphs extends boolean = false,
TStateType = DefaultValues,
TUpdateType = TStateType,
TCustomEventType = unknown
> = AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;interface RunsStreamPayload {
/** Input data for the run */
input?: Record<string, any>;
/** Run configuration */
config?: Config;
/** Stream modes to enable */
streamMode?: StreamMode | StreamMode[];
/** Enable subgraph streaming */
streamSubgraphs?: boolean;
/** Run metadata */
metadata?: Metadata;
/** Multitask strategy */
multitaskStrategy?: MultitaskStrategy;
/** Feedback configuration */
feedbackKeys?: string[];
/** Interrupt after nodes */
interruptAfter?: string[];
/** Interrupt before nodes */
interruptBefore?: string[];
/** Webhook configuration */
webhook?: string;
/** Request kwargs */
kwargs?: Record<string, any>;
}
interface RunsCreatePayload {
/** Input data for the run */
input?: Record<string, any>;
/** Run configuration */
config?: Config;
/** Run metadata */
metadata?: Metadata;
/** Multitask strategy */
multitaskStrategy?: MultitaskStrategy;
/** Webhook configuration */
webhook?: string;
/** Interrupt after nodes */
interruptAfter?: string[];
/** Interrupt before nodes */
interruptBefore?: string[];
/** Request kwargs */
kwargs?: Record<string, any>;
}
interface RunsWaitPayload extends RunsCreatePayload {
/** Maximum wait time in seconds */
timeout?: number;
}interface ListRunsOptions {
/** Maximum runs to return */
limit?: number;
/** Pagination offset */
offset?: number;
/** Filter by run status */
status?: RunStatus;
/** Filter by metadata */
metadata?: Record<string, any>;
/** Created after timestamp */
createdAfter?: string;
/** Created before timestamp */
createdBefore?: string;
}
interface JoinStreamOptions<TStreamMode extends StreamMode[] = ["values"]> {
/** Stream modes to join */
streamMode?: TStreamMode;
/** Enable subgraph streaming */
streamSubgraphs?: boolean;
/** Abort signal for cancellation */
signal?: AbortSignal;
}import { Client } from "@langchain/langgraph-sdk";
const client = new Client({
apiUrl: "https://api.langgraph.example.com",
apiKey: "your-api-key"
});
// Stream a simple run with values
const stream = client.runs.stream(
"thread_123",
"assistant_456",
{
input: { message: "Hello, how can you help me?" },
streamMode: "values"
}
);
for await (const chunk of stream) {
if (chunk.event === "values") {
console.log("Current state:", chunk.data);
}
}// Stream with multiple modes for comprehensive monitoring
const multiModeStream = client.runs.stream(
"thread_123",
"assistant_456",
{
input: {
message: "Analyze this data",
data: [1, 2, 3, 4, 5]
},
streamMode: ["values", "updates", "messages", "debug"],
streamSubgraphs: true,
metadata: { session_id: "session_789" }
}
);
for await (const chunk of multiModeStream) {
switch (chunk.event) {
case "values":
console.log("📊 State values:", chunk.data);
break;
case "updates":
console.log("🔄 State updates:", chunk.data);
break;
case "messages":
console.log("💬 Messages:", chunk.data);
break;
case "debug":
console.log("🐛 Debug info:", chunk.data);
break;
}
console.log("⏰ Timestamp:", chunk.timestamp);
if (chunk.tags) {
console.log("🏷️ Tags:", chunk.tags);
}
}// Create a run without streaming
const run = await client.runs.create(
"thread_123",
"assistant_456",
{
input: { query: "What's the weather like?" },
config: {
configurable: {
temperature: 0.7,
max_tokens: 1000
}
},
metadata: { priority: "high" },
multitaskStrategy: "interrupt"
}
);
console.log("Created run:", run.run_id);
console.log("Status:", run.status);
// Wait for completion
await client.runs.join("thread_123", run.run_id);
// Get updated run status
const updatedRun = await client.runs.get("thread_123", run.run_id);
console.log("Final status:", updatedRun.status);// Create multiple runs in parallel
const batchPayloads = [
{
assistantId: "assistant_456",
input: { task: "analyze_data_1" },
config: { tags: ["batch", "data_analysis"] }
},
{
assistantId: "assistant_789",
input: { task: "generate_report_1" },
config: { tags: ["batch", "reporting"] }
},
{
assistantId: "assistant_456",
input: { task: "analyze_data_2" },
config: { tags: ["batch", "data_analysis"] }
}
];
const batchRuns = await client.runs.createBatch(batchPayloads);
console.log(`Created ${batchRuns.length} runs in batch`);
// Monitor all runs
const runPromises = batchRuns.map(run =>
client.runs.join("thread_123", run.run_id)
);
await Promise.all(runPromises);
console.log("All batch runs completed");// Wait for completion with timeout
try {
const result = await client.runs.wait(
"thread_123",
"assistant_456",
{
input: { message: "Process this request" },
timeout: 30, // 30 seconds timeout
config: {
recursion_limit: 50
}
}
);
console.log("Final result:", result);
} catch (error) {
if (error.name === "TimeoutError") {
console.log("Run timed out");
} else {
console.error("Run failed:", error);
}
}// Join an existing stream
const existingStream = client.runs.joinStream(
"thread_123",
"run_456",
{
streamMode: ["values", "messages"],
streamSubgraphs: false
}
);
// Monitor with cancellation support
const abortController = new AbortController();
// Set timeout for stream monitoring
setTimeout(() => {
abortController.abort();
console.log("Stream monitoring cancelled");
}, 30000);
try {
for await (const chunk of existingStream) {
if (abortController.signal.aborted) {
break;
}
console.log("Stream event:", chunk.event);
console.log("Data:", chunk.data);
}
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream was cancelled");
}
}
// Cancel a run if needed
await client.runs.cancel("thread_123", "run_456", true, "interrupt");interface CustomEvent {
type: "progress" | "warning" | "metric";
payload: Record<string, any>;
component: string;
}
// Stream with custom event handling
const customStream = client.runs.stream<
["values", "custom"],
false,
{ messages: Message[]; progress: number },
{ messages: Message[]; progress: number },
CustomEvent
>(
"thread_123",
"assistant_456",
{
input: { task: "complex_analysis" },
streamMode: ["values", "custom"],
metadata: { enable_custom_events: true }
}
);
for await (const chunk of customStream) {
switch (chunk.event) {
case "values":
console.log("Progress:", chunk.data.progress);
console.log("Messages count:", chunk.data.messages.length);
break;
case "custom":
const customEvent = chunk.data;
console.log(`Custom ${customEvent.type} from ${customEvent.component}:`);
console.log(customEvent.payload);
if (customEvent.type === "warning") {
console.warn("⚠️ Warning received:", customEvent.payload);
}
break;
}
}async function robustStreamProcessing() {
const maxRetries = 3;
let retryCount = 0;
while (retryCount < maxRetries) {
try {
const stream = client.runs.stream(
"thread_123",
"assistant_456",
{
input: { message: "Process this carefully" },
streamMode: ["values", "debug"],
config: {
tags: [`attempt_${retryCount + 1}`]
}
}
);
for await (const chunk of stream) {
if (chunk.event === "values") {
console.log("Processing:", chunk.data);
} else if (chunk.event === "debug") {
console.log("Debug:", chunk.data.payload);
}
}
console.log("Stream completed successfully");
break;
} catch (error) {
retryCount++;
console.error(`Attempt ${retryCount} failed:`, error);
if (retryCount >= maxRetries) {
console.error("All retry attempts exhausted");
throw error;
}
// Exponential backoff
const delay = Math.pow(2, retryCount) * 1000;
console.log(`Retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// Run listing and cleanup
async function manageRuns() {
// List recent runs
const runs = await client.runs.list("thread_123", {
limit: 50,
status: "success",
createdAfter: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString()
});
console.log(`Found ${runs.length} successful runs from last 24h`);
// Clean up old failed runs
const failedRuns = await client.runs.list("thread_123", {
status: "error",
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString()
});
for (const run of failedRuns) {
try {
await client.runs.delete("thread_123", run.run_id);
console.log(`Deleted failed run: ${run.run_id}`);
} catch (error) {
console.error(`Failed to delete run ${run.run_id}:`, error);
}
}
}The RunsClient provides comprehensive run execution capabilities with real-time streaming, flexible event handling, and robust lifecycle management, making it the core component for executing LangGraph assistants and monitoring their progress.
Install with Tessl CLI
npx tessl i tessl/npm-langchain--langgraph-sdk