- Spec files
pypi-langgraph-sdk
Describes: pkg:pypi/langgraph-sdk@0.2.x
- Description
- Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
- Author
- tessl
- Last updated
run-execution.md docs/
1# Run Execution23Execute assistant workflows on threads with support for streaming, interrupts, configuration, and completion handling. Runs represent individual executions of an assistant on a thread.45## Capabilities67### Streaming Execution89Execute runs with real-time streaming of execution events, state changes, and outputs.1011```python { .api }12from collections.abc import AsyncIterator, Mapping, Sequence13from typing import Any14from langgraph_sdk.schema import (15StreamPart, StreamMode, Config, Context, Checkpoint,16Command, QueryParamTypes17)1819# Via client.runs20def stream(21thread_id: str | None,22assistant_id: str,23*,24input: Mapping[str, Any] | None = None,25command: Command | None = None,26stream_mode: StreamMode | Sequence[StreamMode] = "values",27stream_subgraphs: bool = False,28stream_resumable: bool = False,29metadata: Mapping[str, Any] | None = None,30config: Config | None = None,31context: Context | None = None,32checkpoint: Checkpoint | None = None,33checkpoint_id: str | None = None, # deprecated34webhook: str | None = None,35webhook_mode: str | None = None,36headers: Mapping[str, str] | None = None,37params: QueryParamTypes | None = None,38) -> AsyncIterator[StreamPart]:39"""40Stream the results of a run.4142Args:43thread_id: The thread ID to stream the run on.44assistant_id: The assistant ID or graph name to stream the run on.45input: The input to the run.46command: The command to run instead of input.47stream_mode: The mode(s) to stream the run. Default is "values".48stream_subgraphs: Whether to stream subgraphs.49stream_resumable: Whether the stream is resumable.50metadata: The metadata to add to the run.51config: The config to use for the run.52context: The context to add to the run.53checkpoint: The checkpoint to resume from.54checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.55webhook: Webhook to call after the run is done.56webhook_mode: Mode to call the webhook. Options are "GET" and "POST".57headers: Optional custom headers to include with the request.58params: Optional query parameters to include with the request.5960Returns:61AsyncIterator[StreamPart]: The stream of the run.62"""63```6465### Async Execution6667Execute runs asynchronously and retrieve results when complete.6869```python { .api }70from langgraph_sdk.schema import Run, QueryParamTypes7172async def create(73thread_id: str | None,74assistant_id: str,75*,76input: Mapping[str, Any] | None = None,77command: Command | None = None,78stream_mode: StreamMode | Sequence[StreamMode] = "values",79stream_subgraphs: bool = False,80stream_resumable: bool = False,81metadata: Mapping[str, Any] | None = None,82config: Config | None = None,83context: Context | None = None,84checkpoint: Checkpoint | None = None,85checkpoint_id: str | None = None, # deprecated86webhook: str | None = None,87webhook_mode: str | None = None,88headers: Mapping[str, str] | None = None,89params: QueryParamTypes | None = None,90) -> Run:91"""92Create a background run.9394Args:95thread_id: The thread ID to create the run on.96assistant_id: The assistant ID or graph name to create the run on.97input: The input to the run.98command: The command to run instead of input.99stream_mode: The mode(s) to stream the run. Default is "values".100stream_subgraphs: Whether to stream subgraphs.101stream_resumable: Whether the stream is resumable.102metadata: The metadata to add to the run.103config: The config to use for the run.104context: The context to add to the run.105checkpoint: The checkpoint to resume from.106checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.107webhook: Webhook to call after the run is done.108webhook_mode: Mode to call the webhook. Options are "GET" and "POST".109headers: Optional custom headers to include with the request.110params: Optional query parameters to include with the request.111112Returns:113Run: The created run.114"""115```116117### Synchronous Execution118119Execute runs synchronously and wait for completion.120121```python { .api }122async def wait(123thread_id: str | None,124assistant_id: str,125*,126input: Mapping[str, Any] | None = None,127command: Command | None = None,128metadata: Mapping[str, Any] | None = None,129config: Config | None = None,130context: Context | None = None,131checkpoint: Checkpoint | None = None,132checkpoint_id: str | None = None, # deprecated133webhook: str | None = None,134webhook_mode: str | None = None,135checkpoint_during: bool | None = None,136headers: Mapping[str, str] | None = None,137params: QueryParamTypes | None = None,138) -> Run:139"""140Create a run, wait for it to finish and return the final state.141142Args:143thread_id: The thread ID to create the run on.144assistant_id: The assistant ID or graph name to create the run on.145input: The input to the run.146command: The command to run instead of input.147metadata: The metadata to add to the run.148config: The config to use for the run.149context: The context to add to the run.150checkpoint: The checkpoint to resume from.151checkpoint_id: Checkpoint to resume from. Deprecated, use checkpoint instead.152webhook: Webhook to call after the run is done.153webhook_mode: Mode to call the webhook. Options are "GET" and "POST".154checkpoint_during: Whether to checkpoint during the run.155headers: Optional custom headers to include with the request.156params: Optional query parameters to include with the request.157158Returns:159Run: The completed run.160"""161```162163### Batch Execution164165Execute multiple runs concurrently with batch operations.166167```python { .api }168from langgraph_sdk.schema import RunCreate169170async def create_batch(171payloads: list[RunCreate],172*,173headers: Mapping[str, str] | None = None,174params: QueryParamTypes | None = None,175) -> list[Run]:176"""177Create a batch of stateless background runs.178179Args:180payloads: The payloads for the runs.181headers: Optional custom headers to include with the request.182params: Optional query parameters to include with the request.183184Returns:185list[Run]: The created runs.186"""187```188189### Run Management190191Manage active and completed runs with listing, retrieval, and cancellation capabilities.192193```python { .api }194from langgraph_sdk.schema import RunSelectField, RunStatus, CancelAction195196async def list(197thread_id: str,198*,199limit: int = 10,200offset: int = 0,201status: RunStatus | None = None,202select: list[RunSelectField] | None = None,203headers: Mapping[str, str] | None = None,204params: QueryParamTypes | None = None,205) -> list[Run]:206"""207Get all runs for a thread.208209Args:210thread_id: The thread ID to get runs for.211limit: The maximum number of runs to return.212offset: The number of runs to skip.213status: The status to filter by.214select: Fields to include in the response.215headers: Optional custom headers to include with the request.216params: Optional query parameters to include with the request.217218Returns:219list[Run]: The runs for the thread.220"""221222async def get(223thread_id: str,224run_id: str,225*,226headers: Mapping[str, str] | None = None,227params: QueryParamTypes | None = None,228) -> Run:229"""230Get a run.231232Args:233thread_id: The thread ID to get the run from.234run_id: The run ID to get.235headers: Optional custom headers to include with the request.236params: Optional query parameters to include with the request.237238Returns:239Run: Run object.240"""241242async def cancel(243thread_id: str,244run_id: str,245*,246wait: bool = False,247action: CancelAction = "interrupt",248headers: Mapping[str, str] | None = None,249params: QueryParamTypes | None = None,250) -> None:251"""252Cancel a run.253254Args:255thread_id: The thread ID to cancel the run on.256run_id: The run ID to cancel.257wait: Whether to wait for the run to be cancelled.258action: The type of cancellation. Options are "interrupt" or "rollback".259headers: Optional custom headers to include with the request.260params: Optional query parameters to include with the request.261"""262263async def delete(264thread_id: str,265run_id: str,266*,267headers: Mapping[str, str] | None = None,268params: QueryParamTypes | None = None,269) -> None:270"""271Delete a run.272273Args:274thread_id: The thread ID to delete the run from.275run_id: The run ID to delete.276headers: Optional custom headers to include with the request.277params: Optional query parameters to include with the request.278"""279```280281### Run Streaming & Joining282283Join ongoing runs and stream their execution events.284285```python { .api }286async def join(287thread_id: str,288run_id: str,289*,290headers: Mapping[str, str] | None = None,291params: QueryParamTypes | None = None,292) -> dict:293"""294Block until a run is done. Returns the final state of the thread.295296Args:297thread_id: The thread ID to join the run on.298run_id: The run ID to join.299headers: Optional custom headers to include with the request.300params: Optional query parameters to include with the request.301302Returns:303dict: The final state of the thread.304"""305306def join_stream(307thread_id: str,308run_id: str,309*,310cancel_on_disconnect: bool = False,311stream_mode: StreamMode | Sequence[StreamMode] | None = None,312headers: Mapping[str, str] | None = None,313params: QueryParamTypes | None = None,314last_event_id: str | None = None,315) -> AsyncIterator[StreamPart]:316"""317Stream output from a run in real-time, until the run is done.318Output is not buffered, so any output produced before this call will319not be received here.320321Args:322thread_id: The thread ID to stream the run on.323run_id: The run ID to stream.324cancel_on_disconnect: Whether to cancel the run if the stream is disconnected.325stream_mode: The mode(s) to stream the run.326headers: Optional custom headers to include with the request.327params: Optional query parameters to include with the request.328last_event_id: The last event ID to start streaming from.329330Returns:331AsyncIterator[StreamPart]: A stream of the run.332"""333```334335## Types336337```python { .api }338class Run(TypedDict):339"""Run execution details."""340run_id: str341thread_id: str342assistant_id: str343created_at: str344updated_at: str345status: RunStatus346kwargs: dict347metadata: dict348349class RunCreate(TypedDict):350"""Run creation parameters."""351thread_id: str352assistant_id: str353input: dict354config: Config355metadata: dict356multitask_strategy: MultitaskStrategy357358class StreamPart(NamedTuple):359"""Stream event part."""360event: str361data: dict362363RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]364365StreamMode = Literal[366"values", "messages", "updates", "events",367"tasks", "checkpoints", "debug", "custom", "messages-tuple"368]369370MultitaskStrategy = Literal["reject", "interrupt", "rollback", "enqueue"]371372DisconnectMode = Literal["cancel", "continue"]373374OnCompletionBehavior = Literal["delete", "keep"]375376CancelAction = Literal["interrupt", "rollback"]377378RunSelectField = Literal[379"run_id", "thread_id", "assistant_id", "created_at",380"updated_at", "status", "kwargs", "metadata"381]382```383384## Usage Examples385386### Streaming Execution387388```python389# Basic streaming run390async for chunk in client.runs.stream(391thread_id="thread-123",392assistant_id="assistant-456",393input={"messages": [{"role": "human", "content": "Hello!"}]}394):395if chunk.event == "messages":396print(f"Message: {chunk.data}")397elif chunk.event == "events":398print(f"Event: {chunk.data}")399400# Advanced streaming with configuration401async for chunk in client.runs.stream(402thread_id="thread-123",403assistant_id="assistant-456",404input={"query": "Explain AI"},405config={"temperature": 0.7, "max_tokens": 1000},406stream_mode="events",407interrupt_before=["human_review"],408multitask_strategy="enqueue"409):410print(f"{chunk.event}: {chunk.data}")411```412413### Asynchronous Execution414415```python416# Start run asynchronously417run = await client.runs.create(418thread_id="thread-123",419assistant_id="assistant-456",420input={"task": "analyze_document", "doc_id": "doc-789"},421metadata={"priority": "high"},422webhook="https://myapp.com/webhooks/run-complete"423)424425print(f"Started run {run['run_id']} with status {run['status']}")426427# Check run status later428updated_run = await client.runs.get("thread-123", run["run_id"])429if updated_run["status"] == "success":430print("Run completed successfully")431```432433### Synchronous Execution434435```python436# Execute and wait for completion437completed_run = await client.runs.wait(438thread_id="thread-123",439assistant_id="assistant-456",440input={"calculation": "fibonacci", "n": 100},441config={"timeout": 300}442)443444print(f"Final status: {completed_run['status']}")445print(f"Result: {completed_run['kwargs'].get('result')}")446```447448### Batch Operations449450```python451# Create multiple runs452payloads = [453{454"thread_id": "thread-1",455"assistant_id": "assistant-456",456"input": {"task": f"process_item_{i}"}457}458for i in range(10)459]460461batch_runs = await client.runs.create_batch(payloads)462print(f"Created {len(batch_runs)} runs")463```464465### Run Management466467```python468# List thread runs469runs = await client.runs.list("thread-123", limit=50)470active_runs = [r for r in runs if r["status"] in ["pending", "running"]]471472# Cancel a run473if active_runs:474await client.runs.cancel(475"thread-123",476active_runs[0]["run_id"],477cancel_action="interrupt"478)479480# Join an ongoing run481result = await client.runs.join("thread-123", "run-789")482483# Stream events from ongoing run484async for event in client.runs.join_stream("thread-123", "run-789"):485print(f"Event: {event}")486```487488### Error Handling489490```python491try:492async for chunk in client.runs.stream(493thread_id="thread-123",494assistant_id="assistant-456",495input={"query": "test"}496):497if chunk.event == "error":498print(f"Execution error: {chunk.data}")499break500elif chunk.event == "interrupt":501print(f"Execution interrupted: {chunk.data}")502# Handle interrupt, possibly resume or cancel503except Exception as e:504print(f"Stream error: {e}")505```