0
# Pregel Execution Engine
1
2
Pregel is the low-level execution engine that powers LangGraph. It provides fine-grained control over graph execution, streaming, state management, and checkpointing. While most users interact with StateGraph, advanced use cases can directly use Pregel for maximum flexibility.
3
4
## Imports
5
6
```python
7
from langgraph.pregel import Pregel, NodeBuilder
8
```
9
10
## Capabilities
11
12
### Pregel Class
13
14
Core graph execution engine that implements the Runnable interface from LangChain. Handles graph traversal, state management, parallelization, and checkpointing.
15
16
```python { .api }
17
class Pregel:
18
"""
19
Low-level graph execution engine inspired by Google's Pregel.
20
21
Type Parameters:
22
StateT: Type of the graph state
23
ContextT: Type of run-scoped context
24
InputT: Type of graph input
25
OutputT: Type of graph output
26
"""
27
28
def __init__(
29
self,
30
*,
31
nodes,
32
channels,
33
input_channels,
34
output_channels,
35
stream_channels=None,
36
stream_mode="values",
37
trigger_to_nodes=None,
38
interrupt_before_nodes=None,
39
interrupt_after_nodes=None,
40
debug=False,
41
checkpointer=None,
42
store=None,
43
auto_validate=True,
44
cache=None,
45
cache_policy=None,
46
config=None,
47
context_schema=None,
48
name=None,
49
retry_policy=(),
50
step_timeout=None,
51
stream_eager=False,
52
**kwargs
53
):
54
"""
55
Initialize a Pregel graph.
56
57
This is typically not called directly - use StateGraph.compile() instead.
58
59
Parameters:
60
nodes: dict[str, PregelNode] - Graph nodes
61
channels: dict[str, BaseChannel] - State channels
62
input_channels: str | list[str] - Input channel names
63
output_channels: str | list[str] - Output channel names
64
stream_channels: Optional[list[str]] - Channels to stream
65
stream_mode: StreamMode - Default streaming mode
66
trigger_to_nodes: Optional[Mapping[str, Sequence[str]]]
67
Maps trigger names to node sequences for advanced graph control
68
interrupt_before_nodes: Optional[list[str]] - Nodes to interrupt before
69
interrupt_after_nodes: Optional[list[str]] - Nodes to interrupt after
70
debug: bool - Enable debug mode
71
checkpointer: Optional[BaseCheckpointSaver] - Checkpointer
72
store: Optional[BaseStore] - Store for cross-thread memory
73
auto_validate: bool, default True
74
Automatically validate graph structure on construction
75
cache: Optional[BaseCache]
76
Cache instance for node results
77
cache_policy: Optional[CachePolicy]
78
Default cache policy for nodes
79
config: Optional[RunnableConfig]
80
Default configuration for all runs
81
context_schema: Optional[type[ContextT]]
82
Schema for the context type
83
name: str, default "LangGraph"
84
Name of the graph
85
retry_policy: RetryPolicy | Sequence[RetryPolicy], default ()
86
Retry policies to apply to all nodes
87
step_timeout: Optional[float]
88
Timeout in seconds for each step
89
stream_eager: bool, default False
90
Enable eager streaming of outputs
91
"""
92
```
93
94
#### Execution Methods
95
96
```python { .api }
97
def invoke(self, input, config=None, **kwargs):
98
"""
99
Execute the graph and return the final output.
100
101
Parameters:
102
input: Any - Input to the graph (dict, state object, etc.)
103
config: Optional[RunnableConfig] - Configuration for the run
104
context: Optional[ContextT]
105
Static context for the run (added in v0.6.0).
106
Available to all nodes via get_runtime().
107
durability: Optional[Durability]
108
Durability mode for persistence:
109
- "sync": Persist before each step
110
- "async": Persist asynchronously during execution (default)
111
- "exit": Persist only when graph exits
112
**kwargs: Additional keyword arguments passed to nodes
113
114
Returns:
115
OutputT - Final graph output
116
117
Raises:
118
GraphRecursionError: If max steps exceeded
119
GraphInterrupt: If execution is interrupted
120
"""
121
122
async def ainvoke(self, input, config=None, **kwargs):
123
"""
124
Asynchronously execute the graph and return the final output.
125
126
Parameters:
127
input: Any - Input to the graph
128
config: Optional[RunnableConfig] - Configuration for the run
129
context: Optional[ContextT]
130
Static context for the run (added in v0.6.0).
131
Available to all nodes via get_runtime().
132
durability: Optional[Durability]
133
Durability mode for persistence:
134
- "sync": Persist before each step
135
- "async": Persist asynchronously during execution (default)
136
- "exit": Persist only when graph exits
137
**kwargs: Additional keyword arguments
138
139
Returns:
140
OutputT - Final graph output
141
"""
142
```
143
144
#### Streaming Methods
145
146
```python { .api }
147
def stream(
148
self,
149
input,
150
config=None,
151
*,
152
stream_mode=None,
153
output_keys=None,
154
interrupt_before=None,
155
interrupt_after=None,
156
debug=None,
157
subgraphs=False
158
):
159
"""
160
Stream graph execution, yielding outputs as they're produced.
161
162
Parameters:
163
input: Any - Input to the graph
164
config: Optional[RunnableConfig] - Configuration for the run
165
stream_mode: Optional[StreamMode | list[StreamMode]]
166
How to emit outputs. Options:
167
- "values": Emit full state after each step
168
- "updates": Emit only node updates
169
- "custom": Emit custom data via StreamWriter
170
- "messages": Emit LLM messages token-by-token
171
- "checkpoints": Emit checkpoint creation events
172
- "tasks": Emit task start/finish events
173
- "debug": Emit checkpoints and tasks
174
Can pass list for multiple modes simultaneously.
175
output_keys: Optional[str | list[str]]
176
Specific state keys to include in output
177
interrupt_before: Optional[list[str] | Literal["*"]]
178
Override interrupt_before for this run
179
interrupt_after: Optional[list[str] | Literal["*"]]
180
Override interrupt_after for this run
181
durability: Optional[Durability]
182
Durability mode for persistence:
183
- "sync": Persist before each step
184
- "async": Persist asynchronously during execution (default)
185
- "exit": Persist only when graph exits
186
debug: Optional[bool] - Override debug mode for this run
187
subgraphs: bool, default False
188
Whether to stream subgraph execution
189
190
Yields:
191
Chunks of output according to stream_mode.
192
For "values": dict with full state
193
For "updates": dict with node name and update
194
For "checkpoints": checkpoint metadata
195
For "tasks": task execution info
196
"""
197
198
async def astream(
199
self,
200
input,
201
config=None,
202
*,
203
stream_mode=None,
204
output_keys=None,
205
interrupt_before=None,
206
interrupt_after=None,
207
debug=None,
208
subgraphs=False
209
):
210
"""
211
Asynchronously stream graph execution.
212
213
Parameters: Same as stream(), including:
214
durability: Optional[Durability]
215
Durability mode for persistence:
216
- "sync": Persist before each step
217
- "async": Persist asynchronously during execution (default)
218
- "exit": Persist only when graph exits
219
220
Yields:
221
Async iterator of output chunks
222
"""
223
```
224
225
#### State Management Methods
226
227
```python { .api }
228
def get_state(self, config, *, subgraphs=False):
229
"""
230
Get the current state snapshot for a thread.
231
232
Parameters:
233
config: RunnableConfig - Must include thread_id in configurable
234
subgraphs: bool, default False - Include subgraph states
235
236
Returns:
237
StateSnapshot - Current state with:
238
- values: Current state values
239
- next: Tuple of next node(s) to execute
240
- config: Config for this snapshot
241
- metadata: Checkpoint metadata
242
- created_at: Timestamp
243
- parent_config: Parent checkpoint config
244
- tasks: Pending tasks
245
- interrupts: Pending interrupts
246
"""
247
248
async def aget_state(self, config, *, subgraphs=False):
249
"""
250
Asynchronously get the current state snapshot.
251
252
Parameters: Same as get_state()
253
254
Returns:
255
StateSnapshot - Current state
256
"""
257
```
258
259
```python { .api }
260
def get_state_history(
261
self,
262
config,
263
*,
264
filter=None,
265
before=None,
266
limit=None
267
):
268
"""
269
Get historical state snapshots for a thread.
270
271
Parameters:
272
config: RunnableConfig - Must include thread_id
273
filter: Optional[dict] - Filter checkpoints by metadata
274
before: Optional[RunnableConfig] - Get states before this config
275
limit: Optional[int] - Maximum number of states to return
276
277
Yields:
278
StateSnapshot - Historical states in reverse chronological order
279
"""
280
281
async def aget_state_history(
282
self,
283
config,
284
*,
285
filter=None,
286
before=None,
287
limit=None
288
):
289
"""
290
Asynchronously get historical state snapshots.
291
292
Parameters: Same as get_state_history()
293
294
Yields:
295
StateSnapshot - Historical states
296
"""
297
```
298
299
```python { .api }
300
def update_state(self, config, values, as_node=None, task_id=None):
301
"""
302
Update the state of a thread manually.
303
304
Parameters:
305
config: RunnableConfig - Must include thread_id
306
values: dict[str, Any] | Any | None - State updates to apply. Can be None when used with as_node=END to clear tasks.
307
as_node: Optional[str] - Act as if update came from this node
308
task_id: Optional[str]
309
Associate update with this task ID
310
311
Returns:
312
RunnableConfig - Config for the new checkpoint
313
314
Usage:
315
# Update state manually
316
config = {"configurable": {"thread_id": "1"}}
317
new_config = graph.update_state(
318
config,
319
{"counter": 5},
320
as_node="my_node"
321
)
322
"""
323
324
async def aupdate_state(self, config, values, as_node=None, task_id=None):
325
"""
326
Asynchronously update thread state.
327
328
Parameters: Same as update_state()
329
config: RunnableConfig - Must include thread_id
330
values: dict[str, Any] | Any | None - State updates to apply
331
as_node: Optional[str] - Act as if update came from this node
332
task_id: Optional[str]
333
Associate update with this task ID
334
335
Returns:
336
RunnableConfig - Config for the new checkpoint
337
"""
338
```
339
340
```python { .api }
341
def bulk_update_state(
342
self,
343
config: RunnableConfig,
344
supersteps: Sequence[Sequence[StateUpdate]]
345
) -> RunnableConfig:
346
"""
347
Apply updates to the graph state in bulk. Requires a checkpointer to be set.
348
349
Parameters:
350
config: RunnableConfig - The config to apply the updates to. Must include thread_id.
351
supersteps: Sequence[Sequence[StateUpdate]] - A list of supersteps, each including
352
a list of updates to apply sequentially to a graph state. Each update is
353
a tuple of the form (values, as_node, task_id) where task_id is optional.
354
355
Returns:
356
RunnableConfig - The updated config.
357
358
Raises:
359
ValueError: If no checkpointer is set or no updates are provided.
360
InvalidUpdateError: If an invalid update is provided.
361
362
StateUpdate type:
363
NamedTuple with fields:
364
- values: dict[str, Any] | None
365
- as_node: str | None
366
- task_id: str | None
367
"""
368
369
async def abulk_update_state(
370
self,
371
config: RunnableConfig,
372
supersteps: Sequence[Sequence[StateUpdate]]
373
) -> RunnableConfig:
374
"""
375
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.
376
377
Parameters:
378
config: RunnableConfig - The config to apply the updates to. Must include thread_id.
379
supersteps: Sequence[Sequence[StateUpdate]] - A list of supersteps, each including
380
a list of updates to apply sequentially to a graph state. Each update is
381
a tuple of the form (values, as_node, task_id) where task_id is optional.
382
383
Returns:
384
RunnableConfig - The updated config.
385
386
Raises:
387
ValueError: If no checkpointer is set or no updates are provided.
388
InvalidUpdateError: If an invalid update is provided.
389
"""
390
```
391
392
#### Graph Introspection Methods
393
394
```python { .api }
395
def get_graph(self, config=None, *, xray=False):
396
"""
397
Get the graph structure.
398
399
Parameters:
400
config: Optional[RunnableConfig]
401
xray: int | bool, default False
402
Whether to recursively include subgraph structures.
403
- False: Only return main graph structure
404
- True: Include all subgraphs recursively
405
- int: Include subgraphs up to specified depth
406
407
Returns:
408
Graph - Graph structure with nodes and edges
409
"""
410
411
async def aget_graph(self, config=None, *, xray=False):
412
"""
413
Asynchronously get the graph structure.
414
415
Parameters:
416
config: Optional[RunnableConfig]
417
xray: int | bool, default False
418
Whether to recursively include subgraph structures.
419
- False: Only return main graph structure
420
- True: Include all subgraphs recursively
421
- int: Include subgraphs up to specified depth
422
423
Returns:
424
Graph - Graph structure
425
"""
426
```
427
428
```python { .api }
429
def get_subgraphs(self, namespace=None, *, recurse=False):
430
"""
431
Get information about subgraphs.
432
433
Parameters:
434
namespace: str | None - Subgraph namespace to query
435
recurse: bool, default False - Recursively get nested subgraphs
436
437
Returns:
438
list[tuple] - List of (namespace, subgraph) tuples
439
"""
440
441
async def aget_subgraphs(self, namespace=None, *, recurse=False):
442
"""
443
Asynchronously get subgraph information.
444
445
Parameters: Same as get_subgraphs()
446
447
Returns:
448
list[tuple] - List of (namespace, subgraph) tuples
449
"""
450
```
451
452
#### Schema Methods
453
454
```python { .api }
455
def get_input_schema(self, config=None):
456
"""
457
Get the Pydantic schema for graph input.
458
459
Parameters:
460
config: Optional[RunnableConfig]
461
462
Returns:
463
Type[BaseModel] - Pydantic model for input
464
"""
465
466
def get_output_schema(self, config=None):
467
"""
468
Get the Pydantic schema for graph output.
469
470
Parameters:
471
config: Optional[RunnableConfig]
472
473
Returns:
474
Type[BaseModel] - Pydantic model for output
475
"""
476
477
def get_input_jsonschema(self, config=None):
478
"""
479
Get JSON schema for graph input.
480
481
Parameters:
482
config: Optional[RunnableConfig]
483
484
Returns:
485
dict - JSON schema describing input
486
"""
487
488
def get_output_jsonschema(self, config=None):
489
"""
490
Get JSON schema for graph output.
491
492
Parameters:
493
config: Optional[RunnableConfig]
494
495
Returns:
496
dict - JSON schema describing output
497
"""
498
499
def get_config_jsonschema(self, include=None):
500
"""
501
Get JSON schema for configuration.
502
503
Parameters:
504
include: Optional[list[str]] - Fields to include
505
506
Returns:
507
dict - JSON schema for config
508
"""
509
510
def get_context_jsonschema(self):
511
"""
512
Get JSON schema for context.
513
514
Returns:
515
dict - JSON schema for context
516
"""
517
```
518
519
#### Properties
520
521
```python { .api }
522
@property
523
def InputType(self):
524
"""
525
Get the input type annotation.
526
527
Returns:
528
Type - Input type
529
"""
530
531
@property
532
def OutputType(self):
533
"""
534
Get the output type annotation.
535
536
Returns:
537
Type - Output type
538
"""
539
540
@property
541
def stream_channels_list(self):
542
"""
543
Get list of stream channel names.
544
545
Returns:
546
list[str] - Channel names
547
"""
548
549
@property
550
def stream_channels_asis(self):
551
"""
552
Get stream channels as-is (without transformation).
553
554
Returns:
555
dict - Channel mappings
556
"""
557
```
558
559
#### Advanced Methods
560
561
```python { .api }
562
def validate(self):
563
"""
564
Validate the graph structure.
565
566
Ensures that all nodes are reachable, channels are properly configured,
567
and the graph structure is valid for execution.
568
569
Returns:
570
Self - Returns self for method chaining
571
572
Raises:
573
ValueError: If graph structure is invalid
574
575
Usage:
576
pregel = Pregel(...)
577
pregel.validate() # Raises if invalid
578
"""
579
580
def copy(self, update=None):
581
"""
582
Create a copy of the Pregel graph.
583
584
Parameters:
585
update: Optional[dict[str, Any]]
586
Dictionary of attributes to update in the copy
587
588
Returns:
589
Self - Copied Pregel instance
590
591
Usage:
592
# Create a copy with same configuration
593
pregel_copy = pregel.copy()
594
595
# Create a copy with updated attributes
596
pregel_copy = pregel.copy({"debug": True})
597
"""
598
599
def with_config(self, config=None, **kwargs):
600
"""
601
Create a copy of the graph with updated configuration.
602
603
Parameters:
604
config: Optional[RunnableConfig]
605
Configuration to merge with existing config
606
**kwargs: Additional config fields to override
607
608
Returns:
609
Self - New Pregel instance with updated config
610
611
Usage:
612
# Add callbacks to config
613
pregel_with_callbacks = pregel.with_config(
614
callbacks=[MyCallback()]
615
)
616
617
# Update recursion limit
618
pregel_limited = pregel.with_config(recursion_limit=50)
619
"""
620
621
def clear_cache(self, nodes=None):
622
"""
623
Clear cached results for nodes.
624
625
Parameters:
626
nodes: Optional[Sequence[str]]
627
Node names to clear cache for. If None, clears all nodes.
628
629
Raises:
630
ValueError: If no cache is configured on the graph
631
632
Usage:
633
# Clear cache for all nodes
634
pregel.clear_cache()
635
636
# Clear cache for specific nodes
637
pregel.clear_cache(["node1", "node2"])
638
639
Note:
640
Requires a cache to be configured when creating the graph.
641
"""
642
643
async def aclear_cache(self, nodes=None):
644
"""
645
Asynchronously clear cached results for nodes.
646
647
Parameters:
648
nodes: Optional[Sequence[str]]
649
Node names to clear cache for. If None, clears all nodes.
650
651
Raises:
652
ValueError: If no cache is configured on the graph
653
654
Usage:
655
# Clear cache for all nodes
656
await pregel.aclear_cache()
657
658
# Clear cache for specific nodes
659
await pregel.aclear_cache(["node1", "node2"])
660
"""
661
```
662
663
### NodeBuilder Class
664
665
Fluent API for building Pregel nodes with subscriptions, reads, writes, and metadata.
666
667
```python { .api }
668
class NodeBuilder:
669
"""
670
Builder for creating Pregel nodes using a fluent interface.
671
672
Provides a chainable API for configuring node subscriptions, reads,
673
writes, retry policies, and cache policies.
674
"""
675
676
def subscribe_only(self, channel):
677
"""
678
Subscribe to a single channel (shorthand).
679
680
Parameters:
681
channel: str - Channel to subscribe to
682
683
Returns:
684
NodeBuilder - Self for chaining
685
"""
686
687
def subscribe_to(self, *channels, read=True):
688
"""
689
Subscribe to channels (triggers node execution).
690
691
Parameters:
692
*channels: str - Channel names to subscribe to
693
read: bool, default True - Whether to read from channels
694
695
Returns:
696
NodeBuilder - Self for chaining
697
"""
698
699
def read_from(self, *channels):
700
"""
701
Read from channels without subscribing (passive read).
702
703
Parameters:
704
*channels: str - Channel names to read from
705
706
Returns:
707
NodeBuilder - Self for chaining
708
"""
709
710
def do(self, node):
711
"""
712
Set the runnable to execute for this node.
713
714
Parameters:
715
node: Runnable - The node implementation
716
717
Returns:
718
NodeBuilder - Self for chaining
719
"""
720
721
def write_to(self, *channels, **kwargs):
722
"""
723
Add channel writes (output mappings).
724
725
Parameters:
726
*channels: str - Channel names for direct writes
727
**kwargs: Mapped writes (channel_name=value_key)
728
729
Returns:
730
NodeBuilder - Self for chaining
731
"""
732
733
def meta(self, *tags, **metadata):
734
"""
735
Add tags and metadata to the node.
736
737
Parameters:
738
*tags: str - Tags to add
739
**metadata: Metadata key-value pairs
740
741
Returns:
742
NodeBuilder - Self for chaining
743
"""
744
745
def add_retry_policies(self, *policies):
746
"""
747
Add retry policies for node failures.
748
749
Parameters:
750
*policies: RetryPolicy - Retry policy configurations
751
752
Returns:
753
NodeBuilder - Self for chaining
754
"""
755
756
def add_cache_policy(self, policy):
757
"""
758
Add cache policy for node results.
759
760
Parameters:
761
policy: CachePolicy - Cache configuration
762
763
Returns:
764
NodeBuilder - Self for chaining
765
"""
766
767
def build(self):
768
"""
769
Build the PregelNode.
770
771
Returns:
772
PregelNode - Configured node ready for Pregel
773
"""
774
```
775
776
## Usage Examples
777
778
### Direct Pregel Construction
779
780
```python
781
from langgraph.pregel import Pregel
782
from langgraph.channels import LastValue
783
from langchain_core.runnables import RunnableLambda
784
785
# Define nodes as runnables
786
def process(state):
787
return {"output": state["input"] * 2}
788
789
# Construct Pregel directly
790
graph = Pregel(
791
nodes={
792
"process": RunnableLambda(process)
793
},
794
channels={
795
"input": LastValue(),
796
"output": LastValue()
797
},
798
input_channels="input",
799
output_channels="output"
800
)
801
802
result = graph.invoke({"input": 5})
803
# result == {"output": 10}
804
```
805
806
### Streaming with Multiple Modes
807
808
```python
809
from langgraph.graph import StateGraph, START, END
810
811
# Create a graph
812
graph = StateGraph(dict)
813
graph.add_node("node1", lambda s: {"value": s["value"] + 1})
814
graph.add_node("node2", lambda s: {"value": s["value"] * 2})
815
graph.add_edge(START, "node1")
816
graph.add_edge("node1", "node2")
817
graph.add_edge("node2", END)
818
819
app = graph.compile()
820
821
# Stream with multiple modes
822
for chunk in app.stream(
823
{"value": 1},
824
stream_mode=["values", "updates"]
825
):
826
print(chunk)
827
# First iteration: ("values", {"value": 2})
828
# Second iteration: ("updates", {"node1": {"value": 2}})
829
# Third iteration: ("values", {"value": 4})
830
# Fourth iteration: ("updates", {"node2": {"value": 4}})
831
```
832
833
### State Management
834
835
```python
836
from langgraph.graph import StateGraph, START, END
837
from langgraph.checkpoint.memory import MemorySaver
838
839
# Create graph with checkpointer
840
graph = StateGraph(dict)
841
graph.add_node("increment", lambda s: {"count": s["count"] + 1})
842
graph.add_edge(START, "increment")
843
graph.add_edge("increment", END)
844
845
checkpointer = MemorySaver()
846
app = graph.compile(checkpointer=checkpointer)
847
848
# Run with thread_id
849
config = {"configurable": {"thread_id": "thread-1"}}
850
result1 = app.invoke({"count": 0}, config)
851
# result1 == {"count": 1}
852
853
# Get state
854
state = app.get_state(config)
855
# state.values == {"count": 1}
856
# state.next == () # No pending nodes
857
858
# Continue from checkpoint
859
result2 = app.invoke({"count": state.values["count"]}, config)
860
# result2 == {"count": 2}
861
862
# Get history
863
for state in app.get_state_history(config, limit=5):
864
print(state.values, state.created_at)
865
```
866
867
### Manual State Updates
868
869
```python
870
from langgraph.graph import StateGraph, START, END
871
from langgraph.checkpoint.memory import MemorySaver
872
873
graph = StateGraph(dict)
874
graph.add_node("process", lambda s: {"result": s["input"] * 2})
875
graph.add_edge(START, "process")
876
graph.add_edge("process", END)
877
878
checkpointer = MemorySaver()
879
app = graph.compile(checkpointer=checkpointer)
880
881
config = {"configurable": {"thread_id": "1"}}
882
883
# Initial run
884
result = app.invoke({"input": 5}, config)
885
# result == {"input": 5, "result": 10}
886
887
# Manually update state
888
new_config = app.update_state(
889
config,
890
{"input": 20, "result": None},
891
as_node="process"
892
)
893
894
# Resume execution
895
result = app.invoke(None, new_config)
896
# result == {"input": 20, "result": 40}
897
```
898
899
### Using NodeBuilder (Advanced)
900
901
```python
902
from langgraph.pregel import Pregel, NodeBuilder
903
from langgraph.channels import LastValue
904
from langgraph.types import RetryPolicy, CachePolicy
905
from langchain_core.runnables import RunnableLambda
906
907
# Build a node with fluent API
908
node = (
909
NodeBuilder()
910
.subscribe_to("input_channel")
911
.read_from("config_channel")
912
.do(RunnableLambda(lambda x: x * 2))
913
.write_to("output_channel")
914
.add_retry_policies(RetryPolicy(max_attempts=3))
915
.add_cache_policy(CachePolicy(key_func=str))
916
.meta("important", priority=1)
917
.build()
918
)
919
920
# Use in Pregel
921
graph = Pregel(
922
nodes={"my_node": node},
923
channels={
924
"input_channel": LastValue(),
925
"config_channel": LastValue(),
926
"output_channel": LastValue()
927
},
928
input_channels="input_channel",
929
output_channels="output_channel"
930
)
931
```
932
933
### Interrupting and Resuming
934
935
```python
936
from langgraph.graph import StateGraph, START, END
937
from langgraph.checkpoint.memory import MemorySaver
938
939
graph = StateGraph(dict)
940
graph.add_node("step1", lambda s: {"stage": "step1_complete"})
941
graph.add_node("step2", lambda s: {"stage": "step2_complete"})
942
graph.add_node("step3", lambda s: {"stage": "step3_complete"})
943
graph.add_edge(START, "step1")
944
graph.add_edge("step1", "step2")
945
graph.add_edge("step2", "step3")
946
graph.add_edge("step3", END)
947
948
checkpointer = MemorySaver()
949
app = graph.compile(
950
checkpointer=checkpointer,
951
interrupt_before=["step2"]
952
)
953
954
config = {"configurable": {"thread_id": "1"}}
955
956
# Run until interrupt
957
result = app.invoke({"stage": "start"}, config)
958
# Stops before step2
959
960
# Check state
961
state = app.get_state(config)
962
# state.next == ("step2",)
963
# state.values["stage"] == "step1_complete"
964
965
# Resume execution
966
result = app.invoke(None, config)
967
# Continues from step2
968
# result["stage"] == "step3_complete"
969
```
970
971
### Custom Streaming
972
973
```python
974
from langgraph.graph import StateGraph, START, END
975
from langgraph.config import get_stream_writer
976
977
def node_with_custom_stream(state):
978
writer = get_stream_writer()
979
980
# Write custom data to stream
981
writer({"type": "progress", "percent": 25})
982
# Do work...
983
writer({"type": "progress", "percent": 50})
984
# More work...
985
writer({"type": "progress", "percent": 100})
986
987
return {"result": "done"}
988
989
graph = StateGraph(dict)
990
graph.add_node("work", node_with_custom_stream)
991
graph.add_edge(START, "work")
992
graph.add_edge("work", END)
993
994
app = graph.compile()
995
996
# Stream custom data
997
for chunk in app.stream({"input": "data"}, stream_mode="custom"):
998
print(chunk)
999
# {"type": "progress", "percent": 25}
1000
# {"type": "progress", "percent": 50}
1001
# {"type": "progress", "percent": 100}
1002
```
1003
1004
## Notes
1005
1006
- Pregel is typically used indirectly through StateGraph.compile()
1007
- Direct Pregel construction provides maximum flexibility for advanced use cases
1008
- All async methods follow the same signature as sync counterparts with 'a' prefix
1009
- Streaming supports multiple simultaneous modes for comprehensive observability
1010
- State management requires a checkpointer for persistence
1011
- Thread IDs in config enable multi-tenant applications with isolated state
1012
- Pregel implements the LangChain Runnable interface for composability
1013