CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-langgraph-sdk

Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows

Overview
Eval results
Files

thread-management.mddocs/

Thread Management

Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.

Capabilities

Thread CRUD Operations

Core operations for creating, reading, updating, and deleting conversation threads with metadata and configuration support.

from collections.abc import Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
    Thread, ThreadSelectField, Json, OnConflictBehavior, QueryParamTypes
)

# Via client.threads
async def create(
    *,
    metadata: Json = None,
    thread_id: str | None = None,
    if_exists: OnConflictBehavior | None = None,
    supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
    graph_id: str | None = None,
    ttl: int | Mapping[str, Any] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Thread:
    """
    Create a thread.

    Args:
        metadata: Metadata to merge with the thread.
        thread_id: ID of the thread. If None, ID will be a UUID.
        if_exists: How to handle duplicate creates. Defaults to "create".
        supersteps: Optional supersteps to apply to the thread.
        graph_id: Optional graph_id to apply to the thread.
        ttl: Metadata key-value pairs to be used to configure the thread's time-to-live.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Thread: The created thread.
    """

async def get(
    thread_id: str,
    *,
    select: list[ThreadSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Thread:
    """
    Get a thread by ID.

    Args:
        thread_id: The ID of the thread to get.
        select: Fields to include in the response.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Thread: Thread object.
    """

async def update(
    thread_id: str,
    *,
    metadata: Mapping[str, Any],
    ttl: int | Mapping[str, Any] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Thread:
    """
    Update a thread.

    Args:
        thread_id: ID of the thread to update.
        metadata: Metadata to merge with the thread.
        ttl: Time-to-live configuration for the thread.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Thread: The updated thread.
    """

async def delete(
    thread_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None:
    """
    Delete a thread.

    Args:
        thread_id: The ID of the thread to delete.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.
    """

Thread Discovery & Search

Search and enumerate threads with filtering, pagination, and sorting capabilities.

from langgraph_sdk.schema import (
    Thread, ThreadSelectField, ThreadSortBy, ThreadStatus, SortOrder,
    Json, QueryParamTypes
)

async def search(
    *,
    metadata: Json = None,
    values: Json = None,
    ids: Sequence[str] | None = None,
    status: ThreadStatus | None = None,
    limit: int = 10,
    offset: int = 0,
    sort_by: ThreadSortBy | None = None,
    sort_order: SortOrder | None = None,
    select: list[ThreadSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Thread]:
    """
    Search for threads.

    Args:
        metadata: Metadata to filter by. Exact match filter for each KV pair.
        values: Values to filter by. Exact match filter for each KV pair.
        ids: Thread IDs to filter by.
        status: Status to filter by.
        limit: Limit the number of threads to return.
        offset: Offset to start from.
        sort_by: Field to sort by.
        sort_order: Order to sort by.
        select: Fields to include in the response.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        list[Thread]: List of threads matching the query.
    """

async def count(
    *,
    metadata: Json = None,
    values: Json = None,
    status: ThreadStatus | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> int:
    """
    Count threads.

    Args:
        metadata: Metadata to filter by. Exact match filter for each KV pair.
        values: Values to filter by. Exact match filter for each KV pair.
        status: Status to filter by.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        int: Number of threads that match the query.
    """

Thread Operations

Advanced thread operations including copying, state management, and history tracking.

async def copy(
    thread_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None:
    """
    Copy a thread.

    Args:
        thread_id: The ID of the thread to copy.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.
    """

State Management

Inspect and manipulate thread state, including current values, checkpoints, and state updates.

from langgraph_sdk.schema import (
    ThreadState, ThreadUpdateStateResponse, Checkpoint, QueryParamTypes
)

async def get_state(
    thread_id: str,
    *,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    subgraphs: bool = False,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> ThreadState:
    """
    Get state for a thread.

    Args:
        thread_id: The ID of the thread to get the state for.
        checkpoint: The checkpoint to get the state for.
        checkpoint_id: Checkpoint to get the state for. Deprecated, use checkpoint instead.
        subgraphs: Whether to include subgraphs.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        ThreadState: The thread of the state.
    """

async def update_state(
    thread_id: str,
    values: dict[str, Any] | Sequence[dict] | None,
    *,
    as_node: str | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> ThreadUpdateStateResponse:
    """
    Update state for a thread.

    Args:
        thread_id: The ID of the thread to update.
        values: The values to update the state with.
        as_node: The node to update the state as.
        checkpoint: The checkpoint to update the state for.
        checkpoint_id: Checkpoint to update the state for. Deprecated, use checkpoint instead.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        ThreadUpdateStateResponse: The response from updating the thread state.
    """

async def get_history(
    thread_id: str,
    *,
    limit: int = 10,
    before: str | Checkpoint | None = None,
    metadata: Mapping[str, Any] | None = None,
    checkpoint: Checkpoint | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[ThreadState]:
    """
    Get the state history of a thread.

    Args:
        thread_id: The ID of the thread to get the state history for.
        checkpoint: Return states for this subgraph. If empty defaults to root.
        before: Return states before this checkpoint.
        limit: The number of states to return.
        metadata: Metadata to filter by.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        list[ThreadState]: The state history for the thread.
    """

Thread Streaming

Real-time streaming of thread events and state changes.

from collections.abc import AsyncIterator, Sequence
from langgraph_sdk.schema import StreamPart, ThreadStreamMode, QueryParamTypes

async def join_stream(
    thread_id: str,
    *,
    last_event_id: str | None = None,
    stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]:
    """
    Get a stream of events for a thread.

    Args:
        thread_id: The ID of the thread to get the stream for.
        last_event_id: The ID of the last event to get.
        stream_mode: The mode of the stream.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        AsyncIterator[StreamPart]: A stream of events for the thread.
    """

Types

class Thread(TypedDict):
    """Thread definition and status."""
    thread_id: str
    created_at: str
    updated_at: str
    metadata: dict
    status: ThreadStatus
    config: Config

class ThreadState(TypedDict):
    """Thread execution state."""
    values: dict
    next: list[str]
    checkpoint: Checkpoint
    metadata: dict
    created_at: str
    parent_checkpoint: Checkpoint

class ThreadUpdateStateResponse(TypedDict):
    """Response from state update operation."""
    checkpoint: Checkpoint
    status: str

class Checkpoint(TypedDict):
    """Execution checkpoint reference."""
    thread_id: str
    checkpoint_ns: str
    checkpoint_id: str

ThreadStatus = Literal["idle", "busy", "interrupted", "error"]

ThreadStreamMode = Literal["run_modes", "lifecycle", "state_update"]

ThreadSelectField = Literal[
    "thread_id", "created_at", "updated_at", "metadata", "status", "config"
]

ThreadSortBy = Literal["created_at", "updated_at", "thread_id"]

Usage Examples

Creating and Managing Threads

# Create a new thread with metadata
thread = await client.threads.create(
    metadata={"user_id": "user-123", "session_type": "chat"}
)

# Get thread details
thread = await client.threads.get("thread-456")

# Update thread metadata
updated = await client.threads.update(
    "thread-456",
    metadata={"last_interaction": "2023-12-01T10:30:00Z"}
)

Thread Discovery

# Search threads by user
user_threads = await client.threads.search(
    metadata={"user_id": "user-123"},
    status="idle",
    limit=20
)

# Get active threads
active_threads = await client.threads.search(
    status="busy",
    sort_by="updated_at"
)

# Count total threads
total = await client.threads.count()

State Management

# Get current thread state
state = await client.threads.get_state("thread-456")
print(f"Current state: {state['values']}")

# Update thread state
response = await client.threads.update_state(
    "thread-456",
    values={"user_preferences": {"theme": "dark"}},
    as_node="preference_manager"
)

# Get thread execution history
history = await client.threads.get_history("thread-456", limit=50)
for checkpoint in history:
    print(f"Checkpoint {checkpoint['checkpoint']['checkpoint_id']}: {checkpoint['values']}")

Thread Operations

# Copy a thread
copied = await client.threads.copy(
    "thread-456",
    metadata={"source_thread": "thread-456", "copy_reason": "backup"}
)

# Stream thread events
async for event in client.threads.join_stream("thread-456", mode="state_update"):
    print(f"Thread event: {event}")

Thread Lifecycle

# Create, use, and cleanup
thread = await client.threads.create(metadata={"purpose": "temp-calculation"})

# ... perform operations ...

# Delete when done
await client.threads.delete(thread["thread_id"])

Install with Tessl CLI

npx tessl i tessl/pypi-langgraph-sdk

docs

assistant-management.md

authentication.md

client-management.md

index.md

persistent-storage.md

run-execution.md

scheduled-tasks.md

thread-management.md

tile.json