0
# Runs Management and Streaming
1
2
The RunsClient provides comprehensive run execution and monitoring capabilities for LangGraph applications. It enables real-time streaming of run execution, batch processing, run lifecycle management, and flexible stream modes for different data requirements. Runs represent individual executions of LangGraph assistants within thread contexts.
3
4
## Core Functionality
5
6
The RunsClient supports:
7
8
- **Streaming Execution**: Real-time streaming of run execution with various stream modes
9
- **Run Lifecycle**: Create, get, list, cancel, and delete run operations
10
- **Batch Processing**: Create multiple runs simultaneously for parallel execution
11
- **Stream Management**: Join existing streams and manage streaming sessions
12
- **Wait Operations**: Block until run completion with configurable timeouts
13
- **Type Safety**: Full TypeScript support with generic types for state and events
14
15
## RunsClient API
16
17
```typescript { .api }
18
class RunsClient<TStateType = DefaultValues, TUpdateType = TStateType, TCustomEventType = unknown> extends BaseClient {
19
/**
20
* Stream run execution with real-time updates and events
21
* @param threadId - Thread ID for execution context (null for new thread)
22
* @param assistantId - Assistant ID to execute
23
* @param payload - Stream configuration and input data
24
* @returns TypedAsyncGenerator for streaming events
25
*/
26
stream<
27
TStreamMode extends StreamMode[] = ["values"],
28
TSubgraphs extends boolean = false
29
>(
30
threadId: string | null,
31
assistantId: string,
32
payload?: RunsStreamPayload
33
): TypedAsyncGenerator<TStreamMode, TSubgraphs, TStateType, TUpdateType, TCustomEventType>;
34
35
/**
36
* Create a new run execution
37
* @param threadId - Thread ID for execution context
38
* @param assistantId - Assistant ID to execute
39
* @param payload - Run configuration and input data
40
* @returns Promise resolving to created run
41
*/
42
create(threadId: string, assistantId: string, payload?: RunsCreatePayload): Promise<Run>;
43
44
/**
45
* Create multiple runs in parallel for batch processing
46
* @param payloads - Array of run creation configurations with assistant IDs
47
* @returns Promise resolving to array of created runs
48
*/
49
createBatch(payloads: (RunsCreatePayload & { assistantId: string })[]): Promise<Run[]>;
50
51
/**
52
* Wait for run completion and return final state values
53
* @param threadId - Thread ID (null for new thread)
54
* @param assistantId - Assistant ID to execute
55
* @param payload - Wait configuration
56
* @returns Promise resolving to final thread state values
57
*/
58
wait(
59
threadId: string | null,
60
assistantId: string,
61
payload?: RunsWaitPayload
62
): Promise<ThreadState["values"]>;
63
64
/**
65
* List all runs for a specific thread with pagination
66
* @param threadId - Thread ID to list runs for
67
* @param options - Listing and filtering options
68
* @returns Promise resolving to array of runs
69
*/
70
list(threadId: string, options?: ListRunsOptions): Promise<Run[]>;
71
72
/**
73
* Get specific run by ID
74
* @param threadId - Thread ID containing the run
75
* @param runId - Run ID to retrieve
76
* @returns Promise resolving to run details
77
*/
78
get(threadId: string, runId: string): Promise<Run>;
79
80
/**
81
* Cancel a running execution
82
* @param threadId - Thread ID containing the run
83
* @param runId - Run ID to cancel
84
* @param wait - Whether to wait for cancellation to complete
85
* @param action - Cancellation action type
86
* @returns Promise resolving when cancellation completes
87
*/
88
cancel(
89
threadId: string,
90
runId: string,
91
wait?: boolean,
92
action?: CancelAction
93
): Promise<void>;
94
95
/**
96
* Wait for run to complete execution
97
* @param threadId - Thread ID containing the run
98
* @param runId - Run ID to wait for
99
* @param options - Join options including signal for cancellation
100
* @returns Promise resolving when run completes
101
*/
102
join(
103
threadId: string,
104
runId: string,
105
options?: { signal?: AbortSignal }
106
): Promise<void>;
107
108
/**
109
* Join an existing streaming run to receive ongoing events
110
* @param threadId - Thread ID (null if not specified)
111
* @param runId - Run ID to join
112
* @param options - Stream joining options
113
* @returns AsyncGenerator for streaming events
114
*/
115
joinStream<TStreamMode extends StreamMode[] = ["values"]>(
116
threadId: string | null,
117
runId: string,
118
options?: JoinStreamOptions<TStreamMode>
119
): AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;
120
121
/**
122
* Delete a run permanently
123
* @param threadId - Thread ID containing the run
124
* @param runId - Run ID to delete
125
* @returns Promise resolving when deletion completes
126
*/
127
delete(threadId: string, runId: string): Promise<void>;
128
}
129
```
130
131
## Core Types
132
133
### Run Interface
134
135
```typescript { .api }
136
interface Run {
137
/** Unique run identifier */
138
run_id: string;
139
/** Thread ID for execution context */
140
thread_id: string;
141
/** Assistant ID being executed */
142
assistant_id: string;
143
/** Run creation timestamp */
144
created_at: string;
145
/** Run last update timestamp */
146
updated_at: string;
147
/** Current run status */
148
status: RunStatus;
149
/** Run metadata */
150
metadata: Metadata;
151
/** Run configuration */
152
config: Config;
153
/** Run kwargs */
154
kwargs: Record<string, any>;
155
/** Multitask strategy */
156
multitask_strategy: MultitaskStrategy;
157
}
158
159
type RunStatus = "pending" | "running" | "error" | "success" | "timeout" | "interrupted";
160
161
type CancelAction = "interrupt" | "rollback";
162
163
type MultitaskStrategy = "reject" | "interrupt" | "rollback" | "enqueue";
164
```
165
166
### Stream Types
167
168
```typescript { .api }
169
type StreamMode =
170
| "values" // Current state values
171
| "updates" // State updates
172
| "messages" // Message updates
173
| "events" // All execution events
174
| "debug" // Debug information
175
| "metadata" // Metadata updates
176
| "custom"; // Custom events
177
178
// Stream Event Types
179
interface ValuesStreamEvent<ValuesType = DefaultValues> {
180
event: "values";
181
data: ValuesType;
182
timestamp: string;
183
tags?: string[];
184
}
185
186
interface UpdatesStreamEvent<UpdateType = DefaultValues> {
187
event: "updates";
188
data: Record<string, UpdateType>;
189
timestamp: string;
190
tags?: string[];
191
}
192
193
interface MessagesStreamEvent {
194
event: "messages";
195
data: Message[];
196
timestamp: string;
197
tags?: string[];
198
}
199
200
interface EventsStreamEvent {
201
event: "events";
202
data: {
203
event: string;
204
name: string;
205
data: Record<string, any>;
206
tags: string[];
207
};
208
timestamp: string;
209
}
210
211
interface DebugStreamEvent {
212
event: "debug";
213
data: {
214
type: string;
215
timestamp: string;
216
step: number;
217
payload: Record<string, any>;
218
};
219
timestamp: string;
220
}
221
222
interface MetadataStreamEvent {
223
event: "metadata";
224
data: Metadata;
225
timestamp: string;
226
}
227
228
interface CustomStreamEvent<TCustomEventType = unknown> {
229
event: "custom";
230
data: TCustomEventType;
231
timestamp: string;
232
}
233
234
type StreamEvent<
235
TStreamMode extends StreamMode[] = ["values"],
236
TStateType = DefaultValues,
237
TUpdateType = TStateType,
238
TCustomEventType = unknown
239
> =
240
| (TStreamMode extends readonly ("values" | "values" | "values")[] ? ValuesStreamEvent<TStateType> : never)
241
| (TStreamMode extends readonly ("updates" | "updates" | "updates")[] ? UpdatesStreamEvent<TUpdateType> : never)
242
| (TStreamMode extends readonly ("messages" | "messages" | "messages")[] ? MessagesStreamEvent : never)
243
| (TStreamMode extends readonly ("events" | "events" | "events")[] ? EventsStreamEvent : never)
244
| (TStreamMode extends readonly ("debug" | "debug" | "debug")[] ? DebugStreamEvent : never)
245
| (TStreamMode extends readonly ("metadata" | "metadata" | "metadata")[] ? MetadataStreamEvent : never)
246
| (TStreamMode extends readonly ("custom" | "custom" | "custom")[] ? CustomStreamEvent<TCustomEventType> : never);
247
```
248
249
### TypedAsyncGenerator
250
251
```typescript { .api }
252
type TypedAsyncGenerator<
253
TStreamMode extends StreamMode[] = ["values"],
254
TSubgraphs extends boolean = false,
255
TStateType = DefaultValues,
256
TUpdateType = TStateType,
257
TCustomEventType = unknown
258
> = AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;
259
```
260
261
## Payload Types
262
263
### Stream and Create Payloads
264
265
```typescript { .api }
266
interface RunsStreamPayload {
267
/** Input data for the run */
268
input?: Record<string, any>;
269
/** Run configuration */
270
config?: Config;
271
/** Stream modes to enable */
272
streamMode?: StreamMode | StreamMode[];
273
/** Enable subgraph streaming */
274
streamSubgraphs?: boolean;
275
/** Run metadata */
276
metadata?: Metadata;
277
/** Multitask strategy */
278
multitaskStrategy?: MultitaskStrategy;
279
/** Feedback configuration */
280
feedbackKeys?: string[];
281
/** Interrupt after nodes */
282
interruptAfter?: string[];
283
/** Interrupt before nodes */
284
interruptBefore?: string[];
285
/** Webhook configuration */
286
webhook?: string;
287
/** Request kwargs */
288
kwargs?: Record<string, any>;
289
}
290
291
interface RunsCreatePayload {
292
/** Input data for the run */
293
input?: Record<string, any>;
294
/** Run configuration */
295
config?: Config;
296
/** Run metadata */
297
metadata?: Metadata;
298
/** Multitask strategy */
299
multitaskStrategy?: MultitaskStrategy;
300
/** Webhook configuration */
301
webhook?: string;
302
/** Interrupt after nodes */
303
interruptAfter?: string[];
304
/** Interrupt before nodes */
305
interruptBefore?: string[];
306
/** Request kwargs */
307
kwargs?: Record<string, any>;
308
}
309
310
interface RunsWaitPayload extends RunsCreatePayload {
311
/** Maximum wait time in seconds */
312
timeout?: number;
313
}
314
```
315
316
### Query and Options
317
318
```typescript { .api }
319
interface ListRunsOptions {
320
/** Maximum runs to return */
321
limit?: number;
322
/** Pagination offset */
323
offset?: number;
324
/** Filter by run status */
325
status?: RunStatus;
326
/** Filter by metadata */
327
metadata?: Record<string, any>;
328
/** Created after timestamp */
329
createdAfter?: string;
330
/** Created before timestamp */
331
createdBefore?: string;
332
}
333
334
interface JoinStreamOptions<TStreamMode extends StreamMode[] = ["values"]> {
335
/** Stream modes to join */
336
streamMode?: TStreamMode;
337
/** Enable subgraph streaming */
338
streamSubgraphs?: boolean;
339
/** Abort signal for cancellation */
340
signal?: AbortSignal;
341
}
342
```
343
344
## Usage Examples
345
346
### Basic Streaming
347
348
```typescript
349
import { Client } from "@langchain/langgraph-sdk";
350
351
const client = new Client({
352
apiUrl: "https://api.langgraph.example.com",
353
apiKey: "your-api-key"
354
});
355
356
// Stream a simple run with values
357
const stream = client.runs.stream(
358
"thread_123",
359
"assistant_456",
360
{
361
input: { message: "Hello, how can you help me?" },
362
streamMode: "values"
363
}
364
);
365
366
for await (const chunk of stream) {
367
if (chunk.event === "values") {
368
console.log("Current state:", chunk.data);
369
}
370
}
371
```
372
373
### Multi-Mode Streaming
374
375
```typescript
376
// Stream with multiple modes for comprehensive monitoring
377
const multiModeStream = client.runs.stream(
378
"thread_123",
379
"assistant_456",
380
{
381
input: {
382
message: "Analyze this data",
383
data: [1, 2, 3, 4, 5]
384
},
385
streamMode: ["values", "updates", "messages", "debug"],
386
streamSubgraphs: true,
387
metadata: { session_id: "session_789" }
388
}
389
);
390
391
for await (const chunk of multiModeStream) {
392
switch (chunk.event) {
393
case "values":
394
console.log("π State values:", chunk.data);
395
break;
396
397
case "updates":
398
console.log("π State updates:", chunk.data);
399
break;
400
401
case "messages":
402
console.log("π¬ Messages:", chunk.data);
403
break;
404
405
case "debug":
406
console.log("π Debug info:", chunk.data);
407
break;
408
}
409
410
console.log("β° Timestamp:", chunk.timestamp);
411
if (chunk.tags) {
412
console.log("π·οΈ Tags:", chunk.tags);
413
}
414
}
415
```
416
417
### Run Creation and Management
418
419
```typescript
420
// Create a run without streaming
421
const run = await client.runs.create(
422
"thread_123",
423
"assistant_456",
424
{
425
input: { query: "What's the weather like?" },
426
config: {
427
configurable: {
428
temperature: 0.7,
429
max_tokens: 1000
430
}
431
},
432
metadata: { priority: "high" },
433
multitaskStrategy: "interrupt"
434
}
435
);
436
437
console.log("Created run:", run.run_id);
438
console.log("Status:", run.status);
439
440
// Wait for completion
441
await client.runs.join("thread_123", run.run_id);
442
443
// Get updated run status
444
const updatedRun = await client.runs.get("thread_123", run.run_id);
445
console.log("Final status:", updatedRun.status);
446
```
447
448
### Batch Processing
449
450
```typescript
451
// Create multiple runs in parallel
452
const batchPayloads = [
453
{
454
assistantId: "assistant_456",
455
input: { task: "analyze_data_1" },
456
config: { tags: ["batch", "data_analysis"] }
457
},
458
{
459
assistantId: "assistant_789",
460
input: { task: "generate_report_1" },
461
config: { tags: ["batch", "reporting"] }
462
},
463
{
464
assistantId: "assistant_456",
465
input: { task: "analyze_data_2" },
466
config: { tags: ["batch", "data_analysis"] }
467
}
468
];
469
470
const batchRuns = await client.runs.createBatch(batchPayloads);
471
472
console.log(`Created ${batchRuns.length} runs in batch`);
473
474
// Monitor all runs
475
const runPromises = batchRuns.map(run =>
476
client.runs.join("thread_123", run.run_id)
477
);
478
479
await Promise.all(runPromises);
480
console.log("All batch runs completed");
481
```
482
483
### Wait Operations
484
485
```typescript
486
// Wait for completion with timeout
487
try {
488
const result = await client.runs.wait(
489
"thread_123",
490
"assistant_456",
491
{
492
input: { message: "Process this request" },
493
timeout: 30, // 30 seconds timeout
494
config: {
495
recursion_limit: 50
496
}
497
}
498
);
499
500
console.log("Final result:", result);
501
} catch (error) {
502
if (error.name === "TimeoutError") {
503
console.log("Run timed out");
504
} else {
505
console.error("Run failed:", error);
506
}
507
}
508
```
509
510
### Stream Joining and Cancellation
511
512
```typescript
513
// Join an existing stream
514
const existingStream = client.runs.joinStream(
515
"thread_123",
516
"run_456",
517
{
518
streamMode: ["values", "messages"],
519
streamSubgraphs: false
520
}
521
);
522
523
// Monitor with cancellation support
524
const abortController = new AbortController();
525
526
// Set timeout for stream monitoring
527
setTimeout(() => {
528
abortController.abort();
529
console.log("Stream monitoring cancelled");
530
}, 30000);
531
532
try {
533
for await (const chunk of existingStream) {
534
if (abortController.signal.aborted) {
535
break;
536
}
537
538
console.log("Stream event:", chunk.event);
539
console.log("Data:", chunk.data);
540
}
541
} catch (error) {
542
if (error.name === "AbortError") {
543
console.log("Stream was cancelled");
544
}
545
}
546
547
// Cancel a run if needed
548
await client.runs.cancel("thread_123", "run_456", true, "interrupt");
549
```
550
551
### Advanced Streaming with Custom Events
552
553
```typescript
554
interface CustomEvent {
555
type: "progress" | "warning" | "metric";
556
payload: Record<string, any>;
557
component: string;
558
}
559
560
// Stream with custom event handling
561
const customStream = client.runs.stream<
562
["values", "custom"],
563
false,
564
{ messages: Message[]; progress: number },
565
{ messages: Message[]; progress: number },
566
CustomEvent
567
>(
568
"thread_123",
569
"assistant_456",
570
{
571
input: { task: "complex_analysis" },
572
streamMode: ["values", "custom"],
573
metadata: { enable_custom_events: true }
574
}
575
);
576
577
for await (const chunk of customStream) {
578
switch (chunk.event) {
579
case "values":
580
console.log("Progress:", chunk.data.progress);
581
console.log("Messages count:", chunk.data.messages.length);
582
break;
583
584
case "custom":
585
const customEvent = chunk.data;
586
console.log(`Custom ${customEvent.type} from ${customEvent.component}:`);
587
console.log(customEvent.payload);
588
589
if (customEvent.type === "warning") {
590
console.warn("β οΈ Warning received:", customEvent.payload);
591
}
592
break;
593
}
594
}
595
```
596
597
### Error Handling and Recovery
598
599
```typescript
600
async function robustStreamProcessing() {
601
const maxRetries = 3;
602
let retryCount = 0;
603
604
while (retryCount < maxRetries) {
605
try {
606
const stream = client.runs.stream(
607
"thread_123",
608
"assistant_456",
609
{
610
input: { message: "Process this carefully" },
611
streamMode: ["values", "debug"],
612
config: {
613
tags: [`attempt_${retryCount + 1}`]
614
}
615
}
616
);
617
618
for await (const chunk of stream) {
619
if (chunk.event === "values") {
620
console.log("Processing:", chunk.data);
621
} else if (chunk.event === "debug") {
622
console.log("Debug:", chunk.data.payload);
623
}
624
}
625
626
console.log("Stream completed successfully");
627
break;
628
629
} catch (error) {
630
retryCount++;
631
console.error(`Attempt ${retryCount} failed:`, error);
632
633
if (retryCount >= maxRetries) {
634
console.error("All retry attempts exhausted");
635
throw error;
636
}
637
638
// Exponential backoff
639
const delay = Math.pow(2, retryCount) * 1000;
640
console.log(`Retrying in ${delay}ms...`);
641
await new Promise(resolve => setTimeout(resolve, delay));
642
}
643
}
644
}
645
646
// Run listing and cleanup
647
async function manageRuns() {
648
// List recent runs
649
const runs = await client.runs.list("thread_123", {
650
limit: 50,
651
status: "success",
652
createdAfter: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString()
653
});
654
655
console.log(`Found ${runs.length} successful runs from last 24h`);
656
657
// Clean up old failed runs
658
const failedRuns = await client.runs.list("thread_123", {
659
status: "error",
660
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString()
661
});
662
663
for (const run of failedRuns) {
664
try {
665
await client.runs.delete("thread_123", run.run_id);
666
console.log(`Deleted failed run: ${run.run_id}`);
667
} catch (error) {
668
console.error(`Failed to delete run ${run.run_id}:`, error);
669
}
670
}
671
}
672
```
673
674
The RunsClient provides comprehensive run execution capabilities with real-time streaming, flexible event handling, and robust lifecycle management, making it the core component for executing LangGraph assistants and monitoring their progress.