0
# LangGraph SDK
1
2
A comprehensive Python SDK for interacting with the LangGraph Platform REST API. The SDK enables developers to build and manage AI assistants and conversational workflows with async and sync client interfaces, automatic local server discovery, streaming support, and fine-grained authentication and authorization management.
3
4
## Package Information
5
6
- **Package Name**: langgraph-sdk
7
- **Language**: Python
8
- **Installation**: `pip install langgraph-sdk`
9
10
## Core Imports
11
12
```python
13
from langgraph_sdk import get_client, get_sync_client, Auth
14
```
15
16
## Basic Usage
17
18
```python
19
from langgraph_sdk import get_client
20
21
# Connect to LangGraph server (auto-detects local server at localhost:8123)
22
client = await get_client()
23
24
# List all assistants
25
assistants = await client.assistants.search()
26
agent = assistants[0]
27
28
# Create a new conversation thread
29
thread = await client.threads.create()
30
31
# Start a streaming run
32
input_data = {"messages": [{"role": "human", "content": "Hello!"}]}
33
async for chunk in client.runs.stream(
34
thread['thread_id'],
35
agent['assistant_id'],
36
input=input_data
37
):
38
print(chunk)
39
40
# Close the client
41
await client.aclose()
42
```
43
44
## Architecture
45
46
The LangGraph SDK follows a resource-oriented design with distinct client managers:
47
48
- **Client Factory**: `get_client()` and `get_sync_client()` create configured HTTP clients
49
- **Resource Clients**: Dedicated managers for assistants, threads, runs, crons, and store operations
50
- **Authentication System**: Pluggable auth handlers for custom security implementations
51
- **Streaming Support**: Server-sent events for real-time execution monitoring
52
- **Type System**: Comprehensive TypedDict schemas for all API interactions
53
54
Both async and sync versions provide identical APIs, enabling integration into any Python application architecture.
55
56
## Capabilities
57
58
### Client Management
59
60
Core client creation and HTTP operations for connecting to LangGraph servers with automatic discovery, custom authentication, and connection management.
61
62
```python { .api }
63
from collections.abc import Mapping
64
from typing import Union, Optional
65
import httpx
66
67
# Type aliases
68
TimeoutTypes = Union[
69
None,
70
float,
71
tuple[Optional[float], Optional[float]],
72
tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
73
httpx.Timeout,
74
]
75
76
def get_client(
77
*,
78
url: str | None = None,
79
api_key: str | None = None,
80
headers: Mapping[str, str] | None = None,
81
timeout: TimeoutTypes | None = None,
82
) -> LangGraphClient: ...
83
84
def get_sync_client(
85
*,
86
url: str | None = None,
87
api_key: str | None = None,
88
headers: Mapping[str, str] | None = None,
89
timeout: TimeoutTypes | None = None,
90
) -> SyncLangGraphClient: ...
91
```
92
93
[Client Management](./client-management.md)
94
95
### Assistant Management
96
97
Create, configure, and manage AI assistants based on registered graphs. Assistants serve as the execution engines for conversational workflows, with support for versioning, configuration, and metadata management.
98
99
```python { .api }
100
from collections.abc import Mapping
101
from langgraph_sdk.schema import (
102
Assistant, AssistantSelectField, AssistantSortBy, SortOrder,
103
Config, Context, Json, OnConflictBehavior, QueryParamTypes
104
)
105
106
# Via client.assistants
107
async def get(
108
assistant_id: str,
109
*,
110
headers: Mapping[str, str] | None = None,
111
params: QueryParamTypes | None = None,
112
) -> Assistant: ...
113
114
async def create(
115
graph_id: str | None,
116
config: Config | None = None,
117
*,
118
context: Context | None = None,
119
metadata: Json = None,
120
assistant_id: str | None = None,
121
if_exists: OnConflictBehavior | None = None,
122
name: str | None = None,
123
headers: Mapping[str, str] | None = None,
124
params: QueryParamTypes | None = None,
125
) -> Assistant: ...
126
127
async def update(
128
assistant_id: str,
129
*,
130
graph_id: str | None = None,
131
config: Config | None = None,
132
context: Context | None = None,
133
metadata: Json = None,
134
name: str | None = None,
135
headers: Mapping[str, str] | None = None,
136
description: str | None = None,
137
params: QueryParamTypes | None = None,
138
) -> Assistant: ...
139
140
async def search(
141
*,
142
metadata: Json = None,
143
graph_id: str | None = None,
144
limit: int = 10,
145
offset: int = 0,
146
sort_by: AssistantSortBy | None = None,
147
sort_order: SortOrder | None = None,
148
select: list[AssistantSelectField] | None = None,
149
headers: Mapping[str, str] | None = None,
150
params: QueryParamTypes | None = None,
151
) -> list[Assistant]: ...
152
```
153
154
[Assistant Management](./assistant-management.md)
155
156
### Thread Management
157
158
Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.
159
160
```python { .api }
161
from collections.abc import Mapping, Sequence
162
from typing import Any
163
from langgraph_sdk.schema import (
164
Thread, ThreadSelectField, ThreadSortBy, ThreadState, ThreadStatus,
165
ThreadUpdateStateResponse, Checkpoint, Json, OnConflictBehavior, QueryParamTypes
166
)
167
168
# Via client.threads
169
async def create(
170
*,
171
metadata: Json = None,
172
thread_id: str | None = None,
173
if_exists: OnConflictBehavior | None = None,
174
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
175
graph_id: str | None = None,
176
ttl: int | Mapping[str, Any] | None = None,
177
headers: Mapping[str, str] | None = None,
178
params: QueryParamTypes | None = None,
179
) -> Thread: ...
180
181
async def get(
182
thread_id: str,
183
*,
184
select: list[ThreadSelectField] | None = None,
185
headers: Mapping[str, str] | None = None,
186
params: QueryParamTypes | None = None,
187
) -> Thread: ...
188
189
async def update(
190
thread_id: str,
191
*,
192
metadata: Mapping[str, Any],
193
ttl: int | Mapping[str, Any] | None = None,
194
headers: Mapping[str, str] | None = None,
195
params: QueryParamTypes | None = None,
196
) -> Thread: ...
197
198
async def search(
199
*,
200
metadata: Json = None,
201
values: Json = None,
202
ids: Sequence[str] | None = None,
203
status: ThreadStatus | None = None,
204
limit: int = 10,
205
offset: int = 0,
206
sort_by: ThreadSortBy | None = None,
207
sort_order: SortOrder | None = None,
208
select: list[ThreadSelectField] | None = None,
209
headers: Mapping[str, str] | None = None,
210
params: QueryParamTypes | None = None,
211
) -> list[Thread]: ...
212
213
async def get_state(
214
thread_id: str,
215
*,
216
checkpoint: Checkpoint | None = None,
217
checkpoint_id: str | None = None, # deprecated
218
subgraphs: bool = False,
219
headers: Mapping[str, str] | None = None,
220
params: QueryParamTypes | None = None,
221
) -> ThreadState: ...
222
223
async def update_state(
224
thread_id: str,
225
values: dict[str, Any] | Sequence[dict] | None,
226
*,
227
as_node: str | None = None,
228
checkpoint: Checkpoint | None = None,
229
checkpoint_id: str | None = None, # deprecated
230
headers: Mapping[str, str] | None = None,
231
params: QueryParamTypes | None = None,
232
) -> ThreadUpdateStateResponse: ...
233
```
234
235
[Thread Management](./thread-management.md)
236
237
### Run Execution
238
239
Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.
240
241
```python { .api }
242
from collections.abc import AsyncIterator, Mapping, Sequence
243
from typing import Any
244
from langgraph_sdk.schema import (
245
Run, StreamPart, StreamMode, Config, Context, Checkpoint,
246
Command, CancelAction, QueryParamTypes
247
)
248
249
# Via client.runs
250
def stream(
251
thread_id: str | None,
252
assistant_id: str,
253
*,
254
input: Mapping[str, Any] | None = None,
255
command: Command | None = None,
256
stream_mode: StreamMode | Sequence[StreamMode] = "values",
257
stream_subgraphs: bool = False,
258
stream_resumable: bool = False,
259
metadata: Mapping[str, Any] | None = None,
260
config: Config | None = None,
261
context: Context | None = None,
262
checkpoint: Checkpoint | None = None,
263
checkpoint_id: str | None = None, # deprecated
264
webhook: str | None = None,
265
webhook_mode: str | None = None,
266
headers: Mapping[str, str] | None = None,
267
params: QueryParamTypes | None = None,
268
) -> AsyncIterator[StreamPart]: ...
269
270
async def create(
271
thread_id: str | None,
272
assistant_id: str,
273
*,
274
input: Mapping[str, Any] | None = None,
275
command: Command | None = None,
276
stream_mode: StreamMode | Sequence[StreamMode] = "values",
277
stream_subgraphs: bool = False,
278
stream_resumable: bool = False,
279
metadata: Mapping[str, Any] | None = None,
280
config: Config | None = None,
281
context: Context | None = None,
282
checkpoint: Checkpoint | None = None,
283
checkpoint_id: str | None = None, # deprecated
284
webhook: str | None = None,
285
webhook_mode: str | None = None,
286
headers: Mapping[str, str] | None = None,
287
params: QueryParamTypes | None = None,
288
) -> Run: ...
289
290
async def wait(
291
thread_id: str | None,
292
assistant_id: str,
293
*,
294
input: Mapping[str, Any] | None = None,
295
command: Command | None = None,
296
metadata: Mapping[str, Any] | None = None,
297
config: Config | None = None,
298
context: Context | None = None,
299
checkpoint: Checkpoint | None = None,
300
checkpoint_id: str | None = None, # deprecated
301
webhook: str | None = None,
302
webhook_mode: str | None = None,
303
checkpoint_during: bool | None = None,
304
headers: Mapping[str, str] | None = None,
305
params: QueryParamTypes | None = None,
306
) -> Run: ...
307
308
async def cancel(
309
thread_id: str,
310
run_id: str,
311
*,
312
wait: bool = False,
313
action: CancelAction = "interrupt",
314
headers: Mapping[str, str] | None = None,
315
params: QueryParamTypes | None = None,
316
) -> None: ...
317
```
318
319
[Run Execution](./run-execution.md)
320
321
### Scheduled Tasks
322
323
Create and manage cron jobs for automated execution of assistants on threads or with dynamic thread creation. Supports timezone handling, webhook notifications, and flexible scheduling.
324
325
```python { .api }
326
from collections.abc import Mapping
327
from typing import Any
328
from langgraph_sdk.schema import (
329
Cron, CronSelectField, CronSortBy, SortOrder,
330
Config, Context, All, QueryParamTypes
331
)
332
333
# Via client.crons
334
async def create(
335
assistant_id: str,
336
*,
337
schedule: str,
338
input: Mapping[str, Any] | None = None,
339
metadata: Mapping[str, Any] | None = None,
340
config: Config | None = None,
341
context: Context | None = None,
342
checkpoint_during: bool | None = None,
343
interrupt_before: All | list[str] | None = None,
344
interrupt_after: All | list[str] | None = None,
345
webhook: str | None = None,
346
webhook_mode: str | None = None,
347
headers: Mapping[str, str] | None = None,
348
params: QueryParamTypes | None = None,
349
) -> Cron: ...
350
351
async def create_for_thread(
352
thread_id: str,
353
assistant_id: str,
354
*,
355
schedule: str,
356
input: Mapping[str, Any] | None = None,
357
metadata: Mapping[str, Any] | None = None,
358
config: Config | None = None,
359
context: Context | None = None,
360
checkpoint_during: bool | None = None,
361
interrupt_before: All | list[str] | None = None,
362
interrupt_after: All | list[str] | None = None,
363
webhook: str | None = None,
364
webhook_mode: str | None = None,
365
headers: Mapping[str, str] | None = None,
366
params: QueryParamTypes | None = None,
367
) -> Cron: ...
368
369
async def search(
370
*,
371
assistant_id: str | None = None,
372
thread_id: str | None = None,
373
limit: int = 10,
374
offset: int = 0,
375
sort_by: CronSortBy | None = None,
376
sort_order: SortOrder | None = None,
377
select: list[CronSelectField] | None = None,
378
headers: Mapping[str, str] | None = None,
379
params: QueryParamTypes | None = None,
380
) -> list[Cron]: ...
381
382
async def delete(
383
cron_id: str,
384
*,
385
headers: Mapping[str, str] | None = None,
386
params: QueryParamTypes | None = None,
387
) -> None: ...
388
```
389
390
[Scheduled Tasks](./scheduled-tasks.md)
391
392
### Persistent Storage
393
394
Cross-thread persistent memory system for storing and retrieving documents, configuration, and application data with namespacing, search capabilities, and flexible data organization.
395
396
```python { .api }
397
from collections.abc import Mapping, Sequence
398
from typing import Any, Literal
399
from langgraph_sdk.schema import (
400
Item, SearchItemsResponse, ListNamespaceResponse, QueryParamTypes
401
)
402
403
# Via client.store
404
async def put_item(
405
namespace: Sequence[str],
406
/,
407
key: str,
408
value: Mapping[str, Any],
409
index: Literal[False] | list[str] | None = None,
410
ttl: int | None = None,
411
headers: Mapping[str, str] | None = None,
412
params: QueryParamTypes | None = None,
413
) -> None: ...
414
415
async def get_item(
416
namespace: Sequence[str],
417
/,
418
key: str,
419
*,
420
refresh_ttl: bool | None = None,
421
headers: Mapping[str, str] | None = None,
422
params: QueryParamTypes | None = None,
423
) -> Item: ...
424
425
async def search_items(
426
namespace_prefix: Sequence[str],
427
/,
428
filter: Mapping[str, Any] | None = None,
429
limit: int = 10,
430
offset: int = 0,
431
query: str | None = None,
432
refresh_ttl: bool | None = None,
433
headers: Mapping[str, str] | None = None,
434
params: QueryParamTypes | None = None,
435
) -> SearchItemsResponse: ...
436
437
async def delete_item(
438
namespace: Sequence[str],
439
/,
440
key: str,
441
headers: Mapping[str, str] | None = None,
442
params: QueryParamTypes | None = None,
443
) -> None: ...
444
445
async def list_namespaces(
446
*,
447
prefix: Sequence[str] | None = None,
448
suffix: Sequence[str] | None = None,
449
limit: int = 100,
450
offset: int = 0,
451
headers: Mapping[str, str] | None = None,
452
params: QueryParamTypes | None = None,
453
) -> ListNamespaceResponse: ...
454
```
455
456
[Persistent Storage](./persistent-storage.md)
457
458
### Authentication & Authorization
459
460
Comprehensive authentication and authorization system supporting custom authentication handlers, fine-grained authorization rules, and flexible security policies for all resources and actions.
461
462
```python { .api }
463
from typing import Callable, TypeVar
464
from collections.abc import Sequence
465
from langgraph_sdk.auth import types, exceptions
466
467
TH = TypeVar("TH", bound=types.Handler)
468
AH = TypeVar("AH", bound=types.Authenticator)
469
470
class Auth:
471
types = types
472
exceptions = exceptions
473
474
def __init__(self) -> None:
475
self.on: _On = ... # Authorization handlers
476
477
def authenticate(self, fn: AH) -> AH: ...
478
479
# Authorization context classes
480
class _On:
481
assistants: _AssistantsOn
482
threads: _ThreadsOn
483
crons: _CronsOn
484
store: _StoreOn
485
value: type[dict[str, Any]]
486
487
def __call__(
488
self,
489
fn: Callable | None = None,
490
*,
491
resources: str | Sequence[str] | None = None,
492
actions: str | Sequence[str] | None = None,
493
) -> Callable: ...
494
```
495
496
[Authentication & Authorization](./authentication.md)
497
498
## Core Types
499
500
```python { .api }
501
# Client Types
502
class LangGraphClient:
503
assistants: AssistantsClient
504
threads: ThreadsClient
505
runs: RunsClient
506
crons: CronClient
507
store: StoreClient
508
async def aclose(self) -> None: ...
509
510
class SyncLangGraphClient:
511
assistants: SyncAssistantsClient
512
threads: SyncThreadsClient
513
runs: SyncRunsClient
514
crons: SyncCronClient
515
store: SyncStoreClient
516
def close(self) -> None: ...
517
518
# Core Data Models
519
class Assistant(TypedDict):
520
assistant_id: str
521
graph_id: str
522
config: Config
523
created_at: str
524
updated_at: str
525
metadata: dict
526
527
class Thread(TypedDict):
528
thread_id: str
529
created_at: str
530
updated_at: str
531
metadata: dict
532
status: ThreadStatus
533
534
class Run(TypedDict):
535
run_id: str
536
thread_id: str
537
assistant_id: str
538
created_at: str
539
updated_at: str
540
status: RunStatus
541
kwargs: dict
542
543
class Cron(TypedDict):
544
cron_id: str
545
thread_id: str
546
assistant_id: str
547
schedule: str
548
timezone: str
549
created_at: str
550
551
class Item(TypedDict):
552
namespace: list[str]
553
key: str
554
value: dict
555
created_at: str
556
updated_at: str
557
558
# Status Types
559
RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]
560
ThreadStatus = Literal["idle", "busy", "interrupted", "error"]
561
StreamMode = Literal["values", "messages", "updates", "events", "tasks", "checkpoints", "debug"]
562
563
# Configuration Types
564
class Config(TypedDict, total=False):
565
tags: list[str]
566
recursion_limit: int
567
configurable: dict
568
```