0
# Channels
1
2
Channels are the state management primitives in LangGraph. They control how state updates are applied, enabling various patterns like last-value semantics, aggregation, barriers, and pub/sub. Each channel type provides different behavior for merging updates from multiple sources.
3
4
## Imports
5
6
```python
7
from langgraph.channels import (
8
BaseChannel,
9
LastValue,
10
AnyValue,
11
EphemeralValue,
12
UntrackedValue,
13
BinaryOperatorAggregate,
14
Topic,
15
NamedBarrierValue,
16
NamedBarrierValueAfterFinish,
17
LastValueAfterFinish
18
)
19
```
20
21
## Capabilities
22
23
### Base Channel
24
25
Abstract base class for all channels. Defines the interface that all channel types must implement.
26
27
```python { .api }
28
class BaseChannel:
29
"""
30
Abstract base class for all channel types.
31
32
Type Parameters:
33
Value: Type of value stored in the channel
34
Update: Type of update received by the channel
35
Checkpoint: Type of checkpoint data for serialization
36
"""
37
38
@property
39
def ValueType(self):
40
"""
41
Get the type of value stored in this channel.
42
43
Returns:
44
Type - The value type
45
"""
46
47
@property
48
def UpdateType(self):
49
"""
50
Get the type of update this channel accepts.
51
52
Returns:
53
Type - The update type
54
"""
55
56
def __init__(self, typ, key=''):
57
"""
58
Initialize a channel.
59
60
Parameters:
61
typ: Type of value stored in this channel
62
key: Optional channel identifier (default: '')
63
"""
64
65
def from_checkpoint(self, checkpoint):
66
"""
67
Restore channel state from a checkpoint.
68
69
Parameters:
70
checkpoint: Checkpoint data (serialized state)
71
72
Returns:
73
Self - This channel instance with restored state
74
"""
75
76
def get(self):
77
"""
78
Get the current value from the channel.
79
80
Returns:
81
Value - Current channel value
82
83
Raises:
84
EmptyChannelError: If channel has no value
85
"""
86
87
def update(self, values):
88
"""
89
Update the channel with a sequence of values.
90
91
Parameters:
92
values: Sequence[Update] - Updates to apply
93
94
Returns:
95
bool - True if channel was updated
96
"""
97
98
def copy(self):
99
"""
100
Create a copy of this channel.
101
102
Returns:
103
BaseChannel - Copy of the channel
104
"""
105
106
def checkpoint(self):
107
"""
108
Create a checkpoint of the current channel state.
109
110
Returns:
111
Checkpoint - Serializable checkpoint data
112
"""
113
114
def is_available(self):
115
"""
116
Check if the channel has a value (is not empty).
117
118
Returns:
119
bool - True if channel has a value
120
"""
121
122
def consume(self):
123
"""
124
Notify the channel that a task has consumed its value.
125
Used by channels like EphemeralValue to clear after consumption.
126
127
Returns:
128
bool - True if channel state changed
129
"""
130
131
def finish(self):
132
"""
133
Notify the channel that the current run is finishing.
134
Used by channels like LastValueAfterFinish to finalize updates.
135
136
Returns:
137
bool - True if channel state changed
138
"""
139
```
140
141
### LastValue Channel
142
143
Stores only the most recent value received. If multiple updates arrive, only the last one is kept.
144
145
```python { .api }
146
class LastValue:
147
"""
148
Channel that stores only the last value received.
149
150
If multiple values are provided via update(), keeps only the last one.
151
This is the most common channel type for simple state fields.
152
153
Type Parameters:
154
Value: Type of value to store
155
156
Usage:
157
# Direct construction (for Pregel)
158
channel = LastValue(int)
159
channel.update([1, 2, 3])
160
channel.get() # Returns 3
161
162
# In Annotated context (for StateGraph)
163
from typing import Annotated
164
data: Annotated[int, LastValue(int)]
165
"""
166
167
def __init__(self, typ, key=''):
168
"""
169
Initialize a LastValue channel.
170
171
Parameters:
172
typ: Any - The type of value to store
173
key: str - Optional key for the channel (default: '')
174
"""
175
```
176
177
### LastValueAfterFinish Channel
178
179
Similar to LastValue, but only commits the update when finish() is called. Used for values that should only be finalized at the end of a step.
180
181
```python { .api }
182
class LastValueAfterFinish:
183
"""
184
Channel that stores the last value but only commits on finish().
185
186
Updates are staged but not visible via get() until finish() is called.
187
Useful for output values that should only be set at step completion.
188
189
Usage:
190
channel = LastValueAfterFinish(int)
191
channel.update([1, 2, 3])
192
channel.get() # Raises EmptyChannelError
193
channel.finish()
194
channel.get() # Returns 3
195
196
**Note:** After finish() makes the value available and it's consumed, calling consume() will clear both the value and finished flag, resetting the channel for the next cycle.
197
"""
198
199
def __init__(self, typ, key=''):
200
"""
201
Initialize a LastValueAfterFinish channel.
202
203
Parameters:
204
typ: Any - The type of value to store
205
key: str - Optional key for the channel (default: '')
206
"""
207
```
208
209
### AnyValue Channel
210
211
Stores the last value received, but assumes all values in a batch are equivalent. Used when multiple sources might provide the same value.
212
213
```python { .api }
214
class AnyValue:
215
"""
216
Channel that stores the last value, assuming all updates are equivalent.
217
218
Similar to LastValue but semantically indicates that all values in an
219
update batch should be the same. Useful for consensus scenarios.
220
221
Type Parameters:
222
Value: Type of value to store
223
224
Usage:
225
channel = AnyValue(int)
226
channel.update([5, 5, 5]) # All same value
227
channel.get() # Returns 5
228
229
**Special behavior:** Unlike LastValue which ignores empty updates, AnyValue clears its value when updated with an empty sequence.
230
"""
231
232
def __init__(self, typ, key=''):
233
"""
234
Initialize an AnyValue channel.
235
236
Parameters:
237
typ: Any - The type of value to store
238
key: str - Optional key for the channel (default: '')
239
"""
240
```
241
242
### EphemeralValue Channel
243
244
Stores a value that is automatically cleared after being consumed. Useful for one-time signals or events.
245
246
```python { .api }
247
class EphemeralValue:
248
"""
249
Channel that clears its value after being consumed.
250
251
The value is cleared in two scenarios:
252
1. When update() is called with an empty sequence
253
2. When a new update() call provides new values (replaces old value)
254
255
Note: consume() is a no-op for EphemeralValue - clearing happens via update().
256
257
Type Parameters:
258
Value: Type of value to store
259
260
Usage:
261
# Direct construction
262
channel = EphemeralValue(int)
263
channel.update([42])
264
channel.get() # Returns 42
265
266
# Clear the value by calling update with empty sequence
267
channel.update([])
268
channel.get() # Raises EmptyChannelError
269
270
# In Annotated context (for StateGraph)
271
from typing import Annotated
272
user_input: Annotated[str | None, EphemeralValue(str)]
273
"""
274
275
def __init__(self, typ, guard=True):
276
"""
277
Initialize an EphemeralValue channel.
278
279
Parameters:
280
typ: Any - The type of value to store
281
guard: bool, default True
282
Whether to enforce single-value updates. When True, raises
283
InvalidUpdateError if multiple values are received in one step.
284
Set to False to allow multiple concurrent updates (keeps last one).
285
"""
286
```
287
288
### UntrackedValue Channel
289
290
Stores a value that is not included in checkpoints. The value is available during execution but not persisted.
291
292
```python { .api }
293
class UntrackedValue:
294
"""
295
Channel that stores a value without checkpointing it.
296
297
Values are available during execution but not saved to checkpoints.
298
Useful for transient state that doesn't need persistence, like caches
299
or temporary computation results.
300
301
Type Parameters:
302
Value: Type of value to store
303
304
Usage:
305
# Direct construction
306
channel = UntrackedValue(dict)
307
channel.update([{"key": "value"}])
308
channel.get() # Returns {"key": "value"}
309
# Value not included in checkpoint
310
311
# In Annotated context (for StateGraph)
312
from typing import Annotated
313
cache: Annotated[dict | None, UntrackedValue(dict)]
314
"""
315
316
def __init__(self, typ, guard=True):
317
"""
318
Initialize an UntrackedValue channel.
319
320
Parameters:
321
typ: type[Value] - The type of value to store
322
guard: bool, default True
323
Whether to enforce single-value updates. When True, raises
324
InvalidUpdateError if multiple values are received in one step.
325
Set to False to allow multiple concurrent updates (keeps last one).
326
"""
327
```
328
329
### BinaryOperatorAggregate Channel
330
331
Applies a binary operator to aggregate multiple values. Common operators include addition, concatenation, or custom merge functions.
332
333
```python { .api }
334
class BinaryOperatorAggregate:
335
"""
336
Channel that aggregates values using a binary operator.
337
338
Each update is combined with the current value using the provided operator.
339
Common use cases: list concatenation, numeric addition, set union.
340
341
Type Parameters:
342
Value: Type of value to store and aggregate
343
344
Parameters:
345
typ: Type - The type of values to aggregate
346
operator: Callable[[Value, Value], Value] - Binary operator function
347
that combines two values into one
348
349
Usage:
350
from operator import add
351
352
# Numeric addition
353
channel = BinaryOperatorAggregate(int, add)
354
channel.update([1, 2, 3])
355
channel.get() # Returns 6 (1 + 2 + 3)
356
357
# List concatenation
358
from operator import concat
359
channel = BinaryOperatorAggregate(list, concat)
360
channel.update([[1, 2], [3, 4]])
361
channel.get() # Returns [1, 2, 3, 4]
362
"""
363
364
def __init__(self, typ, operator):
365
"""
366
Initialize a BinaryOperatorAggregate channel.
367
368
Parameters:
369
typ: Type - Type of values to aggregate
370
operator: Callable[[Value, Value], Value] - Aggregation function
371
"""
372
373
**Note:** Abstract collection types are automatically converted to concrete types:
374
- `collections.abc.Sequence` → `list`
375
- `collections.abc.Set` → `set`
376
- `collections.abc.Mapping` → `dict`
377
```
378
379
### Topic Channel
380
381
Pub/sub channel for collecting values from multiple sources. By default keeps only values from the current update batch, but can accumulate values across updates.
382
383
```python { .api }
384
class Topic:
385
"""
386
Pub/sub channel for collecting values from multiple sources.
387
388
By default (accumulate=False), Topic keeps only values from the current update
389
batch, clearing previous values. With accumulate=True, it keeps all values across
390
all updates. Useful for message passing, event streams, or collecting results.
391
392
Type Parameters:
393
Value: Type of individual values in the topic
394
395
Usage:
396
# Default behavior - keeps only current batch
397
channel = Topic(int)
398
channel.update([1, 2])
399
channel.update([3])
400
channel.get() # Returns [3] (only latest batch)
401
402
# Accumulating behavior - keeps all values
403
channel = Topic(int, accumulate=True)
404
channel.update([1, 2])
405
channel.update([3])
406
channel.get() # Returns [1, 2, 3] (all values)
407
408
# For TypedDict-style messages
409
channel = Topic(dict, accumulate=True)
410
channel.update([{"a": 1}])
411
channel.update([{"b": 2}])
412
channel.get() # Returns [{"a": 1}, {"b": 2}]
413
414
# In Annotated context (for StateGraph)
415
from typing import Annotated
416
events: Annotated[list[dict], Topic(dict, accumulate=True)]
417
"""
418
419
def __init__(self, typ, accumulate=False):
420
"""
421
Initialize a Topic channel.
422
423
Parameters:
424
typ: type[Value] - The type of individual values in the topic
425
accumulate: bool - Whether to accumulate values across updates (default: False).
426
If False, values from previous updates are cleared.
427
If True, values persist and accumulate indefinitely.
428
"""
429
```
430
431
### NamedBarrierValue Channel
432
433
Waits for updates from multiple named sources before making a value available. Useful for synchronization across parallel branches.
434
435
```python { .api }
436
class NamedBarrierValue:
437
"""
438
Channel that waits for multiple named sources before triggering.
439
440
Acts as a barrier that requires updates from all expected sources before
441
making the value available. Useful for fan-in patterns where you need to
442
wait for all parallel branches to complete.
443
444
Type Parameters:
445
Value: Type of value to store
446
447
Usage:
448
# Direct construction
449
channel = NamedBarrierValue(int, {"branch1", "branch2", "branch3"})
450
channel.update([("branch1", 1)])
451
channel.is_available() # False (not all sources reported)
452
channel.update([("branch2", 2)])
453
channel.is_available() # False
454
channel.update([("branch3", 3)])
455
channel.is_available() # True
456
channel.get() # Returns None (barrier triggered)
457
458
# In Annotated context (for StateGraph)
459
from typing import Annotated
460
result: Annotated[int, NamedBarrierValue(int, ["branch1", "branch2"])]
461
462
Note: NamedBarrierValue.get() returns None when the barrier is satisfied.
463
It's used for synchronization, not value passing.
464
"""
465
466
def __init__(self, typ, names):
467
"""
468
Initialize a NamedBarrierValue channel.
469
470
Parameters:
471
typ: type[Value] - The type of value to store
472
names: set[Value] - Names of sources to wait for
473
"""
474
475
def consume(self) -> bool:
476
"""
477
Consume the barrier value, resetting the seen sources.
478
479
Called after the barrier is satisfied to reset it for the next cycle.
480
Clears the set of seen source names.
481
482
Returns:
483
bool - True if all sources were seen (barrier was triggered), False otherwise
484
"""
485
```
486
487
### NamedBarrierValueAfterFinish Channel
488
489
Similar to NamedBarrierValue, but only commits when finish() is called. Combines barrier synchronization with after-finish semantics.
490
491
```python { .api }
492
class NamedBarrierValueAfterFinish:
493
"""
494
Channel that combines named barrier with after-finish semantics.
495
496
Waits for updates from all named sources, but only makes the value
497
available after finish() is called.
498
499
Usage:
500
channel = NamedBarrierValueAfterFinish(int, {"branch1", "branch2"})
501
channel.update([("branch1", 1)])
502
channel.update([("branch2", 2)])
503
channel.is_available() # False (finish not called)
504
channel.finish()
505
channel.is_available() # True
506
channel.get() # Returns None (barrier triggered)
507
"""
508
509
def __init__(self, typ, names):
510
"""
511
Initialize a NamedBarrierValueAfterFinish channel.
512
513
Parameters:
514
typ: type[Value] - The type of value to store
515
516
names: set[Value] - Names of sources to wait for
517
"""
518
519
def finish(self) -> bool:
520
"""
521
Mark the barrier as finished, allowing value to be consumed.
522
523
Must be called after all sources have updated before the barrier
524
value becomes available.
525
526
Returns:
527
bool - True if all sources were seen and finished, False otherwise
528
"""
529
530
def consume(self) -> bool:
531
"""
532
Consume the barrier value, resetting state for next cycle.
533
534
Can only consume after finish() is called and all sources have reported.
535
Resets both the seen sources and finished flag.
536
537
Returns:
538
bool - True if barrier was finished and all sources seen, False otherwise
539
"""
540
```
541
542
## Usage Examples
543
544
### Using LastValue in State Schema
545
546
```python
547
from typing import TypedDict
548
from langgraph.graph import StateGraph
549
550
class State(TypedDict):
551
# LastValue is the default channel type
552
counter: int
553
message: str
554
555
# All fields use LastValue by default
556
graph = StateGraph(State)
557
```
558
559
### Using BinaryOperatorAggregate with Annotated
560
561
```python
562
from typing import TypedDict, Annotated
563
from operator import add
564
from langgraph.graph import StateGraph
565
566
class State(TypedDict):
567
# Use add operator to accumulate values
568
total: Annotated[int, add]
569
items: list[str]
570
571
def node1(state: State) -> dict:
572
return {"total": 5} # Adds 5 to current total
573
574
def node2(state: State) -> dict:
575
return {"total": 3} # Adds 3 to current total
576
577
graph = StateGraph(State)
578
graph.add_node("node1", node1)
579
graph.add_node("node2", node2)
580
```
581
582
### Using EphemeralValue for User Input
583
584
```python
585
from typing import TypedDict, Annotated
586
from langgraph.channels import EphemeralValue
587
from langgraph.graph import StateGraph
588
589
class State(TypedDict):
590
count: int
591
# User input is cleared after being read
592
user_input: Annotated[str, EphemeralValue()]
593
594
def process(state: State) -> dict:
595
if "user_input" in state:
596
# Process user input
597
print(f"Got input: {state['user_input']}")
598
return {"count": state["count"] + 1}
599
600
graph = StateGraph(State)
601
graph.add_node("process", process)
602
```
603
604
### Using Topic for Message Collection
605
606
```python
607
from typing import TypedDict, Annotated
608
from langgraph.channels import Topic
609
from langgraph.graph import StateGraph
610
611
class State(TypedDict):
612
# Collect all messages as a list
613
events: Annotated[list[dict], Topic()]
614
615
def node1(state: State) -> dict:
616
return {"events": [{"source": "node1", "data": "hello"}]}
617
618
def node2(state: State) -> dict:
619
return {"events": [{"source": "node2", "data": "world"}]}
620
621
# After both nodes run:
622
# state["events"] == [
623
# {"source": "node1", "data": "hello"},
624
# {"source": "node2", "data": "world"}
625
# ]
626
```
627
628
### Using NamedBarrierValue for Synchronization
629
630
```python
631
from typing import TypedDict, Annotated
632
from langgraph.channels import NamedBarrierValue
633
from langgraph.graph import StateGraph, START, END
634
from langgraph.types import Send
635
636
class State(TypedDict):
637
items: list[int]
638
# Wait for all parallel branches to complete
639
result: Annotated[int, NamedBarrierValue(["branch1", "branch2", "branch3"])]
640
641
def fan_out(state: State) -> list[Send]:
642
return [
643
Send("branch1", state),
644
Send("branch2", state),
645
Send("branch3", state)
646
]
647
648
def branch1(state: State) -> dict:
649
return {("result", "branch1"): 1}
650
651
def branch2(state: State) -> dict:
652
return {("result", "branch2"): 2}
653
654
def branch3(state: State) -> dict:
655
return {("result", "branch3"): 3}
656
657
def aggregate(state: State) -> dict:
658
# Only called after all branches report
659
return {"result": state["result"]}
660
661
graph = StateGraph(State)
662
graph.add_node("branch1", branch1)
663
graph.add_node("branch2", branch2)
664
graph.add_node("branch3", branch3)
665
graph.add_node("aggregate", aggregate)
666
667
graph.add_conditional_edges(START, fan_out)
668
graph.add_edge("branch1", "aggregate")
669
graph.add_edge("branch2", "aggregate")
670
graph.add_edge("branch3", "aggregate")
671
graph.add_edge("aggregate", END)
672
```
673
674
### Custom Channel with Direct Instantiation
675
676
```python
677
from langgraph.pregel import Pregel
678
from langgraph.channels import LastValue, BinaryOperatorAggregate
679
from operator import add
680
681
# Low-level Pregel construction with explicit channels
682
graph = Pregel(
683
nodes={
684
"node1": ...,
685
"node2": ...
686
},
687
channels={
688
"field1": LastValue(),
689
"field2": BinaryOperatorAggregate(int, add),
690
"field3": LastValue()
691
},
692
input_channels=["field1"],
693
output_channels=["field3"]
694
)
695
```
696
697
### UntrackedValue for Temporary State
698
699
```python
700
from typing import TypedDict, Annotated
701
from langgraph.channels import UntrackedValue
702
from langgraph.graph import StateGraph
703
704
class State(TypedDict):
705
data: dict
706
# Temporary cache not saved to checkpoints
707
cache: Annotated[dict, UntrackedValue()]
708
709
def process(state: State) -> dict:
710
# Use cache if available
711
if "cache" in state and state["cache"]:
712
result = state["cache"]["computed"]
713
else:
714
# Expensive computation
715
result = expensive_function(state["data"])
716
717
return {
718
"data": {"result": result},
719
"cache": {"computed": result}
720
}
721
722
# Cache is available during execution but not in checkpoints
723
```
724
725
## Notes
726
727
- Channels are typically used implicitly through StateGraph's state schema with Annotated types
728
- Direct channel instantiation is mainly used with low-level Pregel API
729
- The default channel type for state fields is LastValue
730
- Reducers in Annotated types (like `Annotated[int, add]`) create BinaryOperatorAggregate channels
731
- Channel behavior affects how concurrent updates from parallel nodes are merged
732
- Some channels (EphemeralValue, LastValueAfterFinish) have lifecycle methods that are automatically called by the execution engine
733