Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.
Core operations for creating, reading, updating, and deleting conversation threads with metadata and configuration support.
from collections.abc import Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
Thread, ThreadSelectField, Json, OnConflictBehavior, QueryParamTypes
)
# Via client.threads
async def create(
*,
metadata: Json = None,
thread_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
graph_id: str | None = None,
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread:
"""
Create a thread.
Args:
metadata: Metadata to merge with the thread.
thread_id: ID of the thread. If None, ID will be a UUID.
if_exists: How to handle duplicate creates. Defaults to "create".
supersteps: Optional supersteps to apply to the thread.
graph_id: Optional graph_id to apply to the thread.
ttl: Metadata key-value pairs to be used to configure the thread's time-to-live.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Thread: The created thread.
"""
async def get(
thread_id: str,
*,
select: list[ThreadSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread:
"""
Get a thread by ID.
Args:
thread_id: The ID of the thread to get.
select: Fields to include in the response.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Thread: Thread object.
"""
async def update(
thread_id: str,
*,
metadata: Mapping[str, Any],
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread:
"""
Update a thread.
Args:
thread_id: ID of the thread to update.
metadata: Metadata to merge with the thread.
ttl: Time-to-live configuration for the thread.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Thread: The updated thread.
"""
async def delete(
thread_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None:
"""
Delete a thread.
Args:
thread_id: The ID of the thread to delete.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
"""Search and enumerate threads with filtering, pagination, and sorting capabilities.
from langgraph_sdk.schema import (
Thread, ThreadSelectField, ThreadSortBy, ThreadStatus, SortOrder,
Json, QueryParamTypes
)
async def search(
*,
metadata: Json = None,
values: Json = None,
ids: Sequence[str] | None = None,
status: ThreadStatus | None = None,
limit: int = 10,
offset: int = 0,
sort_by: ThreadSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[ThreadSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Thread]:
"""
Search for threads.
Args:
metadata: Metadata to filter by. Exact match filter for each KV pair.
values: Values to filter by. Exact match filter for each KV pair.
ids: Thread IDs to filter by.
status: Status to filter by.
limit: Limit the number of threads to return.
offset: Offset to start from.
sort_by: Field to sort by.
sort_order: Order to sort by.
select: Fields to include in the response.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
list[Thread]: List of threads matching the query.
"""
async def count(
*,
metadata: Json = None,
values: Json = None,
status: ThreadStatus | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> int:
"""
Count threads.
Args:
metadata: Metadata to filter by. Exact match filter for each KV pair.
values: Values to filter by. Exact match filter for each KV pair.
status: Status to filter by.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
int: Number of threads that match the query.
"""Advanced thread operations including copying, state management, and history tracking.
async def copy(
thread_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None:
"""
Copy a thread.
Args:
thread_id: The ID of the thread to copy.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
"""Inspect and manipulate thread state, including current values, checkpoints, and state updates.
from langgraph_sdk.schema import (
ThreadState, ThreadUpdateStateResponse, Checkpoint, QueryParamTypes
)
async def get_state(
thread_id: str,
*,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
subgraphs: bool = False,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadState:
"""
Get state for a thread.
Args:
thread_id: The ID of the thread to get the state for.
checkpoint: The checkpoint to get the state for.
checkpoint_id: Checkpoint to get the state for. Deprecated, use checkpoint instead.
subgraphs: Whether to include subgraphs.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
ThreadState: The thread of the state.
"""
async def update_state(
thread_id: str,
values: dict[str, Any] | Sequence[dict] | None,
*,
as_node: str | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadUpdateStateResponse:
"""
Update state for a thread.
Args:
thread_id: The ID of the thread to update.
values: The values to update the state with.
as_node: The node to update the state as.
checkpoint: The checkpoint to update the state for.
checkpoint_id: Checkpoint to update the state for. Deprecated, use checkpoint instead.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
ThreadUpdateStateResponse: The response from updating the thread state.
"""
async def get_history(
thread_id: str,
*,
limit: int = 10,
before: str | Checkpoint | None = None,
metadata: Mapping[str, Any] | None = None,
checkpoint: Checkpoint | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[ThreadState]:
"""
Get the state history of a thread.
Args:
thread_id: The ID of the thread to get the state history for.
checkpoint: Return states for this subgraph. If empty defaults to root.
before: Return states before this checkpoint.
limit: The number of states to return.
metadata: Metadata to filter by.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
list[ThreadState]: The state history for the thread.
"""Real-time streaming of thread events and state changes.
from collections.abc import AsyncIterator, Sequence
from langgraph_sdk.schema import StreamPart, ThreadStreamMode, QueryParamTypes
async def join_stream(
thread_id: str,
*,
last_event_id: str | None = None,
stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]:
"""
Get a stream of events for a thread.
Args:
thread_id: The ID of the thread to get the stream for.
last_event_id: The ID of the last event to get.
stream_mode: The mode of the stream.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
AsyncIterator[StreamPart]: A stream of events for the thread.
"""class Thread(TypedDict):
"""Thread definition and status."""
thread_id: str
created_at: str
updated_at: str
metadata: dict
status: ThreadStatus
config: Config
class ThreadState(TypedDict):
"""Thread execution state."""
values: dict
next: list[str]
checkpoint: Checkpoint
metadata: dict
created_at: str
parent_checkpoint: Checkpoint
class ThreadUpdateStateResponse(TypedDict):
"""Response from state update operation."""
checkpoint: Checkpoint
status: str
class Checkpoint(TypedDict):
"""Execution checkpoint reference."""
thread_id: str
checkpoint_ns: str
checkpoint_id: str
ThreadStatus = Literal["idle", "busy", "interrupted", "error"]
ThreadStreamMode = Literal["run_modes", "lifecycle", "state_update"]
ThreadSelectField = Literal[
"thread_id", "created_at", "updated_at", "metadata", "status", "config"
]
ThreadSortBy = Literal["created_at", "updated_at", "thread_id"]# Create a new thread with metadata
thread = await client.threads.create(
metadata={"user_id": "user-123", "session_type": "chat"}
)
# Get thread details
thread = await client.threads.get("thread-456")
# Update thread metadata
updated = await client.threads.update(
"thread-456",
metadata={"last_interaction": "2023-12-01T10:30:00Z"}
)# Search threads by user
user_threads = await client.threads.search(
metadata={"user_id": "user-123"},
status="idle",
limit=20
)
# Get active threads
active_threads = await client.threads.search(
status="busy",
sort_by="updated_at"
)
# Count total threads
total = await client.threads.count()# Get current thread state
state = await client.threads.get_state("thread-456")
print(f"Current state: {state['values']}")
# Update thread state
response = await client.threads.update_state(
"thread-456",
values={"user_preferences": {"theme": "dark"}},
as_node="preference_manager"
)
# Get thread execution history
history = await client.threads.get_history("thread-456", limit=50)
for checkpoint in history:
print(f"Checkpoint {checkpoint['checkpoint']['checkpoint_id']}: {checkpoint['values']}")# Copy a thread
copied = await client.threads.copy(
"thread-456",
metadata={"source_thread": "thread-456", "copy_reason": "backup"}
)
# Stream thread events
async for event in client.threads.join_stream("thread-456", mode="state_update"):
print(f"Thread event: {event}")# Create, use, and cleanup
thread = await client.threads.create(metadata={"purpose": "temp-calculation"})
# ... perform operations ...
# Delete when done
await client.threads.delete(thread["thread_id"])Install with Tessl CLI
npx tessl i tessl/pypi-langgraph-sdk