tessl install tessl/pypi-langsmith@0.6.1Python SDK for LangSmith Observability and Evaluation Platform
Manual control over run creation and lifecycle using the trace context manager and RunTree class. Use these when you need fine-grained control beyond what @traceable provides.
Core Classes:
RunTree Methods:
Examples:
→ @traceable Decorator for automatic tracing | → Back to Index
Use manual tracing (this page) when:
Use @traceable decorator (decorators.md) when:
The trace context manager provides manual control over run creation and lifecycle.
class trace:
def __init__(
self,
name: str,
run_type: Literal["chain", "llm", "tool", "retriever", "prompt"] = "chain",
*,
inputs: Optional[dict] = None,
extra: Optional[dict] = None,
project_name: Optional[str] = None,
parent: Optional[Union[RunTree, str, Mapping, Literal["ignore"]]] = None,
tags: Optional[list[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
client: Optional[Client] = None,
run_id: Optional[ID_TYPE] = None,
reference_example_id: Optional[ID_TYPE] = None,
exceptions_to_handle: Optional[tuple[type[BaseException], ...]] = None,
attachments: Optional[Attachments] = None,
):
"""
Context manager to manually manage a LangSmith run.
Can be used as both synchronous and asynchronous context manager.
Parameters:
- name: Name of the run
- run_type: Type of run (e.g., 'chain', 'llm', 'tool')
- inputs: Initial input data for the run
- extra: Extra metadata for the run
- project_name: Project name to associate run with
- parent: Parent run (can be RunTree, dotted order string, or tracing headers)
- tags: List of tags for the run
- metadata: Additional metadata
- client: LangSmith client for custom settings
- run_id: Preset identifier for the run
- reference_example_id: Associates run with a dataset example (for evaluation)
- exceptions_to_handle: Exception types to ignore (not mark as errors)
- attachments: File attachments for the run
"""
def __enter__(self) -> RunTree:
"""
Enter the context manager.
Returns:
RunTree object representing the run
"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit the context manager and finalize the run.
Automatically logs errors if an exception occurred.
"""
async def __aenter__(self) -> RunTree:
"""
Enter the async context manager.
Returns:
RunTree object representing the run
"""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit the async context manager and finalize the run.
"""from langsmith import trace
# Synchronous usage
with trace("My Operation", run_type="tool") as run:
# Do work
result = do_something()
# Manually set outputs
run.end(outputs={"result": result})with trace(
"Process Data",
run_type="chain",
inputs={"data": input_data},
tags=["production"],
metadata={"version": "1.0"}
) as run:
output = process(input_data)
run.end(outputs={"output": output})async with trace("Async Operation", run_type="tool") as run:
result = await do_something_async()
run.end(outputs={"result": result})from langsmith import get_current_run_tree
parent_run = get_current_run_tree()
with trace("Child Operation", parent=parent_run) as run:
result = process_child()
run.end(outputs={"result": result})with trace(
"Resilient Operation",
exceptions_to_handle=(TimeoutError, ConnectionError)
) as run:
# These exceptions won't mark the run as errored
result = might_timeout()
run.end(outputs={"result": result})with trace("Operation", run_type="chain") as run:
try:
result = risky_operation()
run.end(outputs={"result": result})
except Exception as e:
run.end(error=str(e))
raise# For evaluation
with trace(
"Evaluate Example",
reference_example_id="example-uuid-123"
) as run:
result = process_example()
run.end(outputs={"result": result})RunTree represents a single run (trace span) with full control over its lifecycle.
class RunTree(BaseModel):
"""
Run schema with back-references for posting runs.
Represents a single run/span in a trace tree with inputs, outputs,
timing, metadata, and parent-child relationships.
"""
# Core identity fields
name: str
id: UUID
run_type: str = "chain"
# Timing fields
start_time: datetime
end_time: Optional[datetime] = None
# Data fields
inputs: dict
outputs: Optional[dict] = None
error: Optional[str] = None
# Metadata fields
extra: dict = Field(default_factory=dict)
tags: Optional[list[str]] = None
metadata: Optional[dict] = None
# Tree structure fields
parent_run_id: Optional[UUID] = None
session_name: str # Project name
session_id: Optional[UUID] = None # Project ID
trace_id: UUID
dotted_order: str
# Events
events: list[dict] = Field(default_factory=list)
# Reference fields
reference_example_id: Optional[UUID] = None
# Client reference
client: Optional[Client] = Field(default=None, exclude=True)def end(
self,
*,
outputs: Optional[dict] = None,
error: Optional[str] = None,
end_time: Optional[datetime] = None,
events: Optional[Sequence[dict]] = None,
) -> None:
"""
End the run with outputs or error.
Parameters:
- outputs: Output data from the run
- error: Error message if the run failed
- end_time: When the run ended (defaults to now)
- events: Additional events to log
"""
def post(self, *, exclude_child_runs: bool = False) -> None:
"""
Post the run to LangSmith.
Parameters:
- exclude_child_runs: Whether to exclude child runs from the submission
"""
def patch(self) -> None:
"""Patch/update the run in LangSmith."""
def wait(self) -> None:
"""Wait for the run to be posted."""def create_child(
self,
name: str,
run_type: str = "chain",
*,
inputs: Optional[dict] = None,
**kwargs
) -> "RunTree":
"""
Create a child run.
Parameters:
- name: Name of the child run
- run_type: Type of run
- inputs: Input data
- **kwargs: Additional RunTree parameters
Returns:
New RunTree instance
"""def add_tags(self, *tags: str) -> None:
"""Add tags to the run."""
def add_metadata(self, metadata: dict) -> None:
"""Add metadata to the run."""
def add_event(self, events: Union[dict, Sequence[dict]]) -> None:
"""Add events to the run."""
def add_outputs(self, outputs: dict) -> None:
"""Add outputs to the run."""def add_attachment(
self,
key: str,
*,
mime_type: str,
data: Optional[bytes] = None,
path: Optional[Union[str, Path]] = None,
) -> None:
"""
Add a file attachment to the run.
Parameters:
- key: Key for the attachment
- mime_type: MIME type (e.g., "image/png")
- data: Binary data of the attachment
- path: Path to file to attach (alternative to data)
"""from langsmith import RunTree, Client
client = Client()
# Create a root run
run = RunTree(
name="My Operation",
run_type="chain",
inputs={"query": "test input"},
client=client,
project_name="my-project"
)
# Do some work
result = process()
# End the run
run.end(outputs={"result": result})
# Post to LangSmith
run.post()from langsmith import RunTree, Client
client = Client()
# Parent run
parent = RunTree(
name="Parent Operation",
run_type="chain",
inputs={"input": "data"},
client=client
)
# Create child runs
child1 = parent.create_child(
name="Child 1",
run_type="tool",
inputs={"data": "child1 input"}
)
child1.end(outputs={"result": "child1 result"})
child2 = parent.create_child(
name="Child 2",
run_type="tool",
inputs={"data": "child2 input"}
)
child2.end(outputs={"result": "child2 result"})
# End parent
parent.end(outputs={"final": "result"})
# Post the tree (includes children)
parent.post()from langsmith import RunTree, Client
client = Client()
run = RunTree(
name="My Run",
run_type="chain",
inputs={"input": "data"},
client=client
)
# Add tags
run.add_tags("production", "v1.0", "important")
# Add metadata
run.add_metadata({
"user_id": "123",
"region": "us-west",
"model": "gpt-4"
})
# Add events
run.add_event({"event": "cache_miss"})
result = process()
run.end(outputs={"result": result})
run.post()from langsmith import RunTree, Client
client = Client()
run = RunTree(
name="Image Processing",
run_type="tool",
inputs={"image_url": "http://example.com/image.png"},
client=client
)
# Add image as attachment
run.add_attachment(
"input_image",
mime_type="image/png",
path="./input.png"
)
result_image = process_image()
# Add result image
run.add_attachment(
"output_image",
mime_type="image/png",
data=result_image
)
run.end(outputs={"status": "processed"})
run.post()from langsmith import RunTree, Client
client = Client()
# Build a complex trace tree manually
root = RunTree(
name="Complex Pipeline",
run_type="chain",
inputs={"request": "data"},
client=client
)
# Step 1: Parse input
parse = root.create_child(
name="Parse Input",
run_type="parser",
inputs={"raw": "data"}
)
parsed = parse_input()
parse.end(outputs={"parsed": parsed})
# Step 2: Parallel processing
parallel_parent = root.create_child(
name="Parallel Processing",
run_type="chain",
inputs={"data": parsed}
)
process1 = parallel_parent.create_child(
name="Process 1",
run_type="tool",
inputs={"data": parsed["part1"]}
)
result1 = do_process1()
process1.end(outputs={"result": result1})
process2 = parallel_parent.create_child(
name="Process 2",
run_type="tool",
inputs={"data": parsed["part2"]}
)
result2 = do_process2()
process2.end(outputs={"result": result2})
parallel_parent.end(outputs={"results": [result1, result2]})
# Step 3: Combine results
combine = root.create_child(
name="Combine Results",
run_type="chain",
inputs={"results": [result1, result2]}
)
final = combine_results(result1, result2)
combine.end(outputs={"final": final})
# End root
root.end(outputs={"result": final})
root.post()Incrementally build up run outputs over multiple phases:
from langsmith import RunTree, Client
import time
client = Client()
# Create run
run = RunTree(
name="Long Operation",
run_type="chain",
inputs={"input": "data"},
client=client
)
# Do work in phases
phase1_result = do_phase1()
run.add_outputs({"phase1": phase1_result})
run.add_event({"event": "phase1_complete"})
time.sleep(1)
phase2_result = do_phase2()
run.add_outputs({"phase2": phase2_result})
run.add_event({"event": "phase2_complete"})
time.sleep(1)
# Final outputs
run.end(outputs={"final_result": combine(phase1_result, phase2_result)})
# Post asynchronously in background
run.post()
# Or wait for posting to complete
run.wait()@traceable