CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-langgraph-sdk

Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows

Overview
Eval results
Files

run-execution.mddocs/

Run Execution

Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.

Capabilities

Streaming Execution

Execute runs with real-time streaming of execution events, state changes, and outputs.

from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Any
from langgraph_sdk.schema import (
    StreamPart, StreamMode, Config, Context, Checkpoint,
    Command, QueryParamTypes
)

# Via client.runs
def stream(
    thread_id: str | None,
    assistant_id: str,
    *,
    input: Mapping[str, Any] | None = None,
    command: Command | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] = "values",
    stream_subgraphs: bool = False,
    stream_resumable: bool = False,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]:
    """
    Stream the results of a run.

    Args:
        thread_id: The thread ID to stream the run on.
        assistant_id: The assistant ID or graph name to stream the run on.
        input: The input to the run.
        command: The command to run instead of input.
        stream_mode: The mode(s) to stream the run. Default is "values".
        stream_subgraphs: Whether to stream subgraphs.
        stream_resumable: Whether the stream is resumable.
        metadata: The metadata to add to the run.
        config: The config to use for the run.
        context: The context to add to the run.
        checkpoint: The checkpoint to resume from.
        checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
        webhook: Webhook to call after the run is done.
        webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        AsyncIterator[StreamPart]: The stream of the run.
    """

Async Execution

Execute runs asynchronously and retrieve results when complete.

from langgraph_sdk.schema import Run, QueryParamTypes

async def create(
    thread_id: str | None,
    assistant_id: str,
    *,
    input: Mapping[str, Any] | None = None,
    command: Command | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] = "values",
    stream_subgraphs: bool = False,
    stream_resumable: bool = False,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Run:
    """
    Create a background run.

    Args:
        thread_id: The thread ID to create the run on.
        assistant_id: The assistant ID or graph name to create the run on.
        input: The input to the run.
        command: The command to run instead of input.
        stream_mode: The mode(s) to stream the run. Default is "values".
        stream_subgraphs: Whether to stream subgraphs.
        stream_resumable: Whether the stream is resumable.
        metadata: The metadata to add to the run.
        config: The config to use for the run.
        context: The context to add to the run.
        checkpoint: The checkpoint to resume from.
        checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
        webhook: Webhook to call after the run is done.
        webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Run: The created run.
    """

Synchronous Execution

Execute runs synchronously and wait for completion.

async def wait(
    thread_id: str | None,
    assistant_id: str,
    *,
    input: Mapping[str, Any] | None = None,
    command: Command | None = None,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint: Checkpoint | None = None,
    checkpoint_id: str | None = None,  # deprecated
    webhook: str | None = None,
    webhook_mode: str | None = None,
    checkpoint_during: bool | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Run:
    """
    Create a run, wait for it to finish and return the final state.

    Args:
        thread_id: The thread ID to create the run on.
        assistant_id: The assistant ID or graph name to create the run on.
        input: The input to the run.
        command: The command to run instead of input.
        metadata: The metadata to add to the run.
        config: The config to use for the run.
        context: The context to add to the run.
        checkpoint: The checkpoint to resume from.
        checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.
        webhook: Webhook to call after the run is done.
        webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
        checkpoint_during: Whether to checkpoint during the run.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Run: The completed run.
    """

Batch Execution

Execute multiple runs concurrently with batch operations.

from langgraph_sdk.schema import RunCreate

async def create_batch(
    payloads: list[RunCreate],
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Run]:
    """
    Create a batch of stateless background runs.

    Args:
        payloads: The payloads for the runs.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        list[Run]: The created runs.
    """

Run Management

Manage active and completed runs with listing, retrieval, and cancellation capabilities.

from langgraph_sdk.schema import RunSelectField, RunStatus, CancelAction

async def list(
    thread_id: str,
    *,
    limit: int = 10,
    offset: int = 0,
    status: RunStatus | None = None,
    select: list[RunSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Run]:
    """
    Get all runs for a thread.

    Args:
        thread_id: The thread ID to get runs for.
        limit: The maximum number of runs to return.
        offset: The number of runs to skip.
        status: The status to filter by.
        select: Fields to include in the response.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        list[Run]: The runs for the thread.
    """

async def get(
    thread_id: str,
    run_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Run:
    """
    Get a run.

    Args:
        thread_id: The thread ID to get the run from.
        run_id: The run ID to get.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Run: Run object.
    """

async def cancel(
    thread_id: str,
    run_id: str,
    *,
    wait: bool = False,
    action: CancelAction = "interrupt",
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None:
    """
    Cancel a run.

    Args:
        thread_id: The thread ID to cancel the run on.
        run_id: The run ID to cancel.
        wait: Whether to wait for the run to be cancelled.
        action: The type of cancellation. Options are "interrupt" or "rollback".
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.
    """

async def delete(
    thread_id: str,
    run_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None:
    """
    Delete a run.

    Args:
        thread_id: The thread ID to delete the run from.
        run_id: The run ID to delete.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.
    """

Run Streaming & Joining

Join ongoing runs and stream their execution events.

async def join(
    thread_id: str,
    run_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> dict:
    """
    Block until a run is done. Returns the final state of the thread.

    Args:
        thread_id: The thread ID to join the run on.
        run_id: The run ID to join.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        dict: The final state of the thread.
    """

def join_stream(
    thread_id: str,
    run_id: str,
    *,
    cancel_on_disconnect: bool = False,
    stream_mode: StreamMode | Sequence[StreamMode] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
    last_event_id: str | None = None,
) -> AsyncIterator[StreamPart]:
    """
    Stream output from a run in real-time, until the run is done.
    Output is not buffered, so any output produced before this call will
    not be received here.

    Args:
        thread_id: The thread ID to stream the run on.
        run_id: The run ID to stream.
        cancel_on_disconnect: Whether to cancel the run if the stream is disconnected.
        stream_mode: The mode(s) to stream the run.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.
        last_event_id: The last event ID to start streaming from.

    Returns:
        AsyncIterator[StreamPart]: A stream of the run.
    """

Types

class Run(TypedDict):
    """Run execution details."""
    run_id: str
    thread_id: str
    assistant_id: str
    created_at: str
    updated_at: str
    status: RunStatus
    kwargs: dict
    metadata: dict

class RunCreate(TypedDict):
    """Run creation parameters."""
    thread_id: str
    assistant_id: str
    input: dict
    config: Config
    metadata: dict
    multitask_strategy: MultitaskStrategy

class StreamPart(NamedTuple):
    """Stream event part."""
    event: str
    data: dict

RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]

StreamMode = Literal[
    "values", "messages", "updates", "events",
    "tasks", "checkpoints", "debug", "custom", "messages-tuple"
]

MultitaskStrategy = Literal["reject", "interrupt", "rollback", "enqueue"]

DisconnectMode = Literal["cancel", "continue"]

OnCompletionBehavior = Literal["delete", "keep"]

CancelAction = Literal["interrupt", "rollback"]

RunSelectField = Literal[
    "run_id", "thread_id", "assistant_id", "created_at",
    "updated_at", "status", "kwargs", "metadata"
]

Usage Examples

Streaming Execution

# Basic streaming run
async for chunk in client.runs.stream(
    thread_id="thread-123",
    assistant_id="assistant-456",
    input={"messages": [{"role": "human", "content": "Hello!"}]}
):
    if chunk.event == "messages":
        print(f"Message: {chunk.data}")
    elif chunk.event == "events":
        print(f"Event: {chunk.data}")

# Advanced streaming with configuration
async for chunk in client.runs.stream(
    thread_id="thread-123",
    assistant_id="assistant-456",
    input={"query": "Explain AI"},
    config={"temperature": 0.7, "max_tokens": 1000},
    stream_mode="events",
    interrupt_before=["human_review"],
    multitask_strategy="enqueue"
):
    print(f"{chunk.event}: {chunk.data}")

Asynchronous Execution

# Start run asynchronously
run = await client.runs.create(
    thread_id="thread-123",
    assistant_id="assistant-456",
    input={"task": "analyze_document", "doc_id": "doc-789"},
    metadata={"priority": "high"},
    webhook="https://myapp.com/webhooks/run-complete"
)

print(f"Started run {run['run_id']} with status {run['status']}")

# Check run status later
updated_run = await client.runs.get("thread-123", run["run_id"])
if updated_run["status"] == "success":
    print("Run completed successfully")

Synchronous Execution

# Execute and wait for completion
completed_run = await client.runs.wait(
    thread_id="thread-123",
    assistant_id="assistant-456",
    input={"calculation": "fibonacci", "n": 100},
    config={"timeout": 300}
)

print(f"Final status: {completed_run['status']}")
print(f"Result: {completed_run['kwargs'].get('result')}")

Batch Operations

# Create multiple runs
payloads = [
    {
        "thread_id": "thread-1",
        "assistant_id": "assistant-456",
        "input": {"task": f"process_item_{i}"}
    }
    for i in range(10)
]

batch_runs = await client.runs.create_batch(payloads)
print(f"Created {len(batch_runs)} runs")

Run Management

# List thread runs
runs = await client.runs.list("thread-123", limit=50)
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]

# Cancel a run
if active_runs:
    await client.runs.cancel(
        "thread-123",
        active_runs[0]["run_id"],
        cancel_action="interrupt"
    )

# Join an ongoing run
result = await client.runs.join("thread-123", "run-789")

# Stream events from ongoing run
async for event in client.runs.join_stream("thread-123", "run-789"):
    print(f"Event: {event}")

Error Handling

try:
    async for chunk in client.runs.stream(
        thread_id="thread-123",
        assistant_id="assistant-456",
        input={"query": "test"}
    ):
        if chunk.event == "error":
            print(f"Execution error: {chunk.data}")
            break
        elif chunk.event == "interrupt":
            print(f"Execution interrupted: {chunk.data}")
            # Handle interrupt, possibly resume or cancel
except Exception as e:
    print(f"Stream error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-langgraph-sdk

docs

assistant-management.md

authentication.md

client-management.md

index.md

persistent-storage.md

run-execution.md

scheduled-tasks.md

thread-management.md

tile.json