0
# Thread Management
1
2
Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.
3
4
## Capabilities
5
6
### Thread CRUD Operations
7
8
Core operations for creating, reading, updating, and deleting conversation threads with metadata and configuration support.
9
10
```python { .api }
11
from collections.abc import Mapping, Sequence
12
from typing import Any
13
from langgraph_sdk.schema import (
14
Thread, ThreadSelectField, Json, OnConflictBehavior, QueryParamTypes
15
)
16
17
# Via client.threads
18
async def create(
19
*,
20
metadata: Json = None,
21
thread_id: str | None = None,
22
if_exists: OnConflictBehavior | None = None,
23
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
24
graph_id: str | None = None,
25
ttl: int | Mapping[str, Any] | None = None,
26
headers: Mapping[str, str] | None = None,
27
params: QueryParamTypes | None = None,
28
) -> Thread:
29
"""
30
Create a thread.
31
32
Args:
33
metadata: Metadata to merge with the thread.
34
thread_id: ID of the thread. If None, ID will be a UUID.
35
if_exists: How to handle duplicate creates. Defaults to "create".
36
supersteps: Optional supersteps to apply to the thread.
37
graph_id: Optional graph_id to apply to the thread.
38
ttl: Metadata key-value pairs to be used to configure the thread's time-to-live.
39
headers: Optional custom headers to include with the request.
40
params: Optional query parameters to include with the request.
41
42
Returns:
43
Thread: The created thread.
44
"""
45
46
async def get(
47
thread_id: str,
48
*,
49
select: list[ThreadSelectField] | None = None,
50
headers: Mapping[str, str] | None = None,
51
params: QueryParamTypes | None = None,
52
) -> Thread:
53
"""
54
Get a thread by ID.
55
56
Args:
57
thread_id: The ID of the thread to get.
58
select: Fields to include in the response.
59
headers: Optional custom headers to include with the request.
60
params: Optional query parameters to include with the request.
61
62
Returns:
63
Thread: Thread object.
64
"""
65
66
async def update(
67
thread_id: str,
68
*,
69
metadata: Mapping[str, Any],
70
ttl: int | Mapping[str, Any] | None = None,
71
headers: Mapping[str, str] | None = None,
72
params: QueryParamTypes | None = None,
73
) -> Thread:
74
"""
75
Update a thread.
76
77
Args:
78
thread_id: ID of the thread to update.
79
metadata: Metadata to merge with the thread.
80
ttl: Time-to-live configuration for the thread.
81
headers: Optional custom headers to include with the request.
82
params: Optional query parameters to include with the request.
83
84
Returns:
85
Thread: The updated thread.
86
"""
87
88
async def delete(
89
thread_id: str,
90
*,
91
headers: Mapping[str, str] | None = None,
92
params: QueryParamTypes | None = None,
93
) -> None:
94
"""
95
Delete a thread.
96
97
Args:
98
thread_id: The ID of the thread to delete.
99
headers: Optional custom headers to include with the request.
100
params: Optional query parameters to include with the request.
101
"""
102
```
103
104
### Thread Discovery & Search
105
106
Search and enumerate threads with filtering, pagination, and sorting capabilities.
107
108
```python { .api }
109
from langgraph_sdk.schema import (
110
Thread, ThreadSelectField, ThreadSortBy, ThreadStatus, SortOrder,
111
Json, QueryParamTypes
112
)
113
114
async def search(
115
*,
116
metadata: Json = None,
117
values: Json = None,
118
ids: Sequence[str] | None = None,
119
status: ThreadStatus | None = None,
120
limit: int = 10,
121
offset: int = 0,
122
sort_by: ThreadSortBy | None = None,
123
sort_order: SortOrder | None = None,
124
select: list[ThreadSelectField] | None = None,
125
headers: Mapping[str, str] | None = None,
126
params: QueryParamTypes | None = None,
127
) -> list[Thread]:
128
"""
129
Search for threads.
130
131
Args:
132
metadata: Metadata to filter by. Exact match filter for each KV pair.
133
values: Values to filter by. Exact match filter for each KV pair.
134
ids: Thread IDs to filter by.
135
status: Status to filter by.
136
limit: Limit the number of threads to return.
137
offset: Offset to start from.
138
sort_by: Field to sort by.
139
sort_order: Order to sort by.
140
select: Fields to include in the response.
141
headers: Optional custom headers to include with the request.
142
params: Optional query parameters to include with the request.
143
144
Returns:
145
list[Thread]: List of threads matching the query.
146
"""
147
148
async def count(
149
*,
150
metadata: Json = None,
151
values: Json = None,
152
status: ThreadStatus | None = None,
153
headers: Mapping[str, str] | None = None,
154
params: QueryParamTypes | None = None,
155
) -> int:
156
"""
157
Count threads.
158
159
Args:
160
metadata: Metadata to filter by. Exact match filter for each KV pair.
161
values: Values to filter by. Exact match filter for each KV pair.
162
status: Status to filter by.
163
headers: Optional custom headers to include with the request.
164
params: Optional query parameters to include with the request.
165
166
Returns:
167
int: Number of threads that match the query.
168
"""
169
```
170
171
### Thread Operations
172
173
Advanced thread operations including copying, state management, and history tracking.
174
175
```python { .api }
176
async def copy(
177
thread_id: str,
178
*,
179
headers: Mapping[str, str] | None = None,
180
params: QueryParamTypes | None = None,
181
) -> None:
182
"""
183
Copy a thread.
184
185
Args:
186
thread_id: The ID of the thread to copy.
187
headers: Optional custom headers to include with the request.
188
params: Optional query parameters to include with the request.
189
"""
190
```
191
192
### State Management
193
194
Inspect and manipulate thread state, including current values, checkpoints, and state updates.
195
196
```python { .api }
197
from langgraph_sdk.schema import (
198
ThreadState, ThreadUpdateStateResponse, Checkpoint, QueryParamTypes
199
)
200
201
async def get_state(
202
thread_id: str,
203
*,
204
checkpoint: Checkpoint | None = None,
205
checkpoint_id: str | None = None, # deprecated
206
subgraphs: bool = False,
207
headers: Mapping[str, str] | None = None,
208
params: QueryParamTypes | None = None,
209
) -> ThreadState:
210
"""
211
Get state for a thread.
212
213
Args:
214
thread_id: The ID of the thread to get the state for.
215
checkpoint: The checkpoint to get the state for.
216
checkpoint_id: Checkpoint to get the state for. Deprecated, use checkpoint instead.
217
subgraphs: Whether to include subgraphs.
218
headers: Optional custom headers to include with the request.
219
params: Optional query parameters to include with the request.
220
221
Returns:
222
ThreadState: The thread of the state.
223
"""
224
225
async def update_state(
226
thread_id: str,
227
values: dict[str, Any] | Sequence[dict] | None,
228
*,
229
as_node: str | None = None,
230
checkpoint: Checkpoint | None = None,
231
checkpoint_id: str | None = None, # deprecated
232
headers: Mapping[str, str] | None = None,
233
params: QueryParamTypes | None = None,
234
) -> ThreadUpdateStateResponse:
235
"""
236
Update state for a thread.
237
238
Args:
239
thread_id: The ID of the thread to update.
240
values: The values to update the state with.
241
as_node: The node to update the state as.
242
checkpoint: The checkpoint to update the state for.
243
checkpoint_id: Checkpoint to update the state for. Deprecated, use checkpoint instead.
244
headers: Optional custom headers to include with the request.
245
params: Optional query parameters to include with the request.
246
247
Returns:
248
ThreadUpdateStateResponse: The response from updating the thread state.
249
"""
250
251
async def get_history(
252
thread_id: str,
253
*,
254
limit: int = 10,
255
before: str | Checkpoint | None = None,
256
metadata: Mapping[str, Any] | None = None,
257
checkpoint: Checkpoint | None = None,
258
headers: Mapping[str, str] | None = None,
259
params: QueryParamTypes | None = None,
260
) -> list[ThreadState]:
261
"""
262
Get the state history of a thread.
263
264
Args:
265
thread_id: The ID of the thread to get the state history for.
266
checkpoint: Return states for this subgraph. If empty defaults to root.
267
before: Return states before this checkpoint.
268
limit: The number of states to return.
269
metadata: Metadata to filter by.
270
headers: Optional custom headers to include with the request.
271
params: Optional query parameters to include with the request.
272
273
Returns:
274
list[ThreadState]: The state history for the thread.
275
"""
276
```
277
278
### Thread Streaming
279
280
Real-time streaming of thread events and state changes.
281
282
```python { .api }
283
from collections.abc import AsyncIterator, Sequence
284
from langgraph_sdk.schema import StreamPart, ThreadStreamMode, QueryParamTypes
285
286
async def join_stream(
287
thread_id: str,
288
*,
289
last_event_id: str | None = None,
290
stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",
291
headers: Mapping[str, str] | None = None,
292
params: QueryParamTypes | None = None,
293
) -> AsyncIterator[StreamPart]:
294
"""
295
Get a stream of events for a thread.
296
297
Args:
298
thread_id: The ID of the thread to get the stream for.
299
last_event_id: The ID of the last event to get.
300
stream_mode: The mode of the stream.
301
headers: Optional custom headers to include with the request.
302
params: Optional query parameters to include with the request.
303
304
Returns:
305
AsyncIterator[StreamPart]: A stream of events for the thread.
306
"""
307
```
308
309
## Types
310
311
```python { .api }
312
class Thread(TypedDict):
313
"""Thread definition and status."""
314
thread_id: str
315
created_at: str
316
updated_at: str
317
metadata: dict
318
status: ThreadStatus
319
config: Config
320
321
class ThreadState(TypedDict):
322
"""Thread execution state."""
323
values: dict
324
next: list[str]
325
checkpoint: Checkpoint
326
metadata: dict
327
created_at: str
328
parent_checkpoint: Checkpoint
329
330
class ThreadUpdateStateResponse(TypedDict):
331
"""Response from state update operation."""
332
checkpoint: Checkpoint
333
status: str
334
335
class Checkpoint(TypedDict):
336
"""Execution checkpoint reference."""
337
thread_id: str
338
checkpoint_ns: str
339
checkpoint_id: str
340
341
ThreadStatus = Literal["idle", "busy", "interrupted", "error"]
342
343
ThreadStreamMode = Literal["run_modes", "lifecycle", "state_update"]
344
345
ThreadSelectField = Literal[
346
"thread_id", "created_at", "updated_at", "metadata", "status", "config"
347
]
348
349
ThreadSortBy = Literal["created_at", "updated_at", "thread_id"]
350
```
351
352
## Usage Examples
353
354
### Creating and Managing Threads
355
356
```python
357
# Create a new thread with metadata
358
thread = await client.threads.create(
359
metadata={"user_id": "user-123", "session_type": "chat"}
360
)
361
362
# Get thread details
363
thread = await client.threads.get("thread-456")
364
365
# Update thread metadata
366
updated = await client.threads.update(
367
"thread-456",
368
metadata={"last_interaction": "2023-12-01T10:30:00Z"}
369
)
370
```
371
372
### Thread Discovery
373
374
```python
375
# Search threads by user
376
user_threads = await client.threads.search(
377
metadata={"user_id": "user-123"},
378
status="idle",
379
limit=20
380
)
381
382
# Get active threads
383
active_threads = await client.threads.search(
384
status="busy",
385
sort_by="updated_at"
386
)
387
388
# Count total threads
389
total = await client.threads.count()
390
```
391
392
### State Management
393
394
```python
395
# Get current thread state
396
state = await client.threads.get_state("thread-456")
397
print(f"Current state: {state['values']}")
398
399
# Update thread state
400
response = await client.threads.update_state(
401
"thread-456",
402
values={"user_preferences": {"theme": "dark"}},
403
as_node="preference_manager"
404
)
405
406
# Get thread execution history
407
history = await client.threads.get_history("thread-456", limit=50)
408
for checkpoint in history:
409
print(f"Checkpoint {checkpoint['checkpoint']['checkpoint_id']}: {checkpoint['values']}")
410
```
411
412
### Thread Operations
413
414
```python
415
# Copy a thread
416
copied = await client.threads.copy(
417
"thread-456",
418
metadata={"source_thread": "thread-456", "copy_reason": "backup"}
419
)
420
421
# Stream thread events
422
async for event in client.threads.join_stream("thread-456", mode="state_update"):
423
print(f"Thread event: {event}")
424
```
425
426
### Thread Lifecycle
427
428
```python
429
# Create, use, and cleanup
430
thread = await client.threads.create(metadata={"purpose": "temp-calculation"})
431
432
# ... perform operations ...
433
434
# Delete when done
435
await client.threads.delete(thread["thread_id"])
436
```