or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

evaluation.mddocs/

Model Evaluation

Comprehensive model evaluation capabilities including benchmark evaluations, custom scorers, and LLM-as-judge for foundation models.

Capabilities

BenchMarkEvaluator

Standard benchmark evaluations for foundation models including MMLU, BBH, MATH, GSM8K, and more.

class BenchMarkEvaluator:
    """
    Standard benchmark evaluations for foundation models.

    Parameters:
        benchmark: Union[Benchmark, str] - Benchmark name (required)
            - Enum value or string: "MMLU", "BBH", "MATH", "GSM8K", etc.
        subtasks: Optional[List[str]] - Specific subtasks to evaluate
            - Example for MMLU: ["abstract_algebra", "anatomy", "astronomy"]
            - If not provided: runs all subtasks
        evaluate_base_model: Optional[bool] - Also evaluate base model (default: False)
            - Compares fine-tuned vs base model
        model: Union[str, ModelPackage] - Model to evaluate (required)
            - Model name, ARN, or ModelPackage object
        s3_output_path: Optional[str] - S3 path for results
        region: Optional[str] - AWS region
        sagemaker_session: Optional[Session] - SageMaker session
        base_eval_name: Optional[str] - Base evaluation name
        mlflow_resource_arn: Optional[str] - MLflow tracking server ARN
        mlflow_experiment_name: Optional[str] - MLflow experiment name
        mlflow_run_name: Optional[str] - MLflow run name
        networking: Optional[VpcConfig] - VPC configuration
        kms_key_id: Optional[str] - KMS key for encryption
        model_package_group: Optional[Union[str, ModelPackageGroup]] - Model package group

    Methods:
        evaluate(subtask=None) -> EvaluationPipelineExecution
            Run benchmark evaluation.
            
            Parameters:
                subtask: Optional[str] - Specific subtask to run
            
            Returns:
                EvaluationPipelineExecution: Evaluation execution object
            
            Raises:
                ValueError: Invalid benchmark or subtask
                ClientError: AWS API errors

    Class Methods:
        get_all(session=None, region=None) -> List[str]
            Get all available benchmarks.
            
            Parameters:
                session: Optional[Session] - SageMaker session
                region: Optional[str] - AWS region
            
            Returns:
                List[str]: List of benchmark names

    Attributes:
        hyperparameters: Dynamic hyperparameters object with validation
            - batch_size: int
            - max_tokens: int
            - temperature: float
            - top_p: float

    Supported Benchmarks:
        - MMLU: Massive Multitask Language Understanding (57 subtasks)
        - BBH: Big Bench Hard (27 subtasks)
        - MATH: Mathematics problem solving
        - GSM8K: Grade school math (8K problems)
        - HellaSwag: Commonsense reasoning
        - WinoGrande: Commonsense reasoning with pronoun resolution
        - ARC: AI2 Reasoning Challenge (easy and challenge)
        - TruthfulQA: Truthfulness evaluation
        - PIQA: Physical interaction QA
        - BoolQ: Boolean questions
        - RACE: Reading comprehension

    Notes:
        - Benchmarks run as SageMaker Pipeline executions
        - Results include per-subtask and aggregate metrics
        - Evaluation can take 30 minutes to several hours
        - Costs depend on model size and inference time
        - Compare base vs fine-tuned to measure improvement
    """

Usage:

from sagemaker.train.evaluate import BenchMarkEvaluator, get_benchmarks, get_benchmark_properties

# List available benchmarks
benchmarks = BenchMarkEvaluator.get_all()
print(f"Available benchmarks: {benchmarks}")

# Get benchmark details
properties = get_benchmark_properties("MMLU")
print(f"MMLU subtasks: {properties['subtasks']}")
print(f"Dataset info: {properties['dataset_info']}")

# Create benchmark evaluator
evaluator = BenchMarkEvaluator(
    benchmark="MMLU",
    subtasks=["abstract_algebra", "anatomy", "astronomy", "college_mathematics"],
    model="meta-llama/Llama-2-7b-hf-fine-tuned",
    s3_output_path="s3://my-bucket/evaluation-results",
    evaluate_base_model=True,  # Compare with base model
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123:mlflow-tracking-server/my-server",
    mlflow_experiment_name="llm-evaluation",
    mlflow_run_name="mmlu-evaluation-v1"
)

# Configure hyperparameters
evaluator.hyperparameters.batch_size = 8
evaluator.hyperparameters.max_tokens = 512
evaluator.hyperparameters.temperature = 0.1  # Low temperature for deterministic answers

# Run evaluation with error handling
try:
    execution = evaluator.evaluate()
    print(f"Evaluation started: {execution.arn}")
    
    # Wait for completion
    execution.wait(poll=60)  # Check every minute
    
    # Check status
    if execution.status == "Succeeded":
        print(f"Evaluation completed successfully")
        print(f"Results: {execution.s3_output_path}")
    elif execution.status == "Failed":
        print(f"Evaluation failed: {execution.failure_reason}")
    
except ValueError as e:
    print(f"Invalid benchmark configuration: {e}")
except ClientError as e:
    print(f"AWS API error: {e}")

Run Specific Subtask:

# Evaluate only specific subtask
evaluator = BenchMarkEvaluator(
    benchmark="MMLU",
    model="my-model"
)

# Run single subtask
execution = evaluator.evaluate(subtask="college_mathematics")
execution.wait()

CustomScorerEvaluator

Custom scorer or preset metrics evaluations for tailored model assessment.

class CustomScorerEvaluator:
    """
    Custom scorer or preset metrics evaluations.

    Parameters:
        evaluator: Union[str, CustomScorer] - Custom scorer or preset metric name (required)
            - Preset: "accuracy", "f1", "precision", "recall", "bleu", "rouge", "meteor", "bertscore"
            - CustomScorer: User-defined scorer object
        dataset: Union[str, DataSet] - Evaluation dataset (required)
            - S3 URI, dataset ARN, or DataSet object
        evaluate_base_model: Optional[bool] - Also evaluate base model (default: False)
        model: Union[str, ModelPackage] - Model to evaluate (required)
        s3_output_path: Optional[str] - S3 path for results
        region: Optional[str] - AWS region
        sagemaker_session: Optional[Session] - SageMaker session
        base_eval_name: Optional[str] - Base evaluation name
        mlflow_resource_arn: Optional[str] - MLflow tracking server ARN
        mlflow_experiment_name: Optional[str] - MLflow experiment name
        mlflow_run_name: Optional[str] - MLflow run name
        networking: Optional[VpcConfig] - VPC configuration
        kms_key_id: Optional[str] - KMS key for encryption
        model_package_group: Optional[Union[str, ModelPackageGroup]] - Model package group

    Methods:
        evaluate() -> EvaluationPipelineExecution
            Run custom scorer evaluation.
            
            Returns:
                EvaluationPipelineExecution: Evaluation execution
            
            Raises:
                ValueError: Invalid evaluator or dataset
                ClientError: AWS API errors

    Attributes:
        hyperparameters: Dynamic hyperparameters object
            - average: str (for classification metrics) - "micro", "macro", "weighted"
            - threshold: float (for binary classification)
            - Additional metric-specific parameters

    Preset Metrics:
        Classification:
            - accuracy: Overall accuracy
            - f1: F1 score (harmonic mean of precision/recall)
            - precision: Precision score
            - recall: Recall score
        
        Text Generation:
            - bleu: BLEU score for translation quality
            - rouge: ROUGE score for summarization (rouge-1, rouge-2, rouge-l)
            - meteor: METEOR score for translation
            - bertscore: BERTScore semantic similarity

    Notes:
        - Preset metrics cover common use cases
        - Custom scorers for domain-specific evaluation
        - Dataset must include predictions and ground truth
        - Results include per-sample and aggregate metrics
    """

Usage:

from sagemaker.train.evaluate import CustomScorerEvaluator, get_builtin_metrics

# List built-in metrics
builtin_metrics = get_builtin_metrics()
print(f"Available metrics: {list(builtin_metrics)}")

# Using preset metric
evaluator = CustomScorerEvaluator(
    evaluator="f1",  # F1 score
    dataset="s3://my-bucket/eval-data.jsonl",
    model="my-fine-tuned-model",
    s3_output_path="s3://my-bucket/evaluation-results"
)

# Configure hyperparameters for F1
evaluator.hyperparameters.average = "weighted"  # Weighted by class support
evaluator.hyperparameters.threshold = 0.5  # Classification threshold

# Run evaluation
try:
    execution = evaluator.evaluate()
    execution.wait()
    
    print(f"F1 evaluation completed")
    print(f"Results: {execution.s3_output_path}")
    
except ValueError as e:
    print(f"Configuration error: {e}")

Using Custom Scorer:

# Define custom scorer class
class DomainSpecificScorer:
    """Custom scorer for domain-specific metrics."""
    
    def score(self, predictions, ground_truth, metadata=None):
        """
        Calculate custom metrics.
        
        Args:
            predictions: List of model predictions
            ground_truth: List of true labels
            metadata: Optional metadata dictionary
        
        Returns:
            Dict with metric name -> value
        """
        # Implement custom scoring logic
        custom_metric = calculate_domain_metric(predictions, ground_truth)
        secondary_metric = calculate_secondary_metric(predictions, ground_truth)
        
        return {
            "domain_metric": custom_metric,
            "secondary_metric": secondary_metric,
            "sample_count": len(predictions)
        }

# Use custom scorer
custom_evaluator = CustomScorerEvaluator(
    evaluator=DomainSpecificScorer(),
    dataset="s3://my-bucket/test-data.jsonl",
    model="my-model",
    s3_output_path="s3://my-bucket/custom-eval-results"
)

execution = custom_evaluator.evaluate()
execution.wait()

BLEU Score for Translation:

# Evaluate translation model with BLEU
bleu_evaluator = CustomScorerEvaluator(
    evaluator="bleu",
    dataset="s3://bucket/translation-test.jsonl",
    model="my-translation-model"
)

# Configure BLEU parameters
evaluator.hyperparameters.max_order = 4  # BLEU-4
evaluator.hyperparameters.smooth = True

execution = bleu_evaluator.evaluate()

ROUGE Score for Summarization:

# Evaluate summarization with ROUGE
rouge_evaluator = CustomScorerEvaluator(
    evaluator="rouge",
    dataset="s3://bucket/summarization-test.jsonl",
    model="my-summarization-model"
)

# Configure ROUGE parameters
evaluator.hyperparameters.rouge_types = ["rouge1", "rouge2", "rougeL"]
evaluator.hyperparameters.use_stemmer = True

execution = rouge_evaluator.evaluate()

LLMAsJudgeEvaluator

LLM-as-judge evaluations using foundation models to assess quality, safety, and other attributes.

class LLMAsJudgeEvaluator:
    """
    LLM-as-judge evaluations using foundation models.

    Parameters:
        evaluator_model: Union[str, ModelPackage] - Judge model (required)
            - Bedrock model ID (e.g., "anthropic.claude-3-sonnet-20240229-v1:0")
            - ModelPackage from Model Registry
        dataset: Union[str, DataSet] - Evaluation dataset (required)
            - S3 URI, dataset ARN, or DataSet object
        builtin_metrics: Optional[List[str]] - Built-in judge metrics
            - See Built-in Judge Metrics below
        custom_metrics: Optional[List[CustomMetric]] - Custom judge metrics
            - User-defined evaluation criteria
        evaluate_base_model: Optional[bool] - Also evaluate base model (default: False)
        model: Union[str, ModelPackage] - Model to evaluate (required)
        s3_output_path: Optional[str] - S3 path for results
        region: Optional[str] - AWS region
        sagemaker_session: Optional[Session] - SageMaker session
        base_eval_name: Optional[str] - Base evaluation name
        mlflow_resource_arn: Optional[str] - MLflow tracking server ARN
        mlflow_experiment_name: Optional[str] - MLflow experiment name
        mlflow_run_name: Optional[str] - MLflow run name
        networking: Optional[VpcConfig] - VPC configuration
        kms_key_id: Optional[str] - KMS key for encryption
        model_package_group: Optional[Union[str, ModelPackageGroup]] - Model package group

    Methods:
        evaluate() -> EvaluationPipelineExecution
            Run LLM-as-judge evaluation.
            
            Returns:
                EvaluationPipelineExecution: Evaluation execution
            
            Raises:
                ValueError: Invalid evaluator model or metrics
                ClientError: AWS API errors

    Built-in Judge Metrics:
        - helpfulness: Response helpfulness to user query
        - relevance: Answer relevance to question
        - correctness: Factual correctness of response
        - coherence: Logical flow and consistency
        - harmfulness: Harmful or unsafe content detection
        - toxicity: Toxic language detection
        - stereotypes: Stereotype and bias detection
        - refusal: Refusal rate for inappropriate requests
        - completeness: Response completeness
        - conciseness: Response conciseness

    Notes:
        - Uses LLM to evaluate model outputs
        - More flexible than rule-based metrics
        - Evaluator model must support required task
        - Custom metrics allow domain-specific evaluation
        - Results include per-sample scores and reasoning
    """

Usage:

from sagemaker.train.evaluate import LLMAsJudgeEvaluator

# Using built-in metrics
evaluator = LLMAsJudgeEvaluator(
    evaluator_model="anthropic.claude-3-sonnet-20240229-v1:0",  # Bedrock model
    builtin_metrics=["helpfulness", "relevance", "correctness", "harmfulness"],
    dataset="s3://my-bucket/eval-dataset.jsonl",
    model="my-fine-tuned-llm",
    s3_output_path="s3://my-bucket/judge-results",
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123:mlflow-tracking-server/server",
    mlflow_experiment_name="llm-quality-eval"
)

# Run evaluation
try:
    execution = evaluator.evaluate()
    print(f"LLM-as-judge evaluation started: {execution.arn}")
    
    # Wait for completion (can take 1-3 hours for large datasets)
    execution.wait(poll=120)  # Check every 2 minutes
    
    if execution.status == "Succeeded":
        print("Evaluation completed")
        # Results include scores for each metric
        # Download from s3_output_path for analysis
    
except ValueError as e:
    print(f"Invalid configuration: {e}")

Custom Judge Metrics:

# Define custom evaluation criteria
custom_metrics = [
    {
        "name": "technical_accuracy",
        "description": "Evaluate technical accuracy of code explanations",
        "prompt_template": """
You are an expert software engineer. Evaluate the technical accuracy of the following code explanation.

Question: {question}
Explanation: {response}

Rate the technical accuracy on a scale of 1-5:
1 - Completely incorrect
2 - Mostly incorrect with few correct points
3 - Partially correct
4 - Mostly correct with minor issues
5 - Completely accurate

Provide your rating and brief justification.

Rating:"""
    },
    {
        "name": "code_quality",
        "description": "Evaluate code quality and best practices",
        "prompt_template": """
Evaluate the quality of the following code based on best practices.

Code: {response}

Consider: readability, efficiency, error handling, documentation.

Rating (1-5):"""
    }
]

# Create evaluator with custom metrics
evaluator = LLMAsJudgeEvaluator(
    evaluator_model="anthropic.claude-3-sonnet-20240229-v1:0",
    custom_metrics=custom_metrics,
    dataset="s3://my-bucket/code-eval-dataset.jsonl",
    model="my-code-generation-model",
    s3_output_path="s3://my-bucket/code-eval-results"
)

execution = evaluator.evaluate()
execution.wait()

Compare Base vs Fine-Tuned:

# Evaluate both models
evaluator = BenchMarkEvaluator(
    benchmark="MMLU",
    subtasks=["college_computer_science", "college_mathematics"],
    model="my-fine-tuned-llama",
    evaluate_base_model=True,  # Also evaluate base Llama-2-7b
    s3_output_path="s3://bucket/comparison"
)

execution = evaluator.evaluate()
execution.wait()

# Results include metrics for both models
# Compare to measure fine-tuning improvement
# Example output:
# {
#   "fine_tuned": {"accuracy": 0.72, "score": 72.0},
#   "base": {"accuracy": 0.54, "score": 54.0},
#   "improvement": 0.18
# }

EvaluationPipelineExecution

Manages evaluation pipeline executions with status tracking and result retrieval.

class EvaluationPipelineExecution:
    """
    Evaluation pipeline execution management.

    Class Methods:
        start(eval_type, name, pipeline_definition, role_arn, s3_output_path, 
              session=None, region=None, tags=None) -> EvaluationPipelineExecution
            Start evaluation execution.
            
            Parameters:
                eval_type: str - Evaluation type (required)
                    - "benchmark", "custom_scorer", "llm_as_judge"
                name: str - Execution name (required)
                pipeline_definition: Dict - Pipeline definition (required)
                role_arn: str - IAM role ARN (required)
                s3_output_path: str - S3 output path (required)
                session: Optional[Session] - SageMaker session
                region: Optional[str] - AWS region
                tags: Optional[List[Dict]] - Tags
            
            Returns:
                EvaluationPipelineExecution: Started execution
        
        get_all(eval_type, session=None, region=None) -> List[EvaluationPipelineExecution]
            Get all executions of a type.
            
            Parameters:
                eval_type: str - Evaluation type
                session: Optional[Session] - SageMaker session
                region: Optional[str] - AWS region
            
            Returns:
                List[EvaluationPipelineExecution]: All executions
        
        get(pipeline_execution_arn, session=None, region=None) -> EvaluationPipelineExecution
            Get specific execution.
            
            Parameters:
                pipeline_execution_arn: str - Execution ARN (required)
                session: Optional[Session] - SageMaker session
                region: Optional[str] - AWS region
            
            Returns:
                EvaluationPipelineExecution: Execution object

    Methods:
        wait(poll=30, timeout=None) -> None
            Wait for execution to complete.
            
            Parameters:
                poll: int - Polling interval in seconds (default: 30)
                timeout: Optional[int] - Timeout in seconds
            
            Raises:
                TimeoutError: If timeout exceeded
                RuntimeError: If execution failed
        
        stop() -> None
            Stop the execution immediately.
            
            Raises:
                ClientError: If execution already completed
        
        refresh() -> None
            Refresh execution status from API.

    Attributes:
        name: str - Execution name
        arn: str - Execution ARN
        status: str - Current status
            - "Executing": In progress
            - "Succeeded": Completed successfully
            - "Failed": Execution failed
            - "Stopped": Manually stopped
        eval_type: str - Type of evaluation
        s3_output_path: str - S3 path for results
        creation_time: datetime - Creation timestamp
        failure_reason: Optional[str] - Failure reason if failed

    Notes:
        - Evaluation runs as SageMaker Pipeline
        - Long-running (minutes to hours)
        - Results written to S3 in JSON format
        - Can monitor status via SageMaker console
    """

Usage:

from sagemaker.train.evaluate import EvaluationPipelineExecution

# List all benchmark evaluations
executions = EvaluationPipelineExecution.get_all(
    eval_type="benchmark",
    session=session,
    region="us-west-2"
)

print(f"Total evaluations: {len(executions)}")
for execution in executions[:5]:
    print(f"  {execution.name}: {execution.status}")

# Get specific execution by ARN
execution = EvaluationPipelineExecution.get(
    pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123:pipeline/eval-pipeline/execution/abc123",
    session=session,
    region="us-west-2"
)

# Wait with custom timeout (2 hours)
try:
    execution.wait(poll=60, timeout=7200)
    
    if execution.status == "Succeeded":
        print("Evaluation completed successfully")
        # Download results from execution.s3_output_path
    
except TimeoutError:
    print("Evaluation exceeded 2 hour timeout")
    execution.stop()  # Stop if taking too long

# Refresh status
execution.refresh()
print(f"Current status: {execution.status}")

Evaluation Functions

get_benchmarks

def get_benchmarks() -> Type[Benchmark]:
    """
    Get benchmark enum for selecting available benchmarks.

    Returns:
        Type[Benchmark]: Enum class with available benchmarks as attributes

    Usage:
        Access benchmark names as enum values for type safety.
    
    Example:
        Benchmark = get_benchmarks()
        evaluator = BenchMarkEvaluator(benchmark=Benchmark.MMLU, ...)
    """

get_benchmark_properties

def get_benchmark_properties(benchmark: Union[Benchmark, str]) -> Dict[str, Any]:
    """
    Get properties for a specific benchmark.

    Parameters:
        benchmark: Union[Benchmark, str] - Benchmark enum value or string name

    Returns:
        Dict[str, Any]: Benchmark properties including:
            - description: str - Benchmark description
            - subtasks: List[str] - Available subtasks
            - dataset_info: Dict - Dataset information (size, source)
            - metrics: List[str] - Evaluation metrics computed
            - citation: str - Academic citation
            - task_type: str - Task type (classification, generation, etc.)
            - num_samples: int - Total samples in benchmark

    Raises:
        ValueError: If benchmark name invalid

    Example:
        props = get_benchmark_properties("MMLU")
        print(f"Subtasks: {props['subtasks']}")
        print(f"Metrics: {props['metrics']}")
    """

get_builtin_metrics

def get_builtin_metrics() -> Type[BuiltInMetric]:
    """
    Get built-in metrics enum for custom scorer evaluation.

    Returns:
        Type[BuiltInMetric]: Enum class with available built-in metrics

    Usage:
        Access metric names as enum values.

    Example:
        BuiltInMetric = get_builtin_metrics()
        evaluator = CustomScorerEvaluator(
            evaluator=BuiltInMetric.F1,
            ...
        )
    """

Usage:

from sagemaker.train.evaluate import (
    get_benchmarks,
    get_benchmark_properties,
    get_builtin_metrics
)

# List all benchmarks
Benchmark = get_benchmarks()
print(f"Available benchmarks: {list(Benchmark)}")

# Get detailed benchmark information
mmlu_props = get_benchmark_properties(Benchmark.MMLU)
print(f"MMLU Description: {mmlu_props['description']}")
print(f"MMLU Subtasks ({len(mmlu_props['subtasks'])}): {mmlu_props['subtasks'][:5]}...")
print(f"MMLU Metrics: {mmlu_props['metrics']}")
print(f"Dataset size: {mmlu_props['num_samples']} samples")

bbh_props = get_benchmark_properties("BBH")
print(f"\nBBH Subtasks: {bbh_props['subtasks']}")

# List built-in metrics for custom evaluation
BuiltInMetric = get_builtin_metrics()
print(f"\nAvailable metrics: {list(BuiltInMetric)}")

# Use enum for type safety
evaluator = BenchMarkEvaluator(
    benchmark=Benchmark.MATH,  # Type-safe
    model="my-model"
)

Advanced Usage

Multi-Benchmark Evaluation

from sagemaker.train.evaluate import BenchMarkEvaluator
from concurrent.futures import ThreadPoolExecutor, as_completed

# Evaluate multiple benchmarks in parallel
benchmarks = ["MMLU", "BBH", "MATH", "GSM8K"]
model_name = "my-fine-tuned-model"

def run_benchmark(benchmark_name):
    """Run single benchmark evaluation."""
    evaluator = BenchMarkEvaluator(
        benchmark=benchmark_name,
        model=model_name,
        s3_output_path=f"s3://bucket/results/{benchmark_name.lower()}"
    )
    execution = evaluator.evaluate()
    execution.wait()
    return {
        "benchmark": benchmark_name,
        "status": execution.status,
        "results_path": execution.s3_output_path
    }

# Run all benchmarks
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(run_benchmark, bm) for bm in benchmarks]
    
    for future in as_completed(futures):
        try:
            result = future.result()
            results.append(result)
            print(f"{result['benchmark']}: {result['status']}")
        except Exception as e:
            print(f"Evaluation failed: {e}")

# Aggregate results
successful = [r for r in results if r['status'] == 'Succeeded']
print(f"\nCompleted {len(successful)}/{len(benchmarks)} evaluations")

Pipeline Integration

from sagemaker.mlops.workflow import Pipeline, LambdaStep, ConditionStep
from sagemaker.core.workflow import ConditionGreaterThan

# Evaluation as pipeline step
def run_evaluation_lambda(event, context):
    """Lambda to run evaluation."""
    from sagemaker.train.evaluate import BenchMarkEvaluator
    
    evaluator = BenchMarkEvaluator(
        benchmark="MMLU",
        model=event["model_name"],
        s3_output_path=event["output_path"]
    )
    execution = evaluator.evaluate()
    execution.wait()
    
    return {
        "status": execution.status,
        "arn": execution.arn,
        "results_path": execution.s3_output_path
    }

# Include in pipeline
eval_step = LambdaStep(
    name="EvaluateModel",
    lambda_func="arn:aws:lambda:us-west-2:123:function:run-evaluation",
    inputs={
        "model_name": train_step.properties.ModelArtifacts.S3ModelArtifacts,
        "output_path": "s3://bucket/eval"
    },
    outputs=[
        LambdaOutput(output_name="status", output_type="String"),
        LambdaOutput(output_name="results_path", output_type="String")
    ]
)

# Conditional deployment based on evaluation
condition = ConditionEquals(
    left=eval_step.properties.Outputs["status"],
    right="Succeeded"
)

condition_step = ConditionStep(
    name="CheckEvaluation",
    conditions=[condition],
    if_steps=[deploy_step],
    else_steps=[fail_step]
)

pipeline = Pipeline(
    name="train-evaluate-deploy",
    steps=[train_step, eval_step, condition_step]
)

Continuous Evaluation

import boto3
from datetime import datetime

# Schedule daily evaluation
events = boto3.client('events')

# Create EventBridge rule
events.put_rule(
    Name='daily-model-evaluation',
    ScheduleExpression='cron(0 3 * * ? *)',  # 3 AM UTC daily
    State='ENABLED',
    Description='Daily model benchmark evaluation'
)

# Lambda to trigger evaluation
lambda_code = """
def lambda_handler(event, context):
    from sagemaker.train.evaluate import BenchMarkEvaluator
    
    # Get current production model
    model_arn = get_production_model_arn()
    
    # Run evaluation
    evaluator = BenchMarkEvaluator(
        benchmark="MMLU",
        model=model_arn,
        s3_output_path=f"s3://bucket/daily-eval/{datetime.now().date()}"
    )
    
    execution = evaluator.evaluate()
    
    # Don't wait - runs asynchronously
    return {
        'execution_arn': execution.arn,
        'status': 'started'
    }
"""

# Add Lambda as target
events.put_targets(
    Rule='daily-model-evaluation',
    Targets=[{
        'Id': '1',
        'Arn': lambda_function_arn
    }]
)

Support Classes

StepDetail

class StepDetail:
    """
    Pipeline step details for tracking execution progress.

    Represents status and timing information for a single step.

    Fields:
        name: str - Name of the pipeline step
        status: str - Status ("Completed", "Executing", "Waiting", "Failed", "Stopped")
        start_time: Optional[str] - ISO format timestamp when step started
        end_time: Optional[str] - ISO format timestamp when step ended
        display_name: Optional[str] - Human-readable display name
        failure_reason: Optional[str] - Detailed reason if the step failed

    Notes:
        - Part of PipelineExecutionStatus
        - Track individual step progress
        - Identify bottlenecks and failures
    """

PipelineExecutionStatus

class PipelineExecutionStatus:
    """
    Combined pipeline execution status with step details.

    Aggregates overall execution status with detailed step information.

    Fields:
        overall_status: str - Overall execution status
            - "Starting": Pipeline starting
            - "Executing": Steps running
            - "Completed": All steps succeeded
            - "Failed": One or more steps failed
            - "Stopped": Execution stopped
        step_details: List[StepDetail] - List of individual step details
        failure_reason: Optional[str] - Detailed reason if execution failed

    Notes:
        - Provides complete execution state
        - Use to debug failed evaluations
        - Monitor progress for long-running evaluations
    """

Usage:

# Get detailed execution status
execution = evaluator.evaluate()

# Poll for status with step details
import time

while execution.status in ["Starting", "Executing"]:
    status = execution.get_status()
    
    print(f"\nOverall: {status.overall_status}")
    print("Step details:")
    for step in status.step_details:
        duration = ""
        if step.start_time and step.end_time:
            start = datetime.fromisoformat(step.start_time)
            end = datetime.fromisoformat(step.end_time)
            duration = f" ({(end-start).total_seconds():.0f}s)"
        
        print(f"  {step.name}: {step.status}{duration}")
        if step.failure_reason:
            print(f"    Failure: {step.failure_reason}")
    
    time.sleep(60)
    execution.refresh()

print(f"\nFinal status: {execution.status}")

Validation and Constraints

Evaluation Constraints

  • Maximum concurrent evaluations: 10 per account
  • Maximum dataset size: 10 GB
  • Maximum evaluation time: 7 days
  • Minimum dataset samples: 10 (recommended 100+)
  • Maximum custom metrics: 10 per evaluation
  • Prompt template size: Maximum 10 KB

Benchmark Constraints

  • Available benchmarks: Check with get_benchmarks()
  • Subtask availability: Varies by benchmark
  • Model requirements: Must support text generation
  • Token limits: Model-dependent (typically 512-4096)
  • Batch size: 1-32 (affects evaluation speed)

LLM-as-Judge Constraints

  • Supported judge models: Bedrock models only
  • Maximum prompts: 10,000 per evaluation
  • Judge model costs: Per-token pricing applies
  • Evaluation time: ~0.5-2 seconds per sample
  • Custom metric limit: 10 metrics per evaluation

Common Error Scenarios

  1. Benchmark Not Available:

    • Cause: Benchmark not supported in region
    • Solution: Check get_benchmarks() for region, use different benchmark
  2. Model Format Incompatible:

    • Cause: Model doesn't support evaluation format
    • Solution: Ensure model supports text generation with proper prompting
  3. Dataset Format Error:

    • Cause: Dataset doesn't match expected format
    • Solution: Check dataset schema (needs prompt/response/label fields)
  4. Evaluation Timeout:

    • Cause: Model inference too slow
    • Solution: Optimize model, reduce batch size, use faster judge model
  5. Insufficient Quota:

    • Cause: Exceeded concurrent evaluation limit
    • Solution: Wait for current evaluations to complete or request quota increase
  6. Judge Model Access Denied:

    • Cause: Bedrock model not available or no permissions
    • Solution: Enable Bedrock model access, check IAM permissions