Decorators and context managers for automatic tracing of functions and code blocks. LangSmith's tracing system supports both synchronous and asynchronous code, with full support for generators, streaming, and nested traces.
The @traceable decorator automatically traces function execution, logging inputs, outputs, timing, and errors.
def traceable(
run_type: Literal["chain", "llm", "tool", "retriever", "prompt", "embedding", "parser"] = "chain",
*,
name: Optional[str] = None,
metadata: Optional[Mapping[str, Any]] = None,
tags: Optional[list[str]] = None,
client: Optional[Client] = None,
reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
project_name: Optional[str] = None,
process_inputs: Optional[Callable[[dict], dict]] = None,
process_outputs: Optional[Callable[..., dict]] = None,
process_chunk: Optional[Callable] = None,
_invocation_params_fn: Optional[Callable[[dict], dict]] = None,
dangerously_allow_filesystem: bool = False,
enabled: Optional[bool] = None,
) -> Callable:
"""
Decorator to trace a function with LangSmith.
Automatically creates a run/span for each function call and logs inputs,
outputs, and errors. Works with both sync and async functions, and supports
generators and async generators.
Parameters:
- run_type: Type of run (e.g., "chain", "llm", "tool", "retriever", "prompt")
- name: Optional name for the run (defaults to function name)
- metadata: Optional metadata to attach to the run
- tags: Optional list of tags
- client: Optional LangSmith client for custom settings
- reduce_fn: Function to reduce generator output (for functions returning generators)
- project_name: Optional project name to log run to
- process_inputs: Custom function to serialize/process inputs before logging
- process_outputs: Custom function to serialize/process outputs before logging
- process_chunk: Custom function to process streaming chunks
- _invocation_params_fn: Function to extract invocation parameters
- dangerously_allow_filesystem: Allow filesystem access for attachments
- enabled: Whether tracing is enabled (overrides global setting)
Returns:
Decorated function that logs traces
"""from langsmith import traceable
# Basic usage
@traceable
def process_data(data: dict) -> dict:
"""Process some data."""
result = transform(data)
return {"output": result}
# With custom run type and metadata
@traceable(
run_type="llm",
name="CustomName",
tags=["production", "v1"],
metadata={"model": "gpt-4"}
)
def call_llm(prompt: str) -> str:
"""Call an LLM."""
return llm.invoke(prompt)
# Async functions
@traceable
async def async_process(data: dict) -> dict:
"""Process data asynchronously."""
result = await async_transform(data)
return result
# Generator functions
@traceable(reduce_fn=lambda outputs: {"results": list(outputs)})
def stream_results(query: str):
"""Stream results as they're generated."""
for item in generate_items(query):
yield item
# Async generator functions
@traceable
async def async_stream_results(query: str):
"""Stream results asynchronously."""
async for item in async_generate_items(query):
yield item
# Custom input/output processing
@traceable(
process_inputs=lambda inputs: {k: v for k, v in inputs.items() if k != "api_key"},
process_outputs=lambda output: {"summary": str(output)[:100]}
)
def sensitive_operation(api_key: str, data: dict) -> str:
"""Operation with sensitive data."""
return process_with_key(api_key, data)
# Conditional tracing
@traceable(enabled=False)
def sometimes_traced(data: dict) -> dict:
"""Function that can be disabled."""
return process(data)The trace context manager provides manual control over run creation and lifecycle.
class trace:
def __init__(
self,
name: str,
run_type: Literal["chain", "llm", "tool", "retriever", "prompt"] = "chain",
*,
inputs: Optional[dict] = None,
extra: Optional[dict] = None,
project_name: Optional[str] = None,
parent: Optional[Union[RunTree, str, Mapping, Literal["ignore"]]] = None,
tags: Optional[list[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
client: Optional[Client] = None,
run_id: Optional[ID_TYPE] = None,
reference_example_id: Optional[ID_TYPE] = None,
exceptions_to_handle: Optional[tuple[type[BaseException], ...]] = None,
attachments: Optional[Attachments] = None,
):
"""
Context manager to manually manage a LangSmith run.
Can be used as both synchronous and asynchronous context manager.
Parameters:
- name: Name of the run
- run_type: Type of run (e.g., 'chain', 'llm', 'tool')
- inputs: Initial input data for the run
- extra: Extra metadata for the run
- project_name: Project name to associate run with
- parent: Parent run (can be RunTree, dotted order string, or tracing headers)
- tags: List of tags for the run
- metadata: Additional metadata
- client: LangSmith client for custom settings
- run_id: Preset identifier for the run
- reference_example_id: Associates run with a dataset example (for evaluation)
- exceptions_to_handle: Exception types to ignore (not mark as errors)
- attachments: File attachments for the run
"""
def __enter__(self) -> RunTree:
"""
Enter the context manager.
Returns:
RunTree object representing the run
"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit the context manager and finalize the run.
Automatically logs errors if an exception occurred.
"""
async def __aenter__(self) -> RunTree:
"""
Enter the async context manager.
Returns:
RunTree object representing the run
"""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit the async context manager and finalize the run.
"""from langsmith import trace
# Synchronous usage
with trace("My Operation", run_type="tool") as run:
# Do work
result = do_something()
# Manually set outputs
run.end(outputs={"result": result})
# With inputs
with trace(
"Process Data",
run_type="chain",
inputs={"data": input_data},
tags=["production"],
metadata={"version": "1.0"}
) as run:
output = process(input_data)
run.end(outputs={"output": output})
# Asynchronous usage
async with trace("Async Operation", run_type="tool") as run:
result = await do_something_async()
run.end(outputs={"result": result})
# With parent relationship
parent_run = get_current_run_tree()
with trace("Child Operation", parent=parent_run) as run:
result = process_child()
run.end(outputs={"result": result})
# Ignore specific exceptions
with trace(
"Resilient Operation",
exceptions_to_handle=(TimeoutError, ConnectionError)
) as run:
# These exceptions won't mark the run as errored
result = might_timeout()
run.end(outputs={"result": result})
# Manual error handling
with trace("Operation", run_type="chain") as run:
try:
result = risky_operation()
run.end(outputs={"result": result})
except Exception as e:
run.end(error=str(e))
raise
# Associate with dataset example (for evaluation)
with trace(
"Evaluate Example",
reference_example_id="example-uuid-123"
) as run:
result = process_example()
run.end(outputs={"result": result})Context manager to temporarily set tracing context for a block of code.
@contextmanager
def tracing_context(
*,
project_name: Optional[str] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
parent: Optional[Union[RunTree, Mapping, str, Literal[False]]] = None,
enabled: Optional[Union[bool, Literal["local"]]] = None,
client: Optional[Client] = None,
replicas: Optional[Sequence[WriteReplica]] = None,
distributed_parent_id: Optional[str] = None,
) -> Generator[None, None, None]:
"""
Context manager to temporarily set tracing context.
All traced operations within this context will inherit the specified settings.
Parameters:
- project_name: Name of the project to log the run to
- tags: Tags to add to all runs in this context
- metadata: Metadata to add to all runs in this context
- parent: Parent run (RunTree, request headers, or dotted order string); set to False to break parent chain
- enabled: Whether tracing is enabled (True/False/"local")
- client: Client to use for logging
- replicas: Sequence of WriteReplica dictionaries to send runs to
- distributed_parent_id: Distributed parent ID for distributed tracing
"""from langsmith import tracing_context, traceable
@traceable
def my_function():
return "result"
# Set project for a block
with tracing_context(project_name="my-project"):
my_function() # Will log to "my-project"
# Add tags and metadata
with tracing_context(
tags=["production", "v2"],
metadata={"environment": "prod", "region": "us-west"}
):
my_function() # Will have these tags and metadata
# Temporarily disable tracing
with tracing_context(enabled=False):
my_function() # Will not be traced
# Local-only tracing (no network calls)
with tracing_context(enabled="local"):
my_function() # Traced locally but not sent to server
# Break parent chain (start new trace)
with tracing_context(parent=False):
my_function() # Will be a root trace, not a child
# Custom client with specific settings
from langsmith import Client
custom_client = Client(api_key="custom-key")
with tracing_context(client=custom_client):
my_function() # Uses custom client
# Distributed tracing across services
with tracing_context(distributed_parent_id="trace-123-from-service-a"):
my_function() # Connected to trace from another service
# Nested contexts (inner overrides outer)
with tracing_context(project_name="project-a", tags=["tag-a"]):
with tracing_context(tags=["tag-b"]):
my_function() # logs to "project-a" with tag "tag-b"Get the current tracing context as a dictionary.
def get_tracing_context(
context: Optional[contextvars.Context] = None
) -> dict[str, Any]:
"""
Get the current tracing context.
Parameters:
- context: Optional specific context to read from
Returns:
Dictionary with keys: parent, project_name, tags, metadata, enabled,
client, replicas, distributed_parent_id
"""from langsmith import get_tracing_context, tracing_context
# Get current context
ctx = get_tracing_context()
print(ctx["project_name"]) # Current project
print(ctx["tags"]) # Current tags
print(ctx["enabled"]) # Whether tracing is enabled
# Use to pass context between threads
import threading
def worker():
ctx = get_tracing_context()
print(f"Worker sees project: {ctx['project_name']}")
with tracing_context(project_name="my-project"):
thread = threading.Thread(target=worker)
thread.start()
thread.join()Get the current run tree from the context.
def get_current_run_tree() -> Optional[RunTree]:
"""
Get the current run tree from the context.
Returns the current RunTree if inside a traced context, None otherwise.
Useful for accessing run metadata or manually manipulating the current run.
Returns:
The current RunTree if inside a traced context, None otherwise
"""from langsmith import traceable, get_current_run_tree
@traceable
def my_function():
# Get current run
run = get_current_run_tree()
if run:
print(f"Run ID: {run.id}")
print(f"Run name: {run.name}")
print(f"Project: {run.session_name}")
# Add tags dynamically
run.add_tags("dynamic-tag")
# Add metadata dynamically
run.add_metadata({"custom_field": "value"})
# Add events
run.add_event([{"event": "milestone_reached"}])
return "result"
# Access run for parent relationship
@traceable
def parent_function():
return child_function()
@traceable
def child_function():
run = get_current_run_tree()
if run and run.parent_run_id:
print(f"This is a child of {run.parent_run_id}")
return "child result"
# Conditionally log based on run context
@traceable
def adaptive_function():
run = get_current_run_tree()
if run:
# We're being traced, add detailed logging
run.add_metadata({"detailed": True})
# Function logic
return process()Update metadata on the current run tree.
def set_run_metadata(**metadata: Any) -> None:
"""
Update metadata on the current run tree.
Convenience function for adding metadata to the current run without
manually getting the run tree.
Parameters:
- **metadata: Key-value pairs to add to the current run's metadata
"""from langsmith import traceable, set_run_metadata
@traceable
def process_request(user_id: str, request_type: str):
# Add metadata early in execution
set_run_metadata(
user_id=user_id,
request_type=request_type
)
# More metadata as we learn more
if is_premium_user(user_id):
set_run_metadata(tier="premium")
result = process()
# Add result metadata
set_run_metadata(
result_size=len(result),
cache_hit=was_cached
)
return result
@traceable
def track_model_usage(prompt: str):
# Track model information
set_run_metadata(
model="gpt-4",
prompt_tokens=count_tokens(prompt)
)
response = call_model(prompt)
# Add completion metadata
set_run_metadata(
completion_tokens=count_tokens(response),
total_cost=calculate_cost(prompt, response)
)
return response
# Conditional metadata
@traceable
def conditional_metadata(debug: bool = False):
if debug:
set_run_metadata(debug_mode=True, verbose_logging=True)
return process()from langsmith import traceable
@traceable(run_type="chain")
def orchestrator(inputs: dict):
# This will be the parent run
result1 = step1(inputs)
result2 = step2(result1)
result3 = step3(result2)
return result3
@traceable(run_type="tool")
def step1(data: dict):
# Automatically becomes a child of orchestrator
return process_step1(data)
@traceable(run_type="tool")
def step2(data: dict):
return process_step2(data)
@traceable(run_type="tool")
def step3(data: dict):
return process_step3(data)from langsmith import trace, traceable
@traceable
def parent_function():
run = get_current_run_tree()
# Pass parent to a non-traced function
manual_operation(parent=run)
return "result"
def manual_operation(parent):
with trace("Manual Operation", parent=parent) as run:
result = do_work()
run.end(outputs={"result": result})from langsmith import traceable, get_current_run_tree, tracing_context
# Service A
@traceable
def service_a_handler(request):
run = get_current_run_tree()
# Pass trace context to Service B
response = requests.post(
"http://service-b/process",
json={"data": request},
headers={
"X-Trace-Id": str(run.trace_id),
"X-Parent-Id": str(run.id)
}
)
return response.json()
# Service B
@traceable
def service_b_handler(request, headers):
# Extract trace context from headers
trace_id = headers.get("X-Trace-Id")
parent_id = headers.get("X-Parent-Id")
with tracing_context(distributed_parent_id=parent_id):
result = process(request)
return resultfrom langsmith import traceable
@traceable(
reduce_fn=lambda chunks: {"full_response": "".join(chunks)}
)
def stream_llm_response(prompt: str):
"""Stream LLM response and log full output."""
for chunk in llm.stream(prompt):
yield chunk
# Use it
for chunk in stream_llm_response("Tell me a story"):
print(chunk, end="", flush=True)
# Full response is logged to LangSmith
# Async streaming
@traceable(
reduce_fn=lambda chunks: {"full_response": "".join(chunks)}
)
async def async_stream_response(prompt: str):
"""Stream async LLM response."""
async for chunk in llm.astream(prompt):
yield chunkfrom langsmith import traceable
@traceable
def function_with_error():
try:
result = risky_operation()
return result
except ValueError as e:
# Error is automatically logged to the run
raise
# Ignore specific errors
from langsmith import trace
with trace(
"Resilient Operation",
exceptions_to_handle=(TimeoutError,)
) as run:
try:
result = might_timeout()
run.end(outputs={"result": result})
except TimeoutError:
# This error won't mark the run as failed
run.end(outputs={"result": "timeout"})