or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

configuration.mddocs/

0

# Configuration and Runtime

1

2

LangGraph provides utilities for accessing configuration, stores, stream writers, and runtime context from within nodes and tasks. These functions enable nodes to interact with the execution environment and access run-scoped resources.

3

4

## Imports

5

6

```python

7

from langgraph.config import get_config, get_store, get_stream_writer

8

from langgraph.runtime import get_runtime, Runtime

9

```

10

11

## Capabilities

12

13

### Configuration Access

14

15

#### get_config Function

16

17

Get the current RunnableConfig from the execution context.

18

19

```python { .api }

20

def get_config() -> RunnableConfig:

21

"""

22

Get current RunnableConfig from context.

23

24

Retrieves the configuration for the current node execution, including

25

thread_id, checkpoint_id, callbacks, and other execution parameters.

26

27

Returns:

28

RunnableConfig - Current configuration with fields:

29

- configurable: dict with thread_id, checkpoint_id, etc.

30

- callbacks: LangChain callbacks

31

- recursion_limit: Maximum execution steps

32

- max_concurrency: Maximum parallel tasks

33

- tags: Execution tags

34

- metadata: Execution metadata

35

36

Raises:

37

RuntimeError: If called outside of graph execution context

38

39

Usage:

40

def my_node(state):

41

config = get_config()

42

thread_id = config["configurable"]["thread_id"]

43

print(f"Running in thread: {thread_id}")

44

return state

45

46

Note:

47

Requires Python >= 3.11 for async contexts

48

"""

49

```

50

51

### Store Access

52

53

#### get_store Function

54

55

Access the LangGraph store from inside a node or task.

56

57

```python { .api }

58

def get_store() -> BaseStore:

59

"""

60

Access LangGraph store from inside node or task.

61

62

The store provides persistent cross-thread memory for storing and

63

retrieving data that needs to be shared across different graph runs

64

or threads.

65

66

Returns:

67

BaseStore - Store instance with methods:

68

- get(namespace, key): Retrieve value

69

- put(namespace, key, value): Store value

70

- delete(namespace, key): Delete value

71

- search(namespace, query): Search values

72

73

Raises:

74

RuntimeError: If called outside of graph execution context

75

76

Note:

77

Returns None if no store is configured on the graph

78

79

Usage:

80

from langgraph.config import get_store

81

82

def my_node(state):

83

store = get_store()

84

85

# Get shared data

86

user_prefs = store.get(("user", state["user_id"]), "preferences")

87

88

# Update shared data

89

store.put(

90

("user", state["user_id"]),

91

"last_seen",

92

datetime.now().isoformat()

93

)

94

95

return state

96

97

Note:

98

Store must be configured when compiling the graph:

99

app = graph.compile(store=my_store)

100

"""

101

```

102

103

### Stream Access

104

105

#### get_stream_writer Function

106

107

Access the StreamWriter from inside a node or task.

108

109

```python { .api }

110

def get_stream_writer() -> StreamWriter:

111

"""

112

Access StreamWriter from inside node or task.

113

114

Returns a callable that writes custom data to the output stream.

115

Only useful when streaming with stream_mode="custom".

116

117

Returns:

118

StreamWriter - Callable[[Any], None] that writes to stream

119

120

Usage:

121

from langgraph.config import get_stream_writer

122

123

def my_node(state):

124

writer = get_stream_writer()

125

126

# Emit progress updates

127

writer({"type": "progress", "percent": 0})

128

129

# Do work...

130

for i in range(10):

131

process_item(i)

132

writer({"type": "progress", "percent": (i + 1) * 10})

133

134

writer({"type": "complete", "result": "done"})

135

136

return state

137

138

# Stream with custom mode

139

for chunk in app.stream(input, stream_mode="custom"):

140

print(chunk) # Progress updates and completion

141

"""

142

```

143

144

### Runtime Context

145

146

#### Runtime Class

147

148

Bundles run-scoped context and runtime utilities.

149

150

```python { .api }

151

class Runtime:

152

"""

153

Bundles run-scoped context and runtime utilities.

154

155

Type Parameters:

156

ContextT: Type of the context object

157

158

Attributes:

159

context: Optional[ContextT]

160

Static context for the run (read-only, thread-local)

161

store: Optional[BaseStore]

162

Store for persistent cross-thread memory

163

stream_writer: StreamWriter

164

Writer for custom stream data (default: no-op)

165

previous: Any

166

Previous return value (functional API only)

167

"""

168

169

context: Any = None

170

store: BaseStore | None = None

171

stream_writer: StreamWriter = None # Default no-op writer

172

previous: Any = None

173

174

def merge(self, other):

175

"""

176

Merge two runtimes, with other taking precedence.

177

178

Parameters:

179

other: Runtime - Runtime to merge with

180

181

Returns:

182

Runtime - New merged runtime

183

184

Usage:

185

runtime1 = Runtime(context={"a": 1})

186

runtime2 = Runtime(context={"b": 2})

187

merged = runtime1.merge(runtime2)

188

# merged.context has both "a" and "b"

189

"""

190

191

def override(self, **overrides):

192

"""

193

Create new runtime with specific fields overridden.

194

195

Parameters:

196

**overrides: Keyword arguments for fields to override

197

198

Returns:

199

Runtime - New runtime with overrides applied

200

201

Usage:

202

runtime = get_runtime()

203

new_runtime = runtime.override(

204

stream_writer=my_custom_writer

205

)

206

"""

207

```

208

209

#### get_runtime Function

210

211

Get the runtime context for the current graph run.

212

213

```python { .api }

214

def get_runtime(context_schema=None) -> Runtime:

215

"""

216

Get runtime for current graph run.

217

218

Retrieves the Runtime object containing context, store, stream writer,

219

and previous return value for the current execution.

220

221

Parameters:

222

context_schema: Optional[type[ContextT]]

223

Expected type of context (for type checking)

224

225

Returns:

226

Runtime[ContextT] - Runtime with:

227

- context: Run-scoped context (if configured)

228

- store: Store instance (if configured)

229

- stream_writer: Writer for custom streaming

230

- previous: Previous return value (functional API)

231

232

Usage:

233

from langgraph.runtime import get_runtime

234

235

def my_node(state):

236

runtime = get_runtime()

237

238

# Access context

239

if runtime.context:

240

api_key = runtime.context["api_key"]

241

242

# Access store

243

if runtime.store:

244

data = runtime.store.get(("cache",), "key")

245

246

# Write to stream

247

runtime.stream_writer({"progress": 50})

248

249

return state

250

251

Note:

252

Context must be configured when compiling:

253

app = graph.compile(context_schema=MyContext)

254

"""

255

```

256

257

## Usage Examples

258

259

### Accessing Configuration

260

261

```python

262

from typing import TypedDict

263

from langgraph.graph import StateGraph, START, END

264

from langgraph.config import get_config

265

266

class State(TypedDict):

267

data: str

268

thread_info: str

269

270

def node_with_config(state: State) -> dict:

271

"""Access configuration from within node."""

272

config = get_config()

273

274

# Get thread ID

275

thread_id = config["configurable"].get("thread_id", "unknown")

276

277

# Get recursion limit

278

limit = config.get("recursion_limit", "default")

279

280

# Access tags and metadata

281

tags = config.get("tags", [])

282

metadata = config.get("metadata", {})

283

284

return {

285

"thread_info": f"Thread: {thread_id}, Limit: {limit}, Tags: {tags}"

286

}

287

288

graph = StateGraph(State)

289

graph.add_node("process", node_with_config)

290

graph.add_edge(START, "process")

291

graph.add_edge("process", END)

292

293

app = graph.compile()

294

295

result = app.invoke(

296

{"data": "test"},

297

config={

298

"configurable": {"thread_id": "thread-123"},

299

"tags": ["production"],

300

"metadata": {"user": "alice"}

301

}

302

)

303

print(result["thread_info"])

304

```

305

306

### Using Store for Shared State

307

308

```python

309

from typing import TypedDict

310

from langgraph.graph import StateGraph, START, END

311

from langgraph.config import get_store

312

from langgraph.store.memory import InMemoryStore

313

314

class State(TypedDict):

315

user_id: str

316

greeting: str

317

318

def load_preferences(state: State) -> dict:

319

"""Load user preferences from store."""

320

store = get_store()

321

322

# Get user preferences

323

prefs = store.get(("user", state["user_id"]), "preferences")

324

325

if prefs:

326

return {"greeting": f"Welcome back, {prefs.get('name', 'User')}!"}

327

else:

328

return {"greeting": "Welcome, new user!"}

329

330

def save_preferences(state: State) -> dict:

331

"""Save user preferences to store."""

332

store = get_store()

333

334

# Save user data

335

store.put(

336

("user", state["user_id"]),

337

"preferences",

338

{"name": "Alice", "theme": "dark"}

339

)

340

341

return state

342

343

graph = StateGraph(State)

344

graph.add_node("load", load_preferences)

345

graph.add_node("save", save_preferences)

346

graph.add_edge(START, "load")

347

graph.add_edge("load", "save")

348

graph.add_edge("save", END)

349

350

# Compile with store

351

store = InMemoryStore()

352

app = graph.compile(store=store)

353

354

result = app.invoke({"user_id": "user123", "greeting": ""})

355

print(result["greeting"])

356

```

357

358

### Custom Stream Progress

359

360

```python

361

from typing import TypedDict

362

from langgraph.graph import StateGraph, START, END

363

from langgraph.config import get_stream_writer

364

import time

365

366

class State(TypedDict):

367

items: list[int]

368

results: list[int]

369

370

def process_with_progress(state: State) -> dict:

371

"""Process items and emit progress updates."""

372

writer = get_stream_writer()

373

items = state["items"]

374

results = []

375

376

writer({"type": "started", "total": len(items)})

377

378

for i, item in enumerate(items):

379

# Process item

380

result = item * 2

381

results.append(result)

382

383

# Emit progress

384

writer({

385

"type": "progress",

386

"completed": i + 1,

387

"total": len(items),

388

"percent": ((i + 1) / len(items)) * 100

389

})

390

391

time.sleep(0.1) # Simulate work

392

393

writer({"type": "complete", "count": len(results)})

394

395

return {"results": results}

396

397

graph = StateGraph(State)

398

graph.add_node("process", process_with_progress)

399

graph.add_edge(START, "process")

400

graph.add_edge("process", END)

401

402

app = graph.compile()

403

404

# Stream with custom mode

405

for chunk in app.stream(

406

{"items": [1, 2, 3, 4, 5], "results": []},

407

stream_mode="custom"

408

):

409

if chunk["type"] == "progress":

410

print(f"Progress: {chunk['percent']:.0f}%")

411

elif chunk["type"] == "complete":

412

print(f"Complete! Processed {chunk['count']} items")

413

```

414

415

### Using Runtime Context

416

417

```python

418

from typing import TypedDict

419

from langgraph.graph import StateGraph, START, END

420

from langgraph.runtime import get_runtime, Runtime

421

422

class APIContext(TypedDict):

423

api_key: str

424

base_url: str

425

timeout: int

426

427

class State(TypedDict):

428

query: str

429

result: str

430

431

def api_call(state: State) -> dict:

432

"""Make API call using context."""

433

runtime = get_runtime(context_schema=APIContext)

434

435

# Access context

436

api_key = runtime.context["api_key"]

437

base_url = runtime.context["base_url"]

438

timeout = runtime.context["timeout"]

439

440

# Use context for API call

441

# result = requests.get(

442

# f"{base_url}/search",

443

# params={"q": state["query"]},

444

# headers={"Authorization": f"Bearer {api_key}"},

445

# timeout=timeout

446

# )

447

448

return {"result": f"Called {base_url} with key {api_key[:8]}..."}

449

450

graph = StateGraph(State, context_schema=APIContext)

451

graph.add_node("call", api_call)

452

graph.add_edge(START, "call")

453

graph.add_edge("call", END)

454

455

app = graph.compile()

456

457

# Pass context in config

458

result = app.invoke(

459

{"query": "test", "result": ""},

460

config={

461

"configurable": {

462

"context": {

463

"api_key": "sk_secret_key_12345",

464

"base_url": "https://api.example.com",

465

"timeout": 30

466

}

467

}

468

}

469

)

470

print(result["result"])

471

```

472

473

### Functional API with Runtime

474

475

```python

476

from langgraph.func import entrypoint, task

477

from langgraph.runtime import get_runtime

478

479

@task

480

def fetch_data(query: str) -> dict:

481

"""Fetch data using runtime context."""

482

runtime = get_runtime()

483

484

# Access context

485

if runtime.context:

486

api_key = runtime.context.get("api_key")

487

# Use api_key...

488

489

# Write progress

490

runtime.stream_writer({"status": "fetching", "query": query})

491

492

# Simulate fetch

493

result = {"query": query, "data": "result"}

494

495

runtime.stream_writer({"status": "complete"})

496

497

return result

498

499

@entrypoint()

500

def workflow(input: dict) -> dict:

501

"""Workflow using runtime."""

502

runtime = get_runtime()

503

504

# Access previous value (for chained workflows)

505

if runtime.previous:

506

print(f"Previous result: {runtime.previous}")

507

508

# Use task

509

result = fetch_data(input["query"]).result()

510

511

return result

512

513

# Use workflow

514

result = workflow.invoke(

515

{"query": "test"},

516

config={

517

"configurable": {

518

"context": {"api_key": "secret"}

519

}

520

}

521

)

522

```

523

524

### Combining All Context Functions

525

526

```python

527

from typing import TypedDict

528

from langgraph.graph import StateGraph, START, END

529

from langgraph.config import get_config, get_store, get_stream_writer

530

from langgraph.runtime import get_runtime

531

532

class State(TypedDict):

533

user_id: str

534

operation: str

535

result: str

536

537

def comprehensive_node(state: State) -> dict:

538

"""Node using all context functions."""

539

# Get configuration

540

config = get_config()

541

thread_id = config["configurable"].get("thread_id")

542

543

# Get store

544

store = get_store()

545

user_data = store.get(("users", state["user_id"]), "profile")

546

547

# Get stream writer

548

writer = get_stream_writer()

549

writer({"status": "starting", "thread": thread_id})

550

551

# Get runtime

552

runtime = get_runtime()

553

554

# Use context if available

555

if runtime.context:

556

settings = runtime.context.get("settings", {})

557

else:

558

settings = {}

559

560

# Perform operation

561

writer({"status": "processing"})

562

563

result = f"Processed {state['operation']} for user {state['user_id']}"

564

565

# Update store

566

store.put(

567

("users", state["user_id"]),

568

"last_operation",

569

state["operation"]

570

)

571

572

writer({"status": "complete"})

573

574

return {"result": result}

575

576

graph = StateGraph(State)

577

graph.add_node("process", comprehensive_node)

578

graph.add_edge(START, "process")

579

graph.add_edge("process", END)

580

581

# Compile with store

582

from langgraph.store.memory import InMemoryStore

583

from langgraph.checkpoint.memory import MemorySaver

584

585

app = graph.compile(

586

checkpointer=MemorySaver(),

587

store=InMemoryStore()

588

)

589

590

# Run with full context

591

for chunk in app.stream(

592

{"user_id": "u123", "operation": "search", "result": ""},

593

config={

594

"configurable": {

595

"thread_id": "thread-456",

596

"context": {"settings": {"debug": True}}

597

}

598

},

599

stream_mode=["custom", "values"]

600

):

601

print(chunk)

602

```

603

604

## Configuration Keys

605

606

Important RunnableConfig keys:

607

608

```python

609

config = {

610

"configurable": {

611

"thread_id": str, # Thread identifier for checkpointing

612

"checkpoint_id": str, # Specific checkpoint to load

613

"checkpoint_ns": str, # Checkpoint namespace

614

"context": dict, # Run-scoped context

615

},

616

"callbacks": list, # LangChain callbacks

617

"recursion_limit": int, # Max execution steps (default: 25)

618

"max_concurrency": int, # Max parallel tasks

619

"tags": list[str], # Execution tags

620

"metadata": dict, # Execution metadata

621

"run_name": str, # Name for this run

622

}

623

```

624

625

## Notes

626

627

- All context functions (`get_config`, `get_store`, `get_stream_writer`, `get_runtime`) must be called from within graph execution

628

- Context functions use thread-local storage and require Python >= 3.11 for async support

629

- Store must be configured when compiling: `graph.compile(store=store)`

630

- Context schema must be set when compiling: `graph.compile(context_schema=MyContext)`

631

- StreamWriter only produces output when streaming with `stream_mode="custom"`

632

- Runtime.previous is only populated in functional API entrypoints

633

- Configuration is immutable during node execution

634

- Store operations are synchronous in the current implementation

635