A comprehensive Python SDK for LangSmith, providing observability, evaluation, and testing capabilities for LLM applications. LangSmith enables developers to trace, debug, evaluate, and monitor AI applications through a unified API, with full support for synchronous and asynchronous operations.
pip install langsmithimport langsmith as ls
from langsmith import Client, traceableFor async operations:
from langsmith import AsyncClientFor evaluation:
from langsmith import evaluate, RunEvaluator, EvaluationResultFor testing:
from langsmith import test, expectimport langsmith as ls
from langsmith import traceable
# Configure tracing at startup
ls.configure(
enabled=True,
project_name="my-project"
)
# Trace a function
@traceable
def process_query(query: str) -> str:
"""Process a user query."""
# Your LLM or processing logic here
result = f"Processed: {query}"
return result
# Use the traced function
result = process_query("What is LangSmith?")
# Get current run tree for metadata
from langsmith import get_current_run_tree, set_run_metadata
@traceable
def advanced_function(input_data):
# Add metadata to current run
set_run_metadata(user_id="123", version="2.0")
# Get current run
run = get_current_run_tree()
if run:
print(f"Run ID: {run.id}")
return process(input_data)LangSmith's architecture centers on runs (trace spans) organized in tree structures. The Client and AsyncClient classes provide comprehensive API access for managing runs, projects, datasets, examples, and feedback. The tracing system uses context variables to implicitly track parent-child relationships between runs, enabling automatic trace tree construction. The RunTree class represents individual trace spans with metadata, inputs, outputs, and timing information. Evaluation capabilities are built on top of the tracing infrastructure, allowing runs to be associated with dataset examples and evaluated by custom or built-in evaluators. The testing framework integrates with pytest to provide trace-aware test execution with expectations and assertions.
Synchronous client for interacting with the LangSmith API. Provides full CRUD operations for runs, projects, datasets, examples, feedback, annotation queues, and prompts. Includes built-in tracing support with auto-batching, caching, and data anonymization.
class Client:
def __init__(
self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
timeout_ms: Optional[Union[int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]]] = None,
web_url: Optional[str] = None,
session: Optional[requests.Session] = None,
auto_batch_tracing: bool = True,
info: Optional[ls_schemas.LangSmithInfo] = None,
api_version: str = "v1",
hide_inputs: Optional[Union[Callable[[dict], dict], bool]] = None,
hide_outputs: Optional[Union[Callable[[dict], dict], bool]] = None,
anonymizer: Optional[Callable[[Any], Any]] = None,
hide_metadata: Optional[Union[Callable[[dict], dict], bool]] = None,
cache: Union[Cache, bool] = False,
tracing_error_callback: Optional[Callable[[Exception], None]] = None,
workspace_id: Optional[str] = None,
): ...
def create_run(
self,
name: str,
inputs: dict,
run_type: str,
*,
execution_order: Optional[int] = None,
**kwargs
) -> None: ...
def update_run(
self,
run_id: Union[str, UUID],
*,
end_time: Optional[datetime] = None,
error: Optional[str] = None,
outputs: Optional[dict] = None,
**kwargs
) -> None: ...
def read_run(
self,
run_id: Union[str, UUID],
load_child_runs: bool = False
) -> Run: ...
def list_runs(
self,
*,
project_name: Optional[str] = None,
project_id: Optional[Union[str, UUID]] = None,
run_type: Optional[str] = None,
**kwargs
) -> Iterator[Run]: ...Asynchronous client for interacting with the LangSmith API. Provides async/await versions of most Client operations for high-performance concurrent operations.
class AsyncClient:
def __init__(
self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
timeout_ms: Optional[Union[int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]]] = None,
retry_config: Optional[Mapping[str, Any]] = None,
web_url: Optional[str] = None,
cache: Union[AsyncCache, bool] = False,
): ...
async def __aenter__(self) -> "AsyncClient": ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
async def create_run(
self,
name: str,
run_type: str,
inputs: dict,
**kwargs
) -> None: ...
async def read_run(
self,
run_id: Union[str, UUID]
) -> Run: ...Decorators and context managers for automatic tracing of functions and code blocks. Supports both synchronous and asynchronous code, with generators and streaming support.
def traceable(
run_type: Literal["chain", "llm", "tool", "retriever", "prompt", ...] = "chain",
*,
name: Optional[str] = None,
metadata: Optional[Mapping[str, Any]] = None,
tags: Optional[list[str]] = None,
client: Optional[Client] = None,
reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
project_name: Optional[str] = None,
process_inputs: Optional[Callable[[dict], dict]] = None,
process_outputs: Optional[Callable[..., dict]] = None,
) -> Callable: ...
class trace:
def __init__(
self,
name: str,
run_type: Literal["chain", "llm", "tool", ...] = "chain",
*,
inputs: Optional[dict] = None,
extra: Optional[dict] = None,
project_name: Optional[str] = None,
parent: Optional[Union[RunTree, str, Mapping]] = None,
tags: Optional[list[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
client: Optional[Client] = None,
): ...
def __enter__(self) -> RunTree: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
def tracing_context(
*,
project_name: Optional[str] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
parent: Optional[Union[RunTree, Mapping, str, Literal[False]]] = None,
enabled: Optional[Union[bool, Literal["local"]]] = None,
client: Optional[Client] = None,
) -> Generator[None, None, None]: ...
def get_tracing_context(
context: Optional[contextvars.Context] = None
) -> dict[str, Any]: ...
def get_current_run_tree() -> Optional[RunTree]: ...
def set_run_metadata(**metadata: Any) -> None: ...RunTree class for representing trace spans and global configuration for tracing behavior.
class RunTree(BaseModel):
name: str
id: UUID
run_type: str = "chain"
start_time: datetime
end_time: Optional[datetime] = None
inputs: dict
outputs: Optional[dict] = None
error: Optional[str] = None
extra: dict = Field(default_factory=dict)
tags: Optional[list[str]] = None
metadata: Optional[dict] = None
parent_run_id: Optional[UUID] = None
session_name: str
trace_id: UUID
dotted_order: str
def end(
self,
*,
outputs: Optional[dict] = None,
error: Optional[str] = None,
**kwargs
) -> None: ...
def post(self, *, exclude_child_runs: bool = False) -> None: ...
def create_child(
self,
name: str,
run_type: str = "chain",
*,
inputs: Optional[dict] = None,
**kwargs
) -> "RunTree": ...
def add_tags(self, *tags: str) -> None: ...
def add_metadata(self, metadata: dict) -> None: ...
def configure(
client: Optional[Client] = ...,
enabled: Optional[bool] = ...,
project_name: Optional[str] = ...,
tags: Optional[list[str]] = ...,
metadata: Optional[dict[str, Any]] = ...,
) -> None: ...Evaluate target systems on datasets with custom evaluators. Supports both synchronous and asynchronous evaluation with summary evaluators.
def evaluate(
target: Union[Callable, Runnable, str, uuid.UUID, TracerSession],
/,
data: Union[str, uuid.UUID, Iterable[Example], Dataset, None] = None,
evaluators: Optional[Sequence[Union[RunEvaluator, Callable]]] = None,
summary_evaluators: Optional[Sequence[Callable]] = None,
metadata: Optional[dict] = None,
experiment_prefix: Optional[str] = None,
description: Optional[str] = None,
max_concurrency: Optional[int] = None,
num_repetitions: int = 1,
client: Optional[Client] = None,
blocking: bool = True,
experiment: Optional[Union[TracerSession, str, uuid.UUID]] = None,
upload_results: bool = True,
**kwargs: Any,
) -> ExperimentResults: ...
async def aevaluate(
target: Union[AsyncCallable, AsyncIterable, Runnable, str, uuid.UUID, TracerSession],
/,
data: Union[str, uuid.UUID, AsyncIterable[Example], Iterable[Example], None] = None,
evaluators: Optional[Sequence[Union[RunEvaluator, Callable]]] = None,
**kwargs: Any,
) -> AsyncExperimentResults: ...
class RunEvaluator(Protocol):
def evaluate_run(
self,
run: Run,
example: Optional[Example] = None,
) -> Union[EvaluationResult, EvaluationResults, dict]: ...
class EvaluationResult(BaseModel):
key: str
score: Optional[Union[int, float, bool]] = None
value: Optional[Union[str, dict, int, float, bool]] = None
metadata: Optional[dict] = None
comment: Optional[str] = None
correction: Optional[dict] = None
evaluator_info: dict = Field(default_factory=dict)
source_run_id: Optional[Union[UUID, str]] = None
target_run_id: Optional[Union[UUID, str]] = NonePytest integration for tracing test cases with expectations API for approximate assertions and scoring.
def test(
id: Optional[UUID] = None,
output_keys: Optional[Sequence[str]] = None,
client: Optional[Client] = None,
test_suite_name: Optional[str] = None,
metadata: Optional[dict] = None,
repetitions: Optional[int] = None,
split: Optional[Union[str, list[str]]] = None,
cached_hosts: Optional[Sequence[str]] = None,
) -> Callable: ...
def unit(
id: Optional[UUID] = None,
output_keys: Optional[Sequence[str]] = None,
**kwargs
) -> Callable: ...
class expect:
@staticmethod
def score(
value: float,
*,
key: str = "score",
) -> _Matcher: ...
@staticmethod
def value(value: Any) -> _Matcher: ...
@staticmethod
def embedding_distance(
prediction: str,
reference: str,
*,
config: Optional[EmbeddingConfig] = None,
) -> _Matcher: ...
@staticmethod
def edit_distance(
prediction: str,
reference: str,
*,
config: Optional[EditDistanceConfig] = None,
) -> _Matcher: ...LRU cache implementations with TTL and background refresh for prompt and data caching.
class Cache:
def __init__(
self,
*,
max_size: int = 100,
ttl_seconds: Optional[float] = 3600.0,
refresh_interval_seconds: float = 60.0,
fetch_func: Optional[Callable[[str], Any]] = None,
): ...
def get(self, key: str) -> Optional[Any]: ...
def set(self, key: str, value: Any) -> None: ...
def invalidate(self, key: str) -> None: ...
def clear(self) -> None: ...
def shutdown(self) -> None: ...
class AsyncCache:
def __init__(
self,
*,
max_size: int = 100,
ttl_seconds: Optional[float] = 3600.0,
refresh_interval_seconds: float = 60.0,
fetch_func: Optional[Callable[[str], Awaitable[Any]]] = None,
): ...
def get(self, key: str) -> Optional[Any]: ...
def set(self, key: str, value: Any) -> None: ...
async def start(self) -> None: ...
async def stop(self) -> None: ...Utility functions and classes for context management, UUID generation, and data anonymization.
class ContextThreadPoolExecutor(ThreadPoolExecutor):
def submit(
self,
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> Future[T]: ...
def uuid7() -> uuid.UUID: ...
def uuid7_from_datetime(dt: datetime) -> uuid.UUID: ...
__version__: strModule for anonymizing and redacting sensitive data in traces. Provides tools to extract string nodes from nested data structures and apply masking rules.
def create_anonymizer(
replacer: Union[
Callable[[str, list[Union[str, int]]], str],
list[StringNodeRule],
StringNodeProcessor,
],
*,
max_depth: Optional[int] = None,
) -> Callable[[Any], Any]: ...
class StringNodeProcessor(ABC):
@abstractmethod
def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]: ...Use with Client to automatically redact sensitive data from all traces:
from langsmith import Client
from langsmith.anonymizer import create_anonymizer
import re
anonymizer = create_anonymizer([
{"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I), "replace": "[EMAIL]"},
{"pattern": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), "replace": "[SSN]"}
])
client = Client(anonymizer=anonymizer)Key data types used throughout the LangSmith API including Run, Example, Dataset, Feedback, and more.