or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

channels.mddocs/

0

# Channels

1

2

Channels are the state management primitives in LangGraph. They control how state updates are applied, enabling various patterns like last-value semantics, aggregation, barriers, and pub/sub. Each channel type provides different behavior for merging updates from multiple sources.

3

4

## Imports

5

6

```python

7

from langgraph.channels import (

8

BaseChannel,

9

LastValue,

10

AnyValue,

11

EphemeralValue,

12

UntrackedValue,

13

BinaryOperatorAggregate,

14

Topic,

15

NamedBarrierValue,

16

NamedBarrierValueAfterFinish,

17

LastValueAfterFinish

18

)

19

```

20

21

## Capabilities

22

23

### Base Channel

24

25

Abstract base class for all channels. Defines the interface that all channel types must implement.

26

27

```python { .api }

28

class BaseChannel:

29

"""

30

Abstract base class for all channel types.

31

32

Type Parameters:

33

Value: Type of value stored in the channel

34

Update: Type of update received by the channel

35

Checkpoint: Type of checkpoint data for serialization

36

"""

37

38

@property

39

def ValueType(self):

40

"""

41

Get the type of value stored in this channel.

42

43

Returns:

44

Type - The value type

45

"""

46

47

@property

48

def UpdateType(self):

49

"""

50

Get the type of update this channel accepts.

51

52

Returns:

53

Type - The update type

54

"""

55

56

def __init__(self, typ, key=''):

57

"""

58

Initialize a channel.

59

60

Parameters:

61

typ: Type of value stored in this channel

62

key: Optional channel identifier (default: '')

63

"""

64

65

def from_checkpoint(self, checkpoint):

66

"""

67

Restore channel state from a checkpoint.

68

69

Parameters:

70

checkpoint: Checkpoint data (serialized state)

71

72

Returns:

73

Self - This channel instance with restored state

74

"""

75

76

def get(self):

77

"""

78

Get the current value from the channel.

79

80

Returns:

81

Value - Current channel value

82

83

Raises:

84

EmptyChannelError: If channel has no value

85

"""

86

87

def update(self, values):

88

"""

89

Update the channel with a sequence of values.

90

91

Parameters:

92

values: Sequence[Update] - Updates to apply

93

94

Returns:

95

bool - True if channel was updated

96

"""

97

98

def copy(self):

99

"""

100

Create a copy of this channel.

101

102

Returns:

103

BaseChannel - Copy of the channel

104

"""

105

106

def checkpoint(self):

107

"""

108

Create a checkpoint of the current channel state.

109

110

Returns:

111

Checkpoint - Serializable checkpoint data

112

"""

113

114

def is_available(self):

115

"""

116

Check if the channel has a value (is not empty).

117

118

Returns:

119

bool - True if channel has a value

120

"""

121

122

def consume(self):

123

"""

124

Notify the channel that a task has consumed its value.

125

Used by channels like EphemeralValue to clear after consumption.

126

127

Returns:

128

bool - True if channel state changed

129

"""

130

131

def finish(self):

132

"""

133

Notify the channel that the current run is finishing.

134

Used by channels like LastValueAfterFinish to finalize updates.

135

136

Returns:

137

bool - True if channel state changed

138

"""

139

```

140

141

### LastValue Channel

142

143

Stores only the most recent value received. If multiple updates arrive, only the last one is kept.

144

145

```python { .api }

146

class LastValue:

147

"""

148

Channel that stores only the last value received.

149

150

If multiple values are provided via update(), keeps only the last one.

151

This is the most common channel type for simple state fields.

152

153

Type Parameters:

154

Value: Type of value to store

155

156

Usage:

157

# Direct construction (for Pregel)

158

channel = LastValue(int)

159

channel.update([1, 2, 3])

160

channel.get() # Returns 3

161

162

# In Annotated context (for StateGraph)

163

from typing import Annotated

164

data: Annotated[int, LastValue(int)]

165

"""

166

167

def __init__(self, typ, key=''):

168

"""

169

Initialize a LastValue channel.

170

171

Parameters:

172

typ: Any - The type of value to store

173

key: str - Optional key for the channel (default: '')

174

"""

175

```

176

177

### LastValueAfterFinish Channel

178

179

Similar to LastValue, but only commits the update when finish() is called. Used for values that should only be finalized at the end of a step.

180

181

```python { .api }

182

class LastValueAfterFinish:

183

"""

184

Channel that stores the last value but only commits on finish().

185

186

Updates are staged but not visible via get() until finish() is called.

187

Useful for output values that should only be set at step completion.

188

189

Usage:

190

channel = LastValueAfterFinish(int)

191

channel.update([1, 2, 3])

192

channel.get() # Raises EmptyChannelError

193

channel.finish()

194

channel.get() # Returns 3

195

196

**Note:** After finish() makes the value available and it's consumed, calling consume() will clear both the value and finished flag, resetting the channel for the next cycle.

197

"""

198

199

def __init__(self, typ, key=''):

200

"""

201

Initialize a LastValueAfterFinish channel.

202

203

Parameters:

204

typ: Any - The type of value to store

205

key: str - Optional key for the channel (default: '')

206

"""

207

```

208

209

### AnyValue Channel

210

211

Stores the last value received, but assumes all values in a batch are equivalent. Used when multiple sources might provide the same value.

212

213

```python { .api }

214

class AnyValue:

215

"""

216

Channel that stores the last value, assuming all updates are equivalent.

217

218

Similar to LastValue but semantically indicates that all values in an

219

update batch should be the same. Useful for consensus scenarios.

220

221

Type Parameters:

222

Value: Type of value to store

223

224

Usage:

225

channel = AnyValue(int)

226

channel.update([5, 5, 5]) # All same value

227

channel.get() # Returns 5

228

229

**Special behavior:** Unlike LastValue which ignores empty updates, AnyValue clears its value when updated with an empty sequence.

230

"""

231

232

def __init__(self, typ, key=''):

233

"""

234

Initialize an AnyValue channel.

235

236

Parameters:

237

typ: Any - The type of value to store

238

key: str - Optional key for the channel (default: '')

239

"""

240

```

241

242

### EphemeralValue Channel

243

244

Stores a value that is automatically cleared after being consumed. Useful for one-time signals or events.

245

246

```python { .api }

247

class EphemeralValue:

248

"""

249

Channel that clears its value after being consumed.

250

251

The value is cleared in two scenarios:

252

1. When update() is called with an empty sequence

253

2. When a new update() call provides new values (replaces old value)

254

255

Note: consume() is a no-op for EphemeralValue - clearing happens via update().

256

257

Type Parameters:

258

Value: Type of value to store

259

260

Usage:

261

# Direct construction

262

channel = EphemeralValue(int)

263

channel.update([42])

264

channel.get() # Returns 42

265

266

# Clear the value by calling update with empty sequence

267

channel.update([])

268

channel.get() # Raises EmptyChannelError

269

270

# In Annotated context (for StateGraph)

271

from typing import Annotated

272

user_input: Annotated[str | None, EphemeralValue(str)]

273

"""

274

275

def __init__(self, typ, guard=True):

276

"""

277

Initialize an EphemeralValue channel.

278

279

Parameters:

280

typ: Any - The type of value to store

281

guard: bool, default True

282

Whether to enforce single-value updates. When True, raises

283

InvalidUpdateError if multiple values are received in one step.

284

Set to False to allow multiple concurrent updates (keeps last one).

285

"""

286

```

287

288

### UntrackedValue Channel

289

290

Stores a value that is not included in checkpoints. The value is available during execution but not persisted.

291

292

```python { .api }

293

class UntrackedValue:

294

"""

295

Channel that stores a value without checkpointing it.

296

297

Values are available during execution but not saved to checkpoints.

298

Useful for transient state that doesn't need persistence, like caches

299

or temporary computation results.

300

301

Type Parameters:

302

Value: Type of value to store

303

304

Usage:

305

# Direct construction

306

channel = UntrackedValue(dict)

307

channel.update([{"key": "value"}])

308

channel.get() # Returns {"key": "value"}

309

# Value not included in checkpoint

310

311

# In Annotated context (for StateGraph)

312

from typing import Annotated

313

cache: Annotated[dict | None, UntrackedValue(dict)]

314

"""

315

316

def __init__(self, typ, guard=True):

317

"""

318

Initialize an UntrackedValue channel.

319

320

Parameters:

321

typ: type[Value] - The type of value to store

322

guard: bool, default True

323

Whether to enforce single-value updates. When True, raises

324

InvalidUpdateError if multiple values are received in one step.

325

Set to False to allow multiple concurrent updates (keeps last one).

326

"""

327

```

328

329

### BinaryOperatorAggregate Channel

330

331

Applies a binary operator to aggregate multiple values. Common operators include addition, concatenation, or custom merge functions.

332

333

```python { .api }

334

class BinaryOperatorAggregate:

335

"""

336

Channel that aggregates values using a binary operator.

337

338

Each update is combined with the current value using the provided operator.

339

Common use cases: list concatenation, numeric addition, set union.

340

341

Type Parameters:

342

Value: Type of value to store and aggregate

343

344

Parameters:

345

typ: Type - The type of values to aggregate

346

operator: Callable[[Value, Value], Value] - Binary operator function

347

that combines two values into one

348

349

Usage:

350

from operator import add

351

352

# Numeric addition

353

channel = BinaryOperatorAggregate(int, add)

354

channel.update([1, 2, 3])

355

channel.get() # Returns 6 (1 + 2 + 3)

356

357

# List concatenation

358

from operator import concat

359

channel = BinaryOperatorAggregate(list, concat)

360

channel.update([[1, 2], [3, 4]])

361

channel.get() # Returns [1, 2, 3, 4]

362

"""

363

364

def __init__(self, typ, operator):

365

"""

366

Initialize a BinaryOperatorAggregate channel.

367

368

Parameters:

369

typ: Type - Type of values to aggregate

370

operator: Callable[[Value, Value], Value] - Aggregation function

371

"""

372

373

**Note:** Abstract collection types are automatically converted to concrete types:

374

- `collections.abc.Sequence` → `list`

375

- `collections.abc.Set` → `set`

376

- `collections.abc.Mapping` → `dict`

377

```

378

379

### Topic Channel

380

381

Pub/sub channel for collecting values from multiple sources. By default keeps only values from the current update batch, but can accumulate values across updates.

382

383

```python { .api }

384

class Topic:

385

"""

386

Pub/sub channel for collecting values from multiple sources.

387

388

By default (accumulate=False), Topic keeps only values from the current update

389

batch, clearing previous values. With accumulate=True, it keeps all values across

390

all updates. Useful for message passing, event streams, or collecting results.

391

392

Type Parameters:

393

Value: Type of individual values in the topic

394

395

Usage:

396

# Default behavior - keeps only current batch

397

channel = Topic(int)

398

channel.update([1, 2])

399

channel.update([3])

400

channel.get() # Returns [3] (only latest batch)

401

402

# Accumulating behavior - keeps all values

403

channel = Topic(int, accumulate=True)

404

channel.update([1, 2])

405

channel.update([3])

406

channel.get() # Returns [1, 2, 3] (all values)

407

408

# For TypedDict-style messages

409

channel = Topic(dict, accumulate=True)

410

channel.update([{"a": 1}])

411

channel.update([{"b": 2}])

412

channel.get() # Returns [{"a": 1}, {"b": 2}]

413

414

# In Annotated context (for StateGraph)

415

from typing import Annotated

416

events: Annotated[list[dict], Topic(dict, accumulate=True)]

417

"""

418

419

def __init__(self, typ, accumulate=False):

420

"""

421

Initialize a Topic channel.

422

423

Parameters:

424

typ: type[Value] - The type of individual values in the topic

425

accumulate: bool - Whether to accumulate values across updates (default: False).

426

If False, values from previous updates are cleared.

427

If True, values persist and accumulate indefinitely.

428

"""

429

```

430

431

### NamedBarrierValue Channel

432

433

Waits for updates from multiple named sources before making a value available. Useful for synchronization across parallel branches.

434

435

```python { .api }

436

class NamedBarrierValue:

437

"""

438

Channel that waits for multiple named sources before triggering.

439

440

Acts as a barrier that requires updates from all expected sources before

441

making the value available. Useful for fan-in patterns where you need to

442

wait for all parallel branches to complete.

443

444

Type Parameters:

445

Value: Type of value to store

446

447

Usage:

448

# Direct construction

449

channel = NamedBarrierValue(int, {"branch1", "branch2", "branch3"})

450

channel.update([("branch1", 1)])

451

channel.is_available() # False (not all sources reported)

452

channel.update([("branch2", 2)])

453

channel.is_available() # False

454

channel.update([("branch3", 3)])

455

channel.is_available() # True

456

channel.get() # Returns None (barrier triggered)

457

458

# In Annotated context (for StateGraph)

459

from typing import Annotated

460

result: Annotated[int, NamedBarrierValue(int, ["branch1", "branch2"])]

461

462

Note: NamedBarrierValue.get() returns None when the barrier is satisfied.

463

It's used for synchronization, not value passing.

464

"""

465

466

def __init__(self, typ, names):

467

"""

468

Initialize a NamedBarrierValue channel.

469

470

Parameters:

471

typ: type[Value] - The type of value to store

472

names: set[Value] - Names of sources to wait for

473

"""

474

475

def consume(self) -> bool:

476

"""

477

Consume the barrier value, resetting the seen sources.

478

479

Called after the barrier is satisfied to reset it for the next cycle.

480

Clears the set of seen source names.

481

482

Returns:

483

bool - True if all sources were seen (barrier was triggered), False otherwise

484

"""

485

```

486

487

### NamedBarrierValueAfterFinish Channel

488

489

Similar to NamedBarrierValue, but only commits when finish() is called. Combines barrier synchronization with after-finish semantics.

490

491

```python { .api }

492

class NamedBarrierValueAfterFinish:

493

"""

494

Channel that combines named barrier with after-finish semantics.

495

496

Waits for updates from all named sources, but only makes the value

497

available after finish() is called.

498

499

Usage:

500

channel = NamedBarrierValueAfterFinish(int, {"branch1", "branch2"})

501

channel.update([("branch1", 1)])

502

channel.update([("branch2", 2)])

503

channel.is_available() # False (finish not called)

504

channel.finish()

505

channel.is_available() # True

506

channel.get() # Returns None (barrier triggered)

507

"""

508

509

def __init__(self, typ, names):

510

"""

511

Initialize a NamedBarrierValueAfterFinish channel.

512

513

Parameters:

514

typ: type[Value] - The type of value to store

515

516

names: set[Value] - Names of sources to wait for

517

"""

518

519

def finish(self) -> bool:

520

"""

521

Mark the barrier as finished, allowing value to be consumed.

522

523

Must be called after all sources have updated before the barrier

524

value becomes available.

525

526

Returns:

527

bool - True if all sources were seen and finished, False otherwise

528

"""

529

530

def consume(self) -> bool:

531

"""

532

Consume the barrier value, resetting state for next cycle.

533

534

Can only consume after finish() is called and all sources have reported.

535

Resets both the seen sources and finished flag.

536

537

Returns:

538

bool - True if barrier was finished and all sources seen, False otherwise

539

"""

540

```

541

542

## Usage Examples

543

544

### Using LastValue in State Schema

545

546

```python

547

from typing import TypedDict

548

from langgraph.graph import StateGraph

549

550

class State(TypedDict):

551

# LastValue is the default channel type

552

counter: int

553

message: str

554

555

# All fields use LastValue by default

556

graph = StateGraph(State)

557

```

558

559

### Using BinaryOperatorAggregate with Annotated

560

561

```python

562

from typing import TypedDict, Annotated

563

from operator import add

564

from langgraph.graph import StateGraph

565

566

class State(TypedDict):

567

# Use add operator to accumulate values

568

total: Annotated[int, add]

569

items: list[str]

570

571

def node1(state: State) -> dict:

572

return {"total": 5} # Adds 5 to current total

573

574

def node2(state: State) -> dict:

575

return {"total": 3} # Adds 3 to current total

576

577

graph = StateGraph(State)

578

graph.add_node("node1", node1)

579

graph.add_node("node2", node2)

580

```

581

582

### Using EphemeralValue for User Input

583

584

```python

585

from typing import TypedDict, Annotated

586

from langgraph.channels import EphemeralValue

587

from langgraph.graph import StateGraph

588

589

class State(TypedDict):

590

count: int

591

# User input is cleared after being read

592

user_input: Annotated[str, EphemeralValue()]

593

594

def process(state: State) -> dict:

595

if "user_input" in state:

596

# Process user input

597

print(f"Got input: {state['user_input']}")

598

return {"count": state["count"] + 1}

599

600

graph = StateGraph(State)

601

graph.add_node("process", process)

602

```

603

604

### Using Topic for Message Collection

605

606

```python

607

from typing import TypedDict, Annotated

608

from langgraph.channels import Topic

609

from langgraph.graph import StateGraph

610

611

class State(TypedDict):

612

# Collect all messages as a list

613

events: Annotated[list[dict], Topic()]

614

615

def node1(state: State) -> dict:

616

return {"events": [{"source": "node1", "data": "hello"}]}

617

618

def node2(state: State) -> dict:

619

return {"events": [{"source": "node2", "data": "world"}]}

620

621

# After both nodes run:

622

# state["events"] == [

623

# {"source": "node1", "data": "hello"},

624

# {"source": "node2", "data": "world"}

625

# ]

626

```

627

628

### Using NamedBarrierValue for Synchronization

629

630

```python

631

from typing import TypedDict, Annotated

632

from langgraph.channels import NamedBarrierValue

633

from langgraph.graph import StateGraph, START, END

634

from langgraph.types import Send

635

636

class State(TypedDict):

637

items: list[int]

638

# Wait for all parallel branches to complete

639

result: Annotated[int, NamedBarrierValue(["branch1", "branch2", "branch3"])]

640

641

def fan_out(state: State) -> list[Send]:

642

return [

643

Send("branch1", state),

644

Send("branch2", state),

645

Send("branch3", state)

646

]

647

648

def branch1(state: State) -> dict:

649

return {("result", "branch1"): 1}

650

651

def branch2(state: State) -> dict:

652

return {("result", "branch2"): 2}

653

654

def branch3(state: State) -> dict:

655

return {("result", "branch3"): 3}

656

657

def aggregate(state: State) -> dict:

658

# Only called after all branches report

659

return {"result": state["result"]}

660

661

graph = StateGraph(State)

662

graph.add_node("branch1", branch1)

663

graph.add_node("branch2", branch2)

664

graph.add_node("branch3", branch3)

665

graph.add_node("aggregate", aggregate)

666

667

graph.add_conditional_edges(START, fan_out)

668

graph.add_edge("branch1", "aggregate")

669

graph.add_edge("branch2", "aggregate")

670

graph.add_edge("branch3", "aggregate")

671

graph.add_edge("aggregate", END)

672

```

673

674

### Custom Channel with Direct Instantiation

675

676

```python

677

from langgraph.pregel import Pregel

678

from langgraph.channels import LastValue, BinaryOperatorAggregate

679

from operator import add

680

681

# Low-level Pregel construction with explicit channels

682

graph = Pregel(

683

nodes={

684

"node1": ...,

685

"node2": ...

686

},

687

channels={

688

"field1": LastValue(),

689

"field2": BinaryOperatorAggregate(int, add),

690

"field3": LastValue()

691

},

692

input_channels=["field1"],

693

output_channels=["field3"]

694

)

695

```

696

697

### UntrackedValue for Temporary State

698

699

```python

700

from typing import TypedDict, Annotated

701

from langgraph.channels import UntrackedValue

702

from langgraph.graph import StateGraph

703

704

class State(TypedDict):

705

data: dict

706

# Temporary cache not saved to checkpoints

707

cache: Annotated[dict, UntrackedValue()]

708

709

def process(state: State) -> dict:

710

# Use cache if available

711

if "cache" in state and state["cache"]:

712

result = state["cache"]["computed"]

713

else:

714

# Expensive computation

715

result = expensive_function(state["data"])

716

717

return {

718

"data": {"result": result},

719

"cache": {"computed": result}

720

}

721

722

# Cache is available during execution but not in checkpoints

723

```

724

725

## Notes

726

727

- Channels are typically used implicitly through StateGraph's state schema with Annotated types

728

- Direct channel instantiation is mainly used with low-level Pregel API

729

- The default channel type for state fields is LastValue

730

- Reducers in Annotated types (like `Annotated[int, add]`) create BinaryOperatorAggregate channels

731

- Channel behavior affects how concurrent updates from parallel nodes are merged

732

- Some channels (EphemeralValue, LastValueAfterFinish) have lifecycle methods that are automatically called by the execution engine

733