or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

assistant-management.mdauthentication.mdclient-management.mdindex.mdpersistent-storage.mdrun-execution.mdscheduled-tasks.mdthread-management.md
tile.json

tessl/pypi-langgraph-sdk

Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/langgraph-sdk@0.2.x

To install, run

npx @tessl/cli install tessl/pypi-langgraph-sdk@0.2.0

index.mddocs/

LangGraph SDK

A comprehensive Python SDK for interacting with the LangGraph Platform REST API. The SDK enables developers to build and manage AI assistants and conversational workflows with async and sync client interfaces, automatic local server discovery, streaming support, and fine-grained authentication and authorization management.

Package Information

  • Package Name: langgraph-sdk
  • Language: Python
  • Installation: pip install langgraph-sdk

Core Imports

from langgraph_sdk import get_client, get_sync_client, Auth

Basic Usage

from langgraph_sdk import get_client

# Connect to LangGraph server (auto-detects local server at localhost:8123)
client = await get_client()

# List all assistants
assistants = await client.assistants.search()
agent = assistants[0]

# Create a new conversation thread
thread = await client.threads.create()

# Start a streaming run
input_data = {"messages": [{"role": "human", "content": "Hello!"}]}
async for chunk in client.runs.stream(
    thread['thread_id'],
    agent['assistant_id'],
    input=input_data
):
    print(chunk)

# Close the client
await client.aclose()

Architecture

The LangGraph SDK follows a resource-oriented design with distinct client managers:

  • Client Factory: get_client() and get_sync_client() create configured HTTP clients
  • Resource Clients: Dedicated managers for assistants, threads, runs, crons, and store operations
  • Authentication System: Pluggable auth handlers for custom security implementations
  • Streaming Support: Server-sent events for real-time execution monitoring
  • Type System: Comprehensive TypedDict schemas for all API interactions

Both async and sync versions provide identical APIs, enabling integration into any Python application architecture.

Capabilities

Client Management

Core client creation and HTTP operations for connecting to LangGraph servers with automatic discovery, custom authentication, and connection management.

from collections.abc import Mapping
from typing import Union, Optional
import httpx

# Type aliases
TimeoutTypes = Union[
    None,
    float,
    tuple[Optional[float], Optional[float]],
    tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
    httpx.Timeout,
]

def get_client(
    *,
    url: str | None = None,
    api_key: str | None = None,
    headers: Mapping[str, str] | None = None,
    timeout: TimeoutTypes | None = None,
) -> LangGraphClient: ...

def get_sync_client(
    *,
    url: str | None = None,
    api_key: str | None = None,
    headers: Mapping[str, str] | None = None,
    timeout: TimeoutTypes | None = None,
) -> SyncLangGraphClient: ...

Client Management

Assistant Management

Create, configure, and manage AI assistants based on registered graphs. Assistants serve as the execution engines for conversational workflows, with support for versioning, configuration, and metadata management.

from collections.abc import Mapping
from langgraph_sdk.schema import (
    Assistant, AssistantSelectField, AssistantSortBy, SortOrder,
    Config, Context, Json, OnConflictBehavior, QueryParamTypes
)

# Via client.assistants
async def get(
    assistant_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Assistant: ...

async def create(
    graph_id: str | None,
    config: Config | None = None,
    *,
    context: Context | None = None,
    metadata: Json = None,
    assistant_id: str | None = None,
    if_exists: OnConflictBehavior | None = None,
    name: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Assistant: ...

async def update(
    assistant_id: str,
    *,
    graph_id: str | None = None,
    config: Config | None = None,
    context: Context | None = None,
    metadata: Json = None,
    name: str | None = None,
    headers: Mapping[str, str] | None = None,
    description: str | None = None,
    params: QueryParamTypes | None = None,
) -> Assistant: ...

async def search(
    *,
    metadata: Json = None,
    graph_id: str | None = None,
    limit: int = 10,
    offset: int = 0,
    sort_by: AssistantSortBy | None = None,
    sort_order: SortOrder | None = None,
    select: list[AssistantSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Assistant]: ...

Assistant Management

Thread Management

Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.

from collections.abc import Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
    Thread, ThreadSelectField, ThreadSortBy, ThreadState, ThreadStatus,
    ThreadUpdateStateResponse, Checkpoint, Json, OnConflictBehavior, QueryParamTypes
)

# Via client.threads
async def create(
    *,
    metadata: Json = None,
    thread_id: str | None = None,
    if_exists: OnConflictBehavior | None = None,
    supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
    graph_id: str | None = None,
    ttl: int | Mapping[str, Any] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Thread: ...

async def get(
    thread_id: str,
    *,
    select: list[ThreadSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Thread: ...

async def update(
    thread_id: str,
    *,
    metadata: Mapping[str, Any],
    ttl: int | Mapping[str, Any] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Thread: ...

async def search(
    *,
    metadata: Json = None,
    values: Json = None,
    ids: Sequence[str] | None = None,
    status: ThreadStatus | None = None,
    limit: int = 10,
    offset: int = 0,
    sort_by: ThreadSortBy | None = None,
    sort_order: SortOrder | None = None,
    select: list[ThreadSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Thread]: ...

async def get_state(
    thread_id: str,
    *,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    subgraphs: bool = False,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> ThreadState: ...

async def update_state(
    thread_id: str,
    values: dict[str, Any] | Sequence[dict] | None,
    *,
    as_node: str | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> ThreadUpdateStateResponse: ...

Thread Management

Run Execution

Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.

from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
    Run, StreamPart, StreamMode, Config, Context, Checkpoint,
    Command, CancelAction, QueryParamTypes
)

# Via client.runs
def stream(
    thread_id: str | None,
    assistant_id: str,
    *,
    input: Mapping[str, Any] | None = None,
    command: Command | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] = "values",
    stream_subgraphs: bool = False,
    stream_resumable: bool = False,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]: ...

async def create(
    thread_id: str | None,
    assistant_id: str,
    *,
    input: Mapping[str, Any] | None = None,
    command: Command | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] = "values",
    stream_subgraphs: bool = False,
    stream_resumable: bool = False,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Run: ...

async def wait(
    thread_id: str | None,
    assistant_id: str,
    *,
    input: Mapping[str, Any] | None = None,
    command: Command | None = None,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    webhook: str | None = None,
    webhook_mode: str | None = None,
    checkpoint_during: bool | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Run: ...

async def cancel(
    thread_id: str,
    run_id: str,
    *,
    wait: bool = False,
    action: CancelAction = "interrupt",
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None: ...

Run Execution

Scheduled Tasks

Create and manage cron jobs for automated execution of assistants on threads or with dynamic thread creation. Supports timezone handling, webhook notifications, and flexible scheduling.

from collections.abc import Mapping
from typing import Any
from langgraph_sdk.schema import (
    Cron, CronSelectField, CronSortBy, SortOrder,
    Config, Context, All, QueryParamTypes
)

# Via client.crons
async def create(
    assistant_id: str,
    *,
    schedule: str,
    input: Mapping[str, Any] | None = None,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint_during: bool | None = None,
    interrupt_before: All | list[str] | None = None,
    interrupt_after: All | list[str] | None = None,
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Cron: ...

async def create_for_thread(
    thread_id: str,
    assistant_id: str,
    *,
    schedule: str,
    input: Mapping[str, Any] | None = None,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint_during: bool | None = None,
    interrupt_before: All | list[str] | None = None,
    interrupt_after: All | list[str] | None = None,
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Cron: ...

async def search(
    *,
    assistant_id: str | None = None,
    thread_id: str | None = None,
    limit: int = 10,
    offset: int = 0,
    sort_by: CronSortBy | None = None,
    sort_order: SortOrder | None = None,
    select: list[CronSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Cron]: ...

async def delete(
    cron_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None: ...

Scheduled Tasks

Persistent Storage

Cross-thread persistent memory system for storing and retrieving documents, configuration, and application data with namespacing, search capabilities, and flexible data organization.

from collections.abc import Mapping, Sequence
from typing import Any, Literal
from langgraph_sdk.schema import (
    Item, SearchItemsResponse, ListNamespaceResponse, QueryParamTypes
)

# Via client.store
async def put_item(
    namespace: Sequence[str],
    /,
    key: str,
    value: Mapping[str, Any],
    index: Literal[False] | list[str] | None = None,
    ttl: int | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None: ...

async def get_item(
    namespace: Sequence[str],
    /,
    key: str,
    *,
    refresh_ttl: bool | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Item: ...

async def search_items(
    namespace_prefix: Sequence[str],
    /,
    filter: Mapping[str, Any] | None = None,
    limit: int = 10,
    offset: int = 0,
    query: str | None = None,
    refresh_ttl: bool | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> SearchItemsResponse: ...

async def delete_item(
    namespace: Sequence[str],
    /,
    key: str,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None: ...

async def list_namespaces(
    *,
    prefix: Sequence[str] | None = None,
    suffix: Sequence[str] | None = None,
    limit: int = 100,
    offset: int = 0,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> ListNamespaceResponse: ...

Persistent Storage

Authentication & Authorization

Comprehensive authentication and authorization system supporting custom authentication handlers, fine-grained authorization rules, and flexible security policies for all resources and actions.

from typing import Callable, TypeVar
from collections.abc import Sequence
from langgraph_sdk.auth import types, exceptions

TH = TypeVar("TH", bound=types.Handler)
AH = TypeVar("AH", bound=types.Authenticator)

class Auth:
    types = types
    exceptions = exceptions

    def __init__(self) -> None:
        self.on: _On = ...  # Authorization handlers

    def authenticate(self, fn: AH) -> AH: ...

    # Authorization context classes
    class _On:
        assistants: _AssistantsOn
        threads: _ThreadsOn
        crons: _CronsOn
        store: _StoreOn
        value: type[dict[str, Any]]

        def __call__(
            self,
            fn: Callable | None = None,
            *,
            resources: str | Sequence[str] | None = None,
            actions: str | Sequence[str] | None = None,
        ) -> Callable: ...

Authentication & Authorization

Core Types

# Client Types
class LangGraphClient:
    assistants: AssistantsClient
    threads: ThreadsClient
    runs: RunsClient
    crons: CronClient
    store: StoreClient
    async def aclose(self) -> None: ...

class SyncLangGraphClient:
    assistants: SyncAssistantsClient
    threads: SyncThreadsClient
    runs: SyncRunsClient
    crons: SyncCronClient
    store: SyncStoreClient
    def close(self) -> None: ...

# Core Data Models
class Assistant(TypedDict):
    assistant_id: str
    graph_id: str
    config: Config
    created_at: str
    updated_at: str
    metadata: dict

class Thread(TypedDict):
    thread_id: str
    created_at: str
    updated_at: str
    metadata: dict
    status: ThreadStatus

class Run(TypedDict):
    run_id: str
    thread_id: str
    assistant_id: str
    created_at: str
    updated_at: str
    status: RunStatus
    kwargs: dict

class Cron(TypedDict):
    cron_id: str
    thread_id: str
    assistant_id: str
    schedule: str
    timezone: str
    created_at: str

class Item(TypedDict):
    namespace: list[str]
    key: str
    value: dict
    created_at: str
    updated_at: str

# Status Types
RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]
ThreadStatus = Literal["idle", "busy", "interrupted", "error"]
StreamMode = Literal["values", "messages", "updates", "events", "tasks", "checkpoints", "debug"]

# Configuration Types
class Config(TypedDict, total=False):
    tags: list[str]
    recursion_limit: int
    configurable: dict