0
# LangGraph
1
2
LangGraph is a library for building stateful, multi-actor applications with Large Language Models (LLMs). It provides low-level orchestration infrastructure for creating durable, long-running workflows with built-in support for persistence, human-in-the-loop interactions, and complex control flow. LangGraph enables developers to build sophisticated agent systems that can handle failures, maintain memory across sessions, and coordinate multiple actors through a flexible graph-based architecture.
3
4
## Package Information
5
6
- **Package Name**: langgraph
7
- **Package Type**: Library
8
- **Language**: Python
9
- **Installation**: `pip install langgraph`
10
- **Version**: 1.0.1
11
12
## Core Imports
13
14
```python
15
from langgraph.graph import StateGraph, START, END
16
```
17
18
Import tags for controlling behavior:
19
20
```python
21
from langgraph.constants import TAG_NOSTREAM, TAG_HIDDEN
22
```
23
24
Common imports for working with messages:
25
26
```python
27
from langgraph.graph import MessagesState, add_messages
28
```
29
30
For functional API:
31
32
```python
33
from langgraph.func import entrypoint, task
34
```
35
36
For low-level graph execution:
37
38
```python
39
from langgraph.pregel import Pregel
40
```
41
42
## Basic Usage
43
44
```python
45
from typing import TypedDict, Annotated
46
from langgraph.graph import StateGraph, START, END
47
48
# Define the state schema
49
class State(TypedDict):
50
messages: list[str]
51
counter: int
52
53
# Create a graph
54
graph = StateGraph(State)
55
56
# Define node functions
57
def process_message(state: State) -> State:
58
"""Process messages and increment counter."""
59
return {
60
"messages": state["messages"] + ["processed"],
61
"counter": state["counter"] + 1
62
}
63
64
def check_complete(state: State) -> str:
65
"""Route based on counter."""
66
if state["counter"] >= 3:
67
return END
68
return "process"
69
70
# Add nodes
71
graph.add_node("process", process_message)
72
73
# Add edges
74
graph.add_edge(START, "process")
75
graph.add_conditional_edges("process", check_complete)
76
77
# Compile the graph
78
app = graph.compile()
79
80
# Run the graph
81
result = app.invoke({
82
"messages": ["hello"],
83
"counter": 0
84
})
85
86
print(result) # Final state after execution
87
```
88
89
## Architecture
90
91
LangGraph is built around several key components that work together to enable stateful, multi-actor workflows:
92
93
- **StateGraph**: High-level API for building graphs where nodes communicate through shared state. Nodes read from and write to state channels, enabling coordination between multiple actors.
94
95
- **Pregel**: Low-level execution engine inspired by Google's Pregel system. Handles graph traversal, state management, checkpointing, and parallelization of node execution.
96
97
- **Channels**: State management primitives that control how state updates are applied. Different channel types support various patterns like last-value, aggregation, barriers, and topics.
98
99
- **Checkpointing**: Persistent state storage that enables durable execution. Graphs can be paused, resumed, and replayed from any checkpoint, supporting long-running workflows and human-in-the-loop patterns.
100
101
- **Functional API**: Decorator-based API using `@entrypoint` and `@task` that provides an alternative to explicit graph construction, enabling more natural Python code with automatic parallelization.
102
103
- **Runtime Context**: Thread-local context that provides access to configuration, stores, stream writers, and other runtime utilities from within node functions.
104
105
This architecture enables LangGraph to support complex agent workflows with features like conditional routing, parallel execution, state persistence, error recovery, and human intervention at any point in execution.
106
107
## Capabilities
108
109
### Graph Construction
110
111
Build stateful graphs using StateGraph where nodes communicate through shared state. Supports conditional routing, parallel execution, and complex workflows.
112
113
```python { .api }
114
class StateGraph:
115
def __init__(
116
self,
117
state_schema,
118
context_schema=None,
119
*,
120
input_schema=None,
121
output_schema=None
122
): ...
123
124
def add_node(
125
self,
126
node,
127
action=None,
128
*,
129
defer=False,
130
metadata=None,
131
input_schema=None,
132
retry_policy=None,
133
cache_policy=None,
134
destinations=None
135
): ...
136
137
def add_edge(self, start_key, end_key): ...
138
139
def add_conditional_edges(self, source, path, path_map=None): ...
140
141
def compile(
142
self,
143
checkpointer=None,
144
*,
145
cache=None,
146
store=None,
147
interrupt_before=None,
148
interrupt_after=None,
149
debug=False,
150
name=None
151
): ...
152
```
153
154
```python { .api }
155
# Special node identifiers
156
START: str # The first (virtual) node - "__start__"
157
END: str # The last (virtual) node - "__end__"
158
159
# Tags for controlling behavior
160
TAG_NOSTREAM: str # Tag to disable streaming for a chat model
161
TAG_HIDDEN: str # Tag to hide node/edge from tracing/streaming
162
```
163
164
[State Graph API](./state-graph.md)
165
166
### Functional API
167
168
Define workflows using Python decorators for a more natural programming style with automatic parallelization of tasks.
169
170
```python { .api }
171
def task(
172
func=None,
173
*,
174
name=None,
175
retry_policy=None,
176
cache_policy=None
177
): ...
178
179
class entrypoint:
180
def __init__(
181
self,
182
checkpointer=None,
183
store=None,
184
cache=None,
185
context_schema=None,
186
cache_policy=None,
187
retry_policy=None
188
): ...
189
190
def __call__(self, func): ...
191
```
192
193
[Functional API](./functional-api.md)
194
195
### State Management Channels
196
197
Channel primitives for managing how state updates are applied, supporting patterns like last-value, aggregation, barriers, and pub/sub.
198
199
```python { .api }
200
class LastValue: ...
201
class AnyValue: ...
202
class EphemeralValue: ...
203
class UntrackedValue: ...
204
class BinaryOperatorAggregate:
205
def __init__(self, typ, operator): ...
206
class Topic: ...
207
class NamedBarrierValue: ...
208
```
209
210
[Channels](./channels.md)
211
212
### Graph Execution Engine
213
214
Low-level Pregel execution engine providing fine-grained control over graph execution, streaming, and state management.
215
216
```python { .api }
217
class Pregel:
218
def invoke(self, input, config=None, **kwargs): ...
219
def stream(
220
self,
221
input,
222
config=None,
223
*,
224
stream_mode=None,
225
output_keys=None,
226
interrupt_before=None,
227
interrupt_after=None,
228
debug=None,
229
subgraphs=False
230
): ...
231
def get_state(self, config, *, subgraphs=False): ...
232
def update_state(self, config, values, as_node=None): ...
233
def get_state_history(
234
self,
235
config,
236
*,
237
filter=None,
238
before=None,
239
limit=None
240
): ...
241
```
242
243
[Pregel Engine](./pregel.md)
244
245
### Types and Primitives
246
247
Core types for control flow, state snapshots, task management, and retry/cache policies.
248
249
```python { .api }
250
class Send:
251
"""Send message to specific node."""
252
def __init__(self, node: str, arg: Any): ...
253
254
class Command:
255
"""Command for state updates and navigation."""
256
graph: str | None = None
257
update: Any | None = None
258
resume: dict[str, Any] | Any | None = None
259
goto: Send | Sequence[Send] = ()
260
261
def interrupt(value: Any) -> Any:
262
"""Interrupt graph execution with resumable value."""
263
...
264
265
class RetryPolicy:
266
initial_interval: float = 0.5
267
backoff_factor: float = 2.0
268
max_interval: float = 128.0
269
max_attempts: int = 3
270
jitter: bool = True
271
retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
272
273
class StateSnapshot:
274
values: dict[str, Any] | Any
275
next: tuple[str, ...]
276
config: RunnableConfig
277
metadata: CheckpointMetadata | None
278
created_at: str | None
279
parent_config: RunnableConfig | None
280
tasks: tuple[PregelTask, ...]
281
interrupts: tuple[Interrupt, ...]
282
```
283
284
[Types and Primitives](./types-primitives.md)
285
286
### Managed Values
287
288
Built-in managed state values that provide execution context information within nodes and tasks.
289
290
```python { .api }
291
IsLastStep: Annotated[bool, IsLastStepManager]
292
"""
293
Managed value that returns True when execution is on the last step.
294
295
Include in your state TypedDict to access execution context. The value is automatically
296
populated by the runtime and indicates when the graph is on its final step before
297
hitting the recursion limit.
298
299
Usage:
300
from typing import TypedDict
301
from langgraph.managed import IsLastStep
302
from langgraph.graph import StateGraph
303
304
class State(TypedDict):
305
data: str
306
is_last: IsLastStep # Add as a state field
307
308
def my_node(state: State) -> dict:
309
if state["is_last"]: # Access like any other state field
310
# Perform final cleanup or summary
311
return {"data": "final"}
312
return {"data": "continue"}
313
314
graph = StateGraph(State)
315
graph.add_node("process", my_node)
316
# ... The 'is_last' field will be automatically managed
317
"""
318
319
RemainingSteps: Annotated[int, RemainingStepsManager]
320
"""
321
Managed value that returns the number of remaining execution steps.
322
323
Include in your state TypedDict to track how many more steps can execute before
324
hitting the recursion limit. Useful for progress tracking or conditional logic.
325
326
Usage:
327
from typing import TypedDict
328
from langgraph.managed import RemainingSteps
329
from langgraph.graph import StateGraph
330
331
class State(TypedDict):
332
data: str
333
steps_left: RemainingSteps # Add as a state field
334
335
def my_node(state: State) -> dict:
336
print(f"Steps remaining: {state['steps_left']}")
337
return {"data": "processed"}
338
339
graph = StateGraph(State)
340
graph.add_node("process", my_node)
341
# ... The 'steps_left' field will be automatically managed
342
"""
343
```
344
345
**Note:** Managed values must be declared in the state TypedDict schema. They are automatically populated by the runtime and are read-only (nodes cannot modify them). The graph identifies managed values by their type annotation and handles them specially.
346
347
### Error Handling
348
349
Exception classes for graph errors, interrupts, and validation failures.
350
351
```python { .api }
352
class GraphRecursionError(RecursionError): ...
353
class InvalidUpdateError(Exception): ...
354
class GraphInterrupt(Exception): ...
355
class EmptyChannelError(Exception): ...
356
class EmptyInputError(Exception): ...
357
class TaskNotFound(Exception): ...
358
359
class ErrorCode:
360
GRAPH_RECURSION_LIMIT
361
INVALID_CONCURRENT_GRAPH_UPDATE
362
INVALID_GRAPH_NODE_RETURN_VALUE
363
MULTIPLE_SUBGRAPHS
364
INVALID_CHAT_HISTORY
365
```
366
367
[Error Handling](./errors.md)
368
369
### Configuration and Runtime
370
371
Access configuration, stores, stream writers, and runtime context from within nodes and tasks.
372
373
```python { .api }
374
def get_config() -> RunnableConfig:
375
"""Get current RunnableConfig from context."""
376
...
377
378
def get_store() -> BaseStore:
379
"""Access LangGraph store from inside node or task."""
380
...
381
382
def get_stream_writer() -> StreamWriter:
383
"""Access StreamWriter from inside node or task."""
384
...
385
386
def get_runtime(context_schema=None) -> Runtime:
387
"""Get runtime context for current graph run."""
388
...
389
```
390
391
[Configuration and Runtime](./configuration.md)
392
393
### Message Handling
394
395
Utilities for message-based workflows with built-in message merging and state management.
396
397
```python { .api }
398
def add_messages(
399
left: Messages,
400
right: Messages,
401
*,
402
format: Literal["langchain-openai"] | None = None
403
) -> Messages:
404
"""Merge two lists of messages, updating by ID."""
405
...
406
407
class MessagesState(TypedDict):
408
messages: Annotated[list[AnyMessage], add_messages]
409
410
class MessageGraph(StateGraph):
411
"""Deprecated: Use StateGraph with messages key instead."""
412
...
413
```
414
415
[Message Handling](./message-graph.md)
416