or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/langsmith@0.6.x

docs

index.md
tile.json

tessl/pypi-langsmith

tessl install tessl/pypi-langsmith@0.6.1

Python SDK for LangSmith Observability and Evaluation Platform

manual-tracing.mddocs/tracing/

Manual Tracing

Manual control over run creation and lifecycle using the trace context manager and RunTree class. Use these when you need fine-grained control beyond what @traceable provides.

On This Page

Core Classes:

  • trace Context Manager - Manual run management
  • RunTree Class - Low-level run representation

RunTree Methods:

Examples:

→ @traceable Decorator for automatic tracing | → Back to Index

When to Use Manual Tracing

Use manual tracing (this page) when:

  • You need fine-grained control over run lifecycle
  • Tracing specific code blocks within functions
  • Building complex trace trees programmatically
  • Need to set outputs conditionally based on execution path
  • Working with non-standard control flow (error handling, retries)

Use @traceable decorator (decorators.md) when:

  • You want automatic tracing with minimal code
  • Tracing entire functions start-to-finish
  • Working with generators or async functions
  • You need automatic input/output capture

trace Context Manager

The trace context manager provides manual control over run creation and lifecycle.

class trace:
    def __init__(
        self,
        name: str,
        run_type: Literal["chain", "llm", "tool", "retriever", "prompt"] = "chain",
        *,
        inputs: Optional[dict] = None,
        extra: Optional[dict] = None,
        project_name: Optional[str] = None,
        parent: Optional[Union[RunTree, str, Mapping, Literal["ignore"]]] = None,
        tags: Optional[list[str]] = None,
        metadata: Optional[Mapping[str, Any]] = None,
        client: Optional[Client] = None,
        run_id: Optional[ID_TYPE] = None,
        reference_example_id: Optional[ID_TYPE] = None,
        exceptions_to_handle: Optional[tuple[type[BaseException], ...]] = None,
        attachments: Optional[Attachments] = None,
    ):
        """
        Context manager to manually manage a LangSmith run.

        Can be used as both synchronous and asynchronous context manager.

        Parameters:
        - name: Name of the run
        - run_type: Type of run (e.g., 'chain', 'llm', 'tool')
        - inputs: Initial input data for the run
        - extra: Extra metadata for the run
        - project_name: Project name to associate run with
        - parent: Parent run (can be RunTree, dotted order string, or tracing headers)
        - tags: List of tags for the run
        - metadata: Additional metadata
        - client: LangSmith client for custom settings
        - run_id: Preset identifier for the run
        - reference_example_id: Associates run with a dataset example (for evaluation)
        - exceptions_to_handle: Exception types to ignore (not mark as errors)
        - attachments: File attachments for the run
        """

    def __enter__(self) -> RunTree:
        """
        Enter the context manager.

        Returns:
        RunTree object representing the run
        """

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Exit the context manager and finalize the run.

        Automatically logs errors if an exception occurred.
        """

    async def __aenter__(self) -> RunTree:
        """
        Enter the async context manager.

        Returns:
        RunTree object representing the run
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Exit the async context manager and finalize the run.
        """

Basic Usage

from langsmith import trace

# Synchronous usage
with trace("My Operation", run_type="tool") as run:
    # Do work
    result = do_something()

    # Manually set outputs
    run.end(outputs={"result": result})

With Inputs and Metadata

with trace(
    "Process Data",
    run_type="chain",
    inputs={"data": input_data},
    tags=["production"],
    metadata={"version": "1.0"}
) as run:
    output = process(input_data)
    run.end(outputs={"output": output})

Async Usage

async with trace("Async Operation", run_type="tool") as run:
    result = await do_something_async()
    run.end(outputs={"result": result})

With Parent Relationship

from langsmith import get_current_run_tree

parent_run = get_current_run_tree()

with trace("Child Operation", parent=parent_run) as run:
    result = process_child()
    run.end(outputs={"result": result})

Ignore Specific Exceptions

with trace(
    "Resilient Operation",
    exceptions_to_handle=(TimeoutError, ConnectionError)
) as run:
    # These exceptions won't mark the run as errored
    result = might_timeout()
    run.end(outputs={"result": result})

Manual Error Handling

with trace("Operation", run_type="chain") as run:
    try:
        result = risky_operation()
        run.end(outputs={"result": result})
    except Exception as e:
        run.end(error=str(e))
        raise

Associate with Dataset Example

# For evaluation
with trace(
    "Evaluate Example",
    reference_example_id="example-uuid-123"
) as run:
    result = process_example()
    run.end(outputs={"result": result})

RunTree Class

RunTree represents a single run (trace span) with full control over its lifecycle.

class RunTree(BaseModel):
    """
    Run schema with back-references for posting runs.

    Represents a single run/span in a trace tree with inputs, outputs,
    timing, metadata, and parent-child relationships.
    """

    # Core identity fields
    name: str
    id: UUID
    run_type: str = "chain"

    # Timing fields
    start_time: datetime
    end_time: Optional[datetime] = None

    # Data fields
    inputs: dict
    outputs: Optional[dict] = None
    error: Optional[str] = None

    # Metadata fields
    extra: dict = Field(default_factory=dict)
    tags: Optional[list[str]] = None
    metadata: Optional[dict] = None

    # Tree structure fields
    parent_run_id: Optional[UUID] = None
    session_name: str  # Project name
    session_id: Optional[UUID] = None  # Project ID
    trace_id: UUID
    dotted_order: str

    # Events
    events: list[dict] = Field(default_factory=list)

    # Reference fields
    reference_example_id: Optional[UUID] = None

    # Client reference
    client: Optional[Client] = Field(default=None, exclude=True)

Lifecycle Methods

def end(
    self,
    *,
    outputs: Optional[dict] = None,
    error: Optional[str] = None,
    end_time: Optional[datetime] = None,
    events: Optional[Sequence[dict]] = None,
) -> None:
    """
    End the run with outputs or error.

    Parameters:
    - outputs: Output data from the run
    - error: Error message if the run failed
    - end_time: When the run ended (defaults to now)
    - events: Additional events to log
    """

def post(self, *, exclude_child_runs: bool = False) -> None:
    """
    Post the run to LangSmith.

    Parameters:
    - exclude_child_runs: Whether to exclude child runs from the submission
    """

def patch(self) -> None:
    """Patch/update the run in LangSmith."""

def wait(self) -> None:
    """Wait for the run to be posted."""

Child Run Methods

def create_child(
    self,
    name: str,
    run_type: str = "chain",
    *,
    inputs: Optional[dict] = None,
    **kwargs
) -> "RunTree":
    """
    Create a child run.

    Parameters:
    - name: Name of the child run
    - run_type: Type of run
    - inputs: Input data
    - **kwargs: Additional RunTree parameters

    Returns:
    New RunTree instance
    """

Metadata Methods

def add_tags(self, *tags: str) -> None:
    """Add tags to the run."""

def add_metadata(self, metadata: dict) -> None:
    """Add metadata to the run."""

def add_event(self, events: Union[dict, Sequence[dict]]) -> None:
    """Add events to the run."""

def add_outputs(self, outputs: dict) -> None:
    """Add outputs to the run."""

Attachment Methods

def add_attachment(
    self,
    key: str,
    *,
    mime_type: str,
    data: Optional[bytes] = None,
    path: Optional[Union[str, Path]] = None,
) -> None:
    """
    Add a file attachment to the run.

    Parameters:
    - key: Key for the attachment
    - mime_type: MIME type (e.g., "image/png")
    - data: Binary data of the attachment
    - path: Path to file to attach (alternative to data)
    """

Usage Examples

Creating a RunTree Manually

from langsmith import RunTree, Client

client = Client()

# Create a root run
run = RunTree(
    name="My Operation",
    run_type="chain",
    inputs={"query": "test input"},
    client=client,
    project_name="my-project"
)

# Do some work
result = process()

# End the run
run.end(outputs={"result": result})

# Post to LangSmith
run.post()

Creating Child Runs

from langsmith import RunTree, Client

client = Client()

# Parent run
parent = RunTree(
    name="Parent Operation",
    run_type="chain",
    inputs={"input": "data"},
    client=client
)

# Create child runs
child1 = parent.create_child(
    name="Child 1",
    run_type="tool",
    inputs={"data": "child1 input"}
)
child1.end(outputs={"result": "child1 result"})

child2 = parent.create_child(
    name="Child 2",
    run_type="tool",
    inputs={"data": "child2 input"}
)
child2.end(outputs={"result": "child2 result"})

# End parent
parent.end(outputs={"final": "result"})

# Post the tree (includes children)
parent.post()

Adding Metadata and Tags

from langsmith import RunTree, Client

client = Client()
run = RunTree(
    name="My Run",
    run_type="chain",
    inputs={"input": "data"},
    client=client
)

# Add tags
run.add_tags("production", "v1.0", "important")

# Add metadata
run.add_metadata({
    "user_id": "123",
    "region": "us-west",
    "model": "gpt-4"
})

# Add events
run.add_event({"event": "cache_miss"})

result = process()
run.end(outputs={"result": result})
run.post()

Working with Attachments

from langsmith import RunTree, Client

client = Client()
run = RunTree(
    name="Image Processing",
    run_type="tool",
    inputs={"image_url": "http://example.com/image.png"},
    client=client
)

# Add image as attachment
run.add_attachment(
    "input_image",
    mime_type="image/png",
    path="./input.png"
)

result_image = process_image()

# Add result image
run.add_attachment(
    "output_image",
    mime_type="image/png",
    data=result_image
)

run.end(outputs={"status": "processed"})
run.post()

Manual Tree Construction

from langsmith import RunTree, Client

client = Client()

# Build a complex trace tree manually
root = RunTree(
    name="Complex Pipeline",
    run_type="chain",
    inputs={"request": "data"},
    client=client
)

# Step 1: Parse input
parse = root.create_child(
    name="Parse Input",
    run_type="parser",
    inputs={"raw": "data"}
)
parsed = parse_input()
parse.end(outputs={"parsed": parsed})

# Step 2: Parallel processing
parallel_parent = root.create_child(
    name="Parallel Processing",
    run_type="chain",
    inputs={"data": parsed}
)

process1 = parallel_parent.create_child(
    name="Process 1",
    run_type="tool",
    inputs={"data": parsed["part1"]}
)
result1 = do_process1()
process1.end(outputs={"result": result1})

process2 = parallel_parent.create_child(
    name="Process 2",
    run_type="tool",
    inputs={"data": parsed["part2"]}
)
result2 = do_process2()
process2.end(outputs={"result": result2})

parallel_parent.end(outputs={"results": [result1, result2]})

# Step 3: Combine results
combine = root.create_child(
    name="Combine Results",
    run_type="chain",
    inputs={"results": [result1, result2]}
)
final = combine_results(result1, result2)
combine.end(outputs={"final": final})

# End root
root.end(outputs={"result": final})
root.post()

Working with Run Tree Lifecycle

Incrementally build up run outputs over multiple phases:

from langsmith import RunTree, Client
import time

client = Client()

# Create run
run = RunTree(
    name="Long Operation",
    run_type="chain",
    inputs={"input": "data"},
    client=client
)

# Do work in phases
phase1_result = do_phase1()
run.add_outputs({"phase1": phase1_result})
run.add_event({"event": "phase1_complete"})

time.sleep(1)

phase2_result = do_phase2()
run.add_outputs({"phase2": phase2_result})
run.add_event({"event": "phase2_complete"})

time.sleep(1)

# Final outputs
run.end(outputs={"final_result": combine(phase1_result, phase2_result)})

# Post asynchronously in background
run.post()

# Or wait for posting to complete
run.wait()

Related Documentation