Building stateful, multi-actor applications with LLMs
npx @tessl/cli install tessl/pypi-langgraph@1.0.0LangGraph is a library for building stateful, multi-actor applications with Large Language Models (LLMs). It provides low-level orchestration infrastructure for creating durable, long-running workflows with built-in support for persistence, human-in-the-loop interactions, and complex control flow. LangGraph enables developers to build sophisticated agent systems that can handle failures, maintain memory across sessions, and coordinate multiple actors through a flexible graph-based architecture.
pip install langgraphfrom langgraph.graph import StateGraph, START, ENDImport tags for controlling behavior:
from langgraph.constants import TAG_NOSTREAM, TAG_HIDDENCommon imports for working with messages:
from langgraph.graph import MessagesState, add_messagesFor functional API:
from langgraph.func import entrypoint, taskFor low-level graph execution:
from langgraph.pregel import Pregelfrom typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
# Define the state schema
class State(TypedDict):
messages: list[str]
counter: int
# Create a graph
graph = StateGraph(State)
# Define node functions
def process_message(state: State) -> State:
"""Process messages and increment counter."""
return {
"messages": state["messages"] + ["processed"],
"counter": state["counter"] + 1
}
def check_complete(state: State) -> str:
"""Route based on counter."""
if state["counter"] >= 3:
return END
return "process"
# Add nodes
graph.add_node("process", process_message)
# Add edges
graph.add_edge(START, "process")
graph.add_conditional_edges("process", check_complete)
# Compile the graph
app = graph.compile()
# Run the graph
result = app.invoke({
"messages": ["hello"],
"counter": 0
})
print(result) # Final state after executionLangGraph is built around several key components that work together to enable stateful, multi-actor workflows:
StateGraph: High-level API for building graphs where nodes communicate through shared state. Nodes read from and write to state channels, enabling coordination between multiple actors.
Pregel: Low-level execution engine inspired by Google's Pregel system. Handles graph traversal, state management, checkpointing, and parallelization of node execution.
Channels: State management primitives that control how state updates are applied. Different channel types support various patterns like last-value, aggregation, barriers, and topics.
Checkpointing: Persistent state storage that enables durable execution. Graphs can be paused, resumed, and replayed from any checkpoint, supporting long-running workflows and human-in-the-loop patterns.
Functional API: Decorator-based API using @entrypoint and @task that provides an alternative to explicit graph construction, enabling more natural Python code with automatic parallelization.
Runtime Context: Thread-local context that provides access to configuration, stores, stream writers, and other runtime utilities from within node functions.
This architecture enables LangGraph to support complex agent workflows with features like conditional routing, parallel execution, state persistence, error recovery, and human intervention at any point in execution.
Build stateful graphs using StateGraph where nodes communicate through shared state. Supports conditional routing, parallel execution, and complex workflows.
class StateGraph:
def __init__(
self,
state_schema,
context_schema=None,
*,
input_schema=None,
output_schema=None
): ...
def add_node(
self,
node,
action=None,
*,
defer=False,
metadata=None,
input_schema=None,
retry_policy=None,
cache_policy=None,
destinations=None
): ...
def add_edge(self, start_key, end_key): ...
def add_conditional_edges(self, source, path, path_map=None): ...
def compile(
self,
checkpointer=None,
*,
cache=None,
store=None,
interrupt_before=None,
interrupt_after=None,
debug=False,
name=None
): ...# Special node identifiers
START: str # The first (virtual) node - "__start__"
END: str # The last (virtual) node - "__end__"
# Tags for controlling behavior
TAG_NOSTREAM: str # Tag to disable streaming for a chat model
TAG_HIDDEN: str # Tag to hide node/edge from tracing/streamingDefine workflows using Python decorators for a more natural programming style with automatic parallelization of tasks.
def task(
func=None,
*,
name=None,
retry_policy=None,
cache_policy=None
): ...
class entrypoint:
def __init__(
self,
checkpointer=None,
store=None,
cache=None,
context_schema=None,
cache_policy=None,
retry_policy=None
): ...
def __call__(self, func): ...Channel primitives for managing how state updates are applied, supporting patterns like last-value, aggregation, barriers, and pub/sub.
class LastValue: ...
class AnyValue: ...
class EphemeralValue: ...
class UntrackedValue: ...
class BinaryOperatorAggregate:
def __init__(self, typ, operator): ...
class Topic: ...
class NamedBarrierValue: ...Low-level Pregel execution engine providing fine-grained control over graph execution, streaming, and state management.
class Pregel:
def invoke(self, input, config=None, **kwargs): ...
def stream(
self,
input,
config=None,
*,
stream_mode=None,
output_keys=None,
interrupt_before=None,
interrupt_after=None,
debug=None,
subgraphs=False
): ...
def get_state(self, config, *, subgraphs=False): ...
def update_state(self, config, values, as_node=None): ...
def get_state_history(
self,
config,
*,
filter=None,
before=None,
limit=None
): ...Core types for control flow, state snapshots, task management, and retry/cache policies.
class Send:
"""Send message to specific node."""
def __init__(self, node: str, arg: Any): ...
class Command:
"""Command for state updates and navigation."""
graph: str | None = None
update: Any | None = None
resume: dict[str, Any] | Any | None = None
goto: Send | Sequence[Send] = ()
def interrupt(value: Any) -> Any:
"""Interrupt graph execution with resumable value."""
...
class RetryPolicy:
initial_interval: float = 0.5
backoff_factor: float = 2.0
max_interval: float = 128.0
max_attempts: int = 3
jitter: bool = True
retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
class StateSnapshot:
values: dict[str, Any] | Any
next: tuple[str, ...]
config: RunnableConfig
metadata: CheckpointMetadata | None
created_at: str | None
parent_config: RunnableConfig | None
tasks: tuple[PregelTask, ...]
interrupts: tuple[Interrupt, ...]Built-in managed state values that provide execution context information within nodes and tasks.
IsLastStep: Annotated[bool, IsLastStepManager]
"""
Managed value that returns True when execution is on the last step.
Include in your state TypedDict to access execution context. The value is automatically
populated by the runtime and indicates when the graph is on its final step before
hitting the recursion limit.
Usage:
from typing import TypedDict
from langgraph.managed import IsLastStep
from langgraph.graph import StateGraph
class State(TypedDict):
data: str
is_last: IsLastStep # Add as a state field
def my_node(state: State) -> dict:
if state["is_last"]: # Access like any other state field
# Perform final cleanup or summary
return {"data": "final"}
return {"data": "continue"}
graph = StateGraph(State)
graph.add_node("process", my_node)
# ... The 'is_last' field will be automatically managed
"""
RemainingSteps: Annotated[int, RemainingStepsManager]
"""
Managed value that returns the number of remaining execution steps.
Include in your state TypedDict to track how many more steps can execute before
hitting the recursion limit. Useful for progress tracking or conditional logic.
Usage:
from typing import TypedDict
from langgraph.managed import RemainingSteps
from langgraph.graph import StateGraph
class State(TypedDict):
data: str
steps_left: RemainingSteps # Add as a state field
def my_node(state: State) -> dict:
print(f"Steps remaining: {state['steps_left']}")
return {"data": "processed"}
graph = StateGraph(State)
graph.add_node("process", my_node)
# ... The 'steps_left' field will be automatically managed
"""Note: Managed values must be declared in the state TypedDict schema. They are automatically populated by the runtime and are read-only (nodes cannot modify them). The graph identifies managed values by their type annotation and handles them specially.
Exception classes for graph errors, interrupts, and validation failures.
class GraphRecursionError(RecursionError): ...
class InvalidUpdateError(Exception): ...
class GraphInterrupt(Exception): ...
class EmptyChannelError(Exception): ...
class EmptyInputError(Exception): ...
class TaskNotFound(Exception): ...
class ErrorCode:
GRAPH_RECURSION_LIMIT
INVALID_CONCURRENT_GRAPH_UPDATE
INVALID_GRAPH_NODE_RETURN_VALUE
MULTIPLE_SUBGRAPHS
INVALID_CHAT_HISTORYAccess configuration, stores, stream writers, and runtime context from within nodes and tasks.
def get_config() -> RunnableConfig:
"""Get current RunnableConfig from context."""
...
def get_store() -> BaseStore:
"""Access LangGraph store from inside node or task."""
...
def get_stream_writer() -> StreamWriter:
"""Access StreamWriter from inside node or task."""
...
def get_runtime(context_schema=None) -> Runtime:
"""Get runtime context for current graph run."""
...Utilities for message-based workflows with built-in message merging and state management.
def add_messages(
left: Messages,
right: Messages,
*,
format: Literal["langchain-openai"] | None = None
) -> Messages:
"""Merge two lists of messages, updating by ID."""
...
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
class MessageGraph(StateGraph):
"""Deprecated: Use StateGraph with messages key instead."""
...