MLlib provides comprehensive evaluation tools for assessing model performance including metrics for classification, regression, and clustering tasks, along with model validation techniques like cross-validation and train-validation split.
class BinaryClassificationEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasRawPredictionCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("binEval"))
final val metricName: Param[String]
def setMetricName(value: String): BinaryClassificationEvaluator
def setLabelCol(value: String): BinaryClassificationEvaluator
def setRawPredictionCol(value: String): BinaryClassificationEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): BinaryClassificationEvaluator
}class MulticlassClassificationEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasPredictionCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("mcEval"))
final val metricName: Param[String]
final val metricLabel: DoubleParam
final val beta: DoubleParam
final val eps: DoubleParam
def setMetricName(value: String): MulticlassClassificationEvaluator
def setMetricLabel(value: Double): MulticlassClassificationEvaluator
def setBeta(value: Double): MulticlassClassificationEvaluator
def setEps(value: Double): MulticlassClassificationEvaluator
def setLabelCol(value: String): MulticlassClassificationEvaluator
def setPredictionCol(value: String): MulticlassClassificationEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): MulticlassClassificationEvaluator
}class RegressionEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasPredictionCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("regEval"))
final val metricName: Param[String]
final val throughOrigin: BooleanParam
def setMetricName(value: String): RegressionEvaluator
def setThroughOrigin(value: Boolean): RegressionEvaluator
def setLabelCol(value: String): RegressionEvaluator
def setPredictionCol(value: String): RegressionEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): RegressionEvaluator
}class ClusteringEvaluator(override val uid: String) extends Evaluator
with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("clustering"))
final val metricName: Param[String]
final val distanceMeasure: Param[String]
def setMetricName(value: String): ClusteringEvaluator
def setDistanceMeasure(value: String): ClusteringEvaluator
def setPredictionCol(value: String): ClusteringEvaluator
def setFeaturesCol(value: String): ClusteringEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): ClusteringEvaluator
}class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel]
with CrossValidatorParams with MLWritable with Logging {
def this() = this(Identifiable.randomUID("cv"))
def setEstimator(value: Estimator[_]): CrossValidator
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
def setEvaluator(value: Evaluator): CrossValidator
def setNumFolds(value: Int): CrossValidator
def setSeed(value: Long): CrossValidator
def setParallelism(value: Int): CrossValidator
def setCollectSubModels(value: Boolean): CrossValidator
def setFoldCol(value: String): CrossValidator
override def fit(dataset: Dataset[_]): CrossValidatorModel
override def copy(extra: ParamMap): CrossValidator
def write: MLWriter
}
class CrossValidatorModel(override val uid: String, val bestModel: Model[_], val avgMetrics: Array[Double])
extends Model[CrossValidatorModel] with CrossValidatorParams with MLWritable {
lazy val subModels: Option[Array[Array[Model[_]]]]
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): CrossValidatorModel
def write: MLWriter
}class TrainValidationSplit(override val uid: String) extends Estimator[TrainValidationSplitModel]
with TrainValidationSplitParams with MLWritable with Logging {
def this() = this(Identifiable.randomUID("tvs"))
def setEstimator(value: Estimator[_]): TrainValidationSplit
def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
def setEvaluator(value: Evaluator): TrainValidationSplit
def setTrainRatio(value: Double): TrainValidationSplit
def setSeed(value: Long): TrainValidationSplit
def setParallelism(value: Int): TrainValidationSplit
def setCollectSubModels(value: Boolean): TrainValidationSplit
override def fit(dataset: Dataset[_]): TrainValidationSplitModel
override def copy(extra: ParamMap): TrainValidationSplit
def write: MLWriter
}
class TrainValidationSplitModel(override val uid: String, val bestModel: Model[_], val validationMetrics: Array[Double])
extends Model[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {
lazy val subModels: Option[Array[Model[_]]]
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): TrainValidationSplitModel
def write: MLWriter
}class ParamGridBuilder {
def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
def addGrid[T](param: Param[T], values: java.util.List[T]): ParamGridBuilder
def baseOn(paramMap: ParamMap): ParamGridBuilder
def baseOn(paramPairs: ParamPair[_]*): ParamGridBuilder
def build(): Array[ParamMap]
}abstract class Evaluator extends Params {
def evaluate(dataset: Dataset[_]): Double
def isLargerBetter: Boolean
def copy(extra: ParamMap): Evaluator
}import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// Train model (assuming data is prepared)
val lr = new LogisticRegression()
val model = lr.fit(trainingData)
val predictions = model.transform(testData)
// Binary classification evaluation
val binaryEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
// Area Under ROC Curve
val aucROC = binaryEvaluator.setMetricName("areaUnderROC").evaluate(predictions)
println(s"Area Under ROC: $aucROC")
// Area Under Precision-Recall Curve
val aucPR = binaryEvaluator.setMetricName("areaUnderPR").evaluate(predictions)
println(s"Area Under PR: $aucPR")
// Print model summary for additional metrics
if (model.hasSummary) {
val summary = model.summary.asInstanceOf[org.apache.spark.ml.classification.BinaryLogisticRegressionSummary]
// ROC curve
summary.roc.show()
// Precision-Recall curve
summary.pr.show()
// Metrics by threshold
summary.fMeasureByThreshold.show()
summary.precisionByThreshold.show()
summary.recallByThreshold.show()
}import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val rf = new RandomForestClassifier()
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)
val multiclassEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
// Overall accuracy
val accuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
println(s"Accuracy: $accuracy")
// Weighted metrics (accounts for class imbalance)
val weightedPrecision = multiclassEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val weightedRecall = multiclassEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)
val weightedF1 = multiclassEvaluator.setMetricName("f1").evaluate(rfPredictions)
println(s"Weighted Precision: $weightedPrecision")
println(s"Weighted Recall: $weightedRecall")
println(s"Weighted F1-Score: $weightedF1")
// Precision and recall for specific class (e.g., class 1.0)
val precisionClass1 = multiclassEvaluator
.setMetricName("precisionByLabel")
.setMetricLabel(1.0)
.evaluate(rfPredictions)
val recallClass1 = multiclassEvaluator
.setMetricName("recallByLabel")
.setMetricLabel(1.0)
.evaluate(rfPredictions)
println(s"Precision for class 1: $precisionClass1")
println(s"Recall for class 1: $recallClass1")
// F1-score with custom beta (beta=2 emphasizes recall over precision)
val f2Score = multiclassEvaluator
.setMetricName("f1")
.setBeta(2.0)
.evaluate(rfPredictions)
println(s"F2-Score: $f2Score")import org.apache.spark.ml.regression.{LinearRegression, RandomForestRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator
// Train models
val lr = new LinearRegression()
val lrModel = lr.fit(trainingData)
val lrPredictions = lrModel.transform(testData)
val rf = new RandomForestRegressor()
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)
val regressionEvaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
// Evaluate different metrics for linear regression
val metrics = Array("rmse", "mse", "mae", "r2")
println("Linear Regression Metrics:")
metrics.foreach { metric =>
val result = regressionEvaluator.setMetricName(metric).evaluate(lrPredictions)
println(s"$metric: $result")
}
println("\nRandom Forest Regression Metrics:")
metrics.foreach { metric =>
val result = regressionEvaluator.setMetricName(metric).evaluate(rfPredictions)
println(s"$metric: $result")
}
// Explained variance
val explainedVariance = regressionEvaluator
.setMetricName("var")
.evaluate(lrPredictions)
println(s"Explained Variance: $explainedVariance")
// Compare models
val lrRMSE = regressionEvaluator.setMetricName("rmse").evaluate(lrPredictions)
val rfRMSE = regressionEvaluator.setMetricName("rmse").evaluate(rfPredictions)
println(s"\nModel Comparison (RMSE):")
println(s"Linear Regression: $lrRMSE")
println(s"Random Forest: $rfRMSE")
println(s"Best Model: ${if (lrRMSE < rfRMSE) "Linear Regression" else "Random Forest"}")import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Train clustering models with different k values
val kValues = Array(2, 3, 4, 5, 6)
val clusteringEvaluator = new ClusteringEvaluator()
.setPredictionCol("prediction")
.setFeaturesCol("features")
.setMetricName("silhouette")
.setDistanceMeasure("squaredEuclidean")
println("Clustering Evaluation (Silhouette Score):")
kValues.foreach { k =>
val kmeans = new KMeans().setK(k).setSeed(42)
val model = kmeans.fit(data)
val predictions = model.transform(data)
val silhouette = clusteringEvaluator.evaluate(predictions)
println(s"k=$k: Silhouette Score = $silhouette")
}
// Find optimal k using elbow method (inertia/cost)
println("\nElbow Method (Within Set Sum of Squared Errors):")
kValues.foreach { k =>
val kmeans = new KMeans().setK(k).setSeed(42)
val model = kmeans.fit(data)
val cost = model.computeCost(data)
println(s"k=$k: WSSSE = $cost")
}import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setSeed(42)
// Create parameter grid
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(10, 20, 30))
.addGrid(rf.maxDepth, Array(5, 10, 15))
.addGrid(rf.maxBins, Array(16, 32))
.build()
// Create evaluator
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("f1")
// Create cross-validator
val cv = new CrossValidator()
.setEstimator(rf)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
.setParallelism(2) // Parallel execution
.setSeed(42)
// Fit cross-validator
val cvModel = cv.fit(trainingData)
// Get results
println(s"Best F1 Score: ${cvModel.avgMetrics.max}")
println("All cross-validation scores:")
cvModel.avgMetrics.zipWithIndex.foreach { case (metric, idx) =>
println(s" Param set $idx: $metric")
}
// Get best model parameters
val bestModel = cvModel.bestModel.asInstanceOf[RandomForestClassificationModel]
println(s"Best parameters:")
println(s" Number of trees: ${bestModel.getNumTrees}")
println(s" Max depth: ${bestModel.getMaxDepth}")
println(s" Max bins: ${bestModel.getMaxBins}")
// Evaluate on test data
val finalPredictions = cvModel.transform(testData)
val testScore = evaluator.evaluate(finalPredictions)
println(s"Test F1 Score: $testScore")import org.apache.spark.ml.tuning.TrainValidationSplit
import org.apache.spark.ml.regression.{LinearRegression, RandomForestRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator
// Compare different algorithms
val lr = new LinearRegression()
val rf = new RandomForestRegressor()
// Parameter grids for each algorithm
val lrParamGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001, 0.01, 0.1))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
val rfParamGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(10, 20, 30))
.addGrid(rf.maxDepth, Array(5, 10))
.build()
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
// Train-validation split for linear regression
val lrTvs = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(evaluator)
.setEstimatorParamMaps(lrParamGrid)
.setTrainRatio(0.8)
.setSeed(42)
val lrTvsModel = lrTvs.fit(data)
// Train-validation split for random forest
val rfTvs = new TrainValidationSplit()
.setEstimator(rf)
.setEvaluator(evaluator)
.setEstimatorParamMaps(rfParamGrid)
.setTrainRatio(0.8)
.setSeed(42)
val rfTvsModel = rfTvs.fit(data)
// Compare validation metrics
val lrBestScore = lrTvsModel.validationMetrics.min // RMSE: lower is better
val rfBestScore = rfTvsModel.validationMetrics.min
println(s"Linear Regression - Best validation RMSE: $lrBestScore")
println(s"Random Forest - Best validation RMSE: $rfBestScore")
val bestAlgorithm = if (lrBestScore < rfBestScore) {
println("Linear Regression performs better")
lrTvsModel
} else {
println("Random Forest performs better")
rfTvsModel
}
// Final evaluation on test data
val testPredictions = bestAlgorithm.transform(testData)
val testRMSE = evaluator.evaluate(testPredictions)
println(s"Final test RMSE: $testRMSE")import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// Custom evaluation function for classification
def customClassificationMetrics(predictions: DataFrame): Unit = {
// Confusion matrix
val confusionMatrix = predictions
.groupBy("label", "prediction")
.count()
.orderBy("label", "prediction")
println("Confusion Matrix:")
confusionMatrix.show()
// Per-class metrics
val classMetrics = predictions
.groupBy("label")
.agg(
count("*").alias("total"),
sum(when(col("label") === col("prediction"), 1).otherwise(0)).alias("correct")
)
.withColumn("accuracy", col("correct") / col("total"))
println("Per-class Accuracy:")
classMetrics.show()
// Overall metrics
val totalPredictions = predictions.count()
val correctPredictions = predictions
.filter(col("label") === col("prediction"))
.count()
val overallAccuracy = correctPredictions.toDouble / totalPredictions
println(s"Overall Accuracy: $overallAccuracy")
}
// Custom evaluation for regression
def customRegressionMetrics(predictions: DataFrame): Unit = {
import org.apache.spark.sql.functions._
val metrics = predictions
.select(
mean(abs(col("label") - col("prediction"))).alias("mae"),
sqrt(mean(pow(col("label") - col("prediction"), 2))).alias("rmse"),
mean(pow(col("label") - col("prediction"), 2)).alias("mse"),
corr("label", "prediction").alias("correlation")
)
println("Custom Regression Metrics:")
metrics.show()
// Residual analysis
val residuals = predictions
.withColumn("residual", col("label") - col("prediction"))
.withColumn("abs_residual", abs(col("residual")))
.withColumn("squared_residual", pow(col("residual"), 2))
val residualStats = residuals
.select(
mean("residual").alias("mean_residual"),
stddev("residual").alias("std_residual"),
min("residual").alias("min_residual"),
max("residual").alias("max_residual")
)
println("Residual Statistics:")
residualStats.show()
}
// Usage with model predictions
val predictions = model.transform(testData)
customClassificationMetrics(predictions)
// or
// customRegressionMetrics(predictions)import org.apache.spark.sql.functions._
def monitorModelPerformance(predictions: DataFrame,
evaluator: BinaryClassificationEvaluator,
timeCol: String = "timestamp"): Unit = {
// Performance over time
val performanceOverTime = predictions
.withColumn("date", to_date(col(timeCol)))
.groupBy("date")
.agg(
count("*").alias("num_predictions"),
mean(when(col("label") === col("prediction"), 1.0).otherwise(0.0)).alias("accuracy"),
mean("rawPrediction").alias("avg_confidence")
)
.orderBy("date")
println("Performance Over Time:")
performanceOverTime.show()
// Prediction distribution
val predictionDist = predictions
.groupBy("prediction")
.count()
.orderBy("prediction")
println("Prediction Distribution:")
predictionDist.show()
// Confidence analysis (for probabilistic models)
if (predictions.columns.contains("probability")) {
val confidenceAnalysis = predictions
.withColumn("max_probability",
expr("transform(probability.values, x -> x)").getItem(0))
.groupBy("prediction")
.agg(
count("*").alias("count"),
mean("max_probability").alias("avg_confidence"),
min("max_probability").alias("min_confidence"),
max("max_probability").alias("max_confidence")
)
println("Confidence Analysis by Prediction:")
confidenceAnalysis.show()
}
}
// Monitor model performance
monitorModelPerformance(predictions, binaryEvaluator)