or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

pregel.mddocs/

0

# Pregel Execution Engine

1

2

Pregel is the low-level execution engine that powers LangGraph. It provides fine-grained control over graph execution, streaming, state management, and checkpointing. While most users interact with StateGraph, advanced use cases can directly use Pregel for maximum flexibility.

3

4

## Imports

5

6

```python

7

from langgraph.pregel import Pregel, NodeBuilder

8

```

9

10

## Capabilities

11

12

### Pregel Class

13

14

Core graph execution engine that implements the Runnable interface from LangChain. Handles graph traversal, state management, parallelization, and checkpointing.

15

16

```python { .api }

17

class Pregel:

18

"""

19

Low-level graph execution engine inspired by Google's Pregel.

20

21

Type Parameters:

22

StateT: Type of the graph state

23

ContextT: Type of run-scoped context

24

InputT: Type of graph input

25

OutputT: Type of graph output

26

"""

27

28

def __init__(

29

self,

30

*,

31

nodes,

32

channels,

33

input_channels,

34

output_channels,

35

stream_channels=None,

36

stream_mode="values",

37

trigger_to_nodes=None,

38

interrupt_before_nodes=None,

39

interrupt_after_nodes=None,

40

debug=False,

41

checkpointer=None,

42

store=None,

43

auto_validate=True,

44

cache=None,

45

cache_policy=None,

46

config=None,

47

context_schema=None,

48

name=None,

49

retry_policy=(),

50

step_timeout=None,

51

stream_eager=False,

52

**kwargs

53

):

54

"""

55

Initialize a Pregel graph.

56

57

This is typically not called directly - use StateGraph.compile() instead.

58

59

Parameters:

60

nodes: dict[str, PregelNode] - Graph nodes

61

channels: dict[str, BaseChannel] - State channels

62

input_channels: str | list[str] - Input channel names

63

output_channels: str | list[str] - Output channel names

64

stream_channels: Optional[list[str]] - Channels to stream

65

stream_mode: StreamMode - Default streaming mode

66

trigger_to_nodes: Optional[Mapping[str, Sequence[str]]]

67

Maps trigger names to node sequences for advanced graph control

68

interrupt_before_nodes: Optional[list[str]] - Nodes to interrupt before

69

interrupt_after_nodes: Optional[list[str]] - Nodes to interrupt after

70

debug: bool - Enable debug mode

71

checkpointer: Optional[BaseCheckpointSaver] - Checkpointer

72

store: Optional[BaseStore] - Store for cross-thread memory

73

auto_validate: bool, default True

74

Automatically validate graph structure on construction

75

cache: Optional[BaseCache]

76

Cache instance for node results

77

cache_policy: Optional[CachePolicy]

78

Default cache policy for nodes

79

config: Optional[RunnableConfig]

80

Default configuration for all runs

81

context_schema: Optional[type[ContextT]]

82

Schema for the context type

83

name: str, default "LangGraph"

84

Name of the graph

85

retry_policy: RetryPolicy | Sequence[RetryPolicy], default ()

86

Retry policies to apply to all nodes

87

step_timeout: Optional[float]

88

Timeout in seconds for each step

89

stream_eager: bool, default False

90

Enable eager streaming of outputs

91

"""

92

```

93

94

#### Execution Methods

95

96

```python { .api }

97

def invoke(self, input, config=None, **kwargs):

98

"""

99

Execute the graph and return the final output.

100

101

Parameters:

102

input: Any - Input to the graph (dict, state object, etc.)

103

config: Optional[RunnableConfig] - Configuration for the run

104

context: Optional[ContextT]

105

Static context for the run (added in v0.6.0).

106

Available to all nodes via get_runtime().

107

durability: Optional[Durability]

108

Durability mode for persistence:

109

- "sync": Persist before each step

110

- "async": Persist asynchronously during execution (default)

111

- "exit": Persist only when graph exits

112

**kwargs: Additional keyword arguments passed to nodes

113

114

Returns:

115

OutputT - Final graph output

116

117

Raises:

118

GraphRecursionError: If max steps exceeded

119

GraphInterrupt: If execution is interrupted

120

"""

121

122

async def ainvoke(self, input, config=None, **kwargs):

123

"""

124

Asynchronously execute the graph and return the final output.

125

126

Parameters:

127

input: Any - Input to the graph

128

config: Optional[RunnableConfig] - Configuration for the run

129

context: Optional[ContextT]

130

Static context for the run (added in v0.6.0).

131

Available to all nodes via get_runtime().

132

durability: Optional[Durability]

133

Durability mode for persistence:

134

- "sync": Persist before each step

135

- "async": Persist asynchronously during execution (default)

136

- "exit": Persist only when graph exits

137

**kwargs: Additional keyword arguments

138

139

Returns:

140

OutputT - Final graph output

141

"""

142

```

143

144

#### Streaming Methods

145

146

```python { .api }

147

def stream(

148

self,

149

input,

150

config=None,

151

*,

152

stream_mode=None,

153

output_keys=None,

154

interrupt_before=None,

155

interrupt_after=None,

156

debug=None,

157

subgraphs=False

158

):

159

"""

160

Stream graph execution, yielding outputs as they're produced.

161

162

Parameters:

163

input: Any - Input to the graph

164

config: Optional[RunnableConfig] - Configuration for the run

165

stream_mode: Optional[StreamMode | list[StreamMode]]

166

How to emit outputs. Options:

167

- "values": Emit full state after each step

168

- "updates": Emit only node updates

169

- "custom": Emit custom data via StreamWriter

170

- "messages": Emit LLM messages token-by-token

171

- "checkpoints": Emit checkpoint creation events

172

- "tasks": Emit task start/finish events

173

- "debug": Emit checkpoints and tasks

174

Can pass list for multiple modes simultaneously.

175

output_keys: Optional[str | list[str]]

176

Specific state keys to include in output

177

interrupt_before: Optional[list[str] | Literal["*"]]

178

Override interrupt_before for this run

179

interrupt_after: Optional[list[str] | Literal["*"]]

180

Override interrupt_after for this run

181

durability: Optional[Durability]

182

Durability mode for persistence:

183

- "sync": Persist before each step

184

- "async": Persist asynchronously during execution (default)

185

- "exit": Persist only when graph exits

186

debug: Optional[bool] - Override debug mode for this run

187

subgraphs: bool, default False

188

Whether to stream subgraph execution

189

190

Yields:

191

Chunks of output according to stream_mode.

192

For "values": dict with full state

193

For "updates": dict with node name and update

194

For "checkpoints": checkpoint metadata

195

For "tasks": task execution info

196

"""

197

198

async def astream(

199

self,

200

input,

201

config=None,

202

*,

203

stream_mode=None,

204

output_keys=None,

205

interrupt_before=None,

206

interrupt_after=None,

207

debug=None,

208

subgraphs=False

209

):

210

"""

211

Asynchronously stream graph execution.

212

213

Parameters: Same as stream(), including:

214

durability: Optional[Durability]

215

Durability mode for persistence:

216

- "sync": Persist before each step

217

- "async": Persist asynchronously during execution (default)

218

- "exit": Persist only when graph exits

219

220

Yields:

221

Async iterator of output chunks

222

"""

223

```

224

225

#### State Management Methods

226

227

```python { .api }

228

def get_state(self, config, *, subgraphs=False):

229

"""

230

Get the current state snapshot for a thread.

231

232

Parameters:

233

config: RunnableConfig - Must include thread_id in configurable

234

subgraphs: bool, default False - Include subgraph states

235

236

Returns:

237

StateSnapshot - Current state with:

238

- values: Current state values

239

- next: Tuple of next node(s) to execute

240

- config: Config for this snapshot

241

- metadata: Checkpoint metadata

242

- created_at: Timestamp

243

- parent_config: Parent checkpoint config

244

- tasks: Pending tasks

245

- interrupts: Pending interrupts

246

"""

247

248

async def aget_state(self, config, *, subgraphs=False):

249

"""

250

Asynchronously get the current state snapshot.

251

252

Parameters: Same as get_state()

253

254

Returns:

255

StateSnapshot - Current state

256

"""

257

```

258

259

```python { .api }

260

def get_state_history(

261

self,

262

config,

263

*,

264

filter=None,

265

before=None,

266

limit=None

267

):

268

"""

269

Get historical state snapshots for a thread.

270

271

Parameters:

272

config: RunnableConfig - Must include thread_id

273

filter: Optional[dict] - Filter checkpoints by metadata

274

before: Optional[RunnableConfig] - Get states before this config

275

limit: Optional[int] - Maximum number of states to return

276

277

Yields:

278

StateSnapshot - Historical states in reverse chronological order

279

"""

280

281

async def aget_state_history(

282

self,

283

config,

284

*,

285

filter=None,

286

before=None,

287

limit=None

288

):

289

"""

290

Asynchronously get historical state snapshots.

291

292

Parameters: Same as get_state_history()

293

294

Yields:

295

StateSnapshot - Historical states

296

"""

297

```

298

299

```python { .api }

300

def update_state(self, config, values, as_node=None, task_id=None):

301

"""

302

Update the state of a thread manually.

303

304

Parameters:

305

config: RunnableConfig - Must include thread_id

306

values: dict[str, Any] | Any | None - State updates to apply. Can be None when used with as_node=END to clear tasks.

307

as_node: Optional[str] - Act as if update came from this node

308

task_id: Optional[str]

309

Associate update with this task ID

310

311

Returns:

312

RunnableConfig - Config for the new checkpoint

313

314

Usage:

315

# Update state manually

316

config = {"configurable": {"thread_id": "1"}}

317

new_config = graph.update_state(

318

config,

319

{"counter": 5},

320

as_node="my_node"

321

)

322

"""

323

324

async def aupdate_state(self, config, values, as_node=None, task_id=None):

325

"""

326

Asynchronously update thread state.

327

328

Parameters: Same as update_state()

329

config: RunnableConfig - Must include thread_id

330

values: dict[str, Any] | Any | None - State updates to apply

331

as_node: Optional[str] - Act as if update came from this node

332

task_id: Optional[str]

333

Associate update with this task ID

334

335

Returns:

336

RunnableConfig - Config for the new checkpoint

337

"""

338

```

339

340

```python { .api }

341

def bulk_update_state(

342

self,

343

config: RunnableConfig,

344

supersteps: Sequence[Sequence[StateUpdate]]

345

) -> RunnableConfig:

346

"""

347

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

348

349

Parameters:

350

config: RunnableConfig - The config to apply the updates to. Must include thread_id.

351

supersteps: Sequence[Sequence[StateUpdate]] - A list of supersteps, each including

352

a list of updates to apply sequentially to a graph state. Each update is

353

a tuple of the form (values, as_node, task_id) where task_id is optional.

354

355

Returns:

356

RunnableConfig - The updated config.

357

358

Raises:

359

ValueError: If no checkpointer is set or no updates are provided.

360

InvalidUpdateError: If an invalid update is provided.

361

362

StateUpdate type:

363

NamedTuple with fields:

364

- values: dict[str, Any] | None

365

- as_node: str | None

366

- task_id: str | None

367

"""

368

369

async def abulk_update_state(

370

self,

371

config: RunnableConfig,

372

supersteps: Sequence[Sequence[StateUpdate]]

373

) -> RunnableConfig:

374

"""

375

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

376

377

Parameters:

378

config: RunnableConfig - The config to apply the updates to. Must include thread_id.

379

supersteps: Sequence[Sequence[StateUpdate]] - A list of supersteps, each including

380

a list of updates to apply sequentially to a graph state. Each update is

381

a tuple of the form (values, as_node, task_id) where task_id is optional.

382

383

Returns:

384

RunnableConfig - The updated config.

385

386

Raises:

387

ValueError: If no checkpointer is set or no updates are provided.

388

InvalidUpdateError: If an invalid update is provided.

389

"""

390

```

391

392

#### Graph Introspection Methods

393

394

```python { .api }

395

def get_graph(self, config=None, *, xray=False):

396

"""

397

Get the graph structure.

398

399

Parameters:

400

config: Optional[RunnableConfig]

401

xray: int | bool, default False

402

Whether to recursively include subgraph structures.

403

- False: Only return main graph structure

404

- True: Include all subgraphs recursively

405

- int: Include subgraphs up to specified depth

406

407

Returns:

408

Graph - Graph structure with nodes and edges

409

"""

410

411

async def aget_graph(self, config=None, *, xray=False):

412

"""

413

Asynchronously get the graph structure.

414

415

Parameters:

416

config: Optional[RunnableConfig]

417

xray: int | bool, default False

418

Whether to recursively include subgraph structures.

419

- False: Only return main graph structure

420

- True: Include all subgraphs recursively

421

- int: Include subgraphs up to specified depth

422

423

Returns:

424

Graph - Graph structure

425

"""

426

```

427

428

```python { .api }

429

def get_subgraphs(self, namespace=None, *, recurse=False):

430

"""

431

Get information about subgraphs.

432

433

Parameters:

434

namespace: str | None - Subgraph namespace to query

435

recurse: bool, default False - Recursively get nested subgraphs

436

437

Returns:

438

list[tuple] - List of (namespace, subgraph) tuples

439

"""

440

441

async def aget_subgraphs(self, namespace=None, *, recurse=False):

442

"""

443

Asynchronously get subgraph information.

444

445

Parameters: Same as get_subgraphs()

446

447

Returns:

448

list[tuple] - List of (namespace, subgraph) tuples

449

"""

450

```

451

452

#### Schema Methods

453

454

```python { .api }

455

def get_input_schema(self, config=None):

456

"""

457

Get the Pydantic schema for graph input.

458

459

Parameters:

460

config: Optional[RunnableConfig]

461

462

Returns:

463

Type[BaseModel] - Pydantic model for input

464

"""

465

466

def get_output_schema(self, config=None):

467

"""

468

Get the Pydantic schema for graph output.

469

470

Parameters:

471

config: Optional[RunnableConfig]

472

473

Returns:

474

Type[BaseModel] - Pydantic model for output

475

"""

476

477

def get_input_jsonschema(self, config=None):

478

"""

479

Get JSON schema for graph input.

480

481

Parameters:

482

config: Optional[RunnableConfig]

483

484

Returns:

485

dict - JSON schema describing input

486

"""

487

488

def get_output_jsonschema(self, config=None):

489

"""

490

Get JSON schema for graph output.

491

492

Parameters:

493

config: Optional[RunnableConfig]

494

495

Returns:

496

dict - JSON schema describing output

497

"""

498

499

def get_config_jsonschema(self, include=None):

500

"""

501

Get JSON schema for configuration.

502

503

Parameters:

504

include: Optional[list[str]] - Fields to include

505

506

Returns:

507

dict - JSON schema for config

508

"""

509

510

def get_context_jsonschema(self):

511

"""

512

Get JSON schema for context.

513

514

Returns:

515

dict - JSON schema for context

516

"""

517

```

518

519

#### Properties

520

521

```python { .api }

522

@property

523

def InputType(self):

524

"""

525

Get the input type annotation.

526

527

Returns:

528

Type - Input type

529

"""

530

531

@property

532

def OutputType(self):

533

"""

534

Get the output type annotation.

535

536

Returns:

537

Type - Output type

538

"""

539

540

@property

541

def stream_channels_list(self):

542

"""

543

Get list of stream channel names.

544

545

Returns:

546

list[str] - Channel names

547

"""

548

549

@property

550

def stream_channels_asis(self):

551

"""

552

Get stream channels as-is (without transformation).

553

554

Returns:

555

dict - Channel mappings

556

"""

557

```

558

559

#### Advanced Methods

560

561

```python { .api }

562

def validate(self):

563

"""

564

Validate the graph structure.

565

566

Ensures that all nodes are reachable, channels are properly configured,

567

and the graph structure is valid for execution.

568

569

Returns:

570

Self - Returns self for method chaining

571

572

Raises:

573

ValueError: If graph structure is invalid

574

575

Usage:

576

pregel = Pregel(...)

577

pregel.validate() # Raises if invalid

578

"""

579

580

def copy(self, update=None):

581

"""

582

Create a copy of the Pregel graph.

583

584

Parameters:

585

update: Optional[dict[str, Any]]

586

Dictionary of attributes to update in the copy

587

588

Returns:

589

Self - Copied Pregel instance

590

591

Usage:

592

# Create a copy with same configuration

593

pregel_copy = pregel.copy()

594

595

# Create a copy with updated attributes

596

pregel_copy = pregel.copy({"debug": True})

597

"""

598

599

def with_config(self, config=None, **kwargs):

600

"""

601

Create a copy of the graph with updated configuration.

602

603

Parameters:

604

config: Optional[RunnableConfig]

605

Configuration to merge with existing config

606

**kwargs: Additional config fields to override

607

608

Returns:

609

Self - New Pregel instance with updated config

610

611

Usage:

612

# Add callbacks to config

613

pregel_with_callbacks = pregel.with_config(

614

callbacks=[MyCallback()]

615

)

616

617

# Update recursion limit

618

pregel_limited = pregel.with_config(recursion_limit=50)

619

"""

620

621

def clear_cache(self, nodes=None):

622

"""

623

Clear cached results for nodes.

624

625

Parameters:

626

nodes: Optional[Sequence[str]]

627

Node names to clear cache for. If None, clears all nodes.

628

629

Raises:

630

ValueError: If no cache is configured on the graph

631

632

Usage:

633

# Clear cache for all nodes

634

pregel.clear_cache()

635

636

# Clear cache for specific nodes

637

pregel.clear_cache(["node1", "node2"])

638

639

Note:

640

Requires a cache to be configured when creating the graph.

641

"""

642

643

async def aclear_cache(self, nodes=None):

644

"""

645

Asynchronously clear cached results for nodes.

646

647

Parameters:

648

nodes: Optional[Sequence[str]]

649

Node names to clear cache for. If None, clears all nodes.

650

651

Raises:

652

ValueError: If no cache is configured on the graph

653

654

Usage:

655

# Clear cache for all nodes

656

await pregel.aclear_cache()

657

658

# Clear cache for specific nodes

659

await pregel.aclear_cache(["node1", "node2"])

660

"""

661

```

662

663

### NodeBuilder Class

664

665

Fluent API for building Pregel nodes with subscriptions, reads, writes, and metadata.

666

667

```python { .api }

668

class NodeBuilder:

669

"""

670

Builder for creating Pregel nodes using a fluent interface.

671

672

Provides a chainable API for configuring node subscriptions, reads,

673

writes, retry policies, and cache policies.

674

"""

675

676

def subscribe_only(self, channel):

677

"""

678

Subscribe to a single channel (shorthand).

679

680

Parameters:

681

channel: str - Channel to subscribe to

682

683

Returns:

684

NodeBuilder - Self for chaining

685

"""

686

687

def subscribe_to(self, *channels, read=True):

688

"""

689

Subscribe to channels (triggers node execution).

690

691

Parameters:

692

*channels: str - Channel names to subscribe to

693

read: bool, default True - Whether to read from channels

694

695

Returns:

696

NodeBuilder - Self for chaining

697

"""

698

699

def read_from(self, *channels):

700

"""

701

Read from channels without subscribing (passive read).

702

703

Parameters:

704

*channels: str - Channel names to read from

705

706

Returns:

707

NodeBuilder - Self for chaining

708

"""

709

710

def do(self, node):

711

"""

712

Set the runnable to execute for this node.

713

714

Parameters:

715

node: Runnable - The node implementation

716

717

Returns:

718

NodeBuilder - Self for chaining

719

"""

720

721

def write_to(self, *channels, **kwargs):

722

"""

723

Add channel writes (output mappings).

724

725

Parameters:

726

*channels: str - Channel names for direct writes

727

**kwargs: Mapped writes (channel_name=value_key)

728

729

Returns:

730

NodeBuilder - Self for chaining

731

"""

732

733

def meta(self, *tags, **metadata):

734

"""

735

Add tags and metadata to the node.

736

737

Parameters:

738

*tags: str - Tags to add

739

**metadata: Metadata key-value pairs

740

741

Returns:

742

NodeBuilder - Self for chaining

743

"""

744

745

def add_retry_policies(self, *policies):

746

"""

747

Add retry policies for node failures.

748

749

Parameters:

750

*policies: RetryPolicy - Retry policy configurations

751

752

Returns:

753

NodeBuilder - Self for chaining

754

"""

755

756

def add_cache_policy(self, policy):

757

"""

758

Add cache policy for node results.

759

760

Parameters:

761

policy: CachePolicy - Cache configuration

762

763

Returns:

764

NodeBuilder - Self for chaining

765

"""

766

767

def build(self):

768

"""

769

Build the PregelNode.

770

771

Returns:

772

PregelNode - Configured node ready for Pregel

773

"""

774

```

775

776

## Usage Examples

777

778

### Direct Pregel Construction

779

780

```python

781

from langgraph.pregel import Pregel

782

from langgraph.channels import LastValue

783

from langchain_core.runnables import RunnableLambda

784

785

# Define nodes as runnables

786

def process(state):

787

return {"output": state["input"] * 2}

788

789

# Construct Pregel directly

790

graph = Pregel(

791

nodes={

792

"process": RunnableLambda(process)

793

},

794

channels={

795

"input": LastValue(),

796

"output": LastValue()

797

},

798

input_channels="input",

799

output_channels="output"

800

)

801

802

result = graph.invoke({"input": 5})

803

# result == {"output": 10}

804

```

805

806

### Streaming with Multiple Modes

807

808

```python

809

from langgraph.graph import StateGraph, START, END

810

811

# Create a graph

812

graph = StateGraph(dict)

813

graph.add_node("node1", lambda s: {"value": s["value"] + 1})

814

graph.add_node("node2", lambda s: {"value": s["value"] * 2})

815

graph.add_edge(START, "node1")

816

graph.add_edge("node1", "node2")

817

graph.add_edge("node2", END)

818

819

app = graph.compile()

820

821

# Stream with multiple modes

822

for chunk in app.stream(

823

{"value": 1},

824

stream_mode=["values", "updates"]

825

):

826

print(chunk)

827

# First iteration: ("values", {"value": 2})

828

# Second iteration: ("updates", {"node1": {"value": 2}})

829

# Third iteration: ("values", {"value": 4})

830

# Fourth iteration: ("updates", {"node2": {"value": 4}})

831

```

832

833

### State Management

834

835

```python

836

from langgraph.graph import StateGraph, START, END

837

from langgraph.checkpoint.memory import MemorySaver

838

839

# Create graph with checkpointer

840

graph = StateGraph(dict)

841

graph.add_node("increment", lambda s: {"count": s["count"] + 1})

842

graph.add_edge(START, "increment")

843

graph.add_edge("increment", END)

844

845

checkpointer = MemorySaver()

846

app = graph.compile(checkpointer=checkpointer)

847

848

# Run with thread_id

849

config = {"configurable": {"thread_id": "thread-1"}}

850

result1 = app.invoke({"count": 0}, config)

851

# result1 == {"count": 1}

852

853

# Get state

854

state = app.get_state(config)

855

# state.values == {"count": 1}

856

# state.next == () # No pending nodes

857

858

# Continue from checkpoint

859

result2 = app.invoke({"count": state.values["count"]}, config)

860

# result2 == {"count": 2}

861

862

# Get history

863

for state in app.get_state_history(config, limit=5):

864

print(state.values, state.created_at)

865

```

866

867

### Manual State Updates

868

869

```python

870

from langgraph.graph import StateGraph, START, END

871

from langgraph.checkpoint.memory import MemorySaver

872

873

graph = StateGraph(dict)

874

graph.add_node("process", lambda s: {"result": s["input"] * 2})

875

graph.add_edge(START, "process")

876

graph.add_edge("process", END)

877

878

checkpointer = MemorySaver()

879

app = graph.compile(checkpointer=checkpointer)

880

881

config = {"configurable": {"thread_id": "1"}}

882

883

# Initial run

884

result = app.invoke({"input": 5}, config)

885

# result == {"input": 5, "result": 10}

886

887

# Manually update state

888

new_config = app.update_state(

889

config,

890

{"input": 20, "result": None},

891

as_node="process"

892

)

893

894

# Resume execution

895

result = app.invoke(None, new_config)

896

# result == {"input": 20, "result": 40}

897

```

898

899

### Using NodeBuilder (Advanced)

900

901

```python

902

from langgraph.pregel import Pregel, NodeBuilder

903

from langgraph.channels import LastValue

904

from langgraph.types import RetryPolicy, CachePolicy

905

from langchain_core.runnables import RunnableLambda

906

907

# Build a node with fluent API

908

node = (

909

NodeBuilder()

910

.subscribe_to("input_channel")

911

.read_from("config_channel")

912

.do(RunnableLambda(lambda x: x * 2))

913

.write_to("output_channel")

914

.add_retry_policies(RetryPolicy(max_attempts=3))

915

.add_cache_policy(CachePolicy(key_func=str))

916

.meta("important", priority=1)

917

.build()

918

)

919

920

# Use in Pregel

921

graph = Pregel(

922

nodes={"my_node": node},

923

channels={

924

"input_channel": LastValue(),

925

"config_channel": LastValue(),

926

"output_channel": LastValue()

927

},

928

input_channels="input_channel",

929

output_channels="output_channel"

930

)

931

```

932

933

### Interrupting and Resuming

934

935

```python

936

from langgraph.graph import StateGraph, START, END

937

from langgraph.checkpoint.memory import MemorySaver

938

939

graph = StateGraph(dict)

940

graph.add_node("step1", lambda s: {"stage": "step1_complete"})

941

graph.add_node("step2", lambda s: {"stage": "step2_complete"})

942

graph.add_node("step3", lambda s: {"stage": "step3_complete"})

943

graph.add_edge(START, "step1")

944

graph.add_edge("step1", "step2")

945

graph.add_edge("step2", "step3")

946

graph.add_edge("step3", END)

947

948

checkpointer = MemorySaver()

949

app = graph.compile(

950

checkpointer=checkpointer,

951

interrupt_before=["step2"]

952

)

953

954

config = {"configurable": {"thread_id": "1"}}

955

956

# Run until interrupt

957

result = app.invoke({"stage": "start"}, config)

958

# Stops before step2

959

960

# Check state

961

state = app.get_state(config)

962

# state.next == ("step2",)

963

# state.values["stage"] == "step1_complete"

964

965

# Resume execution

966

result = app.invoke(None, config)

967

# Continues from step2

968

# result["stage"] == "step3_complete"

969

```

970

971

### Custom Streaming

972

973

```python

974

from langgraph.graph import StateGraph, START, END

975

from langgraph.config import get_stream_writer

976

977

def node_with_custom_stream(state):

978

writer = get_stream_writer()

979

980

# Write custom data to stream

981

writer({"type": "progress", "percent": 25})

982

# Do work...

983

writer({"type": "progress", "percent": 50})

984

# More work...

985

writer({"type": "progress", "percent": 100})

986

987

return {"result": "done"}

988

989

graph = StateGraph(dict)

990

graph.add_node("work", node_with_custom_stream)

991

graph.add_edge(START, "work")

992

graph.add_edge("work", END)

993

994

app = graph.compile()

995

996

# Stream custom data

997

for chunk in app.stream({"input": "data"}, stream_mode="custom"):

998

print(chunk)

999

# {"type": "progress", "percent": 25}

1000

# {"type": "progress", "percent": 50}

1001

# {"type": "progress", "percent": 100}

1002

```

1003

1004

## Notes

1005

1006

- Pregel is typically used indirectly through StateGraph.compile()

1007

- Direct Pregel construction provides maximum flexibility for advanced use cases

1008

- All async methods follow the same signature as sync counterparts with 'a' prefix

1009

- Streaming supports multiple simultaneous modes for comprehensive observability

1010

- State management requires a checkpointer for persistence

1011

- Thread IDs in config enable multi-tenant applications with isolated state

1012

- Pregel implements the LangChain Runnable interface for composability

1013