Comprehensive Python SDK for AI application observability and experimentation with OpenTelemetry-based tracing, automatic instrumentation, and dataset management.
Comprehensive system for running experiments on datasets with automatic evaluation, result aggregation, and detailed reporting capabilities. Enables systematic testing and evaluation of AI applications.
Core data structures for representing evaluation outcomes from experiments.
class Evaluation:
def __init__(self, *, name: str, value: Union[int, float, str, bool, None],
comment: str = None, metadata: Dict[str, Any] = None,
data_type: ScoreDataType = None, config_id: str = None):
"""Initialize evaluation result.
Args:
name: Unique identifier for the evaluation metric
value: The evaluation score or result
comment: Human-readable explanation of the result
metadata: Additional structured metadata about evaluation
data_type: Score data type (NUMERIC, CATEGORICAL, BOOLEAN)
config_id: Langfuse score config ID
"""
# Attributes
name: str
value: Union[int, float, str, bool, None]
comment: Optional[str]
metadata: Optional[Dict[str, Any]]
data_type: Optional[ScoreDataType]
config_id: Optional[str]Results from processing individual items in an experiment.
class ExperimentItemResult:
def __init__(self, *, item: ExperimentItem, output: Any,
evaluations: List[Evaluation], trace_id: str = None,
dataset_run_id: str = None):
"""Initialize experiment item result.
Args:
item: Original experiment item processed
output: Task function output for this item
evaluations: List of evaluation results
trace_id: Langfuse trace ID for execution
dataset_run_id: Dataset run ID if using Langfuse datasets
"""
# Attributes
item: ExperimentItem
output: Any
evaluations: List[Evaluation]
trace_id: Optional[str]
dataset_run_id: Optional[str]Results from running an entire experiment with formatting and analysis capabilities.
class ExperimentResult:
def __init__(self, *, name: str, run_name: str, description: str = None,
item_results: List[ExperimentItemResult],
run_evaluations: List[Evaluation], dataset_run_id: str = None,
dataset_run_url: str = None):
"""Initialize complete experiment result.
Args:
name: Experiment name
run_name: Current experiment run name
description: Optional experiment description
item_results: Results from individual dataset items
run_evaluations: Aggregate evaluation results for entire run
dataset_run_id: Dataset run ID (for Langfuse datasets)
dataset_run_url: URL to view results in Langfuse UI
"""
def format(self, *, include_item_results: bool = False) -> str:
"""Format results for human-readable display.
Args:
include_item_results: Whether to include detailed results for each item
Returns:
Formatted multi-line string with experiment overview and results
"""
# Attributes
name: str
run_name: str
description: Optional[str]
item_results: List[ExperimentItemResult]
run_evaluations: List[Evaluation]
dataset_run_id: Optional[str]
dataset_run_url: Optional[str]Main method for executing experiments on data with automatic tracing and evaluation.
class Langfuse:
def run_experiment(self, *, name: str, data: ExperimentData,
task: TaskFunction, evaluators: List[EvaluatorFunction] = None,
run_evaluators: List[RunEvaluatorFunction] = None,
run_name: str = None, run_description: str = None,
experiment_config: Dict[str, Any] = None) -> ExperimentResult:
"""Run experiment on dataset with automatic evaluation.
Args:
name: Experiment name
data: List of experiment items to process
task: Function to execute on each item
evaluators: List of item-level evaluator functions
run_evaluators: List of run-level evaluator functions
run_name: Name for this specific run
run_description: Description of this experiment run
experiment_config: Configuration metadata for experiment
Returns:
ExperimentResult with complete results and evaluations
"""Type definitions for experiment data structures and function interfaces.
# Data Types
LocalExperimentItem = TypedDict('LocalExperimentItem', {
'input': Any,
'expected_output': Any,
'metadata': Optional[Dict[str, Any]]
}, total=False)
ExperimentItem = Union[LocalExperimentItem, DatasetItemClient]
ExperimentData = Union[List[LocalExperimentItem], List[DatasetItemClient]]
# Function Protocols
class TaskFunction(Protocol):
def __call__(self, *, item: ExperimentItem, **kwargs) -> Union[Any, Awaitable[Any]]:
"""Execute task on experiment item.
Args:
item: Experiment item to process
**kwargs: Additional arguments
Returns:
Task output (can be async)
"""
class EvaluatorFunction(Protocol):
def __call__(self, *, input: Any, output: Any, expected_output: Any = None,
metadata: Dict[str, Any] = None, **kwargs) -> Union[Evaluation, List[Evaluation], Awaitable[Union[Evaluation, List[Evaluation]]]]:
"""Evaluate task output for individual items.
Args:
input: Original input to task function
output: Task function output
expected_output: Expected output for comparison
metadata: Item metadata
**kwargs: Additional arguments
Returns:
Single evaluation or list of evaluations (can be async)
"""
class RunEvaluatorFunction(Protocol):
def __call__(self, *, item_results: List[ExperimentItemResult],
**kwargs) -> Union[Evaluation, List[Evaluation], Awaitable[Union[Evaluation, List[Evaluation]]]]:
"""Evaluate entire experiment run with aggregate metrics.
Args:
item_results: Results from all processed items
**kwargs: Additional arguments
Returns:
Aggregate evaluation results (can be async)
"""Helper functions for working with evaluators and experiment frameworks.
def create_evaluator_from_autoevals(autoevals_evaluator: Any,
**kwargs: Dict[str, Any]) -> EvaluatorFunction:
"""Create Langfuse evaluator from autoevals evaluator.
Args:
autoevals_evaluator: An autoevals evaluator instance
**kwargs: Additional arguments passed to evaluator
Returns:
Langfuse-compatible evaluator function
"""from langfuse import Langfuse, Evaluation
langfuse = Langfuse()
# Define task function
def generate_answer(*, item, **kwargs):
question = item["input"] if isinstance(item, dict) else item.input
# Your AI model call
answer = my_llm.generate(question)
return answer
# Define evaluator
def accuracy_evaluator(*, input, output, expected_output=None, **kwargs):
if not expected_output:
return Evaluation(name="accuracy", value=None, comment="No expected output")
is_correct = output.strip().lower() == expected_output.strip().lower()
return Evaluation(
name="accuracy",
value=1.0 if is_correct else 0.0,
comment="Exact match" if is_correct else "Different answer"
)
# Experiment data
experiment_data = [
{"input": "What is the capital of France?", "expected_output": "Paris"},
{"input": "What is the capital of Germany?", "expected_output": "Berlin"},
{"input": "What is the capital of Italy?", "expected_output": "Rome"}
]
# Run experiment
result = langfuse.run_experiment(
name="Capital Cities Test",
data=experiment_data,
task=generate_answer,
evaluators=[accuracy_evaluator]
)
# View results
print(result.format())
print(f"Average accuracy: {sum(eval.value for item in result.item_results for eval in item.evaluations if eval.name == 'accuracy') / len(result.item_results)}")def comprehensive_evaluator(*, input, output, expected_output=None, metadata=None, **kwargs):
"""Multiple evaluation metrics for a single item."""
evaluations = []
# Length check
evaluations.append(Evaluation(
name="output_length",
value=len(output),
comment=f"Output contains {len(output)} characters"
))
# Accuracy check
if expected_output:
is_correct = output.strip().lower() == expected_output.strip().lower()
evaluations.append(Evaluation(
name="accuracy",
value=is_correct,
data_type="BOOLEAN",
comment="Exact match" if is_correct else "Different answer"
))
# Custom scoring based on metadata
if metadata and "difficulty" in metadata:
difficulty_bonus = {"easy": 0, "medium": 0.1, "hard": 0.2}[metadata["difficulty"]]
evaluations.append(Evaluation(
name="difficulty_adjusted_score",
value=0.8 + difficulty_bonus,
comment=f"Base score with {metadata['difficulty']} difficulty bonus"
))
return evaluationsimport asyncio
async def async_task(*, item, **kwargs):
"""Async task function."""
question = item["input"] if isinstance(item, dict) else item.input
response = await async_llm_client.generate(question)
return response
async def async_evaluator(*, input, output, expected_output=None, **kwargs):
"""Async evaluator using external API."""
try:
# Call external evaluation service
evaluation_result = await external_eval_api.evaluate(
question=input,
answer=output,
expected=expected_output
)
return Evaluation(
name="external_quality",
value=evaluation_result.score,
comment=evaluation_result.explanation,
metadata={"confidence": evaluation_result.confidence}
)
except Exception as e:
return Evaluation(
name="external_quality",
value=None,
comment=f"Evaluation failed: {str(e)}"
)
# Run with async functions
result = langfuse.run_experiment(
name="Async Experiment",
data=experiment_data,
task=async_task,
evaluators=[async_evaluator]
)def statistical_run_evaluator(*, item_results, **kwargs):
"""Aggregate statistics across all experiment items."""
evaluations = []
# Calculate average scores for each metric
metric_scores = {}
for item_result in item_results:
for evaluation in item_result.evaluations:
if isinstance(evaluation.value, (int, float)):
if evaluation.name not in metric_scores:
metric_scores[evaluation.name] = []
metric_scores[evaluation.name].append(evaluation.value)
# Generate aggregate evaluations
for metric_name, scores in metric_scores.items():
if scores:
avg_score = sum(scores) / len(scores)
evaluations.append(Evaluation(
name=f"avg_{metric_name}",
value=avg_score,
comment=f"Average {metric_name} across {len(scores)} items: {avg_score:.3f}"
))
# Standard deviation
if len(scores) > 1:
variance = sum((x - avg_score) ** 2 for x in scores) / len(scores)
std_dev = variance ** 0.5
evaluations.append(Evaluation(
name=f"std_{metric_name}",
value=std_dev,
comment=f"Standard deviation of {metric_name}: {std_dev:.3f}"
))
return evaluations
# Use run evaluator
result = langfuse.run_experiment(
name="Statistical Analysis",
data=experiment_data,
task=generate_answer,
evaluators=[accuracy_evaluator],
run_evaluators=[statistical_run_evaluator]
)# Run experiment
result = langfuse.run_experiment(
name="Quality Assessment",
data=experiment_data,
task=my_task,
evaluators=[accuracy_evaluator, quality_evaluator]
)
# Basic summary
print(result.format())
# Detailed report with individual items
detailed_report = result.format(include_item_results=True)
with open("experiment_report.txt", "w") as f:
f.write(detailed_report)
# Access individual results programmatically
for i, item_result in enumerate(result.item_results):
print(f"Item {i+1}:")
print(f" Input: {item_result.item}")
print(f" Output: {item_result.output}")
for evaluation in item_result.evaluations:
print(f" {evaluation.name}: {evaluation.value}")
if evaluation.comment:
print(f" Comment: {evaluation.comment}")
# Calculate custom metrics
accuracy_scores = []
for item_result in result.item_results:
for evaluation in item_result.evaluations:
if evaluation.name == "accuracy" and evaluation.value is not None:
accuracy_scores.append(evaluation.value)
if accuracy_scores:
avg_accuracy = sum(accuracy_scores) / len(accuracy_scores)
print(f"Overall accuracy: {avg_accuracy:.2%}")from langfuse.experiment import create_evaluator_from_autoevals
# Assuming you have autoevals installed
# Convert autoevals evaluator to Langfuse format
autoevals_evaluator = some_autoevals.Evaluator()
langfuse_evaluator = create_evaluator_from_autoevals(
autoevals_evaluator,
model="gpt-4" # Additional parameters for the evaluator
)
# Use in experiment
result = langfuse.run_experiment(
name="Autoevals Integration",
data=experiment_data,
task=my_task,
evaluators=[langfuse_evaluator]
)Install with Tessl CLI
npx tessl i tessl/pypi-langfuse