0
# Configuration and Runtime
1
2
LangGraph provides utilities for accessing configuration, stores, stream writers, and runtime context from within nodes and tasks. These functions enable nodes to interact with the execution environment and access run-scoped resources.
3
4
## Imports
5
6
```python
7
from langgraph.config import get_config, get_store, get_stream_writer
8
from langgraph.runtime import get_runtime, Runtime
9
```
10
11
## Capabilities
12
13
### Configuration Access
14
15
#### get_config Function
16
17
Get the current RunnableConfig from the execution context.
18
19
```python { .api }
20
def get_config() -> RunnableConfig:
21
"""
22
Get current RunnableConfig from context.
23
24
Retrieves the configuration for the current node execution, including
25
thread_id, checkpoint_id, callbacks, and other execution parameters.
26
27
Returns:
28
RunnableConfig - Current configuration with fields:
29
- configurable: dict with thread_id, checkpoint_id, etc.
30
- callbacks: LangChain callbacks
31
- recursion_limit: Maximum execution steps
32
- max_concurrency: Maximum parallel tasks
33
- tags: Execution tags
34
- metadata: Execution metadata
35
36
Raises:
37
RuntimeError: If called outside of graph execution context
38
39
Usage:
40
def my_node(state):
41
config = get_config()
42
thread_id = config["configurable"]["thread_id"]
43
print(f"Running in thread: {thread_id}")
44
return state
45
46
Note:
47
Requires Python >= 3.11 for async contexts
48
"""
49
```
50
51
### Store Access
52
53
#### get_store Function
54
55
Access the LangGraph store from inside a node or task.
56
57
```python { .api }
58
def get_store() -> BaseStore:
59
"""
60
Access LangGraph store from inside node or task.
61
62
The store provides persistent cross-thread memory for storing and
63
retrieving data that needs to be shared across different graph runs
64
or threads.
65
66
Returns:
67
BaseStore - Store instance with methods:
68
- get(namespace, key): Retrieve value
69
- put(namespace, key, value): Store value
70
- delete(namespace, key): Delete value
71
- search(namespace, query): Search values
72
73
Raises:
74
RuntimeError: If called outside of graph execution context
75
76
Note:
77
Returns None if no store is configured on the graph
78
79
Usage:
80
from langgraph.config import get_store
81
82
def my_node(state):
83
store = get_store()
84
85
# Get shared data
86
user_prefs = store.get(("user", state["user_id"]), "preferences")
87
88
# Update shared data
89
store.put(
90
("user", state["user_id"]),
91
"last_seen",
92
datetime.now().isoformat()
93
)
94
95
return state
96
97
Note:
98
Store must be configured when compiling the graph:
99
app = graph.compile(store=my_store)
100
"""
101
```
102
103
### Stream Access
104
105
#### get_stream_writer Function
106
107
Access the StreamWriter from inside a node or task.
108
109
```python { .api }
110
def get_stream_writer() -> StreamWriter:
111
"""
112
Access StreamWriter from inside node or task.
113
114
Returns a callable that writes custom data to the output stream.
115
Only useful when streaming with stream_mode="custom".
116
117
Returns:
118
StreamWriter - Callable[[Any], None] that writes to stream
119
120
Usage:
121
from langgraph.config import get_stream_writer
122
123
def my_node(state):
124
writer = get_stream_writer()
125
126
# Emit progress updates
127
writer({"type": "progress", "percent": 0})
128
129
# Do work...
130
for i in range(10):
131
process_item(i)
132
writer({"type": "progress", "percent": (i + 1) * 10})
133
134
writer({"type": "complete", "result": "done"})
135
136
return state
137
138
# Stream with custom mode
139
for chunk in app.stream(input, stream_mode="custom"):
140
print(chunk) # Progress updates and completion
141
"""
142
```
143
144
### Runtime Context
145
146
#### Runtime Class
147
148
Bundles run-scoped context and runtime utilities.
149
150
```python { .api }
151
class Runtime:
152
"""
153
Bundles run-scoped context and runtime utilities.
154
155
Type Parameters:
156
ContextT: Type of the context object
157
158
Attributes:
159
context: Optional[ContextT]
160
Static context for the run (read-only, thread-local)
161
store: Optional[BaseStore]
162
Store for persistent cross-thread memory
163
stream_writer: StreamWriter
164
Writer for custom stream data (default: no-op)
165
previous: Any
166
Previous return value (functional API only)
167
"""
168
169
context: Any = None
170
store: BaseStore | None = None
171
stream_writer: StreamWriter = None # Default no-op writer
172
previous: Any = None
173
174
def merge(self, other):
175
"""
176
Merge two runtimes, with other taking precedence.
177
178
Parameters:
179
other: Runtime - Runtime to merge with
180
181
Returns:
182
Runtime - New merged runtime
183
184
Usage:
185
runtime1 = Runtime(context={"a": 1})
186
runtime2 = Runtime(context={"b": 2})
187
merged = runtime1.merge(runtime2)
188
# merged.context has both "a" and "b"
189
"""
190
191
def override(self, **overrides):
192
"""
193
Create new runtime with specific fields overridden.
194
195
Parameters:
196
**overrides: Keyword arguments for fields to override
197
198
Returns:
199
Runtime - New runtime with overrides applied
200
201
Usage:
202
runtime = get_runtime()
203
new_runtime = runtime.override(
204
stream_writer=my_custom_writer
205
)
206
"""
207
```
208
209
#### get_runtime Function
210
211
Get the runtime context for the current graph run.
212
213
```python { .api }
214
def get_runtime(context_schema=None) -> Runtime:
215
"""
216
Get runtime for current graph run.
217
218
Retrieves the Runtime object containing context, store, stream writer,
219
and previous return value for the current execution.
220
221
Parameters:
222
context_schema: Optional[type[ContextT]]
223
Expected type of context (for type checking)
224
225
Returns:
226
Runtime[ContextT] - Runtime with:
227
- context: Run-scoped context (if configured)
228
- store: Store instance (if configured)
229
- stream_writer: Writer for custom streaming
230
- previous: Previous return value (functional API)
231
232
Usage:
233
from langgraph.runtime import get_runtime
234
235
def my_node(state):
236
runtime = get_runtime()
237
238
# Access context
239
if runtime.context:
240
api_key = runtime.context["api_key"]
241
242
# Access store
243
if runtime.store:
244
data = runtime.store.get(("cache",), "key")
245
246
# Write to stream
247
runtime.stream_writer({"progress": 50})
248
249
return state
250
251
Note:
252
Context must be configured when compiling:
253
app = graph.compile(context_schema=MyContext)
254
"""
255
```
256
257
## Usage Examples
258
259
### Accessing Configuration
260
261
```python
262
from typing import TypedDict
263
from langgraph.graph import StateGraph, START, END
264
from langgraph.config import get_config
265
266
class State(TypedDict):
267
data: str
268
thread_info: str
269
270
def node_with_config(state: State) -> dict:
271
"""Access configuration from within node."""
272
config = get_config()
273
274
# Get thread ID
275
thread_id = config["configurable"].get("thread_id", "unknown")
276
277
# Get recursion limit
278
limit = config.get("recursion_limit", "default")
279
280
# Access tags and metadata
281
tags = config.get("tags", [])
282
metadata = config.get("metadata", {})
283
284
return {
285
"thread_info": f"Thread: {thread_id}, Limit: {limit}, Tags: {tags}"
286
}
287
288
graph = StateGraph(State)
289
graph.add_node("process", node_with_config)
290
graph.add_edge(START, "process")
291
graph.add_edge("process", END)
292
293
app = graph.compile()
294
295
result = app.invoke(
296
{"data": "test"},
297
config={
298
"configurable": {"thread_id": "thread-123"},
299
"tags": ["production"],
300
"metadata": {"user": "alice"}
301
}
302
)
303
print(result["thread_info"])
304
```
305
306
### Using Store for Shared State
307
308
```python
309
from typing import TypedDict
310
from langgraph.graph import StateGraph, START, END
311
from langgraph.config import get_store
312
from langgraph.store.memory import InMemoryStore
313
314
class State(TypedDict):
315
user_id: str
316
greeting: str
317
318
def load_preferences(state: State) -> dict:
319
"""Load user preferences from store."""
320
store = get_store()
321
322
# Get user preferences
323
prefs = store.get(("user", state["user_id"]), "preferences")
324
325
if prefs:
326
return {"greeting": f"Welcome back, {prefs.get('name', 'User')}!"}
327
else:
328
return {"greeting": "Welcome, new user!"}
329
330
def save_preferences(state: State) -> dict:
331
"""Save user preferences to store."""
332
store = get_store()
333
334
# Save user data
335
store.put(
336
("user", state["user_id"]),
337
"preferences",
338
{"name": "Alice", "theme": "dark"}
339
)
340
341
return state
342
343
graph = StateGraph(State)
344
graph.add_node("load", load_preferences)
345
graph.add_node("save", save_preferences)
346
graph.add_edge(START, "load")
347
graph.add_edge("load", "save")
348
graph.add_edge("save", END)
349
350
# Compile with store
351
store = InMemoryStore()
352
app = graph.compile(store=store)
353
354
result = app.invoke({"user_id": "user123", "greeting": ""})
355
print(result["greeting"])
356
```
357
358
### Custom Stream Progress
359
360
```python
361
from typing import TypedDict
362
from langgraph.graph import StateGraph, START, END
363
from langgraph.config import get_stream_writer
364
import time
365
366
class State(TypedDict):
367
items: list[int]
368
results: list[int]
369
370
def process_with_progress(state: State) -> dict:
371
"""Process items and emit progress updates."""
372
writer = get_stream_writer()
373
items = state["items"]
374
results = []
375
376
writer({"type": "started", "total": len(items)})
377
378
for i, item in enumerate(items):
379
# Process item
380
result = item * 2
381
results.append(result)
382
383
# Emit progress
384
writer({
385
"type": "progress",
386
"completed": i + 1,
387
"total": len(items),
388
"percent": ((i + 1) / len(items)) * 100
389
})
390
391
time.sleep(0.1) # Simulate work
392
393
writer({"type": "complete", "count": len(results)})
394
395
return {"results": results}
396
397
graph = StateGraph(State)
398
graph.add_node("process", process_with_progress)
399
graph.add_edge(START, "process")
400
graph.add_edge("process", END)
401
402
app = graph.compile()
403
404
# Stream with custom mode
405
for chunk in app.stream(
406
{"items": [1, 2, 3, 4, 5], "results": []},
407
stream_mode="custom"
408
):
409
if chunk["type"] == "progress":
410
print(f"Progress: {chunk['percent']:.0f}%")
411
elif chunk["type"] == "complete":
412
print(f"Complete! Processed {chunk['count']} items")
413
```
414
415
### Using Runtime Context
416
417
```python
418
from typing import TypedDict
419
from langgraph.graph import StateGraph, START, END
420
from langgraph.runtime import get_runtime, Runtime
421
422
class APIContext(TypedDict):
423
api_key: str
424
base_url: str
425
timeout: int
426
427
class State(TypedDict):
428
query: str
429
result: str
430
431
def api_call(state: State) -> dict:
432
"""Make API call using context."""
433
runtime = get_runtime(context_schema=APIContext)
434
435
# Access context
436
api_key = runtime.context["api_key"]
437
base_url = runtime.context["base_url"]
438
timeout = runtime.context["timeout"]
439
440
# Use context for API call
441
# result = requests.get(
442
# f"{base_url}/search",
443
# params={"q": state["query"]},
444
# headers={"Authorization": f"Bearer {api_key}"},
445
# timeout=timeout
446
# )
447
448
return {"result": f"Called {base_url} with key {api_key[:8]}..."}
449
450
graph = StateGraph(State, context_schema=APIContext)
451
graph.add_node("call", api_call)
452
graph.add_edge(START, "call")
453
graph.add_edge("call", END)
454
455
app = graph.compile()
456
457
# Pass context in config
458
result = app.invoke(
459
{"query": "test", "result": ""},
460
config={
461
"configurable": {
462
"context": {
463
"api_key": "sk_secret_key_12345",
464
"base_url": "https://api.example.com",
465
"timeout": 30
466
}
467
}
468
}
469
)
470
print(result["result"])
471
```
472
473
### Functional API with Runtime
474
475
```python
476
from langgraph.func import entrypoint, task
477
from langgraph.runtime import get_runtime
478
479
@task
480
def fetch_data(query: str) -> dict:
481
"""Fetch data using runtime context."""
482
runtime = get_runtime()
483
484
# Access context
485
if runtime.context:
486
api_key = runtime.context.get("api_key")
487
# Use api_key...
488
489
# Write progress
490
runtime.stream_writer({"status": "fetching", "query": query})
491
492
# Simulate fetch
493
result = {"query": query, "data": "result"}
494
495
runtime.stream_writer({"status": "complete"})
496
497
return result
498
499
@entrypoint()
500
def workflow(input: dict) -> dict:
501
"""Workflow using runtime."""
502
runtime = get_runtime()
503
504
# Access previous value (for chained workflows)
505
if runtime.previous:
506
print(f"Previous result: {runtime.previous}")
507
508
# Use task
509
result = fetch_data(input["query"]).result()
510
511
return result
512
513
# Use workflow
514
result = workflow.invoke(
515
{"query": "test"},
516
config={
517
"configurable": {
518
"context": {"api_key": "secret"}
519
}
520
}
521
)
522
```
523
524
### Combining All Context Functions
525
526
```python
527
from typing import TypedDict
528
from langgraph.graph import StateGraph, START, END
529
from langgraph.config import get_config, get_store, get_stream_writer
530
from langgraph.runtime import get_runtime
531
532
class State(TypedDict):
533
user_id: str
534
operation: str
535
result: str
536
537
def comprehensive_node(state: State) -> dict:
538
"""Node using all context functions."""
539
# Get configuration
540
config = get_config()
541
thread_id = config["configurable"].get("thread_id")
542
543
# Get store
544
store = get_store()
545
user_data = store.get(("users", state["user_id"]), "profile")
546
547
# Get stream writer
548
writer = get_stream_writer()
549
writer({"status": "starting", "thread": thread_id})
550
551
# Get runtime
552
runtime = get_runtime()
553
554
# Use context if available
555
if runtime.context:
556
settings = runtime.context.get("settings", {})
557
else:
558
settings = {}
559
560
# Perform operation
561
writer({"status": "processing"})
562
563
result = f"Processed {state['operation']} for user {state['user_id']}"
564
565
# Update store
566
store.put(
567
("users", state["user_id"]),
568
"last_operation",
569
state["operation"]
570
)
571
572
writer({"status": "complete"})
573
574
return {"result": result}
575
576
graph = StateGraph(State)
577
graph.add_node("process", comprehensive_node)
578
graph.add_edge(START, "process")
579
graph.add_edge("process", END)
580
581
# Compile with store
582
from langgraph.store.memory import InMemoryStore
583
from langgraph.checkpoint.memory import MemorySaver
584
585
app = graph.compile(
586
checkpointer=MemorySaver(),
587
store=InMemoryStore()
588
)
589
590
# Run with full context
591
for chunk in app.stream(
592
{"user_id": "u123", "operation": "search", "result": ""},
593
config={
594
"configurable": {
595
"thread_id": "thread-456",
596
"context": {"settings": {"debug": True}}
597
}
598
},
599
stream_mode=["custom", "values"]
600
):
601
print(chunk)
602
```
603
604
## Configuration Keys
605
606
Important RunnableConfig keys:
607
608
```python
609
config = {
610
"configurable": {
611
"thread_id": str, # Thread identifier for checkpointing
612
"checkpoint_id": str, # Specific checkpoint to load
613
"checkpoint_ns": str, # Checkpoint namespace
614
"context": dict, # Run-scoped context
615
},
616
"callbacks": list, # LangChain callbacks
617
"recursion_limit": int, # Max execution steps (default: 25)
618
"max_concurrency": int, # Max parallel tasks
619
"tags": list[str], # Execution tags
620
"metadata": dict, # Execution metadata
621
"run_name": str, # Name for this run
622
}
623
```
624
625
## Notes
626
627
- All context functions (`get_config`, `get_store`, `get_stream_writer`, `get_runtime`) must be called from within graph execution
628
- Context functions use thread-local storage and require Python >= 3.11 for async support
629
- Store must be configured when compiling: `graph.compile(store=store)`
630
- Context schema must be set when compiling: `graph.compile(context_schema=MyContext)`
631
- StreamWriter only produces output when streaming with `stream_mode="custom"`
632
- Runtime.previous is only populated in functional API entrypoints
633
- Configuration is immutable during node execution
634
- Store operations are synchronous in the current implementation
635