or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md
tile.json

evaluation.mddocs/

Model Evaluation

MLlib provides comprehensive evaluation tools for assessing model performance including metrics for classification, regression, and clustering tasks, along with model validation techniques like cross-validation and train-validation split.

Evaluators

BinaryClassificationEvaluator

class BinaryClassificationEvaluator(override val uid: String) extends Evaluator
  with HasLabelCol with HasRawPredictionCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("binEval"))

  final val metricName: Param[String]

  def setMetricName(value: String): BinaryClassificationEvaluator
  def setLabelCol(value: String): BinaryClassificationEvaluator
  def setRawPredictionCol(value: String): BinaryClassificationEvaluator

  override def evaluate(dataset: Dataset[_]): Double
  override def isLargerBetter: Boolean
  override def copy(extra: ParamMap): BinaryClassificationEvaluator
}

MulticlassClassificationEvaluator

class MulticlassClassificationEvaluator(override val uid: String) extends Evaluator
  with HasLabelCol with HasPredictionCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("mcEval"))

  final val metricName: Param[String]
  final val metricLabel: DoubleParam
  final val beta: DoubleParam
  final val eps: DoubleParam

  def setMetricName(value: String): MulticlassClassificationEvaluator
  def setMetricLabel(value: Double): MulticlassClassificationEvaluator
  def setBeta(value: Double): MulticlassClassificationEvaluator
  def setEps(value: Double): MulticlassClassificationEvaluator
  def setLabelCol(value: String): MulticlassClassificationEvaluator
  def setPredictionCol(value: String): MulticlassClassificationEvaluator

  override def evaluate(dataset: Dataset[_]): Double
  override def isLargerBetter: Boolean
  override def copy(extra: ParamMap): MulticlassClassificationEvaluator
}

RegressionEvaluator

class RegressionEvaluator(override val uid: String) extends Evaluator
  with HasLabelCol with HasPredictionCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("regEval"))

  final val metricName: Param[String]
  final val throughOrigin: BooleanParam

  def setMetricName(value: String): RegressionEvaluator
  def setThroughOrigin(value: Boolean): RegressionEvaluator
  def setLabelCol(value: String): RegressionEvaluator
  def setPredictionCol(value: String): RegressionEvaluator

  override def evaluate(dataset: Dataset[_]): Double
  override def isLargerBetter: Boolean
  override def copy(extra: ParamMap): RegressionEvaluator
}

ClusteringEvaluator

class ClusteringEvaluator(override val uid: String) extends Evaluator
  with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("clustering"))

  final val metricName: Param[String]
  final val distanceMeasure: Param[String]

  def setMetricName(value: String): ClusteringEvaluator
  def setDistanceMeasure(value: String): ClusteringEvaluator
  def setPredictionCol(value: String): ClusteringEvaluator
  def setFeaturesCol(value: String): ClusteringEvaluator

  override def evaluate(dataset: Dataset[_]): Double
  override def isLargerBetter: Boolean
  override def copy(extra: ParamMap): ClusteringEvaluator
}

Model Selection and Tuning

CrossValidator

class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel]
  with CrossValidatorParams with MLWritable with Logging {

  def this() = this(Identifiable.randomUID("cv"))

  def setEstimator(value: Estimator[_]): CrossValidator
  def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
  def setEvaluator(value: Evaluator): CrossValidator
  def setNumFolds(value: Int): CrossValidator
  def setSeed(value: Long): CrossValidator
  def setParallelism(value: Int): CrossValidator
  def setCollectSubModels(value: Boolean): CrossValidator
  def setFoldCol(value: String): CrossValidator

  override def fit(dataset: Dataset[_]): CrossValidatorModel
  override def copy(extra: ParamMap): CrossValidator
  def write: MLWriter
}

class CrossValidatorModel(override val uid: String, val bestModel: Model[_], val avgMetrics: Array[Double])
  extends Model[CrossValidatorModel] with CrossValidatorParams with MLWritable {

  lazy val subModels: Option[Array[Array[Model[_]]]]

  override def transform(dataset: Dataset[_]): DataFrame
  override def transformSchema(schema: StructType): StructType
  override def copy(extra: ParamMap): CrossValidatorModel
  def write: MLWriter
}

TrainValidationSplit

class TrainValidationSplit(override val uid: String) extends Estimator[TrainValidationSplitModel]
  with TrainValidationSplitParams with MLWritable with Logging {

  def this() = this(Identifiable.randomUID("tvs"))

  def setEstimator(value: Estimator[_]): TrainValidationSplit
  def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
  def setEvaluator(value: Evaluator): TrainValidationSplit
  def setTrainRatio(value: Double): TrainValidationSplit
  def setSeed(value: Long): TrainValidationSplit
  def setParallelism(value: Int): TrainValidationSplit
  def setCollectSubModels(value: Boolean): TrainValidationSplit

  override def fit(dataset: Dataset[_]): TrainValidationSplitModel
  override def copy(extra: ParamMap): TrainValidationSplit
  def write: MLWriter
}

class TrainValidationSplitModel(override val uid: String, val bestModel: Model[_], val validationMetrics: Array[Double])
  extends Model[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {

  lazy val subModels: Option[Array[Model[_]]]

  override def transform(dataset: Dataset[_]): DataFrame
  override def transformSchema(schema: StructType): StructType
  override def copy(extra: ParamMap): TrainValidationSplitModel
  def write: MLWriter
}

ParamGridBuilder

class ParamGridBuilder {
  def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
  def addGrid[T](param: Param[T], values: java.util.List[T]): ParamGridBuilder
  def baseOn(paramMap: ParamMap): ParamGridBuilder
  def baseOn(paramPairs: ParamPair[_]*): ParamGridBuilder
  def build(): Array[ParamMap]
}

Base Evaluator

abstract class Evaluator extends Params {
  def evaluate(dataset: Dataset[_]): Double
  def isLargerBetter: Boolean
  def copy(extra: ParamMap): Evaluator
}

Usage Examples

Binary Classification Evaluation

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

// Train model (assuming data is prepared)
val lr = new LogisticRegression()
val model = lr.fit(trainingData)
val predictions = model.transform(testData)

// Binary classification evaluation
val binaryEvaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setRawPredictionCol("rawPrediction")

// Area Under ROC Curve
val aucROC = binaryEvaluator.setMetricName("areaUnderROC").evaluate(predictions)
println(s"Area Under ROC: $aucROC")

// Area Under Precision-Recall Curve
val aucPR = binaryEvaluator.setMetricName("areaUnderPR").evaluate(predictions)
println(s"Area Under PR: $aucPR")

// Print model summary for additional metrics
if (model.hasSummary) {
  val summary = model.summary.asInstanceOf[org.apache.spark.ml.classification.BinaryLogisticRegressionSummary]
  
  // ROC curve
  summary.roc.show()
  
  // Precision-Recall curve  
  summary.pr.show()
  
  // Metrics by threshold
  summary.fMeasureByThreshold.show()
  summary.precisionByThreshold.show()
  summary.recallByThreshold.show()
}

Multiclass Classification Evaluation

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val rf = new RandomForestClassifier()
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)

val multiclassEvaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")

// Overall accuracy
val accuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
println(s"Accuracy: $accuracy")

// Weighted metrics (accounts for class imbalance)
val weightedPrecision = multiclassEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val weightedRecall = multiclassEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)
val weightedF1 = multiclassEvaluator.setMetricName("f1").evaluate(rfPredictions)

println(s"Weighted Precision: $weightedPrecision")
println(s"Weighted Recall: $weightedRecall")
println(s"Weighted F1-Score: $weightedF1")

// Precision and recall for specific class (e.g., class 1.0)
val precisionClass1 = multiclassEvaluator
  .setMetricName("precisionByLabel")
  .setMetricLabel(1.0)
  .evaluate(rfPredictions)

val recallClass1 = multiclassEvaluator
  .setMetricName("recallByLabel")
  .setMetricLabel(1.0)
  .evaluate(rfPredictions)

println(s"Precision for class 1: $precisionClass1")
println(s"Recall for class 1: $recallClass1")

// F1-score with custom beta (beta=2 emphasizes recall over precision)
val f2Score = multiclassEvaluator
  .setMetricName("f1")
  .setBeta(2.0)
  .evaluate(rfPredictions)

println(s"F2-Score: $f2Score")

Regression Evaluation

import org.apache.spark.ml.regression.{LinearRegression, RandomForestRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator

// Train models
val lr = new LinearRegression()
val lrModel = lr.fit(trainingData)
val lrPredictions = lrModel.transform(testData)

val rf = new RandomForestRegressor()
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)

val regressionEvaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")

// Evaluate different metrics for linear regression
val metrics = Array("rmse", "mse", "mae", "r2")
println("Linear Regression Metrics:")
metrics.foreach { metric =>
  val result = regressionEvaluator.setMetricName(metric).evaluate(lrPredictions)
  println(s"$metric: $result")
}

println("\nRandom Forest Regression Metrics:")
metrics.foreach { metric =>
  val result = regressionEvaluator.setMetricName(metric).evaluate(rfPredictions)
  println(s"$metric: $result")
}

// Explained variance
val explainedVariance = regressionEvaluator
  .setMetricName("var")
  .evaluate(lrPredictions)
println(s"Explained Variance: $explainedVariance")

// Compare models
val lrRMSE = regressionEvaluator.setMetricName("rmse").evaluate(lrPredictions)
val rfRMSE = regressionEvaluator.setMetricName("rmse").evaluate(rfPredictions)

println(s"\nModel Comparison (RMSE):")
println(s"Linear Regression: $lrRMSE")
println(s"Random Forest: $rfRMSE")
println(s"Best Model: ${if (lrRMSE < rfRMSE) "Linear Regression" else "Random Forest"}")

Clustering Evaluation

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Train clustering models with different k values
val kValues = Array(2, 3, 4, 5, 6)
val clusteringEvaluator = new ClusteringEvaluator()
  .setPredictionCol("prediction")
  .setFeaturesCol("features")
  .setMetricName("silhouette")
  .setDistanceMeasure("squaredEuclidean")

println("Clustering Evaluation (Silhouette Score):")
kValues.foreach { k =>
  val kmeans = new KMeans().setK(k).setSeed(42)
  val model = kmeans.fit(data)
  val predictions = model.transform(data)
  
  val silhouette = clusteringEvaluator.evaluate(predictions)
  println(s"k=$k: Silhouette Score = $silhouette")
}

// Find optimal k using elbow method (inertia/cost)
println("\nElbow Method (Within Set Sum of Squared Errors):")
kValues.foreach { k =>
  val kmeans = new KMeans().setK(k).setSeed(42)
  val model = kmeans.fit(data)
  val cost = model.computeCost(data)
  println(s"k=$k: WSSSE = $cost")
}

Cross-Validation for Hyperparameter Tuning

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setSeed(42)

// Create parameter grid
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(10, 20, 30))
  .addGrid(rf.maxDepth, Array(5, 10, 15))
  .addGrid(rf.maxBins, Array(16, 32))
  .build()

// Create evaluator
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("f1")

// Create cross-validator
val cv = new CrossValidator()
  .setEstimator(rf)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)
  .setParallelism(2)  // Parallel execution
  .setSeed(42)

// Fit cross-validator
val cvModel = cv.fit(trainingData)

// Get results
println(s"Best F1 Score: ${cvModel.avgMetrics.max}")
println("All cross-validation scores:")
cvModel.avgMetrics.zipWithIndex.foreach { case (metric, idx) =>
  println(s"  Param set $idx: $metric")
}

// Get best model parameters
val bestModel = cvModel.bestModel.asInstanceOf[RandomForestClassificationModel]
println(s"Best parameters:")
println(s"  Number of trees: ${bestModel.getNumTrees}")
println(s"  Max depth: ${bestModel.getMaxDepth}")
println(s"  Max bins: ${bestModel.getMaxBins}")

// Evaluate on test data
val finalPredictions = cvModel.transform(testData)
val testScore = evaluator.evaluate(finalPredictions)
println(s"Test F1 Score: $testScore")

Train-Validation Split

import org.apache.spark.ml.tuning.TrainValidationSplit
import org.apache.spark.ml.regression.{LinearRegression, RandomForestRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator

// Compare different algorithms
val lr = new LinearRegression()
val rf = new RandomForestRegressor()

// Parameter grids for each algorithm
val lrParamGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.001, 0.01, 0.1))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

val rfParamGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(10, 20, 30))
  .addGrid(rf.maxDepth, Array(5, 10))
  .build()

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")

// Train-validation split for linear regression
val lrTvs = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(lrParamGrid)
  .setTrainRatio(0.8)
  .setSeed(42)

val lrTvsModel = lrTvs.fit(data)

// Train-validation split for random forest
val rfTvs = new TrainValidationSplit()
  .setEstimator(rf)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(rfParamGrid)
  .setTrainRatio(0.8)
  .setSeed(42)

val rfTvsModel = rfTvs.fit(data)

// Compare validation metrics
val lrBestScore = lrTvsModel.validationMetrics.min // RMSE: lower is better
val rfBestScore = rfTvsModel.validationMetrics.min

println(s"Linear Regression - Best validation RMSE: $lrBestScore")
println(s"Random Forest - Best validation RMSE: $rfBestScore")

val bestAlgorithm = if (lrBestScore < rfBestScore) {
  println("Linear Regression performs better")
  lrTvsModel
} else {
  println("Random Forest performs better") 
  rfTvsModel
}

// Final evaluation on test data
val testPredictions = bestAlgorithm.transform(testData)
val testRMSE = evaluator.evaluate(testPredictions)
println(s"Final test RMSE: $testRMSE")

Custom Evaluation Metrics

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Custom evaluation function for classification
def customClassificationMetrics(predictions: DataFrame): Unit = {
  // Confusion matrix
  val confusionMatrix = predictions
    .groupBy("label", "prediction")
    .count()
    .orderBy("label", "prediction")
  
  println("Confusion Matrix:")
  confusionMatrix.show()
  
  // Per-class metrics
  val classMetrics = predictions
    .groupBy("label")
    .agg(
      count("*").alias("total"),
      sum(when(col("label") === col("prediction"), 1).otherwise(0)).alias("correct")
    )
    .withColumn("accuracy", col("correct") / col("total"))
  
  println("Per-class Accuracy:")
  classMetrics.show()
  
  // Overall metrics
  val totalPredictions = predictions.count()
  val correctPredictions = predictions
    .filter(col("label") === col("prediction"))
    .count()
  
  val overallAccuracy = correctPredictions.toDouble / totalPredictions
  println(s"Overall Accuracy: $overallAccuracy")
}

// Custom evaluation for regression
def customRegressionMetrics(predictions: DataFrame): Unit = {
  import org.apache.spark.sql.functions._
  
  val metrics = predictions
    .select(
      mean(abs(col("label") - col("prediction"))).alias("mae"),
      sqrt(mean(pow(col("label") - col("prediction"), 2))).alias("rmse"),
      mean(pow(col("label") - col("prediction"), 2)).alias("mse"),
      corr("label", "prediction").alias("correlation")
    )
  
  println("Custom Regression Metrics:")
  metrics.show()
  
  // Residual analysis
  val residuals = predictions
    .withColumn("residual", col("label") - col("prediction"))
    .withColumn("abs_residual", abs(col("residual")))
    .withColumn("squared_residual", pow(col("residual"), 2))
  
  val residualStats = residuals
    .select(
      mean("residual").alias("mean_residual"),
      stddev("residual").alias("std_residual"),
      min("residual").alias("min_residual"),
      max("residual").alias("max_residual")
    )
  
  println("Residual Statistics:")
  residualStats.show()
}

// Usage with model predictions
val predictions = model.transform(testData)
customClassificationMetrics(predictions)
// or
// customRegressionMetrics(predictions)

Model Performance Monitoring

import org.apache.spark.sql.functions._

def monitorModelPerformance(predictions: DataFrame, 
                          evaluator: BinaryClassificationEvaluator,
                          timeCol: String = "timestamp"): Unit = {
  
  // Performance over time
  val performanceOverTime = predictions
    .withColumn("date", to_date(col(timeCol)))
    .groupBy("date")
    .agg(
      count("*").alias("num_predictions"),
      mean(when(col("label") === col("prediction"), 1.0).otherwise(0.0)).alias("accuracy"),
      mean("rawPrediction").alias("avg_confidence")
    )
    .orderBy("date")
  
  println("Performance Over Time:")
  performanceOverTime.show()
  
  // Prediction distribution
  val predictionDist = predictions
    .groupBy("prediction")
    .count()
    .orderBy("prediction")
  
  println("Prediction Distribution:")
  predictionDist.show()
  
  // Confidence analysis (for probabilistic models)
  if (predictions.columns.contains("probability")) {
    val confidenceAnalysis = predictions
      .withColumn("max_probability", 
        expr("transform(probability.values, x -> x)").getItem(0))
      .groupBy("prediction")
      .agg(
        count("*").alias("count"),
        mean("max_probability").alias("avg_confidence"),
        min("max_probability").alias("min_confidence"),
        max("max_probability").alias("max_confidence")
      )
    
    println("Confidence Analysis by Prediction:")
    confidenceAnalysis.show()
  }
}

// Monitor model performance
monitorModelPerformance(predictions, binaryEvaluator)