or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md
tile.json

tessl/pypi-langgraph

Building stateful, multi-actor applications with LLMs

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/langgraph@1.0.x

To install, run

npx @tessl/cli install tessl/pypi-langgraph@1.0.0

index.mddocs/

LangGraph

LangGraph is a library for building stateful, multi-actor applications with Large Language Models (LLMs). It provides low-level orchestration infrastructure for creating durable, long-running workflows with built-in support for persistence, human-in-the-loop interactions, and complex control flow. LangGraph enables developers to build sophisticated agent systems that can handle failures, maintain memory across sessions, and coordinate multiple actors through a flexible graph-based architecture.

Package Information

  • Package Name: langgraph
  • Package Type: Library
  • Language: Python
  • Installation: pip install langgraph
  • Version: 1.0.1

Core Imports

from langgraph.graph import StateGraph, START, END

Import tags for controlling behavior:

from langgraph.constants import TAG_NOSTREAM, TAG_HIDDEN

Common imports for working with messages:

from langgraph.graph import MessagesState, add_messages

For functional API:

from langgraph.func import entrypoint, task

For low-level graph execution:

from langgraph.pregel import Pregel

Basic Usage

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END

# Define the state schema
class State(TypedDict):
    messages: list[str]
    counter: int

# Create a graph
graph = StateGraph(State)

# Define node functions
def process_message(state: State) -> State:
    """Process messages and increment counter."""
    return {
        "messages": state["messages"] + ["processed"],
        "counter": state["counter"] + 1
    }

def check_complete(state: State) -> str:
    """Route based on counter."""
    if state["counter"] >= 3:
        return END
    return "process"

# Add nodes
graph.add_node("process", process_message)

# Add edges
graph.add_edge(START, "process")
graph.add_conditional_edges("process", check_complete)

# Compile the graph
app = graph.compile()

# Run the graph
result = app.invoke({
    "messages": ["hello"],
    "counter": 0
})

print(result)  # Final state after execution

Architecture

LangGraph is built around several key components that work together to enable stateful, multi-actor workflows:

  • StateGraph: High-level API for building graphs where nodes communicate through shared state. Nodes read from and write to state channels, enabling coordination between multiple actors.

  • Pregel: Low-level execution engine inspired by Google's Pregel system. Handles graph traversal, state management, checkpointing, and parallelization of node execution.

  • Channels: State management primitives that control how state updates are applied. Different channel types support various patterns like last-value, aggregation, barriers, and topics.

  • Checkpointing: Persistent state storage that enables durable execution. Graphs can be paused, resumed, and replayed from any checkpoint, supporting long-running workflows and human-in-the-loop patterns.

  • Functional API: Decorator-based API using @entrypoint and @task that provides an alternative to explicit graph construction, enabling more natural Python code with automatic parallelization.

  • Runtime Context: Thread-local context that provides access to configuration, stores, stream writers, and other runtime utilities from within node functions.

This architecture enables LangGraph to support complex agent workflows with features like conditional routing, parallel execution, state persistence, error recovery, and human intervention at any point in execution.

Capabilities

Graph Construction

Build stateful graphs using StateGraph where nodes communicate through shared state. Supports conditional routing, parallel execution, and complex workflows.

class StateGraph:
    def __init__(
        self,
        state_schema,
        context_schema=None,
        *,
        input_schema=None,
        output_schema=None
    ): ...

    def add_node(
        self,
        node,
        action=None,
        *,
        defer=False,
        metadata=None,
        input_schema=None,
        retry_policy=None,
        cache_policy=None,
        destinations=None
    ): ...

    def add_edge(self, start_key, end_key): ...

    def add_conditional_edges(self, source, path, path_map=None): ...

    def compile(
        self,
        checkpointer=None,
        *,
        cache=None,
        store=None,
        interrupt_before=None,
        interrupt_after=None,
        debug=False,
        name=None
    ): ...
# Special node identifiers
START: str  # The first (virtual) node - "__start__"
END: str    # The last (virtual) node - "__end__"

# Tags for controlling behavior
TAG_NOSTREAM: str  # Tag to disable streaming for a chat model
TAG_HIDDEN: str    # Tag to hide node/edge from tracing/streaming

State Graph API

Functional API

Define workflows using Python decorators for a more natural programming style with automatic parallelization of tasks.

def task(
    func=None,
    *,
    name=None,
    retry_policy=None,
    cache_policy=None
): ...

class entrypoint:
    def __init__(
        self,
        checkpointer=None,
        store=None,
        cache=None,
        context_schema=None,
        cache_policy=None,
        retry_policy=None
    ): ...

    def __call__(self, func): ...

Functional API

State Management Channels

Channel primitives for managing how state updates are applied, supporting patterns like last-value, aggregation, barriers, and pub/sub.

class LastValue: ...
class AnyValue: ...
class EphemeralValue: ...
class UntrackedValue: ...
class BinaryOperatorAggregate:
    def __init__(self, typ, operator): ...
class Topic: ...
class NamedBarrierValue: ...

Channels

Graph Execution Engine

Low-level Pregel execution engine providing fine-grained control over graph execution, streaming, and state management.

class Pregel:
    def invoke(self, input, config=None, **kwargs): ...
    def stream(
        self,
        input,
        config=None,
        *,
        stream_mode=None,
        output_keys=None,
        interrupt_before=None,
        interrupt_after=None,
        debug=None,
        subgraphs=False
    ): ...
    def get_state(self, config, *, subgraphs=False): ...
    def update_state(self, config, values, as_node=None): ...
    def get_state_history(
        self,
        config,
        *,
        filter=None,
        before=None,
        limit=None
    ): ...

Pregel Engine

Types and Primitives

Core types for control flow, state snapshots, task management, and retry/cache policies.

class Send:
    """Send message to specific node."""
    def __init__(self, node: str, arg: Any): ...

class Command:
    """Command for state updates and navigation."""
    graph: str | None = None
    update: Any | None = None
    resume: dict[str, Any] | Any | None = None
    goto: Send | Sequence[Send] = ()

def interrupt(value: Any) -> Any:
    """Interrupt graph execution with resumable value."""
    ...

class RetryPolicy:
    initial_interval: float = 0.5
    backoff_factor: float = 2.0
    max_interval: float = 128.0
    max_attempts: int = 3
    jitter: bool = True
    retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]

class StateSnapshot:
    values: dict[str, Any] | Any
    next: tuple[str, ...]
    config: RunnableConfig
    metadata: CheckpointMetadata | None
    created_at: str | None
    parent_config: RunnableConfig | None
    tasks: tuple[PregelTask, ...]
    interrupts: tuple[Interrupt, ...]

Types and Primitives

Managed Values

Built-in managed state values that provide execution context information within nodes and tasks.

IsLastStep: Annotated[bool, IsLastStepManager]
"""
Managed value that returns True when execution is on the last step.

Include in your state TypedDict to access execution context. The value is automatically
populated by the runtime and indicates when the graph is on its final step before
hitting the recursion limit.

Usage:
    from typing import TypedDict
    from langgraph.managed import IsLastStep
    from langgraph.graph import StateGraph

    class State(TypedDict):
        data: str
        is_last: IsLastStep  # Add as a state field

    def my_node(state: State) -> dict:
        if state["is_last"]:  # Access like any other state field
            # Perform final cleanup or summary
            return {"data": "final"}
        return {"data": "continue"}

    graph = StateGraph(State)
    graph.add_node("process", my_node)
    # ... The 'is_last' field will be automatically managed
"""

RemainingSteps: Annotated[int, RemainingStepsManager]
"""
Managed value that returns the number of remaining execution steps.

Include in your state TypedDict to track how many more steps can execute before
hitting the recursion limit. Useful for progress tracking or conditional logic.

Usage:
    from typing import TypedDict
    from langgraph.managed import RemainingSteps
    from langgraph.graph import StateGraph

    class State(TypedDict):
        data: str
        steps_left: RemainingSteps  # Add as a state field

    def my_node(state: State) -> dict:
        print(f"Steps remaining: {state['steps_left']}")
        return {"data": "processed"}

    graph = StateGraph(State)
    graph.add_node("process", my_node)
    # ... The 'steps_left' field will be automatically managed
"""

Note: Managed values must be declared in the state TypedDict schema. They are automatically populated by the runtime and are read-only (nodes cannot modify them). The graph identifies managed values by their type annotation and handles them specially.

Error Handling

Exception classes for graph errors, interrupts, and validation failures.

class GraphRecursionError(RecursionError): ...
class InvalidUpdateError(Exception): ...
class GraphInterrupt(Exception): ...
class EmptyChannelError(Exception): ...
class EmptyInputError(Exception): ...
class TaskNotFound(Exception): ...

class ErrorCode:
    GRAPH_RECURSION_LIMIT
    INVALID_CONCURRENT_GRAPH_UPDATE
    INVALID_GRAPH_NODE_RETURN_VALUE
    MULTIPLE_SUBGRAPHS
    INVALID_CHAT_HISTORY

Error Handling

Configuration and Runtime

Access configuration, stores, stream writers, and runtime context from within nodes and tasks.

def get_config() -> RunnableConfig:
    """Get current RunnableConfig from context."""
    ...

def get_store() -> BaseStore:
    """Access LangGraph store from inside node or task."""
    ...

def get_stream_writer() -> StreamWriter:
    """Access StreamWriter from inside node or task."""
    ...

def get_runtime(context_schema=None) -> Runtime:
    """Get runtime context for current graph run."""
    ...

Configuration and Runtime

Message Handling

Utilities for message-based workflows with built-in message merging and state management.

def add_messages(
    left: Messages,
    right: Messages,
    *,
    format: Literal["langchain-openai"] | None = None
) -> Messages:
    """Merge two lists of messages, updating by ID."""
    ...

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

class MessageGraph(StateGraph):
    """Deprecated: Use StateGraph with messages key instead."""
    ...

Message Handling