Framework for evaluating DSPy programs with built-in metrics, parallel execution, and result tracking. Essential for measuring and improving AI system performance.
Evaluates DSPy programs on datasets with parallel execution support.
class Evaluate:
"""
Evaluate DSPy programs on datasets.
Runs a program on evaluation examples and computes scores
using a metric function. Supports parallel execution, progress
tracking, and result export.
"""
def __init__(
self,
devset: list,
metric: callable = None,
num_threads: int = None,
display_progress: bool = False,
display_table: bool = False,
max_errors: int = None,
provide_traceback: bool = None,
failure_score: float = 0.0,
save_as_csv: str = None,
save_as_json: str = None,
**kwargs
):
"""
Initialize evaluator.
Args:
devset (list): Evaluation dataset (list of Example instances)
metric (callable | None): Metric function(example, pred, trace) -> bool/float
num_threads (int | None): Number of threads for parallel evaluation
display_progress (bool): Show progress bar during evaluation (default: False)
display_table (bool | int): Display results table (True/False or number of rows)
max_errors (int | None): Maximum errors before stopping evaluation
provide_traceback (bool | None): Include error tracebacks in output
failure_score (float): Score assigned to failed examples (default: 0.0)
save_as_csv (str | None): Path to save results as CSV
save_as_json (str | None): Path to save results as JSON
**kwargs: Additional evaluation parameters
"""
pass
def __call__(self, program, **kwargs):
"""
Evaluate program on dataset.
Args:
program: Module to evaluate
**kwargs: Override evaluation parameters
Returns:
EvaluationResult with score and detailed results
"""
passUsage:
import dspy
# Configure
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
# Define metric
def validate_answer(example, pred, trace=None):
"""Check if prediction matches expected answer."""
return example.answer.lower() == pred.answer.lower()
# Prepare evaluation set
devset = [
dspy.Example(question="What is 2+2?", answer="4").with_inputs("question"),
dspy.Example(question="Capital of France?", answer="Paris").with_inputs("question"),
# ... more examples
]
# Create evaluator
evaluator = dspy.Evaluate(
devset=devset,
metric=validate_answer,
num_threads=4,
display_progress=True,
display_table=10 # Show top 10 results
)
# Evaluate program
program = MyModule()
result = evaluator(program)
print(f"Score: {result.score}%")
print(f"Passed: {len([r for r in result.results if r[2] > 0])}")
# Save results
evaluator = dspy.Evaluate(
devset=devset,
metric=validate_answer,
save_as_csv="results.csv",
save_as_json="results.json"
)
result = evaluator(program)Result container from Evaluate.
class EvaluationResult:
"""
Result from program evaluation.
Contains overall score and detailed per-example results.
Subclass of Prediction for compatibility.
"""
score: float
"""Overall score as percentage (e.g., 67.5 for 67.5%)."""
results: list
"""List of (example, prediction, score) tuples for each evaluated example."""Usage:
import dspy
evaluator = dspy.Evaluate(devset=devset, metric=my_metric)
result = evaluator(program)
# Access overall score
print(f"Accuracy: {result.score}%")
# Analyze individual results
for example, pred, score in result.results:
if score == 0: # Failed cases
print(f"Failed on: {example.inputs()}")
print(f"Expected: {example.labels()}")
print(f"Got: {pred}")
# Filter successful cases
successful = [(ex, pred, s) for ex, pred, s in result.results if s > 0]
print(f"Passed {len(successful)}/{len(result.results)} examples")Compute exact match after text normalization.
def EM(prediction: str, answers_list: list) -> bool:
"""
Exact match metric with normalization.
Normalizes prediction and reference answers (lowercase, remove
punctuation, etc.) and checks for exact match.
Args:
prediction (str): Predicted answer
answers_list (list[str]): List of acceptable reference answers
Returns:
True if prediction exactly matches any reference answer
"""
passUsage:
from dspy.evaluate import EM
import dspy
# Direct usage
result = EM("The Eiffel Tower", ["Eiffel Tower", "Tour Eiffel"])
print(result) # True
result = EM("paris", ["Paris", "Paris, France"])
print(result) # True (case-insensitive)
# As evaluation metric
def em_metric(example, pred, trace=None):
return EM(pred.answer, [example.answer])
evaluator = dspy.Evaluate(devset=devset, metric=em_metric)Compute token-level F1 score.
def F1(prediction: str, answers_list: list) -> float:
"""
Token-level F1 score.
Computes maximum F1 score between prediction and any reference
answer based on token overlap.
Args:
prediction (str): Predicted answer
answers_list (list[str]): List of reference answers
Returns:
Maximum F1 score (0.0 to 1.0)
"""
passUsage:
from dspy.evaluate import F1
# Direct usage
score = F1("The quick brown fox", ["quick brown fox jumps"])
print(score) # 0.8 (4 matching tokens out of 5)
# As threshold metric
def f1_metric(example, pred, trace=None):
f1 = F1(pred.answer, [example.answer])
return f1 > 0.7 # Pass if F1 > 0.7
import dspy
evaluator = dspy.Evaluate(devset=devset, metric=f1_metric)HotPotQA-style F1 with special handling.
def HotPotF1(prediction: str, answers_list: list) -> float:
"""
HotPotQA F1 metric.
F1 score with special handling for yes/no/noanswer cases,
following HotPotQA evaluation protocol.
Args:
prediction (str): Predicted answer
answers_list (list[str]): Reference answers
Returns:
F1 score (0.0 to 1.0)
"""
passNormalize text for comparison.
def normalize_text(s: str) -> str:
"""
Normalize text for comparison.
Applies:
- NFD Unicode normalization
- Lowercase conversion
- Punctuation removal
- Article removal (a, an, the)
- Whitespace collapsing
Args:
s (str): Input text
Returns:
Normalized text
"""
passUsage:
from dspy.evaluate import normalize_text
# Normalize for comparison
text1 = normalize_text("The Quick, Brown Fox!")
text2 = normalize_text("quick brown fox")
print(text1 == text2) # True
# Custom metric with normalization
def normalized_match(example, pred, trace=None):
pred_norm = normalize_text(pred.answer)
ans_norm = normalize_text(example.answer)
return pred_norm == ans_normMetric function for exact match evaluation.
def answer_exact_match(
example,
pred,
trace=None,
frac: float = 1.0
) -> bool:
"""
Answer exact match metric.
Compares example.answer with pred.answer using exact match
or F1 threshold.
Args:
example: Example with 'answer' field
pred: Prediction with 'answer' field
trace: Unused, for compatibility
frac (float): Threshold (1.0 = exact match, <1.0 = F1 threshold)
Returns:
True if match/threshold met
"""
passUsage:
from dspy.evaluate import answer_exact_match
import dspy
# Exact match
def metric(example, pred, trace=None):
return answer_exact_match(example, pred, frac=1.0)
# F1 threshold
def metric_f1(example, pred, trace=None):
return answer_exact_match(example, pred, frac=0.8)
evaluator = dspy.Evaluate(devset=devset, metric=metric_f1)Check if answer appears in retrieved passages.
def answer_passage_match(example, pred, trace=None) -> bool:
"""
Answer passage match metric.
Checks if example.answer appears in any passage in pred.context.
Useful for evaluating retrieval quality.
Args:
example: Example with 'answer' field
pred: Prediction with 'context' field (list of passages)
trace: Unused
Returns:
True if answer found in any passage
"""
passUsage:
from dspy.evaluate import answer_passage_match
import dspy
def retrieval_metric(example, pred, trace=None):
"""Check if retrieved passages contain answer."""
return answer_passage_match(example, pred)
# Evaluate RAG system
evaluator = dspy.Evaluate(devset=devset, metric=retrieval_metric)
result = evaluator(rag_program)
print(f"Retrieval quality: {result.score}%")LM-based semantic F1 metric.
class SemanticF1:
"""
Semantic F1 using LM evaluation.
Uses language model to evaluate semantic similarity
between prediction and reference answers.
"""
def __init__(self, lm=None):
"""
Initialize semantic F1 metric.
Args:
lm: Language model for evaluation (uses default if None)
"""
pass
def __call__(self, example, pred, trace=None) -> float:
"""
Compute semantic F1.
Args:
example: Example with reference
pred: Prediction to evaluate
trace: Unused
Returns:
Semantic F1 score (0.0 to 1.0)
"""
passUsage:
import dspy
# Create semantic metric
semantic_f1 = dspy.SemanticF1(lm=dspy.LM('openai/gpt-4o'))
# Use in evaluation
evaluator = dspy.Evaluate(devset=devset, metric=semantic_f1)
result = evaluator(program)Check if answer is complete and grounded in context.
class CompleteAndGrounded:
"""
Complete and grounded metric.
Uses LM to check if answer is both complete (addresses the
question fully) and grounded (supported by context).
"""
def __init__(self, lm=None):
"""
Initialize metric.
Args:
lm: Language model for evaluation
"""
pass
def __call__(self, example, pred, trace=None) -> bool:
"""
Evaluate completeness and grounding.
Args:
example: Example with question and context
pred: Prediction with answer
trace: Unused
Returns:
True if answer is complete and grounded
"""
passSimple evaluation workflow:
import dspy
# Define metric
def accuracy(example, pred, trace=None):
return example.answer == pred.answer
# Evaluate
evaluator = dspy.Evaluate(
devset=test_set,
metric=accuracy,
display_progress=True
)
score = evaluator(program)
print(f"Accuracy: {score.score}%")Evaluate with multiple metrics:
import dspy
from dspy.evaluate import F1
def multi_metric(example, pred, trace=None):
"""Evaluate on multiple criteria."""
# Exact match
exact = example.answer.lower() == pred.answer.lower()
# F1 score
f1 = F1(pred.answer, [example.answer])
# Length constraint
word_count = len(pred.answer.split())
appropriate_length = 10 <= word_count <= 100
# Combined score
score = 0.0
if exact:
score += 1.0
elif f1 > 0.7:
score += 0.5
if appropriate_length:
score += 0.5
return score / 1.5 # Normalize to 0-1
evaluator = dspy.Evaluate(devset=devset, metric=multi_metric)
result = evaluator(program)Evaluate specific components:
import dspy
from dspy.evaluate import answer_passage_match, answer_exact_match
# Evaluate retrieval quality
def retrieval_quality(example, pred, trace=None):
"""Check if retrieved passages contain answer."""
if not hasattr(pred, 'context'):
return False
return answer_passage_match(example, pred)
# Evaluate generation quality
def generation_quality(example, pred, trace=None):
"""Check answer quality given context."""
return answer_exact_match(example, pred)
# Evaluate separately
retrieval_eval = dspy.Evaluate(devset=devset, metric=retrieval_quality)
generation_eval = dspy.Evaluate(devset=devset, metric=generation_quality)
ret_score = retrieval_eval(program)
gen_score = generation_eval(program)
print(f"Retrieval: {ret_score.score}%")
print(f"Generation: {gen_score.score}%")Analyze evaluation failures:
import dspy
def detailed_metric(example, pred, trace=None):
correct = example.answer.lower() == pred.answer.lower()
return correct
evaluator = dspy.Evaluate(
devset=devset,
metric=detailed_metric,
display_table=False,
provide_traceback=True
)
result = evaluator(program)
# Analyze failures
failures = [(ex, pred, score) for ex, pred, score in result.results if score == 0]
print(f"Failed on {len(failures)} examples:")
for ex, pred, _ in failures[:5]: # Show first 5
print(f"\nInput: {ex.inputs()}")
print(f"Expected: {ex.answer}")
print(f"Got: {pred.answer}")
if hasattr(pred, 'reasoning'):
print(f"Reasoning: {pred.reasoning}")Compare multiple programs:
import dspy
from dspy.evaluate import answer_exact_match
def metric(example, pred, trace=None):
return answer_exact_match(example, pred)
# Evaluate baseline
evaluator = dspy.Evaluate(devset=devset, metric=metric)
baseline_score = evaluator(baseline_program)
# Evaluate optimized versions
optimized_scores = {}
for name, program in optimized_programs.items():
score = evaluator(program)
optimized_scores[name] = score.score
print(f"{name}: {score.score}%")
# Show improvement
print(f"\nBaseline: {baseline_score.score}%")
for name, score in optimized_scores.items():
improvement = score - baseline_score.score
print(f"{name}: {score}% ({improvement:+.1f}%)")Perform k-fold cross-validation:
import dspy
import random
def cross_validate(program_class, dataset, k=5, metric=None):
"""K-fold cross-validation."""
random.shuffle(dataset)
fold_size = len(dataset) // k
scores = []
for i in range(k):
# Split data
val_start = i * fold_size
val_end = (i + 1) * fold_size
valset = dataset[val_start:val_end]
trainset = dataset[:val_start] + dataset[val_end:]
# Train/optimize
program = program_class()
optimizer = dspy.BootstrapFewShot(metric=metric)
compiled = optimizer.compile(program, trainset=trainset)
# Evaluate
evaluator = dspy.Evaluate(devset=valset, metric=metric)
result = evaluator(compiled)
scores.append(result.score)
print(f"Fold {i+1}: {result.score}%")
# Aggregate
avg_score = sum(scores) / len(scores)
print(f"\nAverage: {avg_score}%")
return scores
# Run cross-validation
scores = cross_validate(MyProgramClass, dataset, k=5, metric=my_metric)Create comprehensive benchmark:
import dspy
from dspy.evaluate import answer_exact_match, F1
class BenchmarkSuite:
"""Comprehensive evaluation suite."""
def __init__(self):
self.benchmarks = {
"accuracy": self.accuracy_metric,
"f1_score": self.f1_metric,
"latency": self.latency_metric,
"cost": self.cost_metric,
}
def accuracy_metric(self, example, pred, trace=None):
return answer_exact_match(example, pred)
def f1_metric(self, example, pred, trace=None):
f1 = F1(pred.answer, [example.answer])
return f1 > 0.7
def latency_metric(self, example, pred, trace=None):
# Check prediction metadata for timing
if hasattr(pred, '_metadata'):
latency = pred._metadata.get('latency', 0)
return latency < 2.0 # Under 2 seconds
return True
def cost_metric(self, example, pred, trace=None):
# Check token usage
usage = pred.get_lm_usage()
total_tokens = usage.get('total_tokens', 0)
return total_tokens < 1000 # Under 1000 tokens
def evaluate_all(self, program, devset):
"""Run all benchmarks."""
results = {}
for name, metric in self.benchmarks.items():
evaluator = dspy.Evaluate(devset=devset, metric=metric)
score = evaluator(program)
results[name] = score.score
print(f"{name}: {score.score}%")
return results
# Use benchmark suite
suite = BenchmarkSuite()
results = suite.evaluate_all(program, test_set)