Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
npx @tessl/cli install tessl/pypi-langgraph-sdk@0.2.0A comprehensive Python SDK for interacting with the LangGraph Platform REST API. The SDK enables developers to build and manage AI assistants and conversational workflows with async and sync client interfaces, automatic local server discovery, streaming support, and fine-grained authentication and authorization management.
pip install langgraph-sdkfrom langgraph_sdk import get_client, get_sync_client, Authfrom langgraph_sdk import get_client
# Connect to LangGraph server (auto-detects local server at localhost:8123)
client = await get_client()
# List all assistants
assistants = await client.assistants.search()
agent = assistants[0]
# Create a new conversation thread
thread = await client.threads.create()
# Start a streaming run
input_data = {"messages": [{"role": "human", "content": "Hello!"}]}
async for chunk in client.runs.stream(
thread['thread_id'],
agent['assistant_id'],
input=input_data
):
print(chunk)
# Close the client
await client.aclose()The LangGraph SDK follows a resource-oriented design with distinct client managers:
get_client() and get_sync_client() create configured HTTP clientsBoth async and sync versions provide identical APIs, enabling integration into any Python application architecture.
Core client creation and HTTP operations for connecting to LangGraph servers with automatic discovery, custom authentication, and connection management.
from collections.abc import Mapping
from typing import Union, Optional
import httpx
# Type aliases
TimeoutTypes = Union[
None,
float,
tuple[Optional[float], Optional[float]],
tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
httpx.Timeout,
]
def get_client(
*,
url: str | None = None,
api_key: str | None = None,
headers: Mapping[str, str] | None = None,
timeout: TimeoutTypes | None = None,
) -> LangGraphClient: ...
def get_sync_client(
*,
url: str | None = None,
api_key: str | None = None,
headers: Mapping[str, str] | None = None,
timeout: TimeoutTypes | None = None,
) -> SyncLangGraphClient: ...Create, configure, and manage AI assistants based on registered graphs. Assistants serve as the execution engines for conversational workflows, with support for versioning, configuration, and metadata management.
from collections.abc import Mapping
from langgraph_sdk.schema import (
Assistant, AssistantSelectField, AssistantSortBy, SortOrder,
Config, Context, Json, OnConflictBehavior, QueryParamTypes
)
# Via client.assistants
async def get(
assistant_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Assistant: ...
async def create(
graph_id: str | None,
config: Config | None = None,
*,
context: Context | None = None,
metadata: Json = None,
assistant_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
name: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Assistant: ...
async def update(
assistant_id: str,
*,
graph_id: str | None = None,
config: Config | None = None,
context: Context | None = None,
metadata: Json = None,
name: str | None = None,
headers: Mapping[str, str] | None = None,
description: str | None = None,
params: QueryParamTypes | None = None,
) -> Assistant: ...
async def search(
*,
metadata: Json = None,
graph_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: AssistantSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[AssistantSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Assistant]: ...Manage conversation threads that maintain state across multiple interactions. Threads provide isolation for conversations and support state inspection, updates, and history tracking.
from collections.abc import Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
Thread, ThreadSelectField, ThreadSortBy, ThreadState, ThreadStatus,
ThreadUpdateStateResponse, Checkpoint, Json, OnConflictBehavior, QueryParamTypes
)
# Via client.threads
async def create(
*,
metadata: Json = None,
thread_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
graph_id: str | None = None,
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread: ...
async def get(
thread_id: str,
*,
select: list[ThreadSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread: ...
async def update(
thread_id: str,
*,
metadata: Mapping[str, Any],
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread: ...
async def search(
*,
metadata: Json = None,
values: Json = None,
ids: Sequence[str] | None = None,
status: ThreadStatus | None = None,
limit: int = 10,
offset: int = 0,
sort_by: ThreadSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[ThreadSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Thread]: ...
async def get_state(
thread_id: str,
*,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
subgraphs: bool = False,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadState: ...
async def update_state(
thread_id: str,
values: dict[str, Any] | Sequence[dict] | None,
*,
as_node: str | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadUpdateStateResponse: ...Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.
from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
Run, StreamPart, StreamMode, Config, Context, Checkpoint,
Command, CancelAction, QueryParamTypes
)
# Via client.runs
def stream(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]: ...
async def create(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run: ...
async def wait(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
webhook: str | None = None,
webhook_mode: str | None = None,
checkpoint_during: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run: ...
async def cancel(
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None: ...Create and manage cron jobs for automated execution of assistants on threads or with dynamic thread creation. Supports timezone handling, webhook notifications, and flexible scheduling.
from collections.abc import Mapping
from typing import Any
from langgraph_sdk.schema import (
Cron, CronSelectField, CronSortBy, SortOrder,
Config, Context, All, QueryParamTypes
)
# Via client.crons
async def create(
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Cron: ...
async def create_for_thread(
thread_id: str,
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Cron: ...
async def search(
*,
assistant_id: str | None = None,
thread_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: CronSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[CronSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Cron]: ...
async def delete(
cron_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None: ...Cross-thread persistent memory system for storing and retrieving documents, configuration, and application data with namespacing, search capabilities, and flexible data organization.
from collections.abc import Mapping, Sequence
from typing import Any, Literal
from langgraph_sdk.schema import (
Item, SearchItemsResponse, ListNamespaceResponse, QueryParamTypes
)
# Via client.store
async def put_item(
namespace: Sequence[str],
/,
key: str,
value: Mapping[str, Any],
index: Literal[False] | list[str] | None = None,
ttl: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None: ...
async def get_item(
namespace: Sequence[str],
/,
key: str,
*,
refresh_ttl: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Item: ...
async def search_items(
namespace_prefix: Sequence[str],
/,
filter: Mapping[str, Any] | None = None,
limit: int = 10,
offset: int = 0,
query: str | None = None,
refresh_ttl: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> SearchItemsResponse: ...
async def delete_item(
namespace: Sequence[str],
/,
key: str,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None: ...
async def list_namespaces(
*,
prefix: Sequence[str] | None = None,
suffix: Sequence[str] | None = None,
limit: int = 100,
offset: int = 0,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ListNamespaceResponse: ...Comprehensive authentication and authorization system supporting custom authentication handlers, fine-grained authorization rules, and flexible security policies for all resources and actions.
from typing import Callable, TypeVar
from collections.abc import Sequence
from langgraph_sdk.auth import types, exceptions
TH = TypeVar("TH", bound=types.Handler)
AH = TypeVar("AH", bound=types.Authenticator)
class Auth:
types = types
exceptions = exceptions
def __init__(self) -> None:
self.on: _On = ... # Authorization handlers
def authenticate(self, fn: AH) -> AH: ...
# Authorization context classes
class _On:
assistants: _AssistantsOn
threads: _ThreadsOn
crons: _CronsOn
store: _StoreOn
value: type[dict[str, Any]]
def __call__(
self,
fn: Callable | None = None,
*,
resources: str | Sequence[str] | None = None,
actions: str | Sequence[str] | None = None,
) -> Callable: ...Authentication & Authorization
# Client Types
class LangGraphClient:
assistants: AssistantsClient
threads: ThreadsClient
runs: RunsClient
crons: CronClient
store: StoreClient
async def aclose(self) -> None: ...
class SyncLangGraphClient:
assistants: SyncAssistantsClient
threads: SyncThreadsClient
runs: SyncRunsClient
crons: SyncCronClient
store: SyncStoreClient
def close(self) -> None: ...
# Core Data Models
class Assistant(TypedDict):
assistant_id: str
graph_id: str
config: Config
created_at: str
updated_at: str
metadata: dict
class Thread(TypedDict):
thread_id: str
created_at: str
updated_at: str
metadata: dict
status: ThreadStatus
class Run(TypedDict):
run_id: str
thread_id: str
assistant_id: str
created_at: str
updated_at: str
status: RunStatus
kwargs: dict
class Cron(TypedDict):
cron_id: str
thread_id: str
assistant_id: str
schedule: str
timezone: str
created_at: str
class Item(TypedDict):
namespace: list[str]
key: str
value: dict
created_at: str
updated_at: str
# Status Types
RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]
ThreadStatus = Literal["idle", "busy", "interrupted", "error"]
StreamMode = Literal["values", "messages", "updates", "events", "tasks", "checkpoints", "debug"]
# Configuration Types
class Config(TypedDict, total=False):
tags: list[str]
recursion_limit: int
configurable: dict