0
# Run Execution
1
2
Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.
3
4
## Capabilities
5
6
### Streaming Execution
7
8
Execute runs with real-time streaming of execution events, state changes, and outputs.
9
10
```python { .api }
11
from collections.abc import AsyncIterator, Mapping, Sequence
12
from typing import Any
13
from langgraph_sdk.schema import (
14
StreamPart, StreamMode, Config, Context, Checkpoint,
15
Command, QueryParamTypes
16
)
17
18
# Via client.runs
19
def stream(
20
thread_id: str | None,
21
assistant_id: str,
22
*,
23
input: Mapping[str, Any] | None = None,
24
command: Command | None = None,
25
stream_mode: StreamMode | Sequence[StreamMode] = "values",
26
stream_subgraphs: bool = False,
27
stream_resumable: bool = False,
28
metadata: Mapping[str, Any] | None = None,
29
config: Config | None = None,
30
context: Context | None = None,
31
checkpoint: Checkpoint | None = None,
32
checkpoint_id: str | None = None, # deprecated
33
webhook: str | None = None,
34
webhook_mode: str | None = None,
35
headers: Mapping[str, str] | None = None,
36
params: QueryParamTypes | None = None,
37
) -> AsyncIterator[StreamPart]:
38
"""
39
Stream the results of a run.
40
41
Args:
42
thread_id: The thread ID to stream the run on.
43
assistant_id: The assistant ID or graph name to stream the run on.
44
input: The input to the run.
45
command: The command to run instead of input.
46
stream_mode: The mode(s) to stream the run. Default is "values".
47
stream_subgraphs: Whether to stream subgraphs.
48
stream_resumable: Whether the stream is resumable.
49
metadata: The metadata to add to the run.
50
config: The config to use for the run.
51
context: The context to add to the run.
52
checkpoint: The checkpoint to resume from.
53
checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
54
webhook: Webhook to call after the run is done.
55
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
56
headers: Optional custom headers to include with the request.
57
params: Optional query parameters to include with the request.
58
59
Returns:
60
AsyncIterator[StreamPart]: The stream of the run.
61
"""
62
```
63
64
### Async Execution
65
66
Execute runs asynchronously and retrieve results when complete.
67
68
```python { .api }
69
from langgraph_sdk.schema import Run, QueryParamTypes
70
71
async def create(
72
thread_id: str | None,
73
assistant_id: str,
74
*,
75
input: Mapping[str, Any] | None = None,
76
command: Command | None = None,
77
stream_mode: StreamMode | Sequence[StreamMode] = "values",
78
stream_subgraphs: bool = False,
79
stream_resumable: bool = False,
80
metadata: Mapping[str, Any] | None = None,
81
config: Config | None = None,
82
context: Context | None = None,
83
checkpoint: Checkpoint | None = None,
84
checkpoint_id: str | None = None, # deprecated
85
webhook: str | None = None,
86
webhook_mode: str | None = None,
87
headers: Mapping[str, str] | None = None,
88
params: QueryParamTypes | None = None,
89
) -> Run:
90
"""
91
Create a background run.
92
93
Args:
94
thread_id: The thread ID to create the run on.
95
assistant_id: The assistant ID or graph name to create the run on.
96
input: The input to the run.
97
command: The command to run instead of input.
98
stream_mode: The mode(s) to stream the run. Default is "values".
99
stream_subgraphs: Whether to stream subgraphs.
100
stream_resumable: Whether the stream is resumable.
101
metadata: The metadata to add to the run.
102
config: The config to use for the run.
103
context: The context to add to the run.
104
checkpoint: The checkpoint to resume from.
105
checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
106
webhook: Webhook to call after the run is done.
107
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
108
headers: Optional custom headers to include with the request.
109
params: Optional query parameters to include with the request.
110
111
Returns:
112
Run: The created run.
113
"""
114
```
115
116
### Synchronous Execution
117
118
Execute runs synchronously and wait for completion.
119
120
```python { .api }
121
async def wait(
122
thread_id: str | None,
123
assistant_id: str,
124
*,
125
input: Mapping[str, Any] | None = None,
126
command: Command | None = None,
127
metadata: Mapping[str, Any] | None = None,
128
config: Config | None = None,
129
context: Context | None = None,
130
checkpoint: Checkpoint | None = None,
131
checkpoint_id: str | None = None, # deprecated
132
webhook: str | None = None,
133
webhook_mode: str | None = None,
134
checkpoint_during: bool | None = None,
135
headers: Mapping[str, str] | None = None,
136
params: QueryParamTypes | None = None,
137
) -> Run:
138
"""
139
Create a run, wait for it to finish and return the final state.
140
141
Args:
142
thread_id: The thread ID to create the run on.
143
assistant_id: The assistant ID or graph name to create the run on.
144
input: The input to the run.
145
command: The command to run instead of input.
146
metadata: The metadata to add to the run.
147
config: The config to use for the run.
148
context: The context to add to the run.
149
checkpoint: The checkpoint to resume from.
150
checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
151
webhook: Webhook to call after the run is done.
152
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
153
checkpoint_during: Whether to checkpoint during the run.
154
headers: Optional custom headers to include with the request.
155
params: Optional query parameters to include with the request.
156
157
Returns:
158
Run: The completed run.
159
"""
160
```
161
162
### Batch Execution
163
164
Execute multiple runs concurrently with batch operations.
165
166
```python { .api }
167
from langgraph_sdk.schema import RunCreate
168
169
async def create_batch(
170
payloads: list[RunCreate],
171
*,
172
headers: Mapping[str, str] | None = None,
173
params: QueryParamTypes | None = None,
174
) -> list[Run]:
175
"""
176
Create a batch of stateless background runs.
177
178
Args:
179
payloads: The payloads for the runs.
180
headers: Optional custom headers to include with the request.
181
params: Optional query parameters to include with the request.
182
183
Returns:
184
list[Run]: The created runs.
185
"""
186
```
187
188
### Run Management
189
190
Manage active and completed runs with listing, retrieval, and cancellation capabilities.
191
192
```python { .api }
193
from langgraph_sdk.schema import RunSelectField, RunStatus, CancelAction
194
195
async def list(
196
thread_id: str,
197
*,
198
limit: int = 10,
199
offset: int = 0,
200
status: RunStatus | None = None,
201
select: list[RunSelectField] | None = None,
202
headers: Mapping[str, str] | None = None,
203
params: QueryParamTypes | None = None,
204
) -> list[Run]:
205
"""
206
Get all runs for a thread.
207
208
Args:
209
thread_id: The thread ID to get runs for.
210
limit: The maximum number of runs to return.
211
offset: The number of runs to skip.
212
status: The status to filter by.
213
select: Fields to include in the response.
214
headers: Optional custom headers to include with the request.
215
params: Optional query parameters to include with the request.
216
217
Returns:
218
list[Run]: The runs for the thread.
219
"""
220
221
async def get(
222
thread_id: str,
223
run_id: str,
224
*,
225
headers: Mapping[str, str] | None = None,
226
params: QueryParamTypes | None = None,
227
) -> Run:
228
"""
229
Get a run.
230
231
Args:
232
thread_id: The thread ID to get the run from.
233
run_id: The run ID to get.
234
headers: Optional custom headers to include with the request.
235
params: Optional query parameters to include with the request.
236
237
Returns:
238
Run: Run object.
239
"""
240
241
async def cancel(
242
thread_id: str,
243
run_id: str,
244
*,
245
wait: bool = False,
246
action: CancelAction = "interrupt",
247
headers: Mapping[str, str] | None = None,
248
params: QueryParamTypes | None = None,
249
) -> None:
250
"""
251
Cancel a run.
252
253
Args:
254
thread_id: The thread ID to cancel the run on.
255
run_id: The run ID to cancel.
256
wait: Whether to wait for the run to be cancelled.
257
action: The type of cancellation. Options are "interrupt" or "rollback".
258
headers: Optional custom headers to include with the request.
259
params: Optional query parameters to include with the request.
260
"""
261
262
async def delete(
263
thread_id: str,
264
run_id: str,
265
*,
266
headers: Mapping[str, str] | None = None,
267
params: QueryParamTypes | None = None,
268
) -> None:
269
"""
270
Delete a run.
271
272
Args:
273
thread_id: The thread ID to delete the run from.
274
run_id: The run ID to delete.
275
headers: Optional custom headers to include with the request.
276
params: Optional query parameters to include with the request.
277
"""
278
```
279
280
### Run Streaming & Joining
281
282
Join ongoing runs and stream their execution events.
283
284
```python { .api }
285
async def join(
286
thread_id: str,
287
run_id: str,
288
*,
289
headers: Mapping[str, str] | None = None,
290
params: QueryParamTypes | None = None,
291
) -> dict:
292
"""
293
Block until a run is done. Returns the final state of the thread.
294
295
Args:
296
thread_id: The thread ID to join the run on.
297
run_id: The run ID to join.
298
headers: Optional custom headers to include with the request.
299
params: Optional query parameters to include with the request.
300
301
Returns:
302
dict: The final state of the thread.
303
"""
304
305
def join_stream(
306
thread_id: str,
307
run_id: str,
308
*,
309
cancel_on_disconnect: bool = False,
310
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
311
headers: Mapping[str, str] | None = None,
312
params: QueryParamTypes | None = None,
313
last_event_id: str | None = None,
314
) -> AsyncIterator[StreamPart]:
315
"""
316
Stream output from a run in real-time, until the run is done.
317
Output is not buffered, so any output produced before this call will
318
not be received here.
319
320
Args:
321
thread_id: The thread ID to stream the run on.
322
run_id: The run ID to stream.
323
cancel_on_disconnect: Whether to cancel the run if the stream is disconnected.
324
stream_mode: The mode(s) to stream the run.
325
headers: Optional custom headers to include with the request.
326
params: Optional query parameters to include with the request.
327
last_event_id: The last event ID to start streaming from.
328
329
Returns:
330
AsyncIterator[StreamPart]: A stream of the run.
331
"""
332
```
333
334
## Types
335
336
```python { .api }
337
class Run(TypedDict):
338
"""Run execution details."""
339
run_id: str
340
thread_id: str
341
assistant_id: str
342
created_at: str
343
updated_at: str
344
status: RunStatus
345
kwargs: dict
346
metadata: dict
347
348
class RunCreate(TypedDict):
349
"""Run creation parameters."""
350
thread_id: str
351
assistant_id: str
352
input: dict
353
config: Config
354
metadata: dict
355
multitask_strategy: MultitaskStrategy
356
357
class StreamPart(NamedTuple):
358
"""Stream event part."""
359
event: str
360
data: dict
361
362
RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]
363
364
StreamMode = Literal[
365
"values", "messages", "updates", "events",
366
"tasks", "checkpoints", "debug", "custom", "messages-tuple"
367
]
368
369
MultitaskStrategy = Literal["reject", "interrupt", "rollback", "enqueue"]
370
371
DisconnectMode = Literal["cancel", "continue"]
372
373
OnCompletionBehavior = Literal["delete", "keep"]
374
375
CancelAction = Literal["interrupt", "rollback"]
376
377
RunSelectField = Literal[
378
"run_id", "thread_id", "assistant_id", "created_at",
379
"updated_at", "status", "kwargs", "metadata"
380
]
381
```
382
383
## Usage Examples
384
385
### Streaming Execution
386
387
```python
388
# Basic streaming run
389
async for chunk in client.runs.stream(
390
thread_id="thread-123",
391
assistant_id="assistant-456",
392
input={"messages": [{"role": "human", "content": "Hello!"}]}
393
):
394
if chunk.event == "messages":
395
print(f"Message: {chunk.data}")
396
elif chunk.event == "events":
397
print(f"Event: {chunk.data}")
398
399
# Advanced streaming with configuration
400
async for chunk in client.runs.stream(
401
thread_id="thread-123",
402
assistant_id="assistant-456",
403
input={"query": "Explain AI"},
404
config={"temperature": 0.7, "max_tokens": 1000},
405
stream_mode="events",
406
interrupt_before=["human_review"],
407
multitask_strategy="enqueue"
408
):
409
print(f"{chunk.event}: {chunk.data}")
410
```
411
412
### Asynchronous Execution
413
414
```python
415
# Start run asynchronously
416
run = await client.runs.create(
417
thread_id="thread-123",
418
assistant_id="assistant-456",
419
input={"task": "analyze_document", "doc_id": "doc-789"},
420
metadata={"priority": "high"},
421
webhook="https://myapp.com/webhooks/run-complete"
422
)
423
424
print(f"Started run {run['run_id']} with status {run['status']}")
425
426
# Check run status later
427
updated_run = await client.runs.get("thread-123", run["run_id"])
428
if updated_run["status"] == "success":
429
print("Run completed successfully")
430
```
431
432
### Synchronous Execution
433
434
```python
435
# Execute and wait for completion
436
completed_run = await client.runs.wait(
437
thread_id="thread-123",
438
assistant_id="assistant-456",
439
input={"calculation": "fibonacci", "n": 100},
440
config={"timeout": 300}
441
)
442
443
print(f"Final status: {completed_run['status']}")
444
print(f"Result: {completed_run['kwargs'].get('result')}")
445
```
446
447
### Batch Operations
448
449
```python
450
# Create multiple runs
451
payloads = [
452
{
453
"thread_id": "thread-1",
454
"assistant_id": "assistant-456",
455
"input": {"task": f"process_item_{i}"}
456
}
457
for i in range(10)
458
]
459
460
batch_runs = await client.runs.create_batch(payloads)
461
print(f"Created {len(batch_runs)} runs")
462
```
463
464
### Run Management
465
466
```python
467
# List thread runs
468
runs = await client.runs.list("thread-123", limit=50)
469
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]
470
471
# Cancel a run
472
if active_runs:
473
await client.runs.cancel(
474
"thread-123",
475
active_runs[0]["run_id"],
476
cancel_action="interrupt"
477
)
478
479
# Join an ongoing run
480
result = await client.runs.join("thread-123", "run-789")
481
482
# Stream events from ongoing run
483
async for event in client.runs.join_stream("thread-123", "run-789"):
484
print(f"Event: {event}")
485
```
486
487
### Error Handling
488
489
```python
490
try:
491
async for chunk in client.runs.stream(
492
thread_id="thread-123",
493
assistant_id="assistant-456",
494
input={"query": "test"}
495
):
496
if chunk.event == "error":
497
print(f"Execution error: {chunk.data}")
498
break
499
elif chunk.event == "interrupt":
500
print(f"Execution interrupted: {chunk.data}")
501
# Handle interrupt, possibly resume or cancel
502
except Exception as e:
503
print(f"Stream error: {e}")
504
```