pypi-langgraph-sdk

Description
Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-langgraph-sdk@0.2.0

thread-management.md docs/

1
# Thread Management
2
3
Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.
4
5
## Capabilities
6
7
### Thread CRUD Operations
8
9
Core operations for creating, reading, updating, and deleting conversation threads with metadata and configuration support.
10
11
```python { .api }
12
from collections.abc import Mapping, Sequence
13
from typing import Any
14
from langgraph_sdk.schema import (
15
Thread, ThreadSelectField, Json, OnConflictBehavior, QueryParamTypes
16
)
17
18
# Via client.threads
19
async def create(
20
*,
21
metadata: Json = None,
22
thread_id: str | None = None,
23
if_exists: OnConflictBehavior | None = None,
24
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
25
graph_id: str | None = None,
26
ttl: int | Mapping[str, Any] | None = None,
27
headers: Mapping[str, str] | None = None,
28
params: QueryParamTypes | None = None,
29
) -> Thread:
30
"""
31
Create a thread.
32
33
Args:
34
metadata: Metadata to merge with the thread.
35
thread_id: ID of the thread. If None, ID will be a UUID.
36
if_exists: How to handle duplicate creates. Defaults to "create".
37
supersteps: Optional supersteps to apply to the thread.
38
graph_id: Optional graph_id to apply to the thread.
39
ttl: Metadata key-value pairs to be used to configure the thread's time-to-live.
40
headers: Optional custom headers to include with the request.
41
params: Optional query parameters to include with the request.
42
43
Returns:
44
Thread: The created thread.
45
"""
46
47
async def get(
48
thread_id: str,
49
*,
50
select: list[ThreadSelectField] | None = None,
51
headers: Mapping[str, str] | None = None,
52
params: QueryParamTypes | None = None,
53
) -> Thread:
54
"""
55
Get a thread by ID.
56
57
Args:
58
thread_id: The ID of the thread to get.
59
select: Fields to include in the response.
60
headers: Optional custom headers to include with the request.
61
params: Optional query parameters to include with the request.
62
63
Returns:
64
Thread: Thread object.
65
"""
66
67
async def update(
68
thread_id: str,
69
*,
70
metadata: Mapping[str, Any],
71
ttl: int | Mapping[str, Any] | None = None,
72
headers: Mapping[str, str] | None = None,
73
params: QueryParamTypes | None = None,
74
) -> Thread:
75
"""
76
Update a thread.
77
78
Args:
79
thread_id: ID of the thread to update.
80
metadata: Metadata to merge with the thread.
81
ttl: Time-to-live configuration for the thread.
82
headers: Optional custom headers to include with the request.
83
params: Optional query parameters to include with the request.
84
85
Returns:
86
Thread: The updated thread.
87
"""
88
89
async def delete(
90
thread_id: str,
91
*,
92
headers: Mapping[str, str] | None = None,
93
params: QueryParamTypes | None = None,
94
) -> None:
95
"""
96
Delete a thread.
97
98
Args:
99
thread_id: The ID of the thread to delete.
100
headers: Optional custom headers to include with the request.
101
params: Optional query parameters to include with the request.
102
"""
103
```
104
105
### Thread Discovery & Search
106
107
Search and enumerate threads with filtering, pagination, and sorting capabilities.
108
109
```python { .api }
110
from langgraph_sdk.schema import (
111
Thread, ThreadSelectField, ThreadSortBy, ThreadStatus, SortOrder,
112
Json, QueryParamTypes
113
)
114
115
async def search(
116
*,
117
metadata: Json = None,
118
values: Json = None,
119
ids: Sequence[str] | None = None,
120
status: ThreadStatus | None = None,
121
limit: int = 10,
122
offset: int = 0,
123
sort_by: ThreadSortBy | None = None,
124
sort_order: SortOrder | None = None,
125
select: list[ThreadSelectField] | None = None,
126
headers: Mapping[str, str] | None = None,
127
params: QueryParamTypes | None = None,
128
) -> list[Thread]:
129
"""
130
Search for threads.
131
132
Args:
133
metadata: Metadata to filter by. Exact match filter for each KV pair.
134
values: Values to filter by. Exact match filter for each KV pair.
135
ids: Thread IDs to filter by.
136
status: Status to filter by.
137
limit: Limit the number of threads to return.
138
offset: Offset to start from.
139
sort_by: Field to sort by.
140
sort_order: Order to sort by.
141
select: Fields to include in the response.
142
headers: Optional custom headers to include with the request.
143
params: Optional query parameters to include with the request.
144
145
Returns:
146
list[Thread]: List of threads matching the query.
147
"""
148
149
async def count(
150
*,
151
metadata: Json = None,
152
values: Json = None,
153
status: ThreadStatus | None = None,
154
headers: Mapping[str, str] | None = None,
155
params: QueryParamTypes | None = None,
156
) -> int:
157
"""
158
Count threads.
159
160
Args:
161
metadata: Metadata to filter by. Exact match filter for each KV pair.
162
values: Values to filter by. Exact match filter for each KV pair.
163
status: Status to filter by.
164
headers: Optional custom headers to include with the request.
165
params: Optional query parameters to include with the request.
166
167
Returns:
168
int: Number of threads that match the query.
169
"""
170
```
171
172
### Thread Operations
173
174
Advanced thread operations including copying, state management, and history tracking.
175
176
```python { .api }
177
async def copy(
178
thread_id: str,
179
*,
180
headers: Mapping[str, str] | None = None,
181
params: QueryParamTypes | None = None,
182
) -> None:
183
"""
184
Copy a thread.
185
186
Args:
187
thread_id: The ID of the thread to copy.
188
headers: Optional custom headers to include with the request.
189
params: Optional query parameters to include with the request.
190
"""
191
```
192
193
### State Management
194
195
Inspect and manipulate thread state, including current values, checkpoints, and state updates.
196
197
```python { .api }
198
from langgraph_sdk.schema import (
199
ThreadState, ThreadUpdateStateResponse, Checkpoint, QueryParamTypes
200
)
201
202
async def get_state(
203
thread_id: str,
204
*,
205
checkpoint: Checkpoint | None = None,
206
checkpoint_id: str | None = None, # deprecated
207
subgraphs: bool = False,
208
headers: Mapping[str, str] | None = None,
209
params: QueryParamTypes | None = None,
210
) -> ThreadState:
211
"""
212
Get state for a thread.
213
214
Args:
215
thread_id: The ID of the thread to get the state for.
216
checkpoint: The checkpoint to get the state for.
217
checkpoint_id: Checkpoint to get the state for. Deprecated, use checkpoint instead.
218
subgraphs: Whether to include subgraphs.
219
headers: Optional custom headers to include with the request.
220
params: Optional query parameters to include with the request.
221
222
Returns:
223
ThreadState: The thread of the state.
224
"""
225
226
async def update_state(
227
thread_id: str,
228
values: dict[str, Any] | Sequence[dict] | None,
229
*,
230
as_node: str | None = None,
231
checkpoint: Checkpoint | None = None,
232
checkpoint_id: str | None = None, # deprecated
233
headers: Mapping[str, str] | None = None,
234
params: QueryParamTypes | None = None,
235
) -> ThreadUpdateStateResponse:
236
"""
237
Update state for a thread.
238
239
Args:
240
thread_id: The ID of the thread to update.
241
values: The values to update the state with.
242
as_node: The node to update the state as.
243
checkpoint: The checkpoint to update the state for.
244
checkpoint_id: Checkpoint to update the state for. Deprecated, use checkpoint instead.
245
headers: Optional custom headers to include with the request.
246
params: Optional query parameters to include with the request.
247
248
Returns:
249
ThreadUpdateStateResponse: The response from updating the thread state.
250
"""
251
252
async def get_history(
253
thread_id: str,
254
*,
255
limit: int = 10,
256
before: str | Checkpoint | None = None,
257
metadata: Mapping[str, Any] | None = None,
258
checkpoint: Checkpoint | None = None,
259
headers: Mapping[str, str] | None = None,
260
params: QueryParamTypes | None = None,
261
) -> list[ThreadState]:
262
"""
263
Get the state history of a thread.
264
265
Args:
266
thread_id: The ID of the thread to get the state history for.
267
checkpoint: Return states for this subgraph. If empty defaults to root.
268
before: Return states before this checkpoint.
269
limit: The number of states to return.
270
metadata: Metadata to filter by.
271
headers: Optional custom headers to include with the request.
272
params: Optional query parameters to include with the request.
273
274
Returns:
275
list[ThreadState]: The state history for the thread.
276
"""
277
```
278
279
### Thread Streaming
280
281
Real-time streaming of thread events and state changes.
282
283
```python { .api }
284
from collections.abc import AsyncIterator, Sequence
285
from langgraph_sdk.schema import StreamPart, ThreadStreamMode, QueryParamTypes
286
287
async def join_stream(
288
thread_id: str,
289
*,
290
last_event_id: str | None = None,
291
stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",
292
headers: Mapping[str, str] | None = None,
293
params: QueryParamTypes | None = None,
294
) -> AsyncIterator[StreamPart]:
295
"""
296
Get a stream of events for a thread.
297
298
Args:
299
thread_id: The ID of the thread to get the stream for.
300
last_event_id: The ID of the last event to get.
301
stream_mode: The mode of the stream.
302
headers: Optional custom headers to include with the request.
303
params: Optional query parameters to include with the request.
304
305
Returns:
306
AsyncIterator[StreamPart]: A stream of events for the thread.
307
"""
308
```
309
310
## Types
311
312
```python { .api }
313
class Thread(TypedDict):
314
"""Thread definition and status."""
315
thread_id: str
316
created_at: str
317
updated_at: str
318
metadata: dict
319
status: ThreadStatus
320
config: Config
321
322
class ThreadState(TypedDict):
323
"""Thread execution state."""
324
values: dict
325
next: list[str]
326
checkpoint: Checkpoint
327
metadata: dict
328
created_at: str
329
parent_checkpoint: Checkpoint
330
331
class ThreadUpdateStateResponse(TypedDict):
332
"""Response from state update operation."""
333
checkpoint: Checkpoint
334
status: str
335
336
class Checkpoint(TypedDict):
337
"""Execution checkpoint reference."""
338
thread_id: str
339
checkpoint_ns: str
340
checkpoint_id: str
341
342
ThreadStatus = Literal["idle", "busy", "interrupted", "error"]
343
344
ThreadStreamMode = Literal["run_modes", "lifecycle", "state_update"]
345
346
ThreadSelectField = Literal[
347
"thread_id", "created_at", "updated_at", "metadata", "status", "config"
348
]
349
350
ThreadSortBy = Literal["created_at", "updated_at", "thread_id"]
351
```
352
353
## Usage Examples
354
355
### Creating and Managing Threads
356
357
```python
358
# Create a new thread with metadata
359
thread = await client.threads.create(
360
metadata={"user_id": "user-123", "session_type": "chat"}
361
)
362
363
# Get thread details
364
thread = await client.threads.get("thread-456")
365
366
# Update thread metadata
367
updated = await client.threads.update(
368
"thread-456",
369
metadata={"last_interaction": "2023-12-01T10:30:00Z"}
370
)
371
```
372
373
### Thread Discovery
374
375
```python
376
# Search threads by user
377
user_threads = await client.threads.search(
378
metadata={"user_id": "user-123"},
379
status="idle",
380
limit=20
381
)
382
383
# Get active threads
384
active_threads = await client.threads.search(
385
status="busy",
386
sort_by="updated_at"
387
)
388
389
# Count total threads
390
total = await client.threads.count()
391
```
392
393
### State Management
394
395
```python
396
# Get current thread state
397
state = await client.threads.get_state("thread-456")
398
print(f"Current state: {state['values']}")
399
400
# Update thread state
401
response = await client.threads.update_state(
402
"thread-456",
403
values={"user_preferences": {"theme": "dark"}},
404
as_node="preference_manager"
405
)
406
407
# Get thread execution history
408
history = await client.threads.get_history("thread-456", limit=50)
409
for checkpoint in history:
410
print(f"Checkpoint {checkpoint['checkpoint']['checkpoint_id']}: {checkpoint['values']}")
411
```
412
413
### Thread Operations
414
415
```python
416
# Copy a thread
417
copied = await client.threads.copy(
418
"thread-456",
419
metadata={"source_thread": "thread-456", "copy_reason": "backup"}
420
)
421
422
# Stream thread events
423
async for event in client.threads.join_stream("thread-456", mode="state_update"):
424
print(f"Thread event: {event}")
425
```
426
427
### Thread Lifecycle
428
429
```python
430
# Create, use, and cleanup
431
thread = await client.threads.create(metadata={"purpose": "temp-calculation"})
432
433
# ... perform operations ...
434
435
# Delete when done
436
await client.threads.delete(thread["thread_id"])
437
```