or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

types-primitives.mddocs/

0

# Types and Primitives

1

2

LangGraph provides a comprehensive set of types and primitives for controlling graph execution flow, managing state snapshots, configuring retry and cache policies, and coordinating parallel execution.

3

4

## Imports

5

6

```python

7

from langgraph.types import (

8

Send,

9

Command,

10

interrupt,

11

RetryPolicy,

12

CachePolicy,

13

Interrupt,

14

StateSnapshot,

15

StateUpdate,

16

PregelTask,

17

PregelExecutableTask,

18

StreamMode,

19

StreamWriter,

20

Durability,

21

All,

22

Checkpointer

23

)

24

from langgraph.managed import IsLastStep, RemainingSteps

25

from langgraph.typing import (

26

StateT, StateT_co, StateT_contra,

27

ContextT, InputT, OutputT

28

)

29

```

30

31

## Capabilities

32

33

### Control Flow Primitives

34

35

#### Send Class

36

37

Send messages to specific nodes with custom state, enabling dynamic parallelization and fan-out patterns.

38

39

```python { .api }

40

class Send:

41

"""

42

Message to send to a specific node with custom state.

43

44

Used in conditional edges to dynamically route execution to nodes

45

with custom input, enabling fan-out and parallel processing patterns.

46

47

IMPORTANT: When using Send for parallel execution where multiple nodes

48

write to the same state key, you MUST use Annotated with a reducer function

49

(e.g., operator.add for lists). Otherwise you'll get InvalidUpdateError:

50

"Can receive only one value per step. Use an Annotated key to handle multiple values."

51

52

Attributes:

53

node: str - Target node name

54

arg: Any - State/message to send to the node

55

"""

56

57

def __init__(self, node, arg):

58

"""

59

Initialize a Send message.

60

61

Parameters:

62

node: str - Name of the target node

63

arg: Any - State or data to send to the node

64

"""

65

66

node: str

67

arg: Any

68

```

69

70

#### Command Class

71

72

Commands for updating state and controlling graph navigation, including support for parent graph communication.

73

74

```python { .api }

75

class Command:

76

"""

77

Command for state updates and navigation control.

78

79

Enables nodes to update state and control execution flow simultaneously.

80

Can target parent graphs for nested graph communication.

81

82

Type Parameters:

83

N: Type of node identifiers (typically str)

84

85

Attributes:

86

graph: Optional[str] - Target graph name or Command.PARENT

87

update: Optional[Any] - State updates to apply

88

resume: Optional[dict[str, Any] | Any] - Resume value for interrupt

89

goto: Send | Sequence[Send | N] | N - Navigation target(s)

90

91

Class Attributes:

92

PARENT: Literal["__parent__"] - Reference to parent graph

93

"""

94

95

graph: str | None = None

96

update: Any | None = None

97

resume: dict[str, Any] | Any | None = None

98

goto: Send | Sequence[Send] = ()

99

100

PARENT: ClassVar[Literal["__parent__"]] = "__parent__"

101

```

102

103

#### Interrupt Function

104

105

Interrupt graph execution with a resumable value from within a node.

106

107

```python { .api }

108

def interrupt(value: Any) -> Any:

109

"""

110

Interrupt graph execution with a resumable exception.

111

112

Raises an internal exception that pauses execution and stores the value.

113

Execution can be resumed later, optionally providing a resume value.

114

115

Parameters:

116

value: Any - Value to store with the interrupt. Can be retrieved

117

via StateSnapshot.interrupts.

118

119

Returns:

120

Any - The resume value when execution is resumed.

121

If no resume value is provided, returns None.

122

123

Usage:

124

def approval_node(state):

125

# Request human approval

126

user_input = interrupt("Please approve or reject")

127

128

# user_input contains the resume value

129

if user_input == "approve":

130

return {"approved": True}

131

else:

132

return {"approved": False}

133

134

# Later, resume with:

135

# app.invoke(Command(resume="approve"), config)

136

"""

137

```

138

139

### Policy Configuration

140

141

#### RetryPolicy

142

143

Configuration for retrying node execution on failures with exponential backoff.

144

145

```python { .api }

146

class RetryPolicy:

147

"""

148

Configuration for retrying nodes on failure.

149

150

Implements exponential backoff with jitter for reliable retries.

151

152

Attributes:

153

initial_interval: float - Time in seconds before first retry (default: 0.5)

154

backoff_factor: float - Multiplier for retry interval (default: 2.0)

155

max_interval: float - Maximum time between retries (default: 128.0)

156

max_attempts: int - Maximum retry attempts (default: 3)

157

jitter: bool - Add random jitter to retry timing (default: True)

158

retry_on: Exception type(s) or predicate - When to retry:

159

- type[Exception]: Retry on this exception type

160

- Sequence[type[Exception]]: Retry on any of these types

161

- Callable[[Exception], bool]: Retry if function returns True

162

163

Usage:

164

# Retry on any exception

165

policy = RetryPolicy(max_attempts=5)

166

167

# Retry on specific exception

168

policy = RetryPolicy(

169

max_attempts=3,

170

retry_on=ValueError

171

)

172

173

# Retry on multiple exceptions

174

policy = RetryPolicy(

175

retry_on=(ValueError, KeyError)

176

)

177

178

# Custom retry logic

179

policy = RetryPolicy(

180

retry_on=lambda e: isinstance(e, ValueError) and "temporary" in str(e)

181

)

182

"""

183

184

initial_interval: float = 0.5

185

backoff_factor: float = 2.0

186

max_interval: float = 128.0

187

max_attempts: int = 3

188

jitter: bool = True

189

retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]

190

```

191

192

#### CachePolicy

193

194

Configuration for caching node results to avoid redundant computation.

195

196

```python { .api }

197

class CachePolicy:

198

"""

199

Configuration for caching node results.

200

201

Type Parameters:

202

KeyFuncT: Type of the key generation function

203

204

Attributes:

205

key_func: Callable - Function to generate cache key from input

206

ttl: Optional[int] - Time to live in seconds (None = no expiration)

207

208

Usage:

209

# Cache based on input dict

210

policy = CachePolicy(

211

key_func=lambda x: f"{x['id']}:{x['version']}",

212

ttl=3600 # 1 hour

213

)

214

215

# Cache based on entire input

216

policy = CachePolicy(

217

key_func=str,

218

ttl=None # Never expires

219

)

220

221

# Custom key function

222

def complex_key(input_data):

223

return f"{input_data['user']}:{input_data['timestamp']}"

224

225

policy = CachePolicy(

226

key_func=complex_key,

227

ttl=1800 # 30 minutes

228

)

229

"""

230

231

key_func: Callable # Function to generate cache key

232

ttl: int | None = None # Time to live in seconds

233

```

234

235

### State Management Types

236

237

#### StateSnapshot

238

239

Snapshot of graph state at the beginning of a step, including pending tasks and interrupts.

240

241

```python { .api }

242

class StateSnapshot:

243

"""

244

Snapshot of graph state at the beginning of a step.

245

246

Captures complete execution context including state values,

247

pending tasks, and configuration.

248

249

Attributes:

250

values: dict[str, Any] | Any

251

Current state channel values. Dict for TypedDict states,

252

object for Pydantic/dataclass states.

253

next: tuple[str, ...]

254

Tuple of node names to execute next. Empty if no pending nodes.

255

config: RunnableConfig

256

Configuration for this snapshot, including thread_id and

257

checkpoint_id.

258

metadata: Optional[CheckpointMetadata]

259

Metadata associated with this checkpoint (step number, source, etc.)

260

created_at: Optional[str]

261

ISO timestamp when snapshot was created

262

parent_config: Optional[RunnableConfig]

263

Config of parent checkpoint (for nested graphs)

264

tasks: tuple[PregelTask, ...]

265

Pending tasks to execute, including task IDs, names, and state

266

interrupts: tuple[Interrupt, ...]

267

Pending interrupts with values and IDs

268

269

Usage:

270

state = app.get_state(config)

271

print(state.values) # Current state

272

print(state.next) # Next nodes to execute

273

print(state.tasks) # Pending tasks

274

275

# Check if execution is complete

276

if not state.next:

277

print("Graph execution complete")

278

"""

279

280

values: dict[str, Any] | Any

281

next: tuple[str, ...]

282

config: RunnableConfig

283

metadata: CheckpointMetadata | None

284

created_at: str | None

285

parent_config: RunnableConfig | None

286

tasks: tuple[PregelTask, ...]

287

interrupts: tuple[Interrupt, ...]

288

```

289

290

#### StateUpdate

291

292

Information about a state update for bulk operations.

293

294

```python { .api }

295

class StateUpdate:

296

"""

297

State update information for bulk_update_state.

298

299

Attributes:

300

values: Optional[dict[str, Any]] - State values to update

301

as_node: Optional[str] - Act as if update came from this node

302

task_id: Optional[str] - ID of task performing the update

303

"""

304

305

values: dict[str, Any] | None

306

as_node: str | None

307

task_id: str | None

308

```

309

310

#### Interrupt Class

311

312

Information about an interrupt in the graph execution.

313

314

```python { .api }

315

class Interrupt:

316

"""

317

Information about an interrupt.

318

319

Attributes:

320

value: Any - Value associated with the interrupt

321

id: str - Unique identifier for the interrupt

322

"""

323

324

value: Any

325

id: str

326

327

@classmethod

328

def from_ns(cls, value, ns):

329

"""

330

Create an Interrupt from a value and namespace.

331

332

Parameters:

333

value: Any - Interrupt value

334

ns: str - Namespace string

335

336

Returns:

337

Interrupt - New interrupt instance

338

"""

339

```

340

341

### Task Types

342

343

#### PregelTask

344

345

Represents a task in the Pregel execution model, including task state and results.

346

347

```python { .api }

348

class PregelTask:

349

"""

350

A Pregel task representing a unit of work.

351

352

Attributes:

353

id: str - Unique task identifier

354

name: str - Task/node name

355

path: tuple[str | int | tuple, ...]

356

Path in the execution tree (for nested graphs)

357

error: Optional[Exception] - Exception if task failed

358

interrupts: tuple[Interrupt, ...]

359

Interrupts raised by this task

360

state: Optional[RunnableConfig | StateSnapshot]

361

Task state (for subgraphs) or configuration

362

result: Optional[Any] - Task result if completed

363

364

Usage:

365

state = app.get_state(config)

366

for task in state.tasks:

367

print(f"Task {task.name}: {task.id}")

368

if task.error:

369

print(f" Error: {task.error}")

370

if task.interrupts:

371

print(f" Interrupts: {task.interrupts}")

372

"""

373

374

id: str

375

name: str

376

path: tuple[str | int | tuple, ...]

377

error: Exception | None = None

378

interrupts: tuple[Interrupt, ...] = ()

379

state: None | RunnableConfig | StateSnapshot = None

380

result: Any | None = None

381

```

382

383

#### PregelExecutableTask

384

385

Executable task with all information needed for execution (internal use).

386

387

```python { .api }

388

class PregelExecutableTask:

389

"""

390

Executable task with complete execution context.

391

392

This is primarily used internally by the Pregel execution engine.

393

394

Attributes:

395

name: str - Task name

396

input: Any - Task input

397

proc: Runnable - Runnable to execute

398

writes: deque[tuple[str, Any]] - Pending writes

399

config: RunnableConfig - Task configuration

400

triggers: Sequence[str] - Channel triggers

401

retry_policy: Sequence[RetryPolicy] - Retry configurations

402

cache_key: Optional[CacheKey] - Cache key if caching enabled

403

id: str - Task ID

404

path: tuple[str | int | tuple, ...] - Execution path

405

writers: Sequence[Runnable] - Write handlers

406

subgraphs: Sequence[PregelProtocol] - Nested subgraphs

407

"""

408

409

name: str

410

input: Any

411

proc: Runnable

412

writes: deque[tuple[str, Any]]

413

config: RunnableConfig

414

triggers: Sequence[str]

415

retry_policy: Sequence[RetryPolicy]

416

cache_key: CacheKey | None

417

id: str

418

path: tuple[str | int | tuple, ...]

419

writers: Sequence[Runnable] = ()

420

subgraphs: Sequence[PregelProtocol] = ()

421

```

422

423

### Stream and Execution Types

424

425

#### StreamMode Type

426

427

Defines how the stream method should emit outputs.

428

429

```python { .api }

430

StreamMode = Literal["values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"]

431

"""

432

Stream mode options for controlling how graph execution is streamed:

433

434

- "values": Emit the full state after each step/superstep

435

- "updates": Emit node-level updates as they occur, in format {node_name: update_dict}

436

- "checkpoints": Emit checkpoint creation events (metadata about state persistence)

437

- "tasks": Emit task-level events showing when individual tasks start and complete

438

- "debug": Emit both checkpoints and tasks for comprehensive debugging

439

- "messages": Emit LLM message tokens as they are generated (for streaming chat models)

440

- "custom": Emit custom data written via StreamWriter within nodes

441

442

Usage:

443

# Single mode

444

for chunk in app.stream(input, stream_mode="updates"):

445

print(chunk)

446

447

# Multiple modes - returns tuples of (mode, data)

448

for chunk in app.stream(input, stream_mode=["values", "tasks"]):

449

mode, data = chunk

450

if mode == "values":

451

print(f"State: {data}")

452

elif mode == "tasks":

453

print(f"Task: {data}")

454

"""

455

```

456

457

#### StreamWriter Type

458

459

Callable for writing custom data to the output stream.

460

461

```python { .api }

462

StreamWriter = Callable[[Any], None]

463

"""

464

Callable that writes custom data to the output stream.

465

466

Retrieved via get_stream_writer() within nodes. Enables nodes to

467

emit progress updates, logs, or custom events during execution.

468

469

Usage:

470

from langgraph.config import get_stream_writer

471

472

def my_node(state):

473

writer = get_stream_writer()

474

writer({"progress": 25})

475

# Do work...

476

writer({"progress": 100})

477

return state

478

479

# Stream with custom mode

480

for chunk in app.stream(input, stream_mode="custom"):

481

print(chunk) # {"progress": 25}, then {"progress": 100}

482

"""

483

```

484

485

#### Durability Type

486

487

Durability mode for graph execution (internal).

488

489

```python { .api }

490

Durability = Literal["sync", "async", "exit"]

491

"""

492

Durability mode for graph execution.

493

494

- "sync": Changes are persisted synchronously before the next step starts

495

- "async": Changes are persisted asynchronously while the next step executes

496

- "exit": Changes are persisted only when the graph exits

497

498

This is primarily used internally by the execution engine.

499

"""

500

```

501

502

#### Checkpointer Type

503

504

Type alias for checkpointer configurations.

505

506

```python { .api }

507

Checkpointer = None | bool | BaseCheckpointSaver

508

"""

509

Type of checkpointer to use for a subgraph.

510

511

- True: Enables persistent checkpointing for this subgraph

512

- False: Disables checkpointing, even if parent graph has a checkpointer

513

- None: Inherits checkpointer from parent graph

514

- BaseCheckpointSaver: Uses specific checkpointer instance

515

516

Usage:

517

# Inherit from parent

518

graph.compile(checkpointer=None)

519

520

# Enable checkpointing

521

graph.compile(checkpointer=True)

522

523

# Disable checkpointing

524

graph.compile(checkpointer=False)

525

526

# Use specific checkpointer

527

from langgraph.checkpoint.memory import MemorySaver

528

graph.compile(checkpointer=MemorySaver())

529

"""

530

```

531

532

#### All Type

533

534

Special value to indicate all nodes.

535

536

```python { .api }

537

All = Literal["*"]

538

"""

539

Special value to interrupt on all nodes.

540

541

Usage:

542

# Interrupt before all nodes

543

app = graph.compile(interrupt_before="*")

544

545

# Interrupt after all nodes

546

app = graph.compile(interrupt_after="*")

547

"""

548

```

549

550

### Managed Values

551

552

Special state values automatically managed by LangGraph.

553

554

```python { .api }

555

IsLastStep = Annotated[bool, IsLastStepManager]

556

"""

557

Managed value that indicates if the current step is the last step.

558

559

Automatically set to True when the graph is about to reach the

560

recursion limit.

561

562

Usage:

563

from typing import TypedDict, Annotated

564

from langgraph.managed import IsLastStep

565

566

class State(TypedDict):

567

data: str

568

is_last: IsLastStep

569

570

def node(state):

571

if state["is_last"]:

572

# Last step - finalize

573

return {"data": "final"}

574

return {"data": "continue"}

575

"""

576

577

RemainingSteps = Annotated[int, RemainingStepsManager]

578

"""

579

Managed value for the number of remaining steps.

580

581

Automatically decremented with each step. Useful for limiting

582

iteration or providing step-based behavior.

583

584

Usage:

585

from typing import TypedDict, Annotated

586

from langgraph.managed import RemainingSteps

587

588

class State(TypedDict):

589

data: str

590

steps_left: RemainingSteps

591

592

def node(state):

593

print(f"Steps remaining: {state['steps_left']}")

594

return {"data": "processed"}

595

"""

596

```

597

598

### Type Variables

599

600

Type variables for generic typing of graphs and components.

601

602

```python { .api }

603

StateT = TypeVar("StateT", bound=StateLike)

604

"""

605

Type variable for state in graph.

606

607

Used for generic typing of StateGraph and Pregel.

608

Bound to StateLike (dict-like objects, Pydantic models, dataclasses).

609

"""

610

611

StateT_co = TypeVar("StateT_co", bound=StateLike, covariant=True)

612

"""

613

Covariant state type variable.

614

615

Used for read-only state access in type signatures.

616

"""

617

618

StateT_contra = TypeVar("StateT_contra", bound=StateLike, contravariant=True)

619

"""

620

Contravariant state type variable.

621

622

Used for write-only state access in type signatures.

623

"""

624

625

ContextT = TypeVar("ContextT", bound=StateLike | None, default=None)

626

"""

627

Type variable for graph run-scoped context.

628

629

Context is read-only and available throughout the graph run.

630

Defaults to None if not specified.

631

"""

632

633

InputT = TypeVar("InputT", bound=StateLike, default=StateT)

634

"""

635

Type variable for input to state graph.

636

637

Typically a subset of the state schema. Defaults to StateT.

638

"""

639

640

OutputT = TypeVar("OutputT", bound=StateLike, default=StateT)

641

"""

642

Type variable for output of state graph.

643

644

Typically a subset of the state schema. Defaults to StateT.

645

"""

646

```

647

648

## Usage Examples

649

650

### Using Send for Dynamic Parallelization

651

652

```python

653

from typing import TypedDict, Annotated

654

from operator import add

655

from langgraph.graph import StateGraph, START, END

656

from langgraph.types import Send

657

658

class State(TypedDict):

659

items: list[int]

660

# IMPORTANT: Parallel writes require Annotated with a reducer function

661

# Without this, you'll get InvalidUpdateError when multiple nodes write to 'results'

662

results: Annotated[list[int], add]

663

664

def fan_out(state: State) -> list[Send]:

665

"""Create parallel tasks for each item."""

666

return [

667

Send("process", {"value": item})

668

for item in state["items"]

669

]

670

671

def process(state: dict) -> dict:

672

"""Process single item."""

673

return {"results": [state["value"] * 2]}

674

675

graph = StateGraph(State)

676

graph.add_node("process", process)

677

graph.add_conditional_edges(START, fan_out)

678

graph.add_edge("process", END)

679

680

app = graph.compile()

681

result = app.invoke({"items": [1, 2, 3], "results": []})

682

# result["results"] == [2, 4, 6]

683

```

684

685

### Using Command for State Updates and Navigation

686

687

```python

688

from typing import TypedDict

689

from langgraph.graph import StateGraph, START, END

690

from langgraph.types import Command

691

692

class State(TypedDict):

693

stage: str

694

value: int

695

696

def node(state: State) -> Command:

697

"""Return command with state update and navigation."""

698

return Command(

699

update={"value": state["value"] + 1},

700

goto="next_node" if state["value"] < 5 else END

701

)

702

703

graph = StateGraph(State)

704

graph.add_node("node", node)

705

graph.add_node("next_node", lambda s: {"stage": "next"})

706

graph.add_edge(START, "node")

707

```

708

709

### Using Interrupt for Human-in-the-Loop

710

711

```python

712

from typing import TypedDict

713

from langgraph.graph import StateGraph, START, END

714

from langgraph.types import interrupt, Command

715

from langgraph.checkpoint.memory import MemorySaver

716

717

class State(TypedDict):

718

data: str

719

approved: bool

720

721

def needs_approval(state: State) -> dict:

722

"""Request human approval."""

723

response = interrupt({

724

"question": "Approve this data?",

725

"data": state["data"]

726

})

727

728

return {"approved": response == "yes"}

729

730

graph = StateGraph(State)

731

graph.add_node("approval", needs_approval)

732

graph.add_edge(START, "approval")

733

graph.add_edge("approval", END)

734

735

checkpointer = MemorySaver()

736

app = graph.compile(checkpointer=checkpointer)

737

738

config = {"configurable": {"thread_id": "1"}}

739

740

# First run - interrupts

741

result = app.invoke({"data": "important", "approved": False}, config)

742

743

# Check interrupts

744

state = app.get_state(config)

745

print(state.interrupts[0].value) # {"question": "Approve...", "data": "important"}

746

747

# Resume with approval

748

result = app.invoke(Command(resume="yes"), config)

749

# result["approved"] == True

750

```

751

752

### Using RetryPolicy with Nodes

753

754

```python

755

from langgraph.graph import StateGraph, START, END

756

from langgraph.types import RetryPolicy

757

758

class State(TypedDict):

759

attempts: int

760

result: str

761

762

def unreliable_node(state: State) -> dict:

763

"""Node that might fail."""

764

if state["attempts"] < 2:

765

raise ValueError("Temporary failure")

766

return {"result": "success"}

767

768

graph = StateGraph(State)

769

graph.add_node(

770

"work",

771

unreliable_node,

772

retry_policy=RetryPolicy(

773

max_attempts=3,

774

initial_interval=1.0,

775

retry_on=ValueError

776

)

777

)

778

graph.add_edge(START, "work")

779

graph.add_edge("work", END)

780

781

app = graph.compile()

782

# Will retry on ValueError up to 3 times

783

```

784

785

### Using Managed Values

786

787

```python

788

from typing import TypedDict, Annotated

789

from langgraph.graph import StateGraph, START, END

790

from langgraph.managed import IsLastStep, RemainingSteps

791

792

class State(TypedDict):

793

data: list[int]

794

is_last: IsLastStep

795

steps_left: RemainingSteps

796

797

def process(state: State) -> dict:

798

"""Process with awareness of remaining steps."""

799

print(f"Steps remaining: {state['steps_left']}")

800

801

if state["is_last"]:

802

# Last step - finalize

803

return {"data": state["data"] + [999]}

804

805

# Normal processing

806

return {"data": state["data"] + [len(state["data"])]}

807

808

graph = StateGraph(State)

809

graph.add_node("process", process)

810

graph.add_edge(START, "process")

811

graph.add_edge("process", "process") # Loop

812

813

app = graph.compile()

814

815

# Run with recursion limit

816

result = app.invoke(

817

{"data": []},

818

config={"recursion_limit": 5}

819

)

820

# Will loop until is_last is True

821

```

822

823

### Inspecting StateSnapshot

824

825

```python

826

from langgraph.checkpoint.memory import MemorySaver

827

828

graph = StateGraph(dict)

829

# ... add nodes ...

830

831

app = graph.compile(checkpointer=MemorySaver())

832

config = {"configurable": {"thread_id": "1"}}

833

834

# Run graph

835

app.invoke({"value": 1}, config)

836

837

# Get detailed state snapshot

838

state = app.get_state(config)

839

840

print(f"Values: {state.values}")

841

print(f"Next nodes: {state.next}")

842

print(f"Created: {state.created_at}")

843

print(f"Tasks: {len(state.tasks)}")

844

845

for task in state.tasks:

846

print(f" Task: {task.name} [{task.id}]")

847

if task.error:

848

print(f" Error: {task.error}")

849

850

for interrupt in state.interrupts:

851

print(f" Interrupt: {interrupt.value} [{interrupt.id}]")

852

```

853

854

## Notes

855

856

- `Send` enables dynamic fan-out where the number and targets of parallel tasks are determined at runtime

857

- `Command` provides a unified way to update state and control navigation in a single return value

858

- `interrupt()` pauses execution and can be resumed with `Command(resume=value)`

859

- `RetryPolicy` implements exponential backoff with jitter for robust error handling

860

- `CachePolicy` requires a cache backend to be configured in the graph

861

- Managed values (`IsLastStep`, `RemainingSteps`) are automatically updated by the execution engine

862

- `StateSnapshot` provides complete visibility into graph state at any point

863

- Type variables enable type-safe graph construction with IDEs and type checkers

864