0
# State Graph API
1
2
StateGraph is the primary high-level API for building stateful, multi-actor workflows in LangGraph. It provides a declarative interface for defining graphs where nodes communicate through shared state, with support for conditional routing, parallel execution, and complex control flow patterns.
3
4
## Imports
5
6
```python
7
from langgraph.graph import StateGraph, START, END
8
from langgraph.constants import TAG_NOSTREAM, TAG_HIDDEN
9
```
10
11
**Note:** `TAG_NOSTREAM` and `TAG_HIDDEN` are imported from `langgraph.constants`, not `langgraph.graph`.
12
13
## Capabilities
14
15
### StateGraph Class
16
17
Main class for constructing stateful graphs. Nodes read from and write to shared state channels, with automatic state merging using reducer functions.
18
19
```python { .api }
20
class StateGraph:
21
"""
22
A graph whose nodes communicate by reading and writing to a shared state.
23
24
Type Parameters:
25
StateT: The type of the state schema (TypedDict, Pydantic model, or dataclass)
26
ContextT: Optional type for run-scoped context (default: None)
27
InputT: Type for graph input (default: StateT)
28
OutputT: Type for graph output (default: StateT)
29
"""
30
31
def __init__(
32
self,
33
state_schema,
34
context_schema=None,
35
*,
36
input_schema=None,
37
output_schema=None
38
):
39
"""
40
Initialize a StateGraph.
41
42
Parameters:
43
state_schema: TypedDict, Pydantic model, or dataclass defining state structure.
44
Fields can be annotated with reducers using Annotated[type, reducer_func].
45
context_schema: Optional schema for run-scoped read-only context
46
input_schema: Optional schema for graph input (subset of state_schema)
47
output_schema: Optional schema for graph output (subset of state_schema)
48
"""
49
```
50
51
#### Adding Nodes
52
53
```python { .api }
54
def add_node(
55
self,
56
node,
57
action=None,
58
*,
59
defer=False,
60
metadata=None,
61
input_schema=None,
62
retry_policy=None,
63
cache_policy=None,
64
destinations=None
65
):
66
"""
67
Add a node to the graph.
68
69
Parameters:
70
node: str or callable
71
- If str and action is provided: node name
72
- If callable: used as both name (func.__name__) and action
73
action: Optional[Callable[[StateT], StateT | dict | None]]
74
The function to execute for this node. Should accept state and return
75
updates to merge into state (or None for no updates).
76
defer: bool, default False
77
If True, node execution is deferred (for advanced use cases)
78
metadata: Optional[dict]
79
Metadata to attach to the node
80
input_schema: Optional schema to validate/coerce node input
81
retry_policy: Optional[RetryPolicy | Sequence[RetryPolicy]]
82
Retry configuration for node failures
83
cache_policy: Optional[CachePolicy]
84
Cache configuration for node results
85
destinations: Optional[list[str]]
86
Pre-declared destination nodes for type checking
87
88
Returns:
89
Self (for method chaining)
90
"""
91
```
92
93
#### Adding Edges
94
95
```python { .api }
96
def add_edge(self, start_key, end_key):
97
"""
98
Add a directed edge from one node to another.
99
100
Parameters:
101
start_key: str | list[str] - Source node name (or START), or list of source nodes for waiting edges
102
end_key: str - Destination node name (or END)
103
104
Returns:
105
Self (for method chaining)
106
"""
107
```
108
109
```python { .api }
110
def add_conditional_edges(
111
self,
112
source,
113
path,
114
path_map=None
115
):
116
"""
117
Add conditional routing from a source node.
118
119
Parameters:
120
source: str - Source node name
121
path: Callable[[StateT], str | list[str] | Send | list[Send]]
122
Function that determines next node(s) based on state.
123
Can return:
124
- str: Single next node name
125
- list[str]: Multiple next nodes (parallel execution)
126
- Send: Message to specific node with custom state
127
- list[Send]: Multiple Send messages
128
path_map: Optional[dict[str, str]]
129
Mapping from path return values to node names.
130
If None, path must return actual node names.
131
132
Returns:
133
Self (for method chaining)
134
"""
135
```
136
137
```python { .api }
138
def add_sequence(self, nodes):
139
"""
140
Add a sequence of nodes with edges connecting them in order.
141
142
Parameters:
143
nodes: Sequence[str | tuple[str, Callable]]
144
List of node names or (name, action) tuples.
145
Creates nodes and edges in sequence.
146
147
Returns:
148
Self (for method chaining)
149
"""
150
```
151
152
#### Setting Entry and Exit Points
153
154
```python { .api }
155
def set_entry_point(self, key):
156
"""
157
Set the first node to execute when the graph starts.
158
Equivalent to add_edge(START, key).
159
160
Parameters:
161
key: str - Node name to start execution
162
163
Returns:
164
Self (for method chaining)
165
"""
166
```
167
168
```python { .api }
169
def set_conditional_entry_point(self, path, path_map=None):
170
"""
171
Set conditional entry point based on initial state.
172
Equivalent to add_conditional_edges(START, path, path_map).
173
174
Parameters:
175
path: Callable[[StateT], str | list[str] | Send | list[Send]]
176
Function determining first node(s) to execute
177
path_map: Optional[dict[str, str]]
178
Mapping from path return values to node names
179
180
Returns:
181
Self (for method chaining)
182
"""
183
```
184
185
```python { .api }
186
def set_finish_point(self, key):
187
"""
188
Mark a node as a finish point.
189
Equivalent to add_edge(key, END).
190
191
Parameters:
192
key: str - Node name that should end execution
193
194
Returns:
195
Self (for method chaining)
196
"""
197
```
198
199
#### Graph Validation and Compilation
200
201
```python { .api }
202
def validate(self, interrupt=None):
203
"""
204
Validate the graph structure.
205
206
Parameters:
207
interrupt: Optional list of node names where interrupts can occur
208
209
Raises:
210
ValueError: If graph structure is invalid (e.g., unreachable nodes)
211
212
Returns:
213
Self (for method chaining)
214
"""
215
```
216
217
#### Graph Compilation
218
219
```python { .api }
220
def compile(
221
self,
222
checkpointer=None,
223
*,
224
cache=None,
225
store=None,
226
interrupt_before=None,
227
interrupt_after=None,
228
debug=False,
229
name=None
230
):
231
"""
232
Compile the graph into an executable CompiledStateGraph.
233
234
Parameters:
235
checkpointer: Optional[BaseCheckpointSaver]
236
Checkpointer for persisting state. Required for:
237
- Human-in-the-loop workflows
238
- State history
239
- Resuming from failures
240
cache: Optional cache backend for node results
241
store: Optional[BaseStore]
242
Store for persistent cross-thread memory
243
interrupt_before: Optional[list[str] | Literal["*"]]
244
Nodes to interrupt before executing.
245
Use "*" to interrupt before all nodes.
246
interrupt_after: Optional[list[str] | Literal["*"]]
247
Nodes to interrupt after executing.
248
Use "*" to interrupt after all nodes.
249
debug: bool, default False
250
Enable debug mode with additional logging
251
name: Optional[str]
252
Name for the compiled graph
253
254
Returns:
255
CompiledStateGraph - Executable graph that implements Runnable interface
256
"""
257
```
258
259
### CompiledStateGraph Class
260
261
Compiled and executable version of StateGraph. Extends Pregel and implements the LangChain Runnable interface for execution, streaming, and state management.
262
263
```python { .api }
264
class CompiledStateGraph:
265
"""
266
Compiled version of StateGraph, ready for execution.
267
Inherits all methods from Pregel including invoke, stream, get_state, etc.
268
"""
269
```
270
271
#### Execution Methods
272
273
```python { .api }
274
def invoke(
275
self,
276
input: InputT | Command | None,
277
config: RunnableConfig | None = None,
278
*,
279
context: ContextT | None = None,
280
stream_mode: StreamMode = "values",
281
print_mode: StreamMode | Sequence[StreamMode] = (),
282
output_keys: str | Sequence[str] | None = None,
283
interrupt_before: All | Sequence[str] | None = None,
284
interrupt_after: All | Sequence[str] | None = None,
285
durability: Durability | None = None,
286
**kwargs: Any
287
) -> dict[str, Any] | Any:
288
"""
289
Execute the graph synchronously and return the final output.
290
291
Parameters:
292
input: InputT | Command | None
293
Input to the graph. Can be:
294
- State input matching input_schema
295
- Command object to control execution (e.g., resume from interrupt)
296
- None to resume from last checkpoint
297
config: RunnableConfig | None
298
Configuration for the run. Must include thread_id in configurable
299
if using checkpointing.
300
context: ContextT | None
301
Static run-scoped context available to all nodes via get_runtime().
302
Added in v0.6.0.
303
stream_mode: StreamMode, default "values"
304
Internal streaming mode used during execution.
305
print_mode: StreamMode | Sequence[StreamMode], default ()
306
Stream mode(s) to print to stdout during execution.
307
Use for debugging to see intermediate steps.
308
output_keys: str | Sequence[str] | None
309
Specific state keys to include in output. If None, returns all keys.
310
interrupt_before: All | Sequence[str] | None
311
Override graph's interrupt_before for this run.
312
Use "*" to interrupt before all nodes.
313
interrupt_after: All | Sequence[str] | None
314
Override graph's interrupt_after for this run.
315
Use "*" to interrupt after all nodes.
316
durability: Durability | None
317
Persistence mode: "sync", "async", or "exit".
318
Default is "async" (persist asynchronously during execution).
319
320
Returns:
321
dict[str, Any] | Any - Final graph output according to output_schema
322
323
Raises:
324
GraphRecursionError: If max steps exceeded
325
GraphInterrupt: If execution is interrupted
326
327
Example:
328
app = graph.compile(checkpointer=MemorySaver())
329
config = {"configurable": {"thread_id": "1"}}
330
result = app.invoke({"messages": []}, config)
331
"""
332
```
333
334
```python { .api }
335
def stream(
336
self,
337
input: InputT | Command | None,
338
config: RunnableConfig | None = None,
339
*,
340
context: ContextT | None = None,
341
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
342
print_mode: StreamMode | Sequence[StreamMode] = (),
343
output_keys: str | Sequence[str] | None = None,
344
interrupt_before: All | Sequence[str] | None = None,
345
interrupt_after: All | Sequence[str] | None = None,
346
durability: Durability | None = None,
347
subgraphs: bool = False,
348
**kwargs: Any
349
) -> Iterator[dict[str, Any] | Any]:
350
"""
351
Stream graph execution, yielding outputs as they are produced.
352
353
Parameters:
354
input: InputT | Command | None
355
Input to the graph (same as invoke)
356
config: RunnableConfig | None
357
Configuration for the run
358
context: ContextT | None
359
Static run-scoped context
360
stream_mode: StreamMode | Sequence[StreamMode] | None
361
How to emit outputs. Options:
362
- "values": Full state after each step (default)
363
- "updates": Node updates as {node_name: update}
364
- "custom": Custom data written via StreamWriter
365
- "messages": LLM messages token-by-token
366
- "checkpoints": Checkpoint creation events
367
- "tasks": Task start/finish events
368
- "debug": Checkpoints and tasks combined
369
Can pass list for multiple modes: ["values", "updates"]
370
output_keys: str | Sequence[str] | None
371
Specific state keys to include in output
372
interrupt_before: All | Sequence[str] | None
373
Override interrupt_before for this run
374
interrupt_after: All | Sequence[str] | None
375
Override interrupt_after for this run
376
durability: Durability | None
377
Persistence mode
378
subgraphs: bool, default False
379
Whether to stream subgraph execution events
380
381
Yields:
382
dict[str, Any] | Any - Output chunks according to stream_mode:
383
- "values": {"node_name": state_values}
384
- "updates": {"node_name": update_dict}
385
- "custom": Custom data from StreamWriter
386
- Multiple modes: Tuples of (mode, chunk)
387
388
Example:
389
for chunk in app.stream({"value": 1}, stream_mode=["values", "updates"]):
390
print(chunk)
391
# ("values", {"value": 2})
392
# ("updates", {"node1": {"value": 2}})
393
"""
394
```
395
396
```python { .api }
397
def get_state_history(
398
self,
399
config: RunnableConfig,
400
*,
401
filter: dict[str, Any] | None = None,
402
before: RunnableConfig | None = None,
403
limit: int | None = None
404
) -> Iterator[StateSnapshot]:
405
"""
406
Get historical state snapshots for a thread.
407
408
Requires a checkpointer to be configured on the graph.
409
410
Parameters:
411
config: RunnableConfig
412
Must include thread_id in configurable to identify the thread
413
filter: dict[str, Any] | None
414
Filter checkpoints by metadata fields.
415
Example: {"source": "update", "step": 3}
416
before: RunnableConfig | None
417
Get states before this checkpoint config (for pagination)
418
limit: int | None
419
Maximum number of states to return
420
421
Yields:
422
StateSnapshot - Historical states in reverse chronological order (newest first).
423
Each snapshot contains:
424
- values: State values at that point
425
- next: Tuple of next node(s) to execute
426
- config: Config for this checkpoint
427
- metadata: Checkpoint metadata
428
- created_at: Timestamp
429
- parent_config: Parent checkpoint config
430
- tasks: Pending tasks at this point
431
432
Example:
433
config = {"configurable": {"thread_id": "1"}}
434
for state in app.get_state_history(config, limit=5):
435
print(f"Step: {state.metadata['step']}, Values: {state.values}")
436
"""
437
```
438
439
#### State Management Methods
440
441
```python { .api }
442
def get_state(
443
self,
444
config: RunnableConfig,
445
*,
446
subgraphs: bool = False
447
) -> StateSnapshot:
448
"""
449
Get the current state of the graph for a specific thread.
450
451
Requires a checkpointer to be configured on the graph.
452
453
Parameters:
454
config: RunnableConfig
455
Must include thread_id in configurable to identify the thread
456
subgraphs: bool, default False
457
Whether to include subgraph state in the snapshot
458
459
Returns:
460
StateSnapshot - Current state containing:
461
- values: Current state values
462
- next: Tuple of next node(s) to execute
463
- config: Config for current checkpoint
464
- metadata: Checkpoint metadata
465
- created_at: Timestamp
466
- parent_config: Parent checkpoint config
467
- tasks: Pending tasks
468
- interrupts: Any interrupt information
469
470
Example:
471
config = {"configurable": {"thread_id": "1"}}
472
state = app.get_state(config)
473
print(f"Current values: {state.values}")
474
print(f"Next nodes: {state.next}")
475
"""
476
477
def update_state(
478
self,
479
config: RunnableConfig,
480
values: dict[str, Any] | Any | None,
481
as_node: str | None = None,
482
task_id: str | None = None
483
) -> RunnableConfig:
484
"""
485
Update the state of the graph programmatically.
486
487
Requires a checkpointer to be configured on the graph. This is critical for
488
human-in-the-loop workflows where you need to modify state between steps.
489
490
Parameters:
491
config: RunnableConfig
492
Must include thread_id in configurable to identify the thread
493
values: dict[str, Any] | Any | None
494
State updates to apply. If dict, merged with current state.
495
If matches output_schema type, replaces state entirely.
496
Can be None to only update checkpoint without changing state.
497
as_node: str | None, default None
498
Apply update as if it came from this node. Affects which
499
edges are traversed next. If None, update is applied as external input.
500
task_id: str | None, default None
501
ID of specific task to update. Used with Send API for dynamic fanouts.
502
503
Returns:
504
RunnableConfig - Updated config with new checkpoint_id
505
506
Example:
507
# Update state and continue execution
508
config = {"configurable": {"thread_id": "1"}}
509
app.invoke({"value": 1}, config) # Graph pauses at interrupt
510
511
# Modify state externally
512
new_config = app.update_state(config, {"value": 100})
513
514
# Resume with modified state
515
result = app.invoke(None, new_config)
516
"""
517
```
518
519
#### Async Methods
520
521
```python { .api }
522
async def ainvoke(
523
self,
524
input: InputT | Command | None,
525
config: RunnableConfig | None = None,
526
*,
527
context: ContextT | None = None,
528
stream_mode: StreamMode = "values",
529
print_mode: StreamMode | Sequence[StreamMode] = (),
530
output_keys: str | Sequence[str] | None = None,
531
interrupt_before: All | Sequence[str] | None = None,
532
interrupt_after: All | Sequence[str] | None = None,
533
durability: Durability | None = None,
534
**kwargs: Any
535
) -> dict[str, Any] | Any:
536
"""
537
Async version of invoke(). Runs the graph asynchronously until completion.
538
539
All parameters and return values same as invoke(). Use for async/await workflows.
540
541
Example:
542
app = graph.compile(checkpointer=MemorySaver())
543
result = await app.ainvoke({"messages": []}, config)
544
"""
545
546
async def astream(
547
self,
548
input: InputT | Command | None,
549
config: RunnableConfig | None = None,
550
*,
551
context: ContextT | None = None,
552
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
553
print_mode: StreamMode | Sequence[StreamMode] = (),
554
output_keys: str | Sequence[str] | None = None,
555
interrupt_before: All | Sequence[str] | None = None,
556
interrupt_after: All | Sequence[str] | None = None,
557
durability: Durability | None = None,
558
subgraphs: bool = False,
559
**kwargs: Any
560
) -> AsyncIterator[dict[str, Any] | Any]:
561
"""
562
Async version of stream(). Yields outputs asynchronously as they are produced.
563
564
All parameters and return values same as stream(). Use for async/await workflows.
565
566
Example:
567
async for chunk in app.astream({"value": 1}, stream_mode="updates"):
568
print(chunk)
569
"""
570
571
async def aget_state(
572
self,
573
config: RunnableConfig,
574
*,
575
subgraphs: bool = False
576
) -> StateSnapshot:
577
"""
578
Async version of get_state(). Get current state asynchronously.
579
580
All parameters and return values same as get_state().
581
"""
582
583
async def aget_state_history(
584
self,
585
config: RunnableConfig,
586
*,
587
filter: dict[str, Any] | None = None,
588
before: RunnableConfig | None = None,
589
limit: int | None = None
590
) -> AsyncIterator[StateSnapshot]:
591
"""
592
Async version of get_state_history(). Get historical states asynchronously.
593
594
All parameters and return values same as get_state_history().
595
596
Example:
597
config = {"configurable": {"thread_id": "1"}}
598
async for state in app.aget_state_history(config, limit=5):
599
print(f"Step: {state.metadata['step']}")
600
"""
601
602
async def aupdate_state(
603
self,
604
config: RunnableConfig,
605
values: dict[str, Any] | Any | None,
606
as_node: str | None = None,
607
task_id: str | None = None
608
) -> RunnableConfig:
609
"""
610
Async version of update_state(). Update state asynchronously.
611
612
All parameters and return values same as update_state().
613
"""
614
```
615
616
#### Graph Introspection and Utilities
617
618
```python { .api }
619
def get_graph(
620
self,
621
config: RunnableConfig | None = None,
622
*,
623
xray: int | bool = False
624
) -> Graph:
625
"""
626
Get a drawable representation of the computation graph.
627
628
Parameters:
629
config: RunnableConfig | None
630
Optional config for graph generation
631
xray: int | bool, default False
632
If True, includes internal subgraph details.
633
If int, specifies recursion depth for nested subgraph expansion.
634
635
Returns:
636
Graph - Drawable graph object that can be rendered/visualized
637
638
Example:
639
app = graph.compile()
640
graph_viz = app.get_graph()
641
# Can be rendered in Jupyter or saved to file
642
"""
643
644
async def aget_graph(
645
self,
646
config: RunnableConfig | None = None,
647
*,
648
xray: int | bool = False
649
) -> Graph:
650
"""
651
Async version of get_graph(). Get graph representation asynchronously.
652
653
All parameters and return values same as get_graph().
654
"""
655
656
def clear_cache(self, nodes: Sequence[str] | None = None) -> None:
657
"""
658
Clear the cache for specified nodes or all nodes.
659
660
Parameters:
661
nodes: Sequence[str] | None
662
Node names to clear cache for. If None, clears all node caches.
663
664
Example:
665
app.clear_cache(["expensive_node"]) # Clear specific node
666
app.clear_cache() # Clear all caches
667
"""
668
669
async def aclear_cache(self, nodes: Sequence[str] | None = None) -> None:
670
"""
671
Async version of clear_cache(). Clear cache asynchronously.
672
673
All parameters same as clear_cache().
674
"""
675
676
def get_subgraphs(
677
self,
678
*,
679
namespace: str | None = None,
680
recurse: bool = False
681
) -> Iterator[tuple[str, PregelProtocol]]:
682
"""
683
Get the subgraphs of the graph.
684
685
Useful for working with nested graphs and inspecting the graph hierarchy.
686
687
Parameters:
688
namespace: str | None
689
The namespace to filter the subgraphs by. If provided, only
690
returns subgraphs matching this namespace.
691
recurse: bool, default False
692
Whether to recurse into subgraphs. If False, only returns
693
immediate subgraphs.
694
695
Yields:
696
tuple[str, PregelProtocol] - (namespace, subgraph) pairs
697
698
Example:
699
for namespace, subgraph in app.get_subgraphs():
700
print(f"Subgraph: {namespace}")
701
"""
702
703
async def aget_subgraphs(
704
self,
705
*,
706
namespace: str | None = None,
707
recurse: bool = False
708
) -> AsyncIterator[tuple[str, PregelProtocol]]:
709
"""
710
Async version of get_subgraphs(). Get subgraphs asynchronously.
711
712
All parameters and return values same as get_subgraphs().
713
"""
714
715
def bulk_update_state(
716
self,
717
config: RunnableConfig,
718
supersteps: Sequence[Sequence[StateUpdate]]
719
) -> RunnableConfig:
720
"""
721
Apply updates to the graph state in bulk.
722
723
Requires a checkpointer to be set. Useful for batch operations where
724
you need to apply multiple sequential updates efficiently.
725
726
Parameters:
727
config: RunnableConfig
728
Must include thread_id in configurable to identify the thread
729
supersteps: Sequence[Sequence[StateUpdate]]
730
List of supersteps, each containing a list of updates to
731
apply sequentially. Each StateUpdate is a tuple of
732
(values, as_node, task_id) where task_id is optional.
733
734
Returns:
735
RunnableConfig - The updated config
736
737
Raises:
738
ValueError: If no checkpointer is set or no updates are provided
739
InvalidUpdateError: If an invalid update is provided
740
741
Example:
742
from langgraph.types import StateUpdate
743
744
supersteps = [
745
[StateUpdate({"value": 1}, "node1", None)],
746
[StateUpdate({"value": 2}, "node2", None)]
747
]
748
config = app.bulk_update_state(config, supersteps)
749
"""
750
751
async def abulk_update_state(
752
self,
753
config: RunnableConfig,
754
supersteps: Sequence[Sequence[StateUpdate]]
755
) -> RunnableConfig:
756
"""
757
Async version of bulk_update_state(). Apply bulk updates asynchronously.
758
759
All parameters and return values same as bulk_update_state().
760
"""
761
762
def validate(self) -> Self:
763
"""
764
Validate the compiled graph structure.
765
766
Checks for common errors like disconnected nodes, invalid edges,
767
and other structural issues.
768
769
Returns:
770
Self - Returns self for method chaining
771
772
Example:
773
app = graph.compile().validate()
774
"""
775
776
def copy(self, update: dict[str, Any] | None = None) -> Self:
777
"""
778
Create a copy of the graph with optional updates.
779
780
Parameters:
781
update: dict[str, Any] | None
782
Optional dictionary of attributes to update in the copy
783
784
Returns:
785
Self - A new instance with updated attributes
786
787
Example:
788
new_app = app.copy({"debug": True})
789
"""
790
791
def with_config(self, config: RunnableConfig | None = None, **kwargs: Any) -> Self:
792
"""
793
Create a copy of the graph with an updated config.
794
795
Binds a config to the graph, useful for setting default configuration
796
values that will be merged with runtime configs.
797
798
Parameters:
799
config: RunnableConfig | None
800
Config to merge with the graph's existing config
801
**kwargs: Any
802
Additional config values to merge
803
804
Returns:
805
Self - A new instance with bound config
806
807
Example:
808
app_with_config = app.with_config(
809
{"configurable": {"thread_id": "default"}}
810
)
811
"""
812
```
813
814
#### Schema and Introspection Methods
815
816
```python { .api }
817
def get_input_schema(self, config: RunnableConfig | None = None) -> type[BaseModel]:
818
"""
819
Get the Pydantic BaseModel schema for graph input.
820
821
Parameters:
822
config: RunnableConfig | None
823
Optional config for schema generation
824
825
Returns:
826
type[BaseModel] - Pydantic model class describing valid input
827
828
Example:
829
InputSchema = app.get_input_schema()
830
print(InputSchema.model_json_schema())
831
"""
832
833
def get_output_schema(self, config: RunnableConfig | None = None) -> type[BaseModel]:
834
"""
835
Get the Pydantic BaseModel schema for graph output.
836
837
Parameters:
838
config: RunnableConfig | None
839
Optional config for schema generation
840
841
Returns:
842
type[BaseModel] - Pydantic model class describing output structure
843
844
Example:
845
OutputSchema = app.get_output_schema()
846
print(OutputSchema.model_json_schema())
847
"""
848
849
def get_context_jsonschema(self) -> dict[str, Any] | None:
850
"""
851
Get JSON schema for the context (if context_schema is defined).
852
853
Returns:
854
dict[str, Any] | None - JSON schema for context, or None if no
855
context schema was specified
856
857
Example:
858
if context_schema := app.get_context_jsonschema():
859
print(f"Context schema: {context_schema}")
860
"""
861
862
def get_input_jsonschema(self, config: RunnableConfig | None = None) -> dict[str, Any]:
863
"""
864
Get JSON schema for graph input.
865
866
Parameters:
867
config: RunnableConfig | None
868
Optional config for schema generation
869
870
Returns:
871
dict[str, Any] - JSON schema describing valid input structure
872
873
Example:
874
schema = app.get_input_jsonschema()
875
print(schema)
876
"""
877
878
def get_output_jsonschema(self, config: RunnableConfig | None = None) -> dict[str, Any]:
879
"""
880
Get JSON schema for graph output.
881
882
Parameters:
883
config: RunnableConfig | None
884
Optional config for schema generation
885
886
Returns:
887
dict[str, Any] - JSON schema describing output structure
888
889
Example:
890
schema = app.get_output_jsonschema()
891
print(schema)
892
"""
893
894
def attach_node(self, key, node):
895
"""
896
Internal: Attach a node to the compiled graph.
897
898
Parameters:
899
key: str - Node identifier
900
node: Runnable - Node implementation
901
"""
902
903
def attach_edge(self, starts, end):
904
"""
905
Internal: Attach an edge to the compiled graph.
906
907
Parameters:
908
starts: str | list[str] - Source node(s)
909
end: str - Destination node
910
"""
911
912
def attach_branch(self, start, name, branch, *, with_reader=True):
913
"""
914
Internal: Attach a conditional branch to the compiled graph.
915
916
Parameters:
917
start: str - Source node
918
name: str - Branch name
919
branch: Branch implementation
920
with_reader: bool, default True - Whether to attach state reader
921
"""
922
```
923
924
### Special Constants
925
926
```python { .api }
927
START: str
928
"""
929
The first (virtual) node in the graph.
930
Value: "__start__"
931
932
Use as source in add_edge to create entry point:
933
graph.add_edge(START, "first_node")
934
"""
935
936
END: str
937
"""
938
The last (virtual) node in the graph.
939
Value: "__end__"
940
941
Use as destination in add_edge to mark exit point:
942
graph.add_edge("last_node", END)
943
"""
944
945
TAG_NOSTREAM: str
946
"""
947
Tag to disable streaming for a chat model.
948
949
When added to a node's metadata or tags, prevents streaming output
950
from chat models within that node.
951
952
Usage:
953
graph.add_node(
954
"chat",
955
chatbot_node,
956
metadata={"tags": [TAG_NOSTREAM]}
957
)
958
"""
959
960
TAG_HIDDEN: str
961
"""
962
Tag to hide a node/edge from certain tracing/streaming environments.
963
964
When applied, excludes the node or edge from LangSmith tracing and
965
certain streaming outputs.
966
967
Usage:
968
graph.add_node(
969
"internal",
970
internal_node,
971
metadata={"tags": [TAG_HIDDEN]}
972
)
973
"""
974
```
975
976
## Usage Examples
977
978
### Basic State Graph
979
980
```python
981
from typing import TypedDict
982
from langgraph.graph import StateGraph, START, END
983
984
class State(TypedDict):
985
messages: list[str]
986
counter: int
987
988
def process(state: State) -> dict:
989
return {
990
"messages": state["messages"] + ["processed"],
991
"counter": state["counter"] + 1
992
}
993
994
graph = StateGraph(State)
995
graph.add_node("process", process)
996
graph.add_edge(START, "process")
997
graph.add_edge("process", END)
998
999
app = graph.compile()
1000
result = app.invoke({"messages": [], "counter": 0})
1001
```
1002
1003
### Conditional Routing
1004
1005
```python
1006
from typing import TypedDict, Literal
1007
from langgraph.graph import StateGraph, START, END
1008
1009
class State(TypedDict):
1010
value: int
1011
result: str
1012
1013
def check_value(state: State) -> Literal["high", "low"]:
1014
return "high" if state["value"] > 10 else "low"
1015
1016
def process_high(state: State) -> dict:
1017
return {"result": "high value"}
1018
1019
def process_low(state: State) -> dict:
1020
return {"result": "low value"}
1021
1022
graph = StateGraph(State)
1023
graph.add_node("high", process_high)
1024
graph.add_node("low", process_low)
1025
1026
# Conditional routing from START
1027
graph.add_conditional_edges(
1028
START,
1029
check_value,
1030
{"high": "high", "low": "low"}
1031
)
1032
1033
graph.add_edge("high", END)
1034
graph.add_edge("low", END)
1035
1036
app = graph.compile()
1037
```
1038
1039
### State with Reducers
1040
1041
```python
1042
from typing import TypedDict, Annotated
1043
from operator import add
1044
from langgraph.graph import StateGraph, START, END
1045
1046
class State(TypedDict):
1047
# Using reducer to accumulate values
1048
total: Annotated[int, add]
1049
items: list[str]
1050
1051
def add_item(state: State) -> dict:
1052
return {
1053
"total": 5, # Will be added to existing total
1054
"items": state["items"] + ["new"]
1055
}
1056
1057
graph = StateGraph(State)
1058
graph.add_node("add", add_item)
1059
graph.add_edge(START, "add")
1060
graph.add_edge("add", END)
1061
1062
app = graph.compile()
1063
result = app.invoke({"total": 10, "items": []})
1064
# result["total"] == 15 (10 + 5 via reducer)
1065
# result["items"] == ["new"]
1066
```
1067
1068
### Parallel Execution with Send
1069
1070
```python
1071
from typing import TypedDict
1072
from langgraph.graph import StateGraph, START, END
1073
from langgraph.types import Send
1074
1075
class State(TypedDict):
1076
items: list[int]
1077
results: list[int]
1078
1079
def fan_out(state: State) -> list[Send]:
1080
# Send each item to process_item in parallel
1081
return [Send("process_item", {"value": item}) for item in state["items"]]
1082
1083
def process_item(state: dict) -> dict:
1084
return {"results": [state["value"] * 2]}
1085
1086
def aggregate(state: State) -> dict:
1087
return {"results": state["results"]}
1088
1089
graph = StateGraph(State)
1090
graph.add_node("process_item", process_item)
1091
graph.add_node("aggregate", aggregate)
1092
1093
graph.add_conditional_edges(START, fan_out)
1094
graph.add_edge("process_item", "aggregate")
1095
graph.add_edge("aggregate", END)
1096
1097
app = graph.compile()
1098
result = app.invoke({"items": [1, 2, 3], "results": []})
1099
# result["results"] == [2, 4, 6]
1100
```
1101
1102
### With Checkpointing and Interrupts
1103
1104
```python
1105
from typing import TypedDict
1106
from langgraph.graph import StateGraph, START, END
1107
from langgraph.checkpoint.memory import MemorySaver
1108
1109
class State(TypedDict):
1110
value: int
1111
approved: bool
1112
1113
def process(state: State) -> dict:
1114
return {"value": state["value"] + 1}
1115
1116
def needs_approval(state: State) -> dict:
1117
return {"approved": False}
1118
1119
# Create checkpointer for persistence
1120
checkpointer = MemorySaver()
1121
1122
graph = StateGraph(State)
1123
graph.add_node("process", process)
1124
graph.add_node("approval", needs_approval)
1125
1126
graph.add_edge(START, "process")
1127
graph.add_edge("process", "approval")
1128
graph.add_edge("approval", END)
1129
1130
# Compile with interruption before approval node
1131
app = graph.compile(
1132
checkpointer=checkpointer,
1133
interrupt_before=["approval"]
1134
)
1135
1136
# First execution - stops at approval
1137
config = {"configurable": {"thread_id": "1"}}
1138
result = app.invoke({"value": 0, "approved": True}, config)
1139
# Execution interrupted before "approval" node
1140
1141
# Get state to inspect
1142
state = app.get_state(config)
1143
# state.next == ("approval",)
1144
1145
# Resume execution
1146
result = app.invoke(None, config)
1147
# Continues from approval node
1148
```
1149