Comprehensive Python SDK for AI application observability and experimentation with OpenTelemetry-based tracing, automatic instrumentation, and dataset management.
Support for media uploads, data masking, multi-project setups, and advanced configuration options for complex production environments and specialized use cases.
Support for uploading and managing media files with traces and observations.
class LangfuseMedia:
def __init__(self, *, obj: object = None, base64_data_uri: str = None,
content_type: MediaContentType = None, content_bytes: bytes = None,
file_path: str = None):
"""Initialize media object for Langfuse.
Args:
obj: Source object to wrap (images, files, etc.)
base64_data_uri: Base64 encoded data URI
content_type: MIME type of the media content
content_bytes: Raw bytes content
file_path: Path to file to upload
Note:
Provide one of: obj, base64_data_uri, content_bytes + content_type, or file_path
"""
@staticmethod
def parse_reference_string(reference: str) -> ParsedMediaReference:
"""Parse media reference string into components.
Args:
reference: Media reference string from Langfuse
Returns:
ParsedMediaReference with parsed components
"""
@staticmethod
def resolve_media_references(data: Any) -> Any:
"""Replace media references in data with actual content.
Args:
data: Data structure potentially containing media references
Returns:
Data with media references resolved to content
"""Advanced utilities for working with trace context and IDs.
class Langfuse:
def create_trace_id(self) -> str:
"""Generate a unique trace ID.
Returns:
New trace ID string
"""
def get_current_trace_id(self) -> str:
"""Get current trace ID from execution context.
Returns:
Current trace ID or None if no active trace
Raises:
Exception: If no current trace found
"""
def get_current_observation_id(self) -> str:
"""Get current observation ID from execution context.
Returns:
Current observation ID or None if no active observation
Raises:
Exception: If no current observation found
"""
def get_trace_url(self, trace_id: str) -> str:
"""Get URL to view trace in Langfuse UI.
Args:
trace_id: Trace ID to generate URL for
Returns:
URL string for viewing trace in Langfuse dashboard
"""Methods for validating client configuration and connectivity.
class Langfuse:
def auth_check(self) -> bool:
"""Verify API authentication credentials.
Returns:
True if authentication is valid, False otherwise
Raises:
Exception: If unable to perform authentication check
"""Support for managing multiple Langfuse projects within a single application.
def get_client(*, public_key: str = None) -> Langfuse:
"""Get or create Langfuse client for multi-project setups.
Args:
public_key: Project identifier for specific project
Returns:
Langfuse client instance for the specified project
Note:
- Without public_key: Returns single client or disabled client if multiple exist
- With public_key: Returns client for that specific project
- Multi-project support is experimental
"""
# Special parameters for observe decorator in multi-project setups
def decorated_function(data, langfuse_public_key=None):
"""Functions decorated with @observe accept langfuse_public_key parameter."""
passProtocol and utilities for implementing data masking in traces.
# Masking function protocol
class MaskFunction(Protocol):
def __call__(self, *, data: Any) -> Any:
"""Transform data to mask sensitive information.
Args:
data: Original data to mask
Returns:
Masked version of the data
"""
class Langfuse:
def __init__(self, *, mask: MaskFunction = None, **kwargs):
"""Initialize Langfuse client with optional data masking.
Args:
mask: Function to apply data masking to all trace data
**kwargs: Other initialization parameters
"""Extended configuration for production and specialized environments.
class Langfuse:
def __init__(self, *, public_key: str = None, secret_key: str = None,
host: str = "https://cloud.langfuse.com", tracing_enabled: bool = True,
environment: str = None, timeout: int = 60, flush_at: int = 15,
flush_interval: float = 0.5, release: str = None,
sample_rate: float = 1.0, mask: MaskFunction = None,
additional_headers: Dict[str, str] = None,
blocked_instrumentation_scopes: List[str] = None,
media_upload_thread_count: int = 3):
"""Initialize Langfuse client with advanced configuration.
Args:
public_key: Project public key
secret_key: Project secret key
host: Langfuse server URL
tracing_enabled: Global tracing enable/disable
environment: Environment tag for all traces
timeout: Request timeout in seconds
flush_at: Number of events to batch before flushing
flush_interval: Time interval between flushes in seconds
release: Release/version identifier
sample_rate: Sampling rate (0.0 to 1.0) for traces
mask: Data masking function
additional_headers: Additional HTTP headers for requests
blocked_instrumentation_scopes: OpenTelemetry scopes to block
media_upload_thread_count: Number of threads for media uploads
"""Supporting types for advanced functionality.
# Media content types
MediaContentType = str # MIME type strings
# Parsed media reference structure
ParsedMediaReference = TypedDict('ParsedMediaReference', {
'type': str,
'id': str,
'url': str
})
# Trace context structure
TraceContext = TypedDict('TraceContext', {
'trace_id': str,
'parent_span_id': Optional[str]
})
# Map value type for flexible attributes
MapValue = Union[str, int, float, bool, List[Any], Dict[str, Any]]from langfuse import Langfuse, LangfuseMedia
import base64
from PIL import Image
langfuse = Langfuse()
# Upload image file
image_media = LangfuseMedia(file_path="path/to/image.jpg")
with langfuse.start_as_current_span(name="image-analysis") as span:
# Include media in trace
span.update(
input={"image": image_media, "prompt": "Analyze this image"},
output={"description": "A beautiful landscape with mountains"}
)
# Upload from bytes
with open("image.jpg", "rb") as f:
image_bytes = f.read()
bytes_media = LangfuseMedia(
content_bytes=image_bytes,
content_type="image/jpeg"
)
# Upload from base64 data URI
def image_to_base64_uri(image_path):
with open(image_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode()
return f"data:image/jpeg;base64,{encoded}"
uri_media = LangfuseMedia(base64_data_uri=image_to_base64_uri("image.jpg"))
# Upload Python object (e.g., matplotlib figure)
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot([1, 2, 3, 4], [1, 4, 2, 3])
figure_media = LangfuseMedia(obj=fig)
span.update(input={"plot": figure_media})import re
from typing import Any
class DataMasker:
"""Custom data masking implementation."""
def __init__(self):
# Common sensitive patterns
self.patterns = {
'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
'phone': re.compile(r'\b\d{3}-\d{3}-\d{4}\b'),
'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
'credit_card': re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b')
}
def __call__(self, *, data: Any) -> Any:
"""Mask sensitive data according to privacy requirements."""
return self._mask_recursive(data)
def _mask_recursive(self, obj):
"""Recursively mask data in nested structures."""
if isinstance(obj, str):
return self._mask_string(obj)
elif isinstance(obj, dict):
return {k: self._mask_recursive(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._mask_recursive(item) for item in obj]
else:
return obj
def _mask_string(self, text: str) -> str:
"""Apply masking patterns to string content."""
masked = text
for pattern_name, pattern in self.patterns.items():
masked = pattern.sub(f'[MASKED_{pattern_name.upper()}]', masked)
return masked
# Initialize Langfuse with data masking
masker = DataMasker()
langfuse = Langfuse(mask=masker)
# All trace data automatically masked
with langfuse.start_as_current_span(name="process-user-data") as span:
user_input = "My email is john@example.com and phone is 555-123-4567"
span.update(input=user_input)
# Stored as: "My email is [MASKED_EMAIL] and phone is [MASKED_PHONE]"# Initialize multiple clients for different projects
project_a = Langfuse(
public_key="project-a-key",
secret_key="project-a-secret",
environment="production"
)
project_b = Langfuse(
public_key="project-b-key",
secret_key="project-b-secret",
environment="staging"
)
# Use specific clients
with project_a.start_as_current_span(name="project-a-task") as span:
result_a = process_for_project_a()
span.update(output=result_a)
# Use get_client for dynamic project selection
@observe # Uses default client resolution
def shared_function(data, langfuse_public_key=None):
# Function uses client associated with langfuse_public_key
return process_data(data)
# Call with specific project
result = shared_function(data, langfuse_public_key="project-a-key")# Production configuration with all options
production_langfuse = Langfuse(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host="https://your-langfuse-instance.com",
environment="production",
release="v1.2.3",
sample_rate=0.1, # Sample 10% of traces
flush_at=50, # Batch 50 events before flushing
flush_interval=2.0, # Flush every 2 seconds
timeout=30, # 30 second timeout
media_upload_thread_count=5, # 5 threads for media uploads
additional_headers={
"X-Custom-Header": "custom-value",
"Authorization-Proxy": "proxy-token"
},
blocked_instrumentation_scopes=[
"httpx", # Block httpx instrumentation
"requests" # Block requests instrumentation
]
)# Manual trace context management
trace_id = langfuse.create_trace_id()
print(f"Created trace: {trace_id}")
# Use explicit trace context
with langfuse.start_as_current_span(name="main-process", trace_id=trace_id) as main_span:
# Get current context
current_trace = langfuse.get_current_trace_id()
current_observation = langfuse.get_current_observation_id()
print(f"Current trace: {current_trace}")
print(f"Current observation: {current_observation}")
# Create child with explicit parent
child_span = main_span.start_observation(
name="child-process",
as_type="span"
)
try:
result = child_process()
child_span.update(output=result)
finally:
child_span.end()
# Generate UI link
trace_url = langfuse.get_trace_url(trace_id)
print(f"View trace: {trace_url}")class SmartSampler:
"""Custom sampling logic for different scenarios."""
def __init__(self, base_rate=0.1):
self.base_rate = base_rate
self.error_rate = 1.0 # Always sample errors
self.slow_request_rate = 0.5 # Sample 50% of slow requests
def should_sample(self, context):
"""Determine if request should be sampled."""
import random
# Always sample errors
if context.get("has_error"):
return True
# Higher sampling for slow requests
if context.get("execution_time", 0) > 5.0:
return random.random() < self.slow_request_rate
# Base sampling rate
return random.random() < self.base_rate
# Implementation with conditional tracing
sampler = SmartSampler()
def traced_function(input_data):
start_time = time.time()
has_error = False
result = None
try:
result = process_data(input_data)
except Exception as e:
has_error = True
raise
finally:
execution_time = time.time() - start_time
# Determine if we should trace this execution
context = {
"has_error": has_error,
"execution_time": execution_time
}
if sampler.should_sample(context):
# Create trace retrospectively if needed
with langfuse.start_as_current_span(name="sampled-function") as span:
span.update(
input=input_data,
output=result,
metadata={
"execution_time": execution_time,
"sampled": True
}
)
if has_error:
span.update(level="ERROR")
return resultclass LangfuseHealthMonitor:
"""Monitor Langfuse client health and connectivity."""
def __init__(self, langfuse_client):
self.client = langfuse_client
def run_diagnostics(self):
"""Run comprehensive health checks."""
diagnostics = {
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# Authentication check
try:
auth_ok = self.client.auth_check()
diagnostics["checks"]["authentication"] = {
"status": "pass" if auth_ok else "fail",
"message": "Authentication successful" if auth_ok else "Authentication failed"
}
except Exception as e:
diagnostics["checks"]["authentication"] = {
"status": "error",
"message": f"Auth check failed: {str(e)}"
}
# Trace creation test
try:
trace_id = self.client.create_trace_id()
diagnostics["checks"]["trace_creation"] = {
"status": "pass",
"message": f"Trace ID generated: {trace_id[:8]}..."
}
except Exception as e:
diagnostics["checks"]["trace_creation"] = {
"status": "error",
"message": f"Trace creation failed: {str(e)}"
}
# Span creation test
try:
with self.client.start_as_current_span(name="health-check") as span:
span.update(output="Health check successful")
diagnostics["checks"]["span_creation"] = {
"status": "pass",
"message": "Span creation and management successful"
}
except Exception as e:
diagnostics["checks"]["span_creation"] = {
"status": "error",
"message": f"Span creation failed: {str(e)}"
}
# Overall status
all_passed = all(
check["status"] == "pass"
for check in diagnostics["checks"].values()
)
diagnostics["overall_status"] = "healthy" if all_passed else "unhealthy"
return diagnostics
def continuous_monitoring(self, interval=300):
"""Run continuous health monitoring."""
while True:
try:
results = self.run_diagnostics()
print(f"Health check: {results['overall_status']}")
if results["overall_status"] != "healthy":
for check_name, check_result in results["checks"].items():
if check_result["status"] != "pass":
print(f" {check_name}: {check_result['message']}")
except KeyboardInterrupt:
break
except Exception as e:
print(f"Health monitoring error: {e}")
time.sleep(interval)
# Usage
monitor = LangfuseHealthMonitor(langfuse)
health_report = monitor.run_diagnostics()
print(health_report)import os
from typing import Optional
class LangfuseFactory:
"""Factory for creating environment-appropriate Langfuse clients."""
@classmethod
def create_for_environment(cls, environment: Optional[str] = None) -> Langfuse:
"""Create Langfuse client configured for specific environment."""
env = environment or os.getenv("ENVIRONMENT", "development")
configs = {
"development": {
"tracing_enabled": True,
"sample_rate": 1.0,
"flush_at": 1, # Immediate flushing for dev
"timeout": 10
},
"staging": {
"tracing_enabled": True,
"sample_rate": 0.5,
"flush_at": 10,
"timeout": 20
},
"production": {
"tracing_enabled": True,
"sample_rate": 0.1, # Lower sampling in prod
"flush_at": 50,
"timeout": 30,
"additional_headers": {"X-Service": "ai-service"}
},
"test": {
"tracing_enabled": False # Disable for tests
}
}
config = configs.get(env, configs["development"])
return Langfuse(
environment=env,
**config
)
# Usage
langfuse = LangfuseFactory.create_for_environment()
# Override for specific cases
production_client = LangfuseFactory.create_for_environment("production")Install with Tessl CLI
npx tessl i tessl/pypi-langfuse