or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-client.mdcaching.mdclient.mdevaluation.mdindex.mdrun-management.mdschemas.mdtesting.mdtracing.mdutilities.md
README.mdtile.json

index.mddocs/

LangSmith Python SDK

A comprehensive Python SDK for LangSmith, providing observability, evaluation, and testing capabilities for LLM applications. LangSmith enables developers to trace, debug, evaluate, and monitor AI applications through a unified API, with full support for synchronous and asynchronous operations.

Package Information

  • Package Name: langsmith
  • Language: Python
  • Installation: pip install langsmith

Core Imports

import langsmith as ls
from langsmith import Client, traceable

For async operations:

from langsmith import AsyncClient

For evaluation:

from langsmith import evaluate, RunEvaluator, EvaluationResult

For testing:

from langsmith import test, expect

Basic Usage

import langsmith as ls
from langsmith import traceable

# Configure tracing at startup
ls.configure(
    enabled=True,
    project_name="my-project"
)

# Trace a function
@traceable
def process_query(query: str) -> str:
    """Process a user query."""
    # Your LLM or processing logic here
    result = f"Processed: {query}"
    return result

# Use the traced function
result = process_query("What is LangSmith?")

# Get current run tree for metadata
from langsmith import get_current_run_tree, set_run_metadata

@traceable
def advanced_function(input_data):
    # Add metadata to current run
    set_run_metadata(user_id="123", version="2.0")

    # Get current run
    run = get_current_run_tree()
    if run:
        print(f"Run ID: {run.id}")

    return process(input_data)

Architecture

LangSmith's architecture centers on runs (trace spans) organized in tree structures. The Client and AsyncClient classes provide comprehensive API access for managing runs, projects, datasets, examples, and feedback. The tracing system uses context variables to implicitly track parent-child relationships between runs, enabling automatic trace tree construction. The RunTree class represents individual trace spans with metadata, inputs, outputs, and timing information. Evaluation capabilities are built on top of the tracing infrastructure, allowing runs to be associated with dataset examples and evaluated by custom or built-in evaluators. The testing framework integrates with pytest to provide trace-aware test execution with expectations and assertions.

Capabilities

Client API

Synchronous client for interacting with the LangSmith API. Provides full CRUD operations for runs, projects, datasets, examples, feedback, annotation queues, and prompts. Includes built-in tracing support with auto-batching, caching, and data anonymization.

class Client:
    def __init__(
        self,
        api_url: Optional[str] = None,
        api_key: Optional[str] = None,
        timeout_ms: Optional[Union[int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]]] = None,
        web_url: Optional[str] = None,
        session: Optional[requests.Session] = None,
        auto_batch_tracing: bool = True,
        info: Optional[ls_schemas.LangSmithInfo] = None,
        api_version: str = "v1",
        hide_inputs: Optional[Union[Callable[[dict], dict], bool]] = None,
        hide_outputs: Optional[Union[Callable[[dict], dict], bool]] = None,
        anonymizer: Optional[Callable[[Any], Any]] = None,
        hide_metadata: Optional[Union[Callable[[dict], dict], bool]] = None,
        cache: Union[Cache, bool] = False,
        tracing_error_callback: Optional[Callable[[Exception], None]] = None,
        workspace_id: Optional[str] = None,
    ): ...

    def create_run(
        self,
        name: str,
        inputs: dict,
        run_type: str,
        *,
        execution_order: Optional[int] = None,
        **kwargs
    ) -> None: ...

    def update_run(
        self,
        run_id: Union[str, UUID],
        *,
        end_time: Optional[datetime] = None,
        error: Optional[str] = None,
        outputs: Optional[dict] = None,
        **kwargs
    ) -> None: ...

    def read_run(
        self,
        run_id: Union[str, UUID],
        load_child_runs: bool = False
    ) -> Run: ...

    def list_runs(
        self,
        *,
        project_name: Optional[str] = None,
        project_id: Optional[Union[str, UUID]] = None,
        run_type: Optional[str] = None,
        **kwargs
    ) -> Iterator[Run]: ...

Client API

Async Client API

Asynchronous client for interacting with the LangSmith API. Provides async/await versions of most Client operations for high-performance concurrent operations.

class AsyncClient:
    def __init__(
        self,
        api_url: Optional[str] = None,
        api_key: Optional[str] = None,
        timeout_ms: Optional[Union[int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]]] = None,
        retry_config: Optional[Mapping[str, Any]] = None,
        web_url: Optional[str] = None,
        cache: Union[AsyncCache, bool] = False,
    ): ...

    async def __aenter__(self) -> "AsyncClient": ...

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

    async def create_run(
        self,
        name: str,
        run_type: str,
        inputs: dict,
        **kwargs
    ) -> None: ...

    async def read_run(
        self,
        run_id: Union[str, UUID]
    ) -> Run: ...

Async Client API

Tracing & Decorators

Decorators and context managers for automatic tracing of functions and code blocks. Supports both synchronous and asynchronous code, with generators and streaming support.

def traceable(
    run_type: Literal["chain", "llm", "tool", "retriever", "prompt", ...] = "chain",
    *,
    name: Optional[str] = None,
    metadata: Optional[Mapping[str, Any]] = None,
    tags: Optional[list[str]] = None,
    client: Optional[Client] = None,
    reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
    project_name: Optional[str] = None,
    process_inputs: Optional[Callable[[dict], dict]] = None,
    process_outputs: Optional[Callable[..., dict]] = None,
) -> Callable: ...

class trace:
    def __init__(
        self,
        name: str,
        run_type: Literal["chain", "llm", "tool", ...] = "chain",
        *,
        inputs: Optional[dict] = None,
        extra: Optional[dict] = None,
        project_name: Optional[str] = None,
        parent: Optional[Union[RunTree, str, Mapping]] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[Mapping[str, Any]] = None,
        client: Optional[Client] = None,
    ): ...

    def __enter__(self) -> RunTree: ...

    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

def tracing_context(
    *,
    project_name: Optional[str] = None,
    tags: Optional[list[str]] = None,
    metadata: Optional[dict[str, Any]] = None,
    parent: Optional[Union[RunTree, Mapping, str, Literal[False]]] = None,
    enabled: Optional[Union[bool, Literal["local"]]] = None,
    client: Optional[Client] = None,
) -> Generator[None, None, None]: ...

def get_tracing_context(
    context: Optional[contextvars.Context] = None
) -> dict[str, Any]: ...

def get_current_run_tree() -> Optional[RunTree]: ...

def set_run_metadata(**metadata: Any) -> None: ...

Tracing & Decorators

Run Management

RunTree class for representing trace spans and global configuration for tracing behavior.

class RunTree(BaseModel):
    name: str
    id: UUID
    run_type: str = "chain"
    start_time: datetime
    end_time: Optional[datetime] = None
    inputs: dict
    outputs: Optional[dict] = None
    error: Optional[str] = None
    extra: dict = Field(default_factory=dict)
    tags: Optional[list[str]] = None
    metadata: Optional[dict] = None
    parent_run_id: Optional[UUID] = None
    session_name: str
    trace_id: UUID
    dotted_order: str

    def end(
        self,
        *,
        outputs: Optional[dict] = None,
        error: Optional[str] = None,
        **kwargs
    ) -> None: ...

    def post(self, *, exclude_child_runs: bool = False) -> None: ...

    def create_child(
        self,
        name: str,
        run_type: str = "chain",
        *,
        inputs: Optional[dict] = None,
        **kwargs
    ) -> "RunTree": ...

    def add_tags(self, *tags: str) -> None: ...

    def add_metadata(self, metadata: dict) -> None: ...

def configure(
    client: Optional[Client] = ...,
    enabled: Optional[bool] = ...,
    project_name: Optional[str] = ...,
    tags: Optional[list[str]] = ...,
    metadata: Optional[dict[str, Any]] = ...,
) -> None: ...

Run Management

Evaluation

Evaluate target systems on datasets with custom evaluators. Supports both synchronous and asynchronous evaluation with summary evaluators.

def evaluate(
    target: Union[Callable, Runnable, str, uuid.UUID, TracerSession],
    /,
    data: Union[str, uuid.UUID, Iterable[Example], Dataset, None] = None,
    evaluators: Optional[Sequence[Union[RunEvaluator, Callable]]] = None,
    summary_evaluators: Optional[Sequence[Callable]] = None,
    metadata: Optional[dict] = None,
    experiment_prefix: Optional[str] = None,
    description: Optional[str] = None,
    max_concurrency: Optional[int] = None,
    num_repetitions: int = 1,
    client: Optional[Client] = None,
    blocking: bool = True,
    experiment: Optional[Union[TracerSession, str, uuid.UUID]] = None,
    upload_results: bool = True,
    **kwargs: Any,
) -> ExperimentResults: ...

async def aevaluate(
    target: Union[AsyncCallable, AsyncIterable, Runnable, str, uuid.UUID, TracerSession],
    /,
    data: Union[str, uuid.UUID, AsyncIterable[Example], Iterable[Example], None] = None,
    evaluators: Optional[Sequence[Union[RunEvaluator, Callable]]] = None,
    **kwargs: Any,
) -> AsyncExperimentResults: ...

class RunEvaluator(Protocol):
    def evaluate_run(
        self,
        run: Run,
        example: Optional[Example] = None,
    ) -> Union[EvaluationResult, EvaluationResults, dict]: ...

class EvaluationResult(BaseModel):
    key: str
    score: Optional[Union[int, float, bool]] = None
    value: Optional[Union[str, dict, int, float, bool]] = None
    metadata: Optional[dict] = None
    comment: Optional[str] = None
    correction: Optional[dict] = None
    evaluator_info: dict = Field(default_factory=dict)
    source_run_id: Optional[Union[UUID, str]] = None
    target_run_id: Optional[Union[UUID, str]] = None

Evaluation

Testing

Pytest integration for tracing test cases with expectations API for approximate assertions and scoring.

def test(
    id: Optional[UUID] = None,
    output_keys: Optional[Sequence[str]] = None,
    client: Optional[Client] = None,
    test_suite_name: Optional[str] = None,
    metadata: Optional[dict] = None,
    repetitions: Optional[int] = None,
    split: Optional[Union[str, list[str]]] = None,
    cached_hosts: Optional[Sequence[str]] = None,
) -> Callable: ...

def unit(
    id: Optional[UUID] = None,
    output_keys: Optional[Sequence[str]] = None,
    **kwargs
) -> Callable: ...

class expect:
    @staticmethod
    def score(
        value: float,
        *,
        key: str = "score",
    ) -> _Matcher: ...

    @staticmethod
    def value(value: Any) -> _Matcher: ...

    @staticmethod
    def embedding_distance(
        prediction: str,
        reference: str,
        *,
        config: Optional[EmbeddingConfig] = None,
    ) -> _Matcher: ...

    @staticmethod
    def edit_distance(
        prediction: str,
        reference: str,
        *,
        config: Optional[EditDistanceConfig] = None,
    ) -> _Matcher: ...

Testing

Caching

LRU cache implementations with TTL and background refresh for prompt and data caching.

class Cache:
    def __init__(
        self,
        *,
        max_size: int = 100,
        ttl_seconds: Optional[float] = 3600.0,
        refresh_interval_seconds: float = 60.0,
        fetch_func: Optional[Callable[[str], Any]] = None,
    ): ...

    def get(self, key: str) -> Optional[Any]: ...

    def set(self, key: str, value: Any) -> None: ...

    def invalidate(self, key: str) -> None: ...

    def clear(self) -> None: ...

    def shutdown(self) -> None: ...

class AsyncCache:
    def __init__(
        self,
        *,
        max_size: int = 100,
        ttl_seconds: Optional[float] = 3600.0,
        refresh_interval_seconds: float = 60.0,
        fetch_func: Optional[Callable[[str], Awaitable[Any]]] = None,
    ): ...

    def get(self, key: str) -> Optional[Any]: ...

    def set(self, key: str, value: Any) -> None: ...

    async def start(self) -> None: ...

    async def stop(self) -> None: ...

Caching

Utilities

Utility functions and classes for context management, UUID generation, and data anonymization.

class ContextThreadPoolExecutor(ThreadPoolExecutor):
    def submit(
        self,
        func: Callable[P, T],
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> Future[T]: ...

def uuid7() -> uuid.UUID: ...

def uuid7_from_datetime(dt: datetime) -> uuid.UUID: ...

__version__: str

Utilities

Data Anonymization

Module for anonymizing and redacting sensitive data in traces. Provides tools to extract string nodes from nested data structures and apply masking rules.

def create_anonymizer(
    replacer: Union[
        Callable[[str, list[Union[str, int]]], str],
        list[StringNodeRule],
        StringNodeProcessor,
    ],
    *,
    max_depth: Optional[int] = None,
) -> Callable[[Any], Any]: ...

class StringNodeProcessor(ABC):
    @abstractmethod
    def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]: ...

Use with Client to automatically redact sensitive data from all traces:

from langsmith import Client
from langsmith.anonymizer import create_anonymizer
import re

anonymizer = create_anonymizer([
    {"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I), "replace": "[EMAIL]"},
    {"pattern": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), "replace": "[SSN]"}
])

client = Client(anonymizer=anonymizer)

Data Anonymization

Data Schemas

Key data types used throughout the LangSmith API including Run, Example, Dataset, Feedback, and more.

Schemas