or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

state-graph.mddocs/

0

# State Graph API

1

2

StateGraph is the primary high-level API for building stateful, multi-actor workflows in LangGraph. It provides a declarative interface for defining graphs where nodes communicate through shared state, with support for conditional routing, parallel execution, and complex control flow patterns.

3

4

## Imports

5

6

```python

7

from langgraph.graph import StateGraph, START, END

8

from langgraph.constants import TAG_NOSTREAM, TAG_HIDDEN

9

```

10

11

**Note:** `TAG_NOSTREAM` and `TAG_HIDDEN` are imported from `langgraph.constants`, not `langgraph.graph`.

12

13

## Capabilities

14

15

### StateGraph Class

16

17

Main class for constructing stateful graphs. Nodes read from and write to shared state channels, with automatic state merging using reducer functions.

18

19

```python { .api }

20

class StateGraph:

21

"""

22

A graph whose nodes communicate by reading and writing to a shared state.

23

24

Type Parameters:

25

StateT: The type of the state schema (TypedDict, Pydantic model, or dataclass)

26

ContextT: Optional type for run-scoped context (default: None)

27

InputT: Type for graph input (default: StateT)

28

OutputT: Type for graph output (default: StateT)

29

"""

30

31

def __init__(

32

self,

33

state_schema,

34

context_schema=None,

35

*,

36

input_schema=None,

37

output_schema=None

38

):

39

"""

40

Initialize a StateGraph.

41

42

Parameters:

43

state_schema: TypedDict, Pydantic model, or dataclass defining state structure.

44

Fields can be annotated with reducers using Annotated[type, reducer_func].

45

context_schema: Optional schema for run-scoped read-only context

46

input_schema: Optional schema for graph input (subset of state_schema)

47

output_schema: Optional schema for graph output (subset of state_schema)

48

"""

49

```

50

51

#### Adding Nodes

52

53

```python { .api }

54

def add_node(

55

self,

56

node,

57

action=None,

58

*,

59

defer=False,

60

metadata=None,

61

input_schema=None,

62

retry_policy=None,

63

cache_policy=None,

64

destinations=None

65

):

66

"""

67

Add a node to the graph.

68

69

Parameters:

70

node: str or callable

71

- If str and action is provided: node name

72

- If callable: used as both name (func.__name__) and action

73

action: Optional[Callable[[StateT], StateT | dict | None]]

74

The function to execute for this node. Should accept state and return

75

updates to merge into state (or None for no updates).

76

defer: bool, default False

77

If True, node execution is deferred (for advanced use cases)

78

metadata: Optional[dict]

79

Metadata to attach to the node

80

input_schema: Optional schema to validate/coerce node input

81

retry_policy: Optional[RetryPolicy | Sequence[RetryPolicy]]

82

Retry configuration for node failures

83

cache_policy: Optional[CachePolicy]

84

Cache configuration for node results

85

destinations: Optional[list[str]]

86

Pre-declared destination nodes for type checking

87

88

Returns:

89

Self (for method chaining)

90

"""

91

```

92

93

#### Adding Edges

94

95

```python { .api }

96

def add_edge(self, start_key, end_key):

97

"""

98

Add a directed edge from one node to another.

99

100

Parameters:

101

start_key: str | list[str] - Source node name (or START), or list of source nodes for waiting edges

102

end_key: str - Destination node name (or END)

103

104

Returns:

105

Self (for method chaining)

106

"""

107

```

108

109

```python { .api }

110

def add_conditional_edges(

111

self,

112

source,

113

path,

114

path_map=None

115

):

116

"""

117

Add conditional routing from a source node.

118

119

Parameters:

120

source: str - Source node name

121

path: Callable[[StateT], str | list[str] | Send | list[Send]]

122

Function that determines next node(s) based on state.

123

Can return:

124

- str: Single next node name

125

- list[str]: Multiple next nodes (parallel execution)

126

- Send: Message to specific node with custom state

127

- list[Send]: Multiple Send messages

128

path_map: Optional[dict[str, str]]

129

Mapping from path return values to node names.

130

If None, path must return actual node names.

131

132

Returns:

133

Self (for method chaining)

134

"""

135

```

136

137

```python { .api }

138

def add_sequence(self, nodes):

139

"""

140

Add a sequence of nodes with edges connecting them in order.

141

142

Parameters:

143

nodes: Sequence[str | tuple[str, Callable]]

144

List of node names or (name, action) tuples.

145

Creates nodes and edges in sequence.

146

147

Returns:

148

Self (for method chaining)

149

"""

150

```

151

152

#### Setting Entry and Exit Points

153

154

```python { .api }

155

def set_entry_point(self, key):

156

"""

157

Set the first node to execute when the graph starts.

158

Equivalent to add_edge(START, key).

159

160

Parameters:

161

key: str - Node name to start execution

162

163

Returns:

164

Self (for method chaining)

165

"""

166

```

167

168

```python { .api }

169

def set_conditional_entry_point(self, path, path_map=None):

170

"""

171

Set conditional entry point based on initial state.

172

Equivalent to add_conditional_edges(START, path, path_map).

173

174

Parameters:

175

path: Callable[[StateT], str | list[str] | Send | list[Send]]

176

Function determining first node(s) to execute

177

path_map: Optional[dict[str, str]]

178

Mapping from path return values to node names

179

180

Returns:

181

Self (for method chaining)

182

"""

183

```

184

185

```python { .api }

186

def set_finish_point(self, key):

187

"""

188

Mark a node as a finish point.

189

Equivalent to add_edge(key, END).

190

191

Parameters:

192

key: str - Node name that should end execution

193

194

Returns:

195

Self (for method chaining)

196

"""

197

```

198

199

#### Graph Validation and Compilation

200

201

```python { .api }

202

def validate(self, interrupt=None):

203

"""

204

Validate the graph structure.

205

206

Parameters:

207

interrupt: Optional list of node names where interrupts can occur

208

209

Raises:

210

ValueError: If graph structure is invalid (e.g., unreachable nodes)

211

212

Returns:

213

Self (for method chaining)

214

"""

215

```

216

217

#### Graph Compilation

218

219

```python { .api }

220

def compile(

221

self,

222

checkpointer=None,

223

*,

224

cache=None,

225

store=None,

226

interrupt_before=None,

227

interrupt_after=None,

228

debug=False,

229

name=None

230

):

231

"""

232

Compile the graph into an executable CompiledStateGraph.

233

234

Parameters:

235

checkpointer: Optional[BaseCheckpointSaver]

236

Checkpointer for persisting state. Required for:

237

- Human-in-the-loop workflows

238

- State history

239

- Resuming from failures

240

cache: Optional cache backend for node results

241

store: Optional[BaseStore]

242

Store for persistent cross-thread memory

243

interrupt_before: Optional[list[str] | Literal["*"]]

244

Nodes to interrupt before executing.

245

Use "*" to interrupt before all nodes.

246

interrupt_after: Optional[list[str] | Literal["*"]]

247

Nodes to interrupt after executing.

248

Use "*" to interrupt after all nodes.

249

debug: bool, default False

250

Enable debug mode with additional logging

251

name: Optional[str]

252

Name for the compiled graph

253

254

Returns:

255

CompiledStateGraph - Executable graph that implements Runnable interface

256

"""

257

```

258

259

### CompiledStateGraph Class

260

261

Compiled and executable version of StateGraph. Extends Pregel and implements the LangChain Runnable interface for execution, streaming, and state management.

262

263

```python { .api }

264

class CompiledStateGraph:

265

"""

266

Compiled version of StateGraph, ready for execution.

267

Inherits all methods from Pregel including invoke, stream, get_state, etc.

268

"""

269

```

270

271

#### Execution Methods

272

273

```python { .api }

274

def invoke(

275

self,

276

input: InputT | Command | None,

277

config: RunnableConfig | None = None,

278

*,

279

context: ContextT | None = None,

280

stream_mode: StreamMode = "values",

281

print_mode: StreamMode | Sequence[StreamMode] = (),

282

output_keys: str | Sequence[str] | None = None,

283

interrupt_before: All | Sequence[str] | None = None,

284

interrupt_after: All | Sequence[str] | None = None,

285

durability: Durability | None = None,

286

**kwargs: Any

287

) -> dict[str, Any] | Any:

288

"""

289

Execute the graph synchronously and return the final output.

290

291

Parameters:

292

input: InputT | Command | None

293

Input to the graph. Can be:

294

- State input matching input_schema

295

- Command object to control execution (e.g., resume from interrupt)

296

- None to resume from last checkpoint

297

config: RunnableConfig | None

298

Configuration for the run. Must include thread_id in configurable

299

if using checkpointing.

300

context: ContextT | None

301

Static run-scoped context available to all nodes via get_runtime().

302

Added in v0.6.0.

303

stream_mode: StreamMode, default "values"

304

Internal streaming mode used during execution.

305

print_mode: StreamMode | Sequence[StreamMode], default ()

306

Stream mode(s) to print to stdout during execution.

307

Use for debugging to see intermediate steps.

308

output_keys: str | Sequence[str] | None

309

Specific state keys to include in output. If None, returns all keys.

310

interrupt_before: All | Sequence[str] | None

311

Override graph's interrupt_before for this run.

312

Use "*" to interrupt before all nodes.

313

interrupt_after: All | Sequence[str] | None

314

Override graph's interrupt_after for this run.

315

Use "*" to interrupt after all nodes.

316

durability: Durability | None

317

Persistence mode: "sync", "async", or "exit".

318

Default is "async" (persist asynchronously during execution).

319

320

Returns:

321

dict[str, Any] | Any - Final graph output according to output_schema

322

323

Raises:

324

GraphRecursionError: If max steps exceeded

325

GraphInterrupt: If execution is interrupted

326

327

Example:

328

app = graph.compile(checkpointer=MemorySaver())

329

config = {"configurable": {"thread_id": "1"}}

330

result = app.invoke({"messages": []}, config)

331

"""

332

```

333

334

```python { .api }

335

def stream(

336

self,

337

input: InputT | Command | None,

338

config: RunnableConfig | None = None,

339

*,

340

context: ContextT | None = None,

341

stream_mode: StreamMode | Sequence[StreamMode] | None = None,

342

print_mode: StreamMode | Sequence[StreamMode] = (),

343

output_keys: str | Sequence[str] | None = None,

344

interrupt_before: All | Sequence[str] | None = None,

345

interrupt_after: All | Sequence[str] | None = None,

346

durability: Durability | None = None,

347

subgraphs: bool = False,

348

**kwargs: Any

349

) -> Iterator[dict[str, Any] | Any]:

350

"""

351

Stream graph execution, yielding outputs as they are produced.

352

353

Parameters:

354

input: InputT | Command | None

355

Input to the graph (same as invoke)

356

config: RunnableConfig | None

357

Configuration for the run

358

context: ContextT | None

359

Static run-scoped context

360

stream_mode: StreamMode | Sequence[StreamMode] | None

361

How to emit outputs. Options:

362

- "values": Full state after each step (default)

363

- "updates": Node updates as {node_name: update}

364

- "custom": Custom data written via StreamWriter

365

- "messages": LLM messages token-by-token

366

- "checkpoints": Checkpoint creation events

367

- "tasks": Task start/finish events

368

- "debug": Checkpoints and tasks combined

369

Can pass list for multiple modes: ["values", "updates"]

370

output_keys: str | Sequence[str] | None

371

Specific state keys to include in output

372

interrupt_before: All | Sequence[str] | None

373

Override interrupt_before for this run

374

interrupt_after: All | Sequence[str] | None

375

Override interrupt_after for this run

376

durability: Durability | None

377

Persistence mode

378

subgraphs: bool, default False

379

Whether to stream subgraph execution events

380

381

Yields:

382

dict[str, Any] | Any - Output chunks according to stream_mode:

383

- "values": {"node_name": state_values}

384

- "updates": {"node_name": update_dict}

385

- "custom": Custom data from StreamWriter

386

- Multiple modes: Tuples of (mode, chunk)

387

388

Example:

389

for chunk in app.stream({"value": 1}, stream_mode=["values", "updates"]):

390

print(chunk)

391

# ("values", {"value": 2})

392

# ("updates", {"node1": {"value": 2}})

393

"""

394

```

395

396

```python { .api }

397

def get_state_history(

398

self,

399

config: RunnableConfig,

400

*,

401

filter: dict[str, Any] | None = None,

402

before: RunnableConfig | None = None,

403

limit: int | None = None

404

) -> Iterator[StateSnapshot]:

405

"""

406

Get historical state snapshots for a thread.

407

408

Requires a checkpointer to be configured on the graph.

409

410

Parameters:

411

config: RunnableConfig

412

Must include thread_id in configurable to identify the thread

413

filter: dict[str, Any] | None

414

Filter checkpoints by metadata fields.

415

Example: {"source": "update", "step": 3}

416

before: RunnableConfig | None

417

Get states before this checkpoint config (for pagination)

418

limit: int | None

419

Maximum number of states to return

420

421

Yields:

422

StateSnapshot - Historical states in reverse chronological order (newest first).

423

Each snapshot contains:

424

- values: State values at that point

425

- next: Tuple of next node(s) to execute

426

- config: Config for this checkpoint

427

- metadata: Checkpoint metadata

428

- created_at: Timestamp

429

- parent_config: Parent checkpoint config

430

- tasks: Pending tasks at this point

431

432

Example:

433

config = {"configurable": {"thread_id": "1"}}

434

for state in app.get_state_history(config, limit=5):

435

print(f"Step: {state.metadata['step']}, Values: {state.values}")

436

"""

437

```

438

439

#### State Management Methods

440

441

```python { .api }

442

def get_state(

443

self,

444

config: RunnableConfig,

445

*,

446

subgraphs: bool = False

447

) -> StateSnapshot:

448

"""

449

Get the current state of the graph for a specific thread.

450

451

Requires a checkpointer to be configured on the graph.

452

453

Parameters:

454

config: RunnableConfig

455

Must include thread_id in configurable to identify the thread

456

subgraphs: bool, default False

457

Whether to include subgraph state in the snapshot

458

459

Returns:

460

StateSnapshot - Current state containing:

461

- values: Current state values

462

- next: Tuple of next node(s) to execute

463

- config: Config for current checkpoint

464

- metadata: Checkpoint metadata

465

- created_at: Timestamp

466

- parent_config: Parent checkpoint config

467

- tasks: Pending tasks

468

- interrupts: Any interrupt information

469

470

Example:

471

config = {"configurable": {"thread_id": "1"}}

472

state = app.get_state(config)

473

print(f"Current values: {state.values}")

474

print(f"Next nodes: {state.next}")

475

"""

476

477

def update_state(

478

self,

479

config: RunnableConfig,

480

values: dict[str, Any] | Any | None,

481

as_node: str | None = None,

482

task_id: str | None = None

483

) -> RunnableConfig:

484

"""

485

Update the state of the graph programmatically.

486

487

Requires a checkpointer to be configured on the graph. This is critical for

488

human-in-the-loop workflows where you need to modify state between steps.

489

490

Parameters:

491

config: RunnableConfig

492

Must include thread_id in configurable to identify the thread

493

values: dict[str, Any] | Any | None

494

State updates to apply. If dict, merged with current state.

495

If matches output_schema type, replaces state entirely.

496

Can be None to only update checkpoint without changing state.

497

as_node: str | None, default None

498

Apply update as if it came from this node. Affects which

499

edges are traversed next. If None, update is applied as external input.

500

task_id: str | None, default None

501

ID of specific task to update. Used with Send API for dynamic fanouts.

502

503

Returns:

504

RunnableConfig - Updated config with new checkpoint_id

505

506

Example:

507

# Update state and continue execution

508

config = {"configurable": {"thread_id": "1"}}

509

app.invoke({"value": 1}, config) # Graph pauses at interrupt

510

511

# Modify state externally

512

new_config = app.update_state(config, {"value": 100})

513

514

# Resume with modified state

515

result = app.invoke(None, new_config)

516

"""

517

```

518

519

#### Async Methods

520

521

```python { .api }

522

async def ainvoke(

523

self,

524

input: InputT | Command | None,

525

config: RunnableConfig | None = None,

526

*,

527

context: ContextT | None = None,

528

stream_mode: StreamMode = "values",

529

print_mode: StreamMode | Sequence[StreamMode] = (),

530

output_keys: str | Sequence[str] | None = None,

531

interrupt_before: All | Sequence[str] | None = None,

532

interrupt_after: All | Sequence[str] | None = None,

533

durability: Durability | None = None,

534

**kwargs: Any

535

) -> dict[str, Any] | Any:

536

"""

537

Async version of invoke(). Runs the graph asynchronously until completion.

538

539

All parameters and return values same as invoke(). Use for async/await workflows.

540

541

Example:

542

app = graph.compile(checkpointer=MemorySaver())

543

result = await app.ainvoke({"messages": []}, config)

544

"""

545

546

async def astream(

547

self,

548

input: InputT | Command | None,

549

config: RunnableConfig | None = None,

550

*,

551

context: ContextT | None = None,

552

stream_mode: StreamMode | Sequence[StreamMode] | None = None,

553

print_mode: StreamMode | Sequence[StreamMode] = (),

554

output_keys: str | Sequence[str] | None = None,

555

interrupt_before: All | Sequence[str] | None = None,

556

interrupt_after: All | Sequence[str] | None = None,

557

durability: Durability | None = None,

558

subgraphs: bool = False,

559

**kwargs: Any

560

) -> AsyncIterator[dict[str, Any] | Any]:

561

"""

562

Async version of stream(). Yields outputs asynchronously as they are produced.

563

564

All parameters and return values same as stream(). Use for async/await workflows.

565

566

Example:

567

async for chunk in app.astream({"value": 1}, stream_mode="updates"):

568

print(chunk)

569

"""

570

571

async def aget_state(

572

self,

573

config: RunnableConfig,

574

*,

575

subgraphs: bool = False

576

) -> StateSnapshot:

577

"""

578

Async version of get_state(). Get current state asynchronously.

579

580

All parameters and return values same as get_state().

581

"""

582

583

async def aget_state_history(

584

self,

585

config: RunnableConfig,

586

*,

587

filter: dict[str, Any] | None = None,

588

before: RunnableConfig | None = None,

589

limit: int | None = None

590

) -> AsyncIterator[StateSnapshot]:

591

"""

592

Async version of get_state_history(). Get historical states asynchronously.

593

594

All parameters and return values same as get_state_history().

595

596

Example:

597

config = {"configurable": {"thread_id": "1"}}

598

async for state in app.aget_state_history(config, limit=5):

599

print(f"Step: {state.metadata['step']}")

600

"""

601

602

async def aupdate_state(

603

self,

604

config: RunnableConfig,

605

values: dict[str, Any] | Any | None,

606

as_node: str | None = None,

607

task_id: str | None = None

608

) -> RunnableConfig:

609

"""

610

Async version of update_state(). Update state asynchronously.

611

612

All parameters and return values same as update_state().

613

"""

614

```

615

616

#### Graph Introspection and Utilities

617

618

```python { .api }

619

def get_graph(

620

self,

621

config: RunnableConfig | None = None,

622

*,

623

xray: int | bool = False

624

) -> Graph:

625

"""

626

Get a drawable representation of the computation graph.

627

628

Parameters:

629

config: RunnableConfig | None

630

Optional config for graph generation

631

xray: int | bool, default False

632

If True, includes internal subgraph details.

633

If int, specifies recursion depth for nested subgraph expansion.

634

635

Returns:

636

Graph - Drawable graph object that can be rendered/visualized

637

638

Example:

639

app = graph.compile()

640

graph_viz = app.get_graph()

641

# Can be rendered in Jupyter or saved to file

642

"""

643

644

async def aget_graph(

645

self,

646

config: RunnableConfig | None = None,

647

*,

648

xray: int | bool = False

649

) -> Graph:

650

"""

651

Async version of get_graph(). Get graph representation asynchronously.

652

653

All parameters and return values same as get_graph().

654

"""

655

656

def clear_cache(self, nodes: Sequence[str] | None = None) -> None:

657

"""

658

Clear the cache for specified nodes or all nodes.

659

660

Parameters:

661

nodes: Sequence[str] | None

662

Node names to clear cache for. If None, clears all node caches.

663

664

Example:

665

app.clear_cache(["expensive_node"]) # Clear specific node

666

app.clear_cache() # Clear all caches

667

"""

668

669

async def aclear_cache(self, nodes: Sequence[str] | None = None) -> None:

670

"""

671

Async version of clear_cache(). Clear cache asynchronously.

672

673

All parameters same as clear_cache().

674

"""

675

676

def get_subgraphs(

677

self,

678

*,

679

namespace: str | None = None,

680

recurse: bool = False

681

) -> Iterator[tuple[str, PregelProtocol]]:

682

"""

683

Get the subgraphs of the graph.

684

685

Useful for working with nested graphs and inspecting the graph hierarchy.

686

687

Parameters:

688

namespace: str | None

689

The namespace to filter the subgraphs by. If provided, only

690

returns subgraphs matching this namespace.

691

recurse: bool, default False

692

Whether to recurse into subgraphs. If False, only returns

693

immediate subgraphs.

694

695

Yields:

696

tuple[str, PregelProtocol] - (namespace, subgraph) pairs

697

698

Example:

699

for namespace, subgraph in app.get_subgraphs():

700

print(f"Subgraph: {namespace}")

701

"""

702

703

async def aget_subgraphs(

704

self,

705

*,

706

namespace: str | None = None,

707

recurse: bool = False

708

) -> AsyncIterator[tuple[str, PregelProtocol]]:

709

"""

710

Async version of get_subgraphs(). Get subgraphs asynchronously.

711

712

All parameters and return values same as get_subgraphs().

713

"""

714

715

def bulk_update_state(

716

self,

717

config: RunnableConfig,

718

supersteps: Sequence[Sequence[StateUpdate]]

719

) -> RunnableConfig:

720

"""

721

Apply updates to the graph state in bulk.

722

723

Requires a checkpointer to be set. Useful for batch operations where

724

you need to apply multiple sequential updates efficiently.

725

726

Parameters:

727

config: RunnableConfig

728

Must include thread_id in configurable to identify the thread

729

supersteps: Sequence[Sequence[StateUpdate]]

730

List of supersteps, each containing a list of updates to

731

apply sequentially. Each StateUpdate is a tuple of

732

(values, as_node, task_id) where task_id is optional.

733

734

Returns:

735

RunnableConfig - The updated config

736

737

Raises:

738

ValueError: If no checkpointer is set or no updates are provided

739

InvalidUpdateError: If an invalid update is provided

740

741

Example:

742

from langgraph.types import StateUpdate

743

744

supersteps = [

745

[StateUpdate({"value": 1}, "node1", None)],

746

[StateUpdate({"value": 2}, "node2", None)]

747

]

748

config = app.bulk_update_state(config, supersteps)

749

"""

750

751

async def abulk_update_state(

752

self,

753

config: RunnableConfig,

754

supersteps: Sequence[Sequence[StateUpdate]]

755

) -> RunnableConfig:

756

"""

757

Async version of bulk_update_state(). Apply bulk updates asynchronously.

758

759

All parameters and return values same as bulk_update_state().

760

"""

761

762

def validate(self) -> Self:

763

"""

764

Validate the compiled graph structure.

765

766

Checks for common errors like disconnected nodes, invalid edges,

767

and other structural issues.

768

769

Returns:

770

Self - Returns self for method chaining

771

772

Example:

773

app = graph.compile().validate()

774

"""

775

776

def copy(self, update: dict[str, Any] | None = None) -> Self:

777

"""

778

Create a copy of the graph with optional updates.

779

780

Parameters:

781

update: dict[str, Any] | None

782

Optional dictionary of attributes to update in the copy

783

784

Returns:

785

Self - A new instance with updated attributes

786

787

Example:

788

new_app = app.copy({"debug": True})

789

"""

790

791

def with_config(self, config: RunnableConfig | None = None, **kwargs: Any) -> Self:

792

"""

793

Create a copy of the graph with an updated config.

794

795

Binds a config to the graph, useful for setting default configuration

796

values that will be merged with runtime configs.

797

798

Parameters:

799

config: RunnableConfig | None

800

Config to merge with the graph's existing config

801

**kwargs: Any

802

Additional config values to merge

803

804

Returns:

805

Self - A new instance with bound config

806

807

Example:

808

app_with_config = app.with_config(

809

{"configurable": {"thread_id": "default"}}

810

)

811

"""

812

```

813

814

#### Schema and Introspection Methods

815

816

```python { .api }

817

def get_input_schema(self, config: RunnableConfig | None = None) -> type[BaseModel]:

818

"""

819

Get the Pydantic BaseModel schema for graph input.

820

821

Parameters:

822

config: RunnableConfig | None

823

Optional config for schema generation

824

825

Returns:

826

type[BaseModel] - Pydantic model class describing valid input

827

828

Example:

829

InputSchema = app.get_input_schema()

830

print(InputSchema.model_json_schema())

831

"""

832

833

def get_output_schema(self, config: RunnableConfig | None = None) -> type[BaseModel]:

834

"""

835

Get the Pydantic BaseModel schema for graph output.

836

837

Parameters:

838

config: RunnableConfig | None

839

Optional config for schema generation

840

841

Returns:

842

type[BaseModel] - Pydantic model class describing output structure

843

844

Example:

845

OutputSchema = app.get_output_schema()

846

print(OutputSchema.model_json_schema())

847

"""

848

849

def get_context_jsonschema(self) -> dict[str, Any] | None:

850

"""

851

Get JSON schema for the context (if context_schema is defined).

852

853

Returns:

854

dict[str, Any] | None - JSON schema for context, or None if no

855

context schema was specified

856

857

Example:

858

if context_schema := app.get_context_jsonschema():

859

print(f"Context schema: {context_schema}")

860

"""

861

862

def get_input_jsonschema(self, config: RunnableConfig | None = None) -> dict[str, Any]:

863

"""

864

Get JSON schema for graph input.

865

866

Parameters:

867

config: RunnableConfig | None

868

Optional config for schema generation

869

870

Returns:

871

dict[str, Any] - JSON schema describing valid input structure

872

873

Example:

874

schema = app.get_input_jsonschema()

875

print(schema)

876

"""

877

878

def get_output_jsonschema(self, config: RunnableConfig | None = None) -> dict[str, Any]:

879

"""

880

Get JSON schema for graph output.

881

882

Parameters:

883

config: RunnableConfig | None

884

Optional config for schema generation

885

886

Returns:

887

dict[str, Any] - JSON schema describing output structure

888

889

Example:

890

schema = app.get_output_jsonschema()

891

print(schema)

892

"""

893

894

def attach_node(self, key, node):

895

"""

896

Internal: Attach a node to the compiled graph.

897

898

Parameters:

899

key: str - Node identifier

900

node: Runnable - Node implementation

901

"""

902

903

def attach_edge(self, starts, end):

904

"""

905

Internal: Attach an edge to the compiled graph.

906

907

Parameters:

908

starts: str | list[str] - Source node(s)

909

end: str - Destination node

910

"""

911

912

def attach_branch(self, start, name, branch, *, with_reader=True):

913

"""

914

Internal: Attach a conditional branch to the compiled graph.

915

916

Parameters:

917

start: str - Source node

918

name: str - Branch name

919

branch: Branch implementation

920

with_reader: bool, default True - Whether to attach state reader

921

"""

922

```

923

924

### Special Constants

925

926

```python { .api }

927

START: str

928

"""

929

The first (virtual) node in the graph.

930

Value: "__start__"

931

932

Use as source in add_edge to create entry point:

933

graph.add_edge(START, "first_node")

934

"""

935

936

END: str

937

"""

938

The last (virtual) node in the graph.

939

Value: "__end__"

940

941

Use as destination in add_edge to mark exit point:

942

graph.add_edge("last_node", END)

943

"""

944

945

TAG_NOSTREAM: str

946

"""

947

Tag to disable streaming for a chat model.

948

949

When added to a node's metadata or tags, prevents streaming output

950

from chat models within that node.

951

952

Usage:

953

graph.add_node(

954

"chat",

955

chatbot_node,

956

metadata={"tags": [TAG_NOSTREAM]}

957

)

958

"""

959

960

TAG_HIDDEN: str

961

"""

962

Tag to hide a node/edge from certain tracing/streaming environments.

963

964

When applied, excludes the node or edge from LangSmith tracing and

965

certain streaming outputs.

966

967

Usage:

968

graph.add_node(

969

"internal",

970

internal_node,

971

metadata={"tags": [TAG_HIDDEN]}

972

)

973

"""

974

```

975

976

## Usage Examples

977

978

### Basic State Graph

979

980

```python

981

from typing import TypedDict

982

from langgraph.graph import StateGraph, START, END

983

984

class State(TypedDict):

985

messages: list[str]

986

counter: int

987

988

def process(state: State) -> dict:

989

return {

990

"messages": state["messages"] + ["processed"],

991

"counter": state["counter"] + 1

992

}

993

994

graph = StateGraph(State)

995

graph.add_node("process", process)

996

graph.add_edge(START, "process")

997

graph.add_edge("process", END)

998

999

app = graph.compile()

1000

result = app.invoke({"messages": [], "counter": 0})

1001

```

1002

1003

### Conditional Routing

1004

1005

```python

1006

from typing import TypedDict, Literal

1007

from langgraph.graph import StateGraph, START, END

1008

1009

class State(TypedDict):

1010

value: int

1011

result: str

1012

1013

def check_value(state: State) -> Literal["high", "low"]:

1014

return "high" if state["value"] > 10 else "low"

1015

1016

def process_high(state: State) -> dict:

1017

return {"result": "high value"}

1018

1019

def process_low(state: State) -> dict:

1020

return {"result": "low value"}

1021

1022

graph = StateGraph(State)

1023

graph.add_node("high", process_high)

1024

graph.add_node("low", process_low)

1025

1026

# Conditional routing from START

1027

graph.add_conditional_edges(

1028

START,

1029

check_value,

1030

{"high": "high", "low": "low"}

1031

)

1032

1033

graph.add_edge("high", END)

1034

graph.add_edge("low", END)

1035

1036

app = graph.compile()

1037

```

1038

1039

### State with Reducers

1040

1041

```python

1042

from typing import TypedDict, Annotated

1043

from operator import add

1044

from langgraph.graph import StateGraph, START, END

1045

1046

class State(TypedDict):

1047

# Using reducer to accumulate values

1048

total: Annotated[int, add]

1049

items: list[str]

1050

1051

def add_item(state: State) -> dict:

1052

return {

1053

"total": 5, # Will be added to existing total

1054

"items": state["items"] + ["new"]

1055

}

1056

1057

graph = StateGraph(State)

1058

graph.add_node("add", add_item)

1059

graph.add_edge(START, "add")

1060

graph.add_edge("add", END)

1061

1062

app = graph.compile()

1063

result = app.invoke({"total": 10, "items": []})

1064

# result["total"] == 15 (10 + 5 via reducer)

1065

# result["items"] == ["new"]

1066

```

1067

1068

### Parallel Execution with Send

1069

1070

```python

1071

from typing import TypedDict

1072

from langgraph.graph import StateGraph, START, END

1073

from langgraph.types import Send

1074

1075

class State(TypedDict):

1076

items: list[int]

1077

results: list[int]

1078

1079

def fan_out(state: State) -> list[Send]:

1080

# Send each item to process_item in parallel

1081

return [Send("process_item", {"value": item}) for item in state["items"]]

1082

1083

def process_item(state: dict) -> dict:

1084

return {"results": [state["value"] * 2]}

1085

1086

def aggregate(state: State) -> dict:

1087

return {"results": state["results"]}

1088

1089

graph = StateGraph(State)

1090

graph.add_node("process_item", process_item)

1091

graph.add_node("aggregate", aggregate)

1092

1093

graph.add_conditional_edges(START, fan_out)

1094

graph.add_edge("process_item", "aggregate")

1095

graph.add_edge("aggregate", END)

1096

1097

app = graph.compile()

1098

result = app.invoke({"items": [1, 2, 3], "results": []})

1099

# result["results"] == [2, 4, 6]

1100

```

1101

1102

### With Checkpointing and Interrupts

1103

1104

```python

1105

from typing import TypedDict

1106

from langgraph.graph import StateGraph, START, END

1107

from langgraph.checkpoint.memory import MemorySaver

1108

1109

class State(TypedDict):

1110

value: int

1111

approved: bool

1112

1113

def process(state: State) -> dict:

1114

return {"value": state["value"] + 1}

1115

1116

def needs_approval(state: State) -> dict:

1117

return {"approved": False}

1118

1119

# Create checkpointer for persistence

1120

checkpointer = MemorySaver()

1121

1122

graph = StateGraph(State)

1123

graph.add_node("process", process)

1124

graph.add_node("approval", needs_approval)

1125

1126

graph.add_edge(START, "process")

1127

graph.add_edge("process", "approval")

1128

graph.add_edge("approval", END)

1129

1130

# Compile with interruption before approval node

1131

app = graph.compile(

1132

checkpointer=checkpointer,

1133

interrupt_before=["approval"]

1134

)

1135

1136

# First execution - stops at approval

1137

config = {"configurable": {"thread_id": "1"}}

1138

result = app.invoke({"value": 0, "approved": True}, config)

1139

# Execution interrupted before "approval" node

1140

1141

# Get state to inspect

1142

state = app.get_state(config)

1143

# state.next == ("approval",)

1144

1145

# Resume execution

1146

result = app.invoke(None, config)

1147

# Continues from approval node

1148

```

1149