or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assistant-management.mdauthentication.mdclient-management.mdindex.mdpersistent-storage.mdrun-execution.mdscheduled-tasks.mdthread-management.md

thread-management.mddocs/

0

# Thread Management

1

2

Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.

3

4

## Capabilities

5

6

### Thread CRUD Operations

7

8

Core operations for creating, reading, updating, and deleting conversation threads with metadata and configuration support.

9

10

```python { .api }

11

from collections.abc import Mapping, Sequence

12

from typing import Any

13

from langgraph_sdk.schema import (

14

Thread, ThreadSelectField, Json, OnConflictBehavior, QueryParamTypes

15

)

16

17

# Via client.threads

18

async def create(

19

*,

20

metadata: Json = None,

21

thread_id: str | None = None,

22

if_exists: OnConflictBehavior | None = None,

23

supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,

24

graph_id: str | None = None,

25

ttl: int | Mapping[str, Any] | None = None,

26

headers: Mapping[str, str] | None = None,

27

params: QueryParamTypes | None = None,

28

) -> Thread:

29

"""

30

Create a thread.

31

32

Args:

33

metadata: Metadata to merge with the thread.

34

thread_id: ID of the thread. If None, ID will be a UUID.

35

if_exists: How to handle duplicate creates. Defaults to "create".

36

supersteps: Optional supersteps to apply to the thread.

37

graph_id: Optional graph_id to apply to the thread.

38

ttl: Metadata key-value pairs to be used to configure the thread's time-to-live.

39

headers: Optional custom headers to include with the request.

40

params: Optional query parameters to include with the request.

41

42

Returns:

43

Thread: The created thread.

44

"""

45

46

async def get(

47

thread_id: str,

48

*,

49

select: list[ThreadSelectField] | None = None,

50

headers: Mapping[str, str] | None = None,

51

params: QueryParamTypes | None = None,

52

) -> Thread:

53

"""

54

Get a thread by ID.

55

56

Args:

57

thread_id: The ID of the thread to get.

58

select: Fields to include in the response.

59

headers: Optional custom headers to include with the request.

60

params: Optional query parameters to include with the request.

61

62

Returns:

63

Thread: Thread object.

64

"""

65

66

async def update(

67

thread_id: str,

68

*,

69

metadata: Mapping[str, Any],

70

ttl: int | Mapping[str, Any] | None = None,

71

headers: Mapping[str, str] | None = None,

72

params: QueryParamTypes | None = None,

73

) -> Thread:

74

"""

75

Update a thread.

76

77

Args:

78

thread_id: ID of the thread to update.

79

metadata: Metadata to merge with the thread.

80

ttl: Time-to-live configuration for the thread.

81

headers: Optional custom headers to include with the request.

82

params: Optional query parameters to include with the request.

83

84

Returns:

85

Thread: The updated thread.

86

"""

87

88

async def delete(

89

thread_id: str,

90

*,

91

headers: Mapping[str, str] | None = None,

92

params: QueryParamTypes | None = None,

93

) -> None:

94

"""

95

Delete a thread.

96

97

Args:

98

thread_id: The ID of the thread to delete.

99

headers: Optional custom headers to include with the request.

100

params: Optional query parameters to include with the request.

101

"""

102

```

103

104

### Thread Discovery & Search

105

106

Search and enumerate threads with filtering, pagination, and sorting capabilities.

107

108

```python { .api }

109

from langgraph_sdk.schema import (

110

Thread, ThreadSelectField, ThreadSortBy, ThreadStatus, SortOrder,

111

Json, QueryParamTypes

112

)

113

114

async def search(

115

*,

116

metadata: Json = None,

117

values: Json = None,

118

ids: Sequence[str] | None = None,

119

status: ThreadStatus | None = None,

120

limit: int = 10,

121

offset: int = 0,

122

sort_by: ThreadSortBy | None = None,

123

sort_order: SortOrder | None = None,

124

select: list[ThreadSelectField] | None = None,

125

headers: Mapping[str, str] | None = None,

126

params: QueryParamTypes | None = None,

127

) -> list[Thread]:

128

"""

129

Search for threads.

130

131

Args:

132

metadata: Metadata to filter by. Exact match filter for each KV pair.

133

values: Values to filter by. Exact match filter for each KV pair.

134

ids: Thread IDs to filter by.

135

status: Status to filter by.

136

limit: Limit the number of threads to return.

137

offset: Offset to start from.

138

sort_by: Field to sort by.

139

sort_order: Order to sort by.

140

select: Fields to include in the response.

141

headers: Optional custom headers to include with the request.

142

params: Optional query parameters to include with the request.

143

144

Returns:

145

list[Thread]: List of threads matching the query.

146

"""

147

148

async def count(

149

*,

150

metadata: Json = None,

151

values: Json = None,

152

status: ThreadStatus | None = None,

153

headers: Mapping[str, str] | None = None,

154

params: QueryParamTypes | None = None,

155

) -> int:

156

"""

157

Count threads.

158

159

Args:

160

metadata: Metadata to filter by. Exact match filter for each KV pair.

161

values: Values to filter by. Exact match filter for each KV pair.

162

status: Status to filter by.

163

headers: Optional custom headers to include with the request.

164

params: Optional query parameters to include with the request.

165

166

Returns:

167

int: Number of threads that match the query.

168

"""

169

```

170

171

### Thread Operations

172

173

Advanced thread operations including copying, state management, and history tracking.

174

175

```python { .api }

176

async def copy(

177

thread_id: str,

178

*,

179

headers: Mapping[str, str] | None = None,

180

params: QueryParamTypes | None = None,

181

) -> None:

182

"""

183

Copy a thread.

184

185

Args:

186

thread_id: The ID of the thread to copy.

187

headers: Optional custom headers to include with the request.

188

params: Optional query parameters to include with the request.

189

"""

190

```

191

192

### State Management

193

194

Inspect and manipulate thread state, including current values, checkpoints, and state updates.

195

196

```python { .api }

197

from langgraph_sdk.schema import (

198

ThreadState, ThreadUpdateStateResponse, Checkpoint, QueryParamTypes

199

)

200

201

async def get_state(

202

thread_id: str,

203

*,

204

checkpoint: Checkpoint | None = None,

205

checkpoint_id: str | None = None, # deprecated

206

subgraphs: bool = False,

207

headers: Mapping[str, str] | None = None,

208

params: QueryParamTypes | None = None,

209

) -> ThreadState:

210

"""

211

Get state for a thread.

212

213

Args:

214

thread_id: The ID of the thread to get the state for.

215

checkpoint: The checkpoint to get the state for.

216

checkpoint_id: Checkpoint to get the state for. Deprecated, use checkpoint instead.

217

subgraphs: Whether to include subgraphs.

218

headers: Optional custom headers to include with the request.

219

params: Optional query parameters to include with the request.

220

221

Returns:

222

ThreadState: The thread of the state.

223

"""

224

225

async def update_state(

226

thread_id: str,

227

values: dict[str, Any] | Sequence[dict] | None,

228

*,

229

as_node: str | None = None,

230

checkpoint: Checkpoint | None = None,

231

checkpoint_id: str | None = None, # deprecated

232

headers: Mapping[str, str] | None = None,

233

params: QueryParamTypes | None = None,

234

) -> ThreadUpdateStateResponse:

235

"""

236

Update state for a thread.

237

238

Args:

239

thread_id: The ID of the thread to update.

240

values: The values to update the state with.

241

as_node: The node to update the state as.

242

checkpoint: The checkpoint to update the state for.

243

checkpoint_id: Checkpoint to update the state for. Deprecated, use checkpoint instead.

244

headers: Optional custom headers to include with the request.

245

params: Optional query parameters to include with the request.

246

247

Returns:

248

ThreadUpdateStateResponse: The response from updating the thread state.

249

"""

250

251

async def get_history(

252

thread_id: str,

253

*,

254

limit: int = 10,

255

before: str | Checkpoint | None = None,

256

metadata: Mapping[str, Any] | None = None,

257

checkpoint: Checkpoint | None = None,

258

headers: Mapping[str, str] | None = None,

259

params: QueryParamTypes | None = None,

260

) -> list[ThreadState]:

261

"""

262

Get the state history of a thread.

263

264

Args:

265

thread_id: The ID of the thread to get the state history for.

266

checkpoint: Return states for this subgraph. If empty defaults to root.

267

before: Return states before this checkpoint.

268

limit: The number of states to return.

269

metadata: Metadata to filter by.

270

headers: Optional custom headers to include with the request.

271

params: Optional query parameters to include with the request.

272

273

Returns:

274

list[ThreadState]: The state history for the thread.

275

"""

276

```

277

278

### Thread Streaming

279

280

Real-time streaming of thread events and state changes.

281

282

```python { .api }

283

from collections.abc import AsyncIterator, Sequence

284

from langgraph_sdk.schema import StreamPart, ThreadStreamMode, QueryParamTypes

285

286

async def join_stream(

287

thread_id: str,

288

*,

289

last_event_id: str | None = None,

290

stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",

291

headers: Mapping[str, str] | None = None,

292

params: QueryParamTypes | None = None,

293

) -> AsyncIterator[StreamPart]:

294

"""

295

Get a stream of events for a thread.

296

297

Args:

298

thread_id: The ID of the thread to get the stream for.

299

last_event_id: The ID of the last event to get.

300

stream_mode: The mode of the stream.

301

headers: Optional custom headers to include with the request.

302

params: Optional query parameters to include with the request.

303

304

Returns:

305

AsyncIterator[StreamPart]: A stream of events for the thread.

306

"""

307

```

308

309

## Types

310

311

```python { .api }

312

class Thread(TypedDict):

313

"""Thread definition and status."""

314

thread_id: str

315

created_at: str

316

updated_at: str

317

metadata: dict

318

status: ThreadStatus

319

config: Config

320

321

class ThreadState(TypedDict):

322

"""Thread execution state."""

323

values: dict

324

next: list[str]

325

checkpoint: Checkpoint

326

metadata: dict

327

created_at: str

328

parent_checkpoint: Checkpoint

329

330

class ThreadUpdateStateResponse(TypedDict):

331

"""Response from state update operation."""

332

checkpoint: Checkpoint

333

status: str

334

335

class Checkpoint(TypedDict):

336

"""Execution checkpoint reference."""

337

thread_id: str

338

checkpoint_ns: str

339

checkpoint_id: str

340

341

ThreadStatus = Literal["idle", "busy", "interrupted", "error"]

342

343

ThreadStreamMode = Literal["run_modes", "lifecycle", "state_update"]

344

345

ThreadSelectField = Literal[

346

"thread_id", "created_at", "updated_at", "metadata", "status", "config"

347

]

348

349

ThreadSortBy = Literal["created_at", "updated_at", "thread_id"]

350

```

351

352

## Usage Examples

353

354

### Creating and Managing Threads

355

356

```python

357

# Create a new thread with metadata

358

thread = await client.threads.create(

359

metadata={"user_id": "user-123", "session_type": "chat"}

360

)

361

362

# Get thread details

363

thread = await client.threads.get("thread-456")

364

365

# Update thread metadata

366

updated = await client.threads.update(

367

"thread-456",

368

metadata={"last_interaction": "2023-12-01T10:30:00Z"}

369

)

370

```

371

372

### Thread Discovery

373

374

```python

375

# Search threads by user

376

user_threads = await client.threads.search(

377

metadata={"user_id": "user-123"},

378

status="idle",

379

limit=20

380

)

381

382

# Get active threads

383

active_threads = await client.threads.search(

384

status="busy",

385

sort_by="updated_at"

386

)

387

388

# Count total threads

389

total = await client.threads.count()

390

```

391

392

### State Management

393

394

```python

395

# Get current thread state

396

state = await client.threads.get_state("thread-456")

397

print(f"Current state: {state['values']}")

398

399

# Update thread state

400

response = await client.threads.update_state(

401

"thread-456",

402

values={"user_preferences": {"theme": "dark"}},

403

as_node="preference_manager"

404

)

405

406

# Get thread execution history

407

history = await client.threads.get_history("thread-456", limit=50)

408

for checkpoint in history:

409

print(f"Checkpoint {checkpoint['checkpoint']['checkpoint_id']}: {checkpoint['values']}")

410

```

411

412

### Thread Operations

413

414

```python

415

# Copy a thread

416

copied = await client.threads.copy(

417

"thread-456",

418

metadata={"source_thread": "thread-456", "copy_reason": "backup"}

419

)

420

421

# Stream thread events

422

async for event in client.threads.join_stream("thread-456", mode="state_update"):

423

print(f"Thread event: {event}")

424

```

425

426

### Thread Lifecycle

427

428

```python

429

# Create, use, and cleanup

430

thread = await client.threads.create(metadata={"purpose": "temp-calculation"})

431

432

# ... perform operations ...

433

434

# Delete when done

435

await client.threads.delete(thread["thread_id"])

436

```