Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.
Execute runs with real-time streaming of execution events, state changes, and outputs.
from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
StreamPart, StreamMode, Config, Context, Checkpoint,
Command, QueryParamTypes
)
# Via client.runs
def stream(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]:
"""
Stream the results of a run.
Args:
thread_id: The thread ID to stream the run on.
assistant_id: The assistant ID or graph name to stream the run on.
input: The input to the run.
command: The command to run instead of input.
stream_mode: The mode(s) to stream the run. Default is "values".
stream_subgraphs: Whether to stream subgraphs.
stream_resumable: Whether the stream is resumable.
metadata: The metadata to add to the run.
config: The config to use for the run.
context: The context to add to the run.
checkpoint: The checkpoint to resume from.
checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
webhook: Webhook to call after the run is done.
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
AsyncIterator[StreamPart]: The stream of the run.
"""Execute runs asynchronously and retrieve results when complete.
from langgraph_sdk.schema import Run, QueryParamTypes
async def create(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run:
"""
Create a background run.
Args:
thread_id: The thread ID to create the run on.
assistant_id: The assistant ID or graph name to create the run on.
input: The input to the run.
command: The command to run instead of input.
stream_mode: The mode(s) to stream the run. Default is "values".
stream_subgraphs: Whether to stream subgraphs.
stream_resumable: Whether the stream is resumable.
metadata: The metadata to add to the run.
config: The config to use for the run.
context: The context to add to the run.
checkpoint: The checkpoint to resume from.
checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
webhook: Webhook to call after the run is done.
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Run: The created run.
"""Execute runs synchronously and wait for completion.
async def wait(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None, # deprecated
webhook: str | None = None,
webhook_mode: str | None = None,
checkpoint_during: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run:
"""
Create a run, wait for it to finish and return the final state.
Args:
thread_id: The thread ID to create the run on.
assistant_id: The assistant ID or graph name to create the run on.
input: The input to the run.
command: The command to run instead of input.
metadata: The metadata to add to the run.
config: The config to use for the run.
context: The context to add to the run.
checkpoint: The checkpoint to resume from.
checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
webhook: Webhook to call after the run is done.
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
checkpoint_during: Whether to checkpoint during the run.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Run: The completed run.
"""Execute multiple runs concurrently with batch operations.
from langgraph_sdk.schema import RunCreate
async def create_batch(
payloads: list[RunCreate],
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]:
"""
Create a batch of stateless background runs.
Args:
payloads: The payloads for the runs.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
list[Run]: The created runs.
"""Manage active and completed runs with listing, retrieval, and cancellation capabilities.
from langgraph_sdk.schema import RunSelectField, RunStatus, CancelAction
async def list(
thread_id: str,
*,
limit: int = 10,
offset: int = 0,
status: RunStatus | None = None,
select: list[RunSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]:
"""
Get all runs for a thread.
Args:
thread_id: The thread ID to get runs for.
limit: The maximum number of runs to return.
offset: The number of runs to skip.
status: The status to filter by.
select: Fields to include in the response.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
list[Run]: The runs for the thread.
"""
async def get(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run:
"""
Get a run.
Args:
thread_id: The thread ID to get the run from.
run_id: The run ID to get.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Run: Run object.
"""
async def cancel(
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None:
"""
Cancel a run.
Args:
thread_id: The thread ID to cancel the run on.
run_id: The run ID to cancel.
wait: Whether to wait for the run to be cancelled.
action: The type of cancellation. Options are "interrupt" or "rollback".
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
"""
async def delete(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None:
"""
Delete a run.
Args:
thread_id: The thread ID to delete the run from.
run_id: The run ID to delete.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
"""Join ongoing runs and stream their execution events.
async def join(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> dict:
"""
Block until a run is done. Returns the final state of the thread.
Args:
thread_id: The thread ID to join the run on.
run_id: The run ID to join.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
dict: The final state of the thread.
"""
def join_stream(
thread_id: str,
run_id: str,
*,
cancel_on_disconnect: bool = False,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
last_event_id: str | None = None,
) -> AsyncIterator[StreamPart]:
"""
Stream output from a run in real-time, until the run is done.
Output is not buffered, so any output produced before this call will
not be received here.
Args:
thread_id: The thread ID to stream the run on.
run_id: The run ID to stream.
cancel_on_disconnect: Whether to cancel the run if the stream is disconnected.
stream_mode: The mode(s) to stream the run.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
last_event_id: The last event ID to start streaming from.
Returns:
AsyncIterator[StreamPart]: A stream of the run.
"""class Run(TypedDict):
"""Run execution details."""
run_id: str
thread_id: str
assistant_id: str
created_at: str
updated_at: str
status: RunStatus
kwargs: dict
metadata: dict
class RunCreate(TypedDict):
"""Run creation parameters."""
thread_id: str
assistant_id: str
input: dict
config: Config
metadata: dict
multitask_strategy: MultitaskStrategy
class StreamPart(NamedTuple):
"""Stream event part."""
event: str
data: dict
RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]
StreamMode = Literal[
"values", "messages", "updates", "events",
"tasks", "checkpoints", "debug", "custom", "messages-tuple"
]
MultitaskStrategy = Literal["reject", "interrupt", "rollback", "enqueue"]
DisconnectMode = Literal["cancel", "continue"]
OnCompletionBehavior = Literal["delete", "keep"]
CancelAction = Literal["interrupt", "rollback"]
RunSelectField = Literal[
"run_id", "thread_id", "assistant_id", "created_at",
"updated_at", "status", "kwargs", "metadata"
]# Basic streaming run
async for chunk in client.runs.stream(
thread_id="thread-123",
assistant_id="assistant-456",
input={"messages": [{"role": "human", "content": "Hello!"}]}
):
if chunk.event == "messages":
print(f"Message: {chunk.data}")
elif chunk.event == "events":
print(f"Event: {chunk.data}")
# Advanced streaming with configuration
async for chunk in client.runs.stream(
thread_id="thread-123",
assistant_id="assistant-456",
input={"query": "Explain AI"},
config={"temperature": 0.7, "max_tokens": 1000},
stream_mode="events",
interrupt_before=["human_review"],
multitask_strategy="enqueue"
):
print(f"{chunk.event}: {chunk.data}")# Start run asynchronously
run = await client.runs.create(
thread_id="thread-123",
assistant_id="assistant-456",
input={"task": "analyze_document", "doc_id": "doc-789"},
metadata={"priority": "high"},
webhook="https://myapp.com/webhooks/run-complete"
)
print(f"Started run {run['run_id']} with status {run['status']}")
# Check run status later
updated_run = await client.runs.get("thread-123", run["run_id"])
if updated_run["status"] == "success":
print("Run completed successfully")# Execute and wait for completion
completed_run = await client.runs.wait(
thread_id="thread-123",
assistant_id="assistant-456",
input={"calculation": "fibonacci", "n": 100},
config={"timeout": 300}
)
print(f"Final status: {completed_run['status']}")
print(f"Result: {completed_run['kwargs'].get('result')}")# Create multiple runs
payloads = [
{
"thread_id": "thread-1",
"assistant_id": "assistant-456",
"input": {"task": f"process_item_{i}"}
}
for i in range(10)
]
batch_runs = await client.runs.create_batch(payloads)
print(f"Created {len(batch_runs)} runs")# List thread runs
runs = await client.runs.list("thread-123", limit=50)
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]
# Cancel a run
if active_runs:
await client.runs.cancel(
"thread-123",
active_runs[0]["run_id"],
cancel_action="interrupt"
)
# Join an ongoing run
result = await client.runs.join("thread-123", "run-789")
# Stream events from ongoing run
async for event in client.runs.join_stream("thread-123", "run-789"):
print(f"Event: {event}")try:
async for chunk in client.runs.stream(
thread_id="thread-123",
assistant_id="assistant-456",
input={"query": "test"}
):
if chunk.event == "error":
print(f"Execution error: {chunk.data}")
break
elif chunk.event == "interrupt":
print(f"Execution interrupted: {chunk.data}")
# Handle interrupt, possibly resume or cancel
except Exception as e:
print(f"Stream error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-langgraph-sdk