0
# Types and Primitives
1
2
LangGraph provides a comprehensive set of types and primitives for controlling graph execution flow, managing state snapshots, configuring retry and cache policies, and coordinating parallel execution.
3
4
## Imports
5
6
```python
7
from langgraph.types import (
8
Send,
9
Command,
10
interrupt,
11
RetryPolicy,
12
CachePolicy,
13
Interrupt,
14
StateSnapshot,
15
StateUpdate,
16
PregelTask,
17
PregelExecutableTask,
18
StreamMode,
19
StreamWriter,
20
Durability,
21
All,
22
Checkpointer
23
)
24
from langgraph.managed import IsLastStep, RemainingSteps
25
from langgraph.typing import (
26
StateT, StateT_co, StateT_contra,
27
ContextT, InputT, OutputT
28
)
29
```
30
31
## Capabilities
32
33
### Control Flow Primitives
34
35
#### Send Class
36
37
Send messages to specific nodes with custom state, enabling dynamic parallelization and fan-out patterns.
38
39
```python { .api }
40
class Send:
41
"""
42
Message to send to a specific node with custom state.
43
44
Used in conditional edges to dynamically route execution to nodes
45
with custom input, enabling fan-out and parallel processing patterns.
46
47
IMPORTANT: When using Send for parallel execution where multiple nodes
48
write to the same state key, you MUST use Annotated with a reducer function
49
(e.g., operator.add for lists). Otherwise you'll get InvalidUpdateError:
50
"Can receive only one value per step. Use an Annotated key to handle multiple values."
51
52
Attributes:
53
node: str - Target node name
54
arg: Any - State/message to send to the node
55
"""
56
57
def __init__(self, node, arg):
58
"""
59
Initialize a Send message.
60
61
Parameters:
62
node: str - Name of the target node
63
arg: Any - State or data to send to the node
64
"""
65
66
node: str
67
arg: Any
68
```
69
70
#### Command Class
71
72
Commands for updating state and controlling graph navigation, including support for parent graph communication.
73
74
```python { .api }
75
class Command:
76
"""
77
Command for state updates and navigation control.
78
79
Enables nodes to update state and control execution flow simultaneously.
80
Can target parent graphs for nested graph communication.
81
82
Type Parameters:
83
N: Type of node identifiers (typically str)
84
85
Attributes:
86
graph: Optional[str] - Target graph name or Command.PARENT
87
update: Optional[Any] - State updates to apply
88
resume: Optional[dict[str, Any] | Any] - Resume value for interrupt
89
goto: Send | Sequence[Send | N] | N - Navigation target(s)
90
91
Class Attributes:
92
PARENT: Literal["__parent__"] - Reference to parent graph
93
"""
94
95
graph: str | None = None
96
update: Any | None = None
97
resume: dict[str, Any] | Any | None = None
98
goto: Send | Sequence[Send] = ()
99
100
PARENT: ClassVar[Literal["__parent__"]] = "__parent__"
101
```
102
103
#### Interrupt Function
104
105
Interrupt graph execution with a resumable value from within a node.
106
107
```python { .api }
108
def interrupt(value: Any) -> Any:
109
"""
110
Interrupt graph execution with a resumable exception.
111
112
Raises an internal exception that pauses execution and stores the value.
113
Execution can be resumed later, optionally providing a resume value.
114
115
Parameters:
116
value: Any - Value to store with the interrupt. Can be retrieved
117
via StateSnapshot.interrupts.
118
119
Returns:
120
Any - The resume value when execution is resumed.
121
If no resume value is provided, returns None.
122
123
Usage:
124
def approval_node(state):
125
# Request human approval
126
user_input = interrupt("Please approve or reject")
127
128
# user_input contains the resume value
129
if user_input == "approve":
130
return {"approved": True}
131
else:
132
return {"approved": False}
133
134
# Later, resume with:
135
# app.invoke(Command(resume="approve"), config)
136
"""
137
```
138
139
### Policy Configuration
140
141
#### RetryPolicy
142
143
Configuration for retrying node execution on failures with exponential backoff.
144
145
```python { .api }
146
class RetryPolicy:
147
"""
148
Configuration for retrying nodes on failure.
149
150
Implements exponential backoff with jitter for reliable retries.
151
152
Attributes:
153
initial_interval: float - Time in seconds before first retry (default: 0.5)
154
backoff_factor: float - Multiplier for retry interval (default: 2.0)
155
max_interval: float - Maximum time between retries (default: 128.0)
156
max_attempts: int - Maximum retry attempts (default: 3)
157
jitter: bool - Add random jitter to retry timing (default: True)
158
retry_on: Exception type(s) or predicate - When to retry:
159
- type[Exception]: Retry on this exception type
160
- Sequence[type[Exception]]: Retry on any of these types
161
- Callable[[Exception], bool]: Retry if function returns True
162
163
Usage:
164
# Retry on any exception
165
policy = RetryPolicy(max_attempts=5)
166
167
# Retry on specific exception
168
policy = RetryPolicy(
169
max_attempts=3,
170
retry_on=ValueError
171
)
172
173
# Retry on multiple exceptions
174
policy = RetryPolicy(
175
retry_on=(ValueError, KeyError)
176
)
177
178
# Custom retry logic
179
policy = RetryPolicy(
180
retry_on=lambda e: isinstance(e, ValueError) and "temporary" in str(e)
181
)
182
"""
183
184
initial_interval: float = 0.5
185
backoff_factor: float = 2.0
186
max_interval: float = 128.0
187
max_attempts: int = 3
188
jitter: bool = True
189
retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
190
```
191
192
#### CachePolicy
193
194
Configuration for caching node results to avoid redundant computation.
195
196
```python { .api }
197
class CachePolicy:
198
"""
199
Configuration for caching node results.
200
201
Type Parameters:
202
KeyFuncT: Type of the key generation function
203
204
Attributes:
205
key_func: Callable - Function to generate cache key from input
206
ttl: Optional[int] - Time to live in seconds (None = no expiration)
207
208
Usage:
209
# Cache based on input dict
210
policy = CachePolicy(
211
key_func=lambda x: f"{x['id']}:{x['version']}",
212
ttl=3600 # 1 hour
213
)
214
215
# Cache based on entire input
216
policy = CachePolicy(
217
key_func=str,
218
ttl=None # Never expires
219
)
220
221
# Custom key function
222
def complex_key(input_data):
223
return f"{input_data['user']}:{input_data['timestamp']}"
224
225
policy = CachePolicy(
226
key_func=complex_key,
227
ttl=1800 # 30 minutes
228
)
229
"""
230
231
key_func: Callable # Function to generate cache key
232
ttl: int | None = None # Time to live in seconds
233
```
234
235
### State Management Types
236
237
#### StateSnapshot
238
239
Snapshot of graph state at the beginning of a step, including pending tasks and interrupts.
240
241
```python { .api }
242
class StateSnapshot:
243
"""
244
Snapshot of graph state at the beginning of a step.
245
246
Captures complete execution context including state values,
247
pending tasks, and configuration.
248
249
Attributes:
250
values: dict[str, Any] | Any
251
Current state channel values. Dict for TypedDict states,
252
object for Pydantic/dataclass states.
253
next: tuple[str, ...]
254
Tuple of node names to execute next. Empty if no pending nodes.
255
config: RunnableConfig
256
Configuration for this snapshot, including thread_id and
257
checkpoint_id.
258
metadata: Optional[CheckpointMetadata]
259
Metadata associated with this checkpoint (step number, source, etc.)
260
created_at: Optional[str]
261
ISO timestamp when snapshot was created
262
parent_config: Optional[RunnableConfig]
263
Config of parent checkpoint (for nested graphs)
264
tasks: tuple[PregelTask, ...]
265
Pending tasks to execute, including task IDs, names, and state
266
interrupts: tuple[Interrupt, ...]
267
Pending interrupts with values and IDs
268
269
Usage:
270
state = app.get_state(config)
271
print(state.values) # Current state
272
print(state.next) # Next nodes to execute
273
print(state.tasks) # Pending tasks
274
275
# Check if execution is complete
276
if not state.next:
277
print("Graph execution complete")
278
"""
279
280
values: dict[str, Any] | Any
281
next: tuple[str, ...]
282
config: RunnableConfig
283
metadata: CheckpointMetadata | None
284
created_at: str | None
285
parent_config: RunnableConfig | None
286
tasks: tuple[PregelTask, ...]
287
interrupts: tuple[Interrupt, ...]
288
```
289
290
#### StateUpdate
291
292
Information about a state update for bulk operations.
293
294
```python { .api }
295
class StateUpdate:
296
"""
297
State update information for bulk_update_state.
298
299
Attributes:
300
values: Optional[dict[str, Any]] - State values to update
301
as_node: Optional[str] - Act as if update came from this node
302
task_id: Optional[str] - ID of task performing the update
303
"""
304
305
values: dict[str, Any] | None
306
as_node: str | None
307
task_id: str | None
308
```
309
310
#### Interrupt Class
311
312
Information about an interrupt in the graph execution.
313
314
```python { .api }
315
class Interrupt:
316
"""
317
Information about an interrupt.
318
319
Attributes:
320
value: Any - Value associated with the interrupt
321
id: str - Unique identifier for the interrupt
322
"""
323
324
value: Any
325
id: str
326
327
@classmethod
328
def from_ns(cls, value, ns):
329
"""
330
Create an Interrupt from a value and namespace.
331
332
Parameters:
333
value: Any - Interrupt value
334
ns: str - Namespace string
335
336
Returns:
337
Interrupt - New interrupt instance
338
"""
339
```
340
341
### Task Types
342
343
#### PregelTask
344
345
Represents a task in the Pregel execution model, including task state and results.
346
347
```python { .api }
348
class PregelTask:
349
"""
350
A Pregel task representing a unit of work.
351
352
Attributes:
353
id: str - Unique task identifier
354
name: str - Task/node name
355
path: tuple[str | int | tuple, ...]
356
Path in the execution tree (for nested graphs)
357
error: Optional[Exception] - Exception if task failed
358
interrupts: tuple[Interrupt, ...]
359
Interrupts raised by this task
360
state: Optional[RunnableConfig | StateSnapshot]
361
Task state (for subgraphs) or configuration
362
result: Optional[Any] - Task result if completed
363
364
Usage:
365
state = app.get_state(config)
366
for task in state.tasks:
367
print(f"Task {task.name}: {task.id}")
368
if task.error:
369
print(f" Error: {task.error}")
370
if task.interrupts:
371
print(f" Interrupts: {task.interrupts}")
372
"""
373
374
id: str
375
name: str
376
path: tuple[str | int | tuple, ...]
377
error: Exception | None = None
378
interrupts: tuple[Interrupt, ...] = ()
379
state: None | RunnableConfig | StateSnapshot = None
380
result: Any | None = None
381
```
382
383
#### PregelExecutableTask
384
385
Executable task with all information needed for execution (internal use).
386
387
```python { .api }
388
class PregelExecutableTask:
389
"""
390
Executable task with complete execution context.
391
392
This is primarily used internally by the Pregel execution engine.
393
394
Attributes:
395
name: str - Task name
396
input: Any - Task input
397
proc: Runnable - Runnable to execute
398
writes: deque[tuple[str, Any]] - Pending writes
399
config: RunnableConfig - Task configuration
400
triggers: Sequence[str] - Channel triggers
401
retry_policy: Sequence[RetryPolicy] - Retry configurations
402
cache_key: Optional[CacheKey] - Cache key if caching enabled
403
id: str - Task ID
404
path: tuple[str | int | tuple, ...] - Execution path
405
writers: Sequence[Runnable] - Write handlers
406
subgraphs: Sequence[PregelProtocol] - Nested subgraphs
407
"""
408
409
name: str
410
input: Any
411
proc: Runnable
412
writes: deque[tuple[str, Any]]
413
config: RunnableConfig
414
triggers: Sequence[str]
415
retry_policy: Sequence[RetryPolicy]
416
cache_key: CacheKey | None
417
id: str
418
path: tuple[str | int | tuple, ...]
419
writers: Sequence[Runnable] = ()
420
subgraphs: Sequence[PregelProtocol] = ()
421
```
422
423
### Stream and Execution Types
424
425
#### StreamMode Type
426
427
Defines how the stream method should emit outputs.
428
429
```python { .api }
430
StreamMode = Literal["values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"]
431
"""
432
Stream mode options for controlling how graph execution is streamed:
433
434
- "values": Emit the full state after each step/superstep
435
- "updates": Emit node-level updates as they occur, in format {node_name: update_dict}
436
- "checkpoints": Emit checkpoint creation events (metadata about state persistence)
437
- "tasks": Emit task-level events showing when individual tasks start and complete
438
- "debug": Emit both checkpoints and tasks for comprehensive debugging
439
- "messages": Emit LLM message tokens as they are generated (for streaming chat models)
440
- "custom": Emit custom data written via StreamWriter within nodes
441
442
Usage:
443
# Single mode
444
for chunk in app.stream(input, stream_mode="updates"):
445
print(chunk)
446
447
# Multiple modes - returns tuples of (mode, data)
448
for chunk in app.stream(input, stream_mode=["values", "tasks"]):
449
mode, data = chunk
450
if mode == "values":
451
print(f"State: {data}")
452
elif mode == "tasks":
453
print(f"Task: {data}")
454
"""
455
```
456
457
#### StreamWriter Type
458
459
Callable for writing custom data to the output stream.
460
461
```python { .api }
462
StreamWriter = Callable[[Any], None]
463
"""
464
Callable that writes custom data to the output stream.
465
466
Retrieved via get_stream_writer() within nodes. Enables nodes to
467
emit progress updates, logs, or custom events during execution.
468
469
Usage:
470
from langgraph.config import get_stream_writer
471
472
def my_node(state):
473
writer = get_stream_writer()
474
writer({"progress": 25})
475
# Do work...
476
writer({"progress": 100})
477
return state
478
479
# Stream with custom mode
480
for chunk in app.stream(input, stream_mode="custom"):
481
print(chunk) # {"progress": 25}, then {"progress": 100}
482
"""
483
```
484
485
#### Durability Type
486
487
Durability mode for graph execution (internal).
488
489
```python { .api }
490
Durability = Literal["sync", "async", "exit"]
491
"""
492
Durability mode for graph execution.
493
494
- "sync": Changes are persisted synchronously before the next step starts
495
- "async": Changes are persisted asynchronously while the next step executes
496
- "exit": Changes are persisted only when the graph exits
497
498
This is primarily used internally by the execution engine.
499
"""
500
```
501
502
#### Checkpointer Type
503
504
Type alias for checkpointer configurations.
505
506
```python { .api }
507
Checkpointer = None | bool | BaseCheckpointSaver
508
"""
509
Type of checkpointer to use for a subgraph.
510
511
- True: Enables persistent checkpointing for this subgraph
512
- False: Disables checkpointing, even if parent graph has a checkpointer
513
- None: Inherits checkpointer from parent graph
514
- BaseCheckpointSaver: Uses specific checkpointer instance
515
516
Usage:
517
# Inherit from parent
518
graph.compile(checkpointer=None)
519
520
# Enable checkpointing
521
graph.compile(checkpointer=True)
522
523
# Disable checkpointing
524
graph.compile(checkpointer=False)
525
526
# Use specific checkpointer
527
from langgraph.checkpoint.memory import MemorySaver
528
graph.compile(checkpointer=MemorySaver())
529
"""
530
```
531
532
#### All Type
533
534
Special value to indicate all nodes.
535
536
```python { .api }
537
All = Literal["*"]
538
"""
539
Special value to interrupt on all nodes.
540
541
Usage:
542
# Interrupt before all nodes
543
app = graph.compile(interrupt_before="*")
544
545
# Interrupt after all nodes
546
app = graph.compile(interrupt_after="*")
547
"""
548
```
549
550
### Managed Values
551
552
Special state values automatically managed by LangGraph.
553
554
```python { .api }
555
IsLastStep = Annotated[bool, IsLastStepManager]
556
"""
557
Managed value that indicates if the current step is the last step.
558
559
Automatically set to True when the graph is about to reach the
560
recursion limit.
561
562
Usage:
563
from typing import TypedDict, Annotated
564
from langgraph.managed import IsLastStep
565
566
class State(TypedDict):
567
data: str
568
is_last: IsLastStep
569
570
def node(state):
571
if state["is_last"]:
572
# Last step - finalize
573
return {"data": "final"}
574
return {"data": "continue"}
575
"""
576
577
RemainingSteps = Annotated[int, RemainingStepsManager]
578
"""
579
Managed value for the number of remaining steps.
580
581
Automatically decremented with each step. Useful for limiting
582
iteration or providing step-based behavior.
583
584
Usage:
585
from typing import TypedDict, Annotated
586
from langgraph.managed import RemainingSteps
587
588
class State(TypedDict):
589
data: str
590
steps_left: RemainingSteps
591
592
def node(state):
593
print(f"Steps remaining: {state['steps_left']}")
594
return {"data": "processed"}
595
"""
596
```
597
598
### Type Variables
599
600
Type variables for generic typing of graphs and components.
601
602
```python { .api }
603
StateT = TypeVar("StateT", bound=StateLike)
604
"""
605
Type variable for state in graph.
606
607
Used for generic typing of StateGraph and Pregel.
608
Bound to StateLike (dict-like objects, Pydantic models, dataclasses).
609
"""
610
611
StateT_co = TypeVar("StateT_co", bound=StateLike, covariant=True)
612
"""
613
Covariant state type variable.
614
615
Used for read-only state access in type signatures.
616
"""
617
618
StateT_contra = TypeVar("StateT_contra", bound=StateLike, contravariant=True)
619
"""
620
Contravariant state type variable.
621
622
Used for write-only state access in type signatures.
623
"""
624
625
ContextT = TypeVar("ContextT", bound=StateLike | None, default=None)
626
"""
627
Type variable for graph run-scoped context.
628
629
Context is read-only and available throughout the graph run.
630
Defaults to None if not specified.
631
"""
632
633
InputT = TypeVar("InputT", bound=StateLike, default=StateT)
634
"""
635
Type variable for input to state graph.
636
637
Typically a subset of the state schema. Defaults to StateT.
638
"""
639
640
OutputT = TypeVar("OutputT", bound=StateLike, default=StateT)
641
"""
642
Type variable for output of state graph.
643
644
Typically a subset of the state schema. Defaults to StateT.
645
"""
646
```
647
648
## Usage Examples
649
650
### Using Send for Dynamic Parallelization
651
652
```python
653
from typing import TypedDict, Annotated
654
from operator import add
655
from langgraph.graph import StateGraph, START, END
656
from langgraph.types import Send
657
658
class State(TypedDict):
659
items: list[int]
660
# IMPORTANT: Parallel writes require Annotated with a reducer function
661
# Without this, you'll get InvalidUpdateError when multiple nodes write to 'results'
662
results: Annotated[list[int], add]
663
664
def fan_out(state: State) -> list[Send]:
665
"""Create parallel tasks for each item."""
666
return [
667
Send("process", {"value": item})
668
for item in state["items"]
669
]
670
671
def process(state: dict) -> dict:
672
"""Process single item."""
673
return {"results": [state["value"] * 2]}
674
675
graph = StateGraph(State)
676
graph.add_node("process", process)
677
graph.add_conditional_edges(START, fan_out)
678
graph.add_edge("process", END)
679
680
app = graph.compile()
681
result = app.invoke({"items": [1, 2, 3], "results": []})
682
# result["results"] == [2, 4, 6]
683
```
684
685
### Using Command for State Updates and Navigation
686
687
```python
688
from typing import TypedDict
689
from langgraph.graph import StateGraph, START, END
690
from langgraph.types import Command
691
692
class State(TypedDict):
693
stage: str
694
value: int
695
696
def node(state: State) -> Command:
697
"""Return command with state update and navigation."""
698
return Command(
699
update={"value": state["value"] + 1},
700
goto="next_node" if state["value"] < 5 else END
701
)
702
703
graph = StateGraph(State)
704
graph.add_node("node", node)
705
graph.add_node("next_node", lambda s: {"stage": "next"})
706
graph.add_edge(START, "node")
707
```
708
709
### Using Interrupt for Human-in-the-Loop
710
711
```python
712
from typing import TypedDict
713
from langgraph.graph import StateGraph, START, END
714
from langgraph.types import interrupt, Command
715
from langgraph.checkpoint.memory import MemorySaver
716
717
class State(TypedDict):
718
data: str
719
approved: bool
720
721
def needs_approval(state: State) -> dict:
722
"""Request human approval."""
723
response = interrupt({
724
"question": "Approve this data?",
725
"data": state["data"]
726
})
727
728
return {"approved": response == "yes"}
729
730
graph = StateGraph(State)
731
graph.add_node("approval", needs_approval)
732
graph.add_edge(START, "approval")
733
graph.add_edge("approval", END)
734
735
checkpointer = MemorySaver()
736
app = graph.compile(checkpointer=checkpointer)
737
738
config = {"configurable": {"thread_id": "1"}}
739
740
# First run - interrupts
741
result = app.invoke({"data": "important", "approved": False}, config)
742
743
# Check interrupts
744
state = app.get_state(config)
745
print(state.interrupts[0].value) # {"question": "Approve...", "data": "important"}
746
747
# Resume with approval
748
result = app.invoke(Command(resume="yes"), config)
749
# result["approved"] == True
750
```
751
752
### Using RetryPolicy with Nodes
753
754
```python
755
from langgraph.graph import StateGraph, START, END
756
from langgraph.types import RetryPolicy
757
758
class State(TypedDict):
759
attempts: int
760
result: str
761
762
def unreliable_node(state: State) -> dict:
763
"""Node that might fail."""
764
if state["attempts"] < 2:
765
raise ValueError("Temporary failure")
766
return {"result": "success"}
767
768
graph = StateGraph(State)
769
graph.add_node(
770
"work",
771
unreliable_node,
772
retry_policy=RetryPolicy(
773
max_attempts=3,
774
initial_interval=1.0,
775
retry_on=ValueError
776
)
777
)
778
graph.add_edge(START, "work")
779
graph.add_edge("work", END)
780
781
app = graph.compile()
782
# Will retry on ValueError up to 3 times
783
```
784
785
### Using Managed Values
786
787
```python
788
from typing import TypedDict, Annotated
789
from langgraph.graph import StateGraph, START, END
790
from langgraph.managed import IsLastStep, RemainingSteps
791
792
class State(TypedDict):
793
data: list[int]
794
is_last: IsLastStep
795
steps_left: RemainingSteps
796
797
def process(state: State) -> dict:
798
"""Process with awareness of remaining steps."""
799
print(f"Steps remaining: {state['steps_left']}")
800
801
if state["is_last"]:
802
# Last step - finalize
803
return {"data": state["data"] + [999]}
804
805
# Normal processing
806
return {"data": state["data"] + [len(state["data"])]}
807
808
graph = StateGraph(State)
809
graph.add_node("process", process)
810
graph.add_edge(START, "process")
811
graph.add_edge("process", "process") # Loop
812
813
app = graph.compile()
814
815
# Run with recursion limit
816
result = app.invoke(
817
{"data": []},
818
config={"recursion_limit": 5}
819
)
820
# Will loop until is_last is True
821
```
822
823
### Inspecting StateSnapshot
824
825
```python
826
from langgraph.checkpoint.memory import MemorySaver
827
828
graph = StateGraph(dict)
829
# ... add nodes ...
830
831
app = graph.compile(checkpointer=MemorySaver())
832
config = {"configurable": {"thread_id": "1"}}
833
834
# Run graph
835
app.invoke({"value": 1}, config)
836
837
# Get detailed state snapshot
838
state = app.get_state(config)
839
840
print(f"Values: {state.values}")
841
print(f"Next nodes: {state.next}")
842
print(f"Created: {state.created_at}")
843
print(f"Tasks: {len(state.tasks)}")
844
845
for task in state.tasks:
846
print(f" Task: {task.name} [{task.id}]")
847
if task.error:
848
print(f" Error: {task.error}")
849
850
for interrupt in state.interrupts:
851
print(f" Interrupt: {interrupt.value} [{interrupt.id}]")
852
```
853
854
## Notes
855
856
- `Send` enables dynamic fan-out where the number and targets of parallel tasks are determined at runtime
857
- `Command` provides a unified way to update state and control navigation in a single return value
858
- `interrupt()` pauses execution and can be resumed with `Command(resume=value)`
859
- `RetryPolicy` implements exponential backoff with jitter for robust error handling
860
- `CachePolicy` requires a cache backend to be configured in the graph
861
- Managed values (`IsLastStep`, `RemainingSteps`) are automatically updated by the execution engine
862
- `StateSnapshot` provides complete visibility into graph state at any point
863
- Type variables enable type-safe graph construction with IDEs and type checkers
864