or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

index.mddocs/

0

# LangGraph

1

2

LangGraph is a library for building stateful, multi-actor applications with Large Language Models (LLMs). It provides low-level orchestration infrastructure for creating durable, long-running workflows with built-in support for persistence, human-in-the-loop interactions, and complex control flow. LangGraph enables developers to build sophisticated agent systems that can handle failures, maintain memory across sessions, and coordinate multiple actors through a flexible graph-based architecture.

3

4

## Package Information

5

6

- **Package Name**: langgraph

7

- **Package Type**: Library

8

- **Language**: Python

9

- **Installation**: `pip install langgraph`

10

- **Version**: 1.0.1

11

12

## Core Imports

13

14

```python

15

from langgraph.graph import StateGraph, START, END

16

```

17

18

Import tags for controlling behavior:

19

20

```python

21

from langgraph.constants import TAG_NOSTREAM, TAG_HIDDEN

22

```

23

24

Common imports for working with messages:

25

26

```python

27

from langgraph.graph import MessagesState, add_messages

28

```

29

30

For functional API:

31

32

```python

33

from langgraph.func import entrypoint, task

34

```

35

36

For low-level graph execution:

37

38

```python

39

from langgraph.pregel import Pregel

40

```

41

42

## Basic Usage

43

44

```python

45

from typing import TypedDict, Annotated

46

from langgraph.graph import StateGraph, START, END

47

48

# Define the state schema

49

class State(TypedDict):

50

messages: list[str]

51

counter: int

52

53

# Create a graph

54

graph = StateGraph(State)

55

56

# Define node functions

57

def process_message(state: State) -> State:

58

"""Process messages and increment counter."""

59

return {

60

"messages": state["messages"] + ["processed"],

61

"counter": state["counter"] + 1

62

}

63

64

def check_complete(state: State) -> str:

65

"""Route based on counter."""

66

if state["counter"] >= 3:

67

return END

68

return "process"

69

70

# Add nodes

71

graph.add_node("process", process_message)

72

73

# Add edges

74

graph.add_edge(START, "process")

75

graph.add_conditional_edges("process", check_complete)

76

77

# Compile the graph

78

app = graph.compile()

79

80

# Run the graph

81

result = app.invoke({

82

"messages": ["hello"],

83

"counter": 0

84

})

85

86

print(result) # Final state after execution

87

```

88

89

## Architecture

90

91

LangGraph is built around several key components that work together to enable stateful, multi-actor workflows:

92

93

- **StateGraph**: High-level API for building graphs where nodes communicate through shared state. Nodes read from and write to state channels, enabling coordination between multiple actors.

94

95

- **Pregel**: Low-level execution engine inspired by Google's Pregel system. Handles graph traversal, state management, checkpointing, and parallelization of node execution.

96

97

- **Channels**: State management primitives that control how state updates are applied. Different channel types support various patterns like last-value, aggregation, barriers, and topics.

98

99

- **Checkpointing**: Persistent state storage that enables durable execution. Graphs can be paused, resumed, and replayed from any checkpoint, supporting long-running workflows and human-in-the-loop patterns.

100

101

- **Functional API**: Decorator-based API using `@entrypoint` and `@task` that provides an alternative to explicit graph construction, enabling more natural Python code with automatic parallelization.

102

103

- **Runtime Context**: Thread-local context that provides access to configuration, stores, stream writers, and other runtime utilities from within node functions.

104

105

This architecture enables LangGraph to support complex agent workflows with features like conditional routing, parallel execution, state persistence, error recovery, and human intervention at any point in execution.

106

107

## Capabilities

108

109

### Graph Construction

110

111

Build stateful graphs using StateGraph where nodes communicate through shared state. Supports conditional routing, parallel execution, and complex workflows.

112

113

```python { .api }

114

class StateGraph:

115

def __init__(

116

self,

117

state_schema,

118

context_schema=None,

119

*,

120

input_schema=None,

121

output_schema=None

122

): ...

123

124

def add_node(

125

self,

126

node,

127

action=None,

128

*,

129

defer=False,

130

metadata=None,

131

input_schema=None,

132

retry_policy=None,

133

cache_policy=None,

134

destinations=None

135

): ...

136

137

def add_edge(self, start_key, end_key): ...

138

139

def add_conditional_edges(self, source, path, path_map=None): ...

140

141

def compile(

142

self,

143

checkpointer=None,

144

*,

145

cache=None,

146

store=None,

147

interrupt_before=None,

148

interrupt_after=None,

149

debug=False,

150

name=None

151

): ...

152

```

153

154

```python { .api }

155

# Special node identifiers

156

START: str # The first (virtual) node - "__start__"

157

END: str # The last (virtual) node - "__end__"

158

159

# Tags for controlling behavior

160

TAG_NOSTREAM: str # Tag to disable streaming for a chat model

161

TAG_HIDDEN: str # Tag to hide node/edge from tracing/streaming

162

```

163

164

[State Graph API](./state-graph.md)

165

166

### Functional API

167

168

Define workflows using Python decorators for a more natural programming style with automatic parallelization of tasks.

169

170

```python { .api }

171

def task(

172

func=None,

173

*,

174

name=None,

175

retry_policy=None,

176

cache_policy=None

177

): ...

178

179

class entrypoint:

180

def __init__(

181

self,

182

checkpointer=None,

183

store=None,

184

cache=None,

185

context_schema=None,

186

cache_policy=None,

187

retry_policy=None

188

): ...

189

190

def __call__(self, func): ...

191

```

192

193

[Functional API](./functional-api.md)

194

195

### State Management Channels

196

197

Channel primitives for managing how state updates are applied, supporting patterns like last-value, aggregation, barriers, and pub/sub.

198

199

```python { .api }

200

class LastValue: ...

201

class AnyValue: ...

202

class EphemeralValue: ...

203

class UntrackedValue: ...

204

class BinaryOperatorAggregate:

205

def __init__(self, typ, operator): ...

206

class Topic: ...

207

class NamedBarrierValue: ...

208

```

209

210

[Channels](./channels.md)

211

212

### Graph Execution Engine

213

214

Low-level Pregel execution engine providing fine-grained control over graph execution, streaming, and state management.

215

216

```python { .api }

217

class Pregel:

218

def invoke(self, input, config=None, **kwargs): ...

219

def stream(

220

self,

221

input,

222

config=None,

223

*,

224

stream_mode=None,

225

output_keys=None,

226

interrupt_before=None,

227

interrupt_after=None,

228

debug=None,

229

subgraphs=False

230

): ...

231

def get_state(self, config, *, subgraphs=False): ...

232

def update_state(self, config, values, as_node=None): ...

233

def get_state_history(

234

self,

235

config,

236

*,

237

filter=None,

238

before=None,

239

limit=None

240

): ...

241

```

242

243

[Pregel Engine](./pregel.md)

244

245

### Types and Primitives

246

247

Core types for control flow, state snapshots, task management, and retry/cache policies.

248

249

```python { .api }

250

class Send:

251

"""Send message to specific node."""

252

def __init__(self, node: str, arg: Any): ...

253

254

class Command:

255

"""Command for state updates and navigation."""

256

graph: str | None = None

257

update: Any | None = None

258

resume: dict[str, Any] | Any | None = None

259

goto: Send | Sequence[Send] = ()

260

261

def interrupt(value: Any) -> Any:

262

"""Interrupt graph execution with resumable value."""

263

...

264

265

class RetryPolicy:

266

initial_interval: float = 0.5

267

backoff_factor: float = 2.0

268

max_interval: float = 128.0

269

max_attempts: int = 3

270

jitter: bool = True

271

retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]

272

273

class StateSnapshot:

274

values: dict[str, Any] | Any

275

next: tuple[str, ...]

276

config: RunnableConfig

277

metadata: CheckpointMetadata | None

278

created_at: str | None

279

parent_config: RunnableConfig | None

280

tasks: tuple[PregelTask, ...]

281

interrupts: tuple[Interrupt, ...]

282

```

283

284

[Types and Primitives](./types-primitives.md)

285

286

### Managed Values

287

288

Built-in managed state values that provide execution context information within nodes and tasks.

289

290

```python { .api }

291

IsLastStep: Annotated[bool, IsLastStepManager]

292

"""

293

Managed value that returns True when execution is on the last step.

294

295

Include in your state TypedDict to access execution context. The value is automatically

296

populated by the runtime and indicates when the graph is on its final step before

297

hitting the recursion limit.

298

299

Usage:

300

from typing import TypedDict

301

from langgraph.managed import IsLastStep

302

from langgraph.graph import StateGraph

303

304

class State(TypedDict):

305

data: str

306

is_last: IsLastStep # Add as a state field

307

308

def my_node(state: State) -> dict:

309

if state["is_last"]: # Access like any other state field

310

# Perform final cleanup or summary

311

return {"data": "final"}

312

return {"data": "continue"}

313

314

graph = StateGraph(State)

315

graph.add_node("process", my_node)

316

# ... The 'is_last' field will be automatically managed

317

"""

318

319

RemainingSteps: Annotated[int, RemainingStepsManager]

320

"""

321

Managed value that returns the number of remaining execution steps.

322

323

Include in your state TypedDict to track how many more steps can execute before

324

hitting the recursion limit. Useful for progress tracking or conditional logic.

325

326

Usage:

327

from typing import TypedDict

328

from langgraph.managed import RemainingSteps

329

from langgraph.graph import StateGraph

330

331

class State(TypedDict):

332

data: str

333

steps_left: RemainingSteps # Add as a state field

334

335

def my_node(state: State) -> dict:

336

print(f"Steps remaining: {state['steps_left']}")

337

return {"data": "processed"}

338

339

graph = StateGraph(State)

340

graph.add_node("process", my_node)

341

# ... The 'steps_left' field will be automatically managed

342

"""

343

```

344

345

**Note:** Managed values must be declared in the state TypedDict schema. They are automatically populated by the runtime and are read-only (nodes cannot modify them). The graph identifies managed values by their type annotation and handles them specially.

346

347

### Error Handling

348

349

Exception classes for graph errors, interrupts, and validation failures.

350

351

```python { .api }

352

class GraphRecursionError(RecursionError): ...

353

class InvalidUpdateError(Exception): ...

354

class GraphInterrupt(Exception): ...

355

class EmptyChannelError(Exception): ...

356

class EmptyInputError(Exception): ...

357

class TaskNotFound(Exception): ...

358

359

class ErrorCode:

360

GRAPH_RECURSION_LIMIT

361

INVALID_CONCURRENT_GRAPH_UPDATE

362

INVALID_GRAPH_NODE_RETURN_VALUE

363

MULTIPLE_SUBGRAPHS

364

INVALID_CHAT_HISTORY

365

```

366

367

[Error Handling](./errors.md)

368

369

### Configuration and Runtime

370

371

Access configuration, stores, stream writers, and runtime context from within nodes and tasks.

372

373

```python { .api }

374

def get_config() -> RunnableConfig:

375

"""Get current RunnableConfig from context."""

376

...

377

378

def get_store() -> BaseStore:

379

"""Access LangGraph store from inside node or task."""

380

...

381

382

def get_stream_writer() -> StreamWriter:

383

"""Access StreamWriter from inside node or task."""

384

...

385

386

def get_runtime(context_schema=None) -> Runtime:

387

"""Get runtime context for current graph run."""

388

...

389

```

390

391

[Configuration and Runtime](./configuration.md)

392

393

### Message Handling

394

395

Utilities for message-based workflows with built-in message merging and state management.

396

397

```python { .api }

398

def add_messages(

399

left: Messages,

400

right: Messages,

401

*,

402

format: Literal["langchain-openai"] | None = None

403

) -> Messages:

404

"""Merge two lists of messages, updating by ID."""

405

...

406

407

class MessagesState(TypedDict):

408

messages: Annotated[list[AnyMessage], add_messages]

409

410

class MessageGraph(StateGraph):

411

"""Deprecated: Use StateGraph with messages key instead."""

412

...

413

```

414

415

[Message Handling](./message-graph.md)

416