Utility functions and classes for context management, UUID generation, and data anonymization.
ThreadPoolExecutor that copies context to child threads, ensuring context variables like tracing context are available in worker threads.
class ContextThreadPoolExecutor(ThreadPoolExecutor):
"""
ThreadPoolExecutor that copies the context to child threads.
Ensures that context variables (like tracing context) are properly
propagated to worker threads. Essential for maintaining trace
parent-child relationships across threads.
"""
def submit(
self,
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> Future[T]:
"""
Submit a function to the executor.
The function will execute in a worker thread with the current
context copied.
Parameters:
- func: Function to execute
- *args: Positional arguments for func
- **kwargs: Keyword arguments for func
Returns:
Future object representing the execution
"""
def map(
self,
fn: Callable[..., T],
*iterables: Iterable[Any],
timeout: Optional[float] = None,
chunksize: int = 1,
) -> Iterator[T]:
"""
Return an iterator equivalent to map(fn, *iterables).
Parameters:
- fn: Function to apply
- *iterables: Iterables to map over
- timeout: Maximum time to wait for results
- chunksize: Size of chunks for processing
Returns:
Iterator of results
"""Generate a random RFC 9562-compliant UUID v7.
def uuid7() -> uuid.UUID:
"""
Generate a random RFC 9562-compliant UUID v7.
UUID v7 includes a timestamp component for sortability, making it
ideal for use as run IDs and other time-ordered identifiers.
Returns:
A random UUID v7 instance
"""Generate a UUID v7 from a datetime object.
def uuid7_from_datetime(dt: datetime) -> uuid.UUID:
"""
Generate a UUID v7 from a datetime object.
The UUID's timestamp corresponds to the provided time, making it
useful for creating UUIDs that sort to a specific point in time.
Parameters:
- dt: A timezone-aware datetime (naive datetimes treated as UTC)
Returns:
A UUID v7 whose timestamp corresponds to the provided datetime
"""Package version string constant.
__version__: strThe __version__ constant contains the version string for the langsmith package.
import langsmith
print(langsmith.__version__) # e.g., "0.6.2"
# Use for version checking
from packaging import version
if version.parse(langsmith.__version__) >= version.parse("0.6.0"):
# Use features from 0.6.0+
passModule for anonymizing/redacting sensitive data in traces.
class StringNode(TypedDict):
"""Represents a string extracted from nested data."""
value: str
"""The string value"""
path: list[Union[str, int]]
"""Path to the string in the data structure"""class StringNodeRule(TypedDict):
"""Declarative rule for replacing sensitive data."""
pattern: re.Pattern
"""Regex pattern to match"""
replace: Optional[str]
"""Replacement string (default: "[redacted]")"""class StringNodeProcessor(ABC):
"""Base class for custom node processors."""
@abstractmethod
def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]:
"""
Accept and return list of string nodes to mask.
Parameters:
- nodes: List of StringNode objects to process
Returns:
List of processed StringNode objects with masked values
"""class RuleNodeProcessor(StringNodeProcessor):
"""Processor that uses regex rules."""
def __init__(self, rules: list[StringNodeRule]):
"""
Create a rule-based processor.
Parameters:
- rules: List of StringNodeRule dictionaries
"""
def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]:
"""
Mask nodes using configured rules.
Parameters:
- nodes: List of StringNode objects
Returns:
List of masked StringNode objects
"""class CallableNodeProcessor(StringNodeProcessor):
"""Processor that uses a custom function."""
def __init__(
self,
func: Union[
Callable[[str], str],
Callable[[str, list[Union[str, int]]], str]
]
):
"""
Create a callable-based processor.
Parameters:
- func: Function that takes (value) or (value, path) and returns masked value
"""
def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]:
"""
Mask nodes using callable function.
Parameters:
- nodes: List of StringNode objects
Returns:
List of masked StringNode objects
"""def create_anonymizer(
replacer: Union[
Callable[[str, list[Union[str, int]]], str],
list[StringNodeRule],
StringNodeProcessor,
],
*,
max_depth: Optional[int] = None,
) -> Callable[[Any], Any]:
"""
Create an anonymizer function that can be passed to Client.
The anonymizer traverses nested data structures and applies masking
rules to all string values.
Parameters:
- replacer: Either a list of rules, a callable, or a StringNodeProcessor
- max_depth: Maximum depth to traverse (default: 10)
Returns:
Anonymizer function that accepts data and returns anonymized data
"""from langsmith import ContextThreadPoolExecutor, traceable
@traceable
def worker_function(item):
"""Function that will run in worker thread."""
# Tracing context is preserved
return process(item)
@traceable
def parallel_processing(items):
"""Process items in parallel with tracing."""
with ContextThreadPoolExecutor(max_workers=4) as executor:
# Submit work to thread pool
futures = [executor.submit(worker_function, item) for item in items]
# Get results
results = [f.result() for f in futures]
return resultsfrom langsmith import ContextThreadPoolExecutor, traceable
@traceable
def process_item(item):
"""Process a single item with tracing."""
return transform(item)
def process_batch(items):
"""Process batch using thread pool map."""
with ContextThreadPoolExecutor(max_workers=8) as executor:
# Map function over items
results = list(executor.map(process_item, items))
return resultsfrom langsmith import uuid7
# Generate UUID v7
id = uuid7()
print(id) # e.g., 018e9e35-92d2-7890-abcd-ef1234567890
# Use as run ID
from langsmith import RunTree, Client
client = Client()
run = RunTree(
name="My Run",
run_type="chain",
inputs={"data": "test"},
run_id=uuid7(),
client=client
)from langsmith import uuid7_from_datetime
from datetime import datetime, timezone
# Create UUID for specific time
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
id = uuid7_from_datetime(dt)
# UUIDs created this way sort by timestamp
id1 = uuid7_from_datetime(datetime(2024, 1, 1, tzinfo=timezone.utc))
id2 = uuid7_from_datetime(datetime(2024, 1, 2, tzinfo=timezone.utc))
assert id1 < id2 # True - sorts chronologicallyfrom langsmith import Client
from langsmith.anonymizer import create_anonymizer
import re
# Define anonymization rules
anonymizer = create_anonymizer([
{
"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I),
"replace": "[EMAIL]"
},
{
"pattern": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"replace": "[SSN]"
},
{
"pattern": re.compile(r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14})\b'),
"replace": "[CREDIT_CARD]"
},
{
"pattern": re.compile(r'\bapi[_-]?key[_-]?([a-z0-9]+)\b', re.I),
"replace": "api_key_[REDACTED]"
}
])
# Use with Client
client = Client(anonymizer=anonymizer)
# Now all traces will have sensitive data redacted
@traceable
def process_user_data(email, ssn):
# email and ssn will be redacted in traces
return {"email": email, "ssn": ssn}from langsmith import Client
from langsmith.anonymizer import create_anonymizer
def mask_sensitive(text: str, path: list) -> str:
"""Custom masking logic based on path."""
# Mask passwords completely
if "password" in path:
return "[REDACTED]"
# Mask PII fields
if any(field in path for field in ["ssn", "email", "phone"]):
return "[PII]"
# Partial masking for other sensitive fields
if "api_key" in path:
if len(text) > 8:
return text[:4] + "..." + text[-4:]
return text
anonymizer = create_anonymizer(mask_sensitive)
client = Client(anonymizer=anonymizer)from langsmith import Client
from langsmith.anonymizer import (
create_anonymizer,
StringNodeProcessor,
StringNode
)
class CustomAnonymizer(StringNodeProcessor):
"""Custom anonymizer with complex logic."""
def __init__(self):
self.patterns = {
"email": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I),
"phone": re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'),
}
def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]:
"""Mask sensitive data in nodes."""
masked = []
for node in nodes:
value = node["value"]
# Apply patterns
for pattern_name, pattern in self.patterns.items():
value = pattern.sub(f"[{pattern_name.upper()}]", value)
# Mask based on path
if "password" in node["path"]:
value = "[PASSWORD]"
masked.append({"value": value, "path": node["path"]})
return masked
anonymizer = create_anonymizer(CustomAnonymizer())
client = Client(anonymizer=anonymizer)from langsmith import Client
from langsmith.anonymizer import create_anonymizer
import re
import hashlib
def hash_pii(text: str, path: list) -> str:
"""Hash PII instead of redacting completely."""
# Identify PII fields
pii_fields = {"email", "ssn", "phone", "name"}
# Check if path contains PII field
is_pii = any(field in str(path).lower() for field in pii_fields)
if is_pii:
# Hash the value for anonymized but consistent tracking
hash_obj = hashlib.sha256(text.encode())
return f"[HASH:{hash_obj.hexdigest()[:16]}]"
# Mask API keys partially
if "api" in str(path).lower() and len(text) > 10:
return text[:4] + "*" * (len(text) - 8) + text[-4:]
return text
anonymizer = create_anonymizer(hash_pii, max_depth=20)
client = Client(anonymizer=anonymizer)from langsmith import Client
from langsmith.anonymizer import create_anonymizer
import re
# Create anonymizer with depth limit
anonymizer = create_anonymizer(
[
{
"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I),
"replace": "[EMAIL]"
}
],
max_depth=5 # Only traverse 5 levels deep
)
client = Client(anonymizer=anonymizer)from langsmith.anonymizer import create_anonymizer
import re
# Create anonymizer
anonymizer = create_anonymizer([
{
"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I),
"replace": "[EMAIL]"
}
])
# Test with sample data
test_data = {
"user": {
"name": "John Doe",
"email": "john@example.com",
"messages": [
"Contact me at john@example.com",
"Or at jane@example.com"
]
}
}
# Anonymize
anonymized = anonymizer(test_data)
print(anonymized)
# {
# "user": {
# "name": "John Doe",
# "email": "[EMAIL]",
# "messages": [
# "Contact me at [EMAIL]",
# "Or at [EMAIL]"
# ]
# }
# }from langsmith import Client
from langsmith.anonymizer import create_anonymizer
import os
# Only anonymize in production
if os.getenv("ENV") == "production":
anonymizer = create_anonymizer([
{
"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I),
"replace": "[EMAIL]"
}
])
client = Client(anonymizer=anonymizer)
else:
# No anonymization in dev/test
client = Client()from langsmith import Client
from langsmith.anonymizer import create_anonymizer
import re
# Development: Partial masking
dev_anonymizer = create_anonymizer([
{
"pattern": re.compile(r'([A-Z0-9._%+-]+)@([A-Z0-9.-]+\.[A-Z]{2,})', re.I),
"replace": r"\1@[DOMAIN]"
}
])
# Production: Full redaction
prod_anonymizer = create_anonymizer([
{
"pattern": re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', re.I),
"replace": "[EMAIL]"
}
])
# Choose based on environment
import os
anonymizer = prod_anonymizer if os.getenv("ENV") == "production" else dev_anonymizer
client = Client(anonymizer=anonymizer)