Apache Spark MLlib is a scalable machine learning library that provides high-level APIs for common machine learning algorithms and utilities
MLlib provides comprehensive evaluation tools for assessing model performance including metrics for classification, regression, and clustering tasks, along with model validation techniques like cross-validation and train-validation split.
class BinaryClassificationEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasRawPredictionCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("binEval"))
final val metricName: Param[String]
def setMetricName(value: String): BinaryClassificationEvaluator
def setLabelCol(value: String): BinaryClassificationEvaluator
def setRawPredictionCol(value: String): BinaryClassificationEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): BinaryClassificationEvaluator
}class MulticlassClassificationEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasPredictionCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("mcEval"))
final val metricName: Param[String]
final val metricLabel: DoubleParam
final val beta: DoubleParam
final val eps: DoubleParam
def setMetricName(value: String): MulticlassClassificationEvaluator
def setMetricLabel(value: Double): MulticlassClassificationEvaluator
def setBeta(value: Double): MulticlassClassificationEvaluator
def setEps(value: Double): MulticlassClassificationEvaluator
def setLabelCol(value: String): MulticlassClassificationEvaluator
def setPredictionCol(value: String): MulticlassClassificationEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): MulticlassClassificationEvaluator
}class RegressionEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasPredictionCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("regEval"))
final val metricName: Param[String]
final val throughOrigin: BooleanParam
def setMetricName(value: String): RegressionEvaluator
def setThroughOrigin(value: Boolean): RegressionEvaluator
def setLabelCol(value: String): RegressionEvaluator
def setPredictionCol(value: String): RegressionEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): RegressionEvaluator
}class ClusteringEvaluator(override val uid: String) extends Evaluator
with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("clustering"))
final val metricName: Param[String]
final val distanceMeasure: Param[String]
def setMetricName(value: String): ClusteringEvaluator
def setDistanceMeasure(value: String): ClusteringEvaluator
def setPredictionCol(value: String): ClusteringEvaluator
def setFeaturesCol(value: String): ClusteringEvaluator
override def evaluate(dataset: Dataset[_]): Double
override def isLargerBetter: Boolean
override def copy(extra: ParamMap): ClusteringEvaluator
}class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel]
with CrossValidatorParams with MLWritable with Logging {
def this() = this(Identifiable.randomUID("cv"))
def setEstimator(value: Estimator[_]): CrossValidator
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
def setEvaluator(value: Evaluator): CrossValidator
def setNumFolds(value: Int): CrossValidator
def setSeed(value: Long): CrossValidator
def setParallelism(value: Int): CrossValidator
def setCollectSubModels(value: Boolean): CrossValidator
def setFoldCol(value: String): CrossValidator
override def fit(dataset: Dataset[_]): CrossValidatorModel
override def copy(extra: ParamMap): CrossValidator
def write: MLWriter
}
class CrossValidatorModel(override val uid: String, val bestModel: Model[_], val avgMetrics: Array[Double])
extends Model[CrossValidatorModel] with CrossValidatorParams with MLWritable {
lazy val subModels: Option[Array[Array[Model[_]]]]
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): CrossValidatorModel
def write: MLWriter
}class TrainValidationSplit(override val uid: String) extends Estimator[TrainValidationSplitModel]
with TrainValidationSplitParams with MLWritable with Logging {
def this() = this(Identifiable.randomUID("tvs"))
def setEstimator(value: Estimator[_]): TrainValidationSplit
def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
def setEvaluator(value: Evaluator): TrainValidationSplit
def setTrainRatio(value: Double): TrainValidationSplit
def setSeed(value: Long): TrainValidationSplit
def setParallelism(value: Int): TrainValidationSplit
def setCollectSubModels(value: Boolean): TrainValidationSplit
override def fit(dataset: Dataset[_]): TrainValidationSplitModel
override def copy(extra: ParamMap): TrainValidationSplit
def write: MLWriter
}
class TrainValidationSplitModel(override val uid: String, val bestModel: Model[_], val validationMetrics: Array[Double])
extends Model[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {
lazy val subModels: Option[Array[Model[_]]]
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): TrainValidationSplitModel
def write: MLWriter
}class ParamGridBuilder {
def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
def addGrid[T](param: Param[T], values: java.util.List[T]): ParamGridBuilder
def baseOn(paramMap: ParamMap): ParamGridBuilder
def baseOn(paramPairs: ParamPair[_]*): ParamGridBuilder
def build(): Array[ParamMap]
}abstract class Evaluator extends Params {
def evaluate(dataset: Dataset[_]): Double
def isLargerBetter: Boolean
def copy(extra: ParamMap): Evaluator
}import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// Train model (assuming data is prepared)
val lr = new LogisticRegression()
val model = lr.fit(trainingData)
val predictions = model.transform(testData)
// Binary classification evaluation
val binaryEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
// Area Under ROC Curve
val aucROC = binaryEvaluator.setMetricName("areaUnderROC").evaluate(predictions)
println(s"Area Under ROC: $aucROC")
// Area Under Precision-Recall Curve
val aucPR = binaryEvaluator.setMetricName("areaUnderPR").evaluate(predictions)
println(s"Area Under PR: $aucPR")
// Print model summary for additional metrics
if (model.hasSummary) {
val summary = model.summary.asInstanceOf[org.apache.spark.ml.classification.BinaryLogisticRegressionSummary]
// ROC curve
summary.roc.show()
// Precision-Recall curve
summary.pr.show()
// Metrics by threshold
summary.fMeasureByThreshold.show()
summary.precisionByThreshold.show()
summary.recallByThreshold.show()
}import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val rf = new RandomForestClassifier()
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)
val multiclassEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
// Overall accuracy
val accuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
println(s"Accuracy: $accuracy")
// Weighted metrics (accounts for class imbalance)
val weightedPrecision = multiclassEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val weightedRecall = multiclassEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)
val weightedF1 = multiclassEvaluator.setMetricName("f1").evaluate(rfPredictions)
println(s"Weighted Precision: $weightedPrecision")
println(s"Weighted Recall: $weightedRecall")
println(s"Weighted F1-Score: $weightedF1")
// Precision and recall for specific class (e.g., class 1.0)
val precisionClass1 = multiclassEvaluator
.setMetricName("precisionByLabel")
.setMetricLabel(1.0)
.evaluate(rfPredictions)
val recallClass1 = multiclassEvaluator
.setMetricName("recallByLabel")
.setMetricLabel(1.0)
.evaluate(rfPredictions)
println(s"Precision for class 1: $precisionClass1")
println(s"Recall for class 1: $recallClass1")
// F1-score with custom beta (beta=2 emphasizes recall over precision)
val f2Score = multiclassEvaluator
.setMetricName("f1")
.setBeta(2.0)
.evaluate(rfPredictions)
println(s"F2-Score: $f2Score")import org.apache.spark.ml.regression.{LinearRegression, RandomForestRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator
// Train models
val lr = new LinearRegression()
val lrModel = lr.fit(trainingData)
val lrPredictions = lrModel.transform(testData)
val rf = new RandomForestRegressor()
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)
val regressionEvaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
// Evaluate different metrics for linear regression
val metrics = Array("rmse", "mse", "mae", "r2")
println("Linear Regression Metrics:")
metrics.foreach { metric =>
val result = regressionEvaluator.setMetricName(metric).evaluate(lrPredictions)
println(s"$metric: $result")
}
println("\nRandom Forest Regression Metrics:")
metrics.foreach { metric =>
val result = regressionEvaluator.setMetricName(metric).evaluate(rfPredictions)
println(s"$metric: $result")
}
// Explained variance
val explainedVariance = regressionEvaluator
.setMetricName("var")
.evaluate(lrPredictions)
println(s"Explained Variance: $explainedVariance")
// Compare models
val lrRMSE = regressionEvaluator.setMetricName("rmse").evaluate(lrPredictions)
val rfRMSE = regressionEvaluator.setMetricName("rmse").evaluate(rfPredictions)
println(s"\nModel Comparison (RMSE):")
println(s"Linear Regression: $lrRMSE")
println(s"Random Forest: $rfRMSE")
println(s"Best Model: ${if (lrRMSE < rfRMSE) "Linear Regression" else "Random Forest"}")import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Train clustering models with different k values
val kValues = Array(2, 3, 4, 5, 6)
val clusteringEvaluator = new ClusteringEvaluator()
.setPredictionCol("prediction")
.setFeaturesCol("features")
.setMetricName("silhouette")
.setDistanceMeasure("squaredEuclidean")
println("Clustering Evaluation (Silhouette Score):")
kValues.foreach { k =>
val kmeans = new KMeans().setK(k).setSeed(42)
val model = kmeans.fit(data)
val predictions = model.transform(data)
val silhouette = clusteringEvaluator.evaluate(predictions)
println(s"k=$k: Silhouette Score = $silhouette")
}
// Find optimal k using elbow method (inertia/cost)
println("\nElbow Method (Within Set Sum of Squared Errors):")
kValues.foreach { k =>
val kmeans = new KMeans().setK(k).setSeed(42)
val model = kmeans.fit(data)
val cost = model.computeCost(data)
println(s"k=$k: WSSSE = $cost")
}import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setSeed(42)
// Create parameter grid
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(10, 20, 30))
.addGrid(rf.maxDepth, Array(5, 10, 15))
.addGrid(rf.maxBins, Array(16, 32))
.build()
// Create evaluator
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("f1")
// Create cross-validator
val cv = new CrossValidator()
.setEstimator(rf)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
.setParallelism(2) // Parallel execution
.setSeed(42)
// Fit cross-validator
val cvModel = cv.fit(trainingData)
// Get results
println(s"Best F1 Score: ${cvModel.avgMetrics.max}")
println("All cross-validation scores:")
cvModel.avgMetrics.zipWithIndex.foreach { case (metric, idx) =>
println(s" Param set $idx: $metric")
}
// Get best model parameters
val bestModel = cvModel.bestModel.asInstanceOf[RandomForestClassificationModel]
println(s"Best parameters:")
println(s" Number of trees: ${bestModel.getNumTrees}")
println(s" Max depth: ${bestModel.getMaxDepth}")
println(s" Max bins: ${bestModel.getMaxBins}")
// Evaluate on test data
val finalPredictions = cvModel.transform(testData)
val testScore = evaluator.evaluate(finalPredictions)
println(s"Test F1 Score: $testScore")import org.apache.spark.ml.tuning.TrainValidationSplit
import org.apache.spark.ml.regression.{LinearRegression, RandomForestRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator
// Compare different algorithms
val lr = new LinearRegression()
val rf = new RandomForestRegressor()
// Parameter grids for each algorithm
val lrParamGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001, 0.01, 0.1))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
val rfParamGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(10, 20, 30))
.addGrid(rf.maxDepth, Array(5, 10))
.build()
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
// Train-validation split for linear regression
val lrTvs = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(evaluator)
.setEstimatorParamMaps(lrParamGrid)
.setTrainRatio(0.8)
.setSeed(42)
val lrTvsModel = lrTvs.fit(data)
// Train-validation split for random forest
val rfTvs = new TrainValidationSplit()
.setEstimator(rf)
.setEvaluator(evaluator)
.setEstimatorParamMaps(rfParamGrid)
.setTrainRatio(0.8)
.setSeed(42)
val rfTvsModel = rfTvs.fit(data)
// Compare validation metrics
val lrBestScore = lrTvsModel.validationMetrics.min // RMSE: lower is better
val rfBestScore = rfTvsModel.validationMetrics.min
println(s"Linear Regression - Best validation RMSE: $lrBestScore")
println(s"Random Forest - Best validation RMSE: $rfBestScore")
val bestAlgorithm = if (lrBestScore < rfBestScore) {
println("Linear Regression performs better")
lrTvsModel
} else {
println("Random Forest performs better")
rfTvsModel
}
// Final evaluation on test data
val testPredictions = bestAlgorithm.transform(testData)
val testRMSE = evaluator.evaluate(testPredictions)
println(s"Final test RMSE: $testRMSE")import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// Custom evaluation function for classification
def customClassificationMetrics(predictions: DataFrame): Unit = {
// Confusion matrix
val confusionMatrix = predictions
.groupBy("label", "prediction")
.count()
.orderBy("label", "prediction")
println("Confusion Matrix:")
confusionMatrix.show()
// Per-class metrics
val classMetrics = predictions
.groupBy("label")
.agg(
count("*").alias("total"),
sum(when(col("label") === col("prediction"), 1).otherwise(0)).alias("correct")
)
.withColumn("accuracy", col("correct") / col("total"))
println("Per-class Accuracy:")
classMetrics.show()
// Overall metrics
val totalPredictions = predictions.count()
val correctPredictions = predictions
.filter(col("label") === col("prediction"))
.count()
val overallAccuracy = correctPredictions.toDouble / totalPredictions
println(s"Overall Accuracy: $overallAccuracy")
}
// Custom evaluation for regression
def customRegressionMetrics(predictions: DataFrame): Unit = {
import org.apache.spark.sql.functions._
val metrics = predictions
.select(
mean(abs(col("label") - col("prediction"))).alias("mae"),
sqrt(mean(pow(col("label") - col("prediction"), 2))).alias("rmse"),
mean(pow(col("label") - col("prediction"), 2)).alias("mse"),
corr("label", "prediction").alias("correlation")
)
println("Custom Regression Metrics:")
metrics.show()
// Residual analysis
val residuals = predictions
.withColumn("residual", col("label") - col("prediction"))
.withColumn("abs_residual", abs(col("residual")))
.withColumn("squared_residual", pow(col("residual"), 2))
val residualStats = residuals
.select(
mean("residual").alias("mean_residual"),
stddev("residual").alias("std_residual"),
min("residual").alias("min_residual"),
max("residual").alias("max_residual")
)
println("Residual Statistics:")
residualStats.show()
}
// Usage with model predictions
val predictions = model.transform(testData)
customClassificationMetrics(predictions)
// or
// customRegressionMetrics(predictions)import org.apache.spark.sql.functions._
def monitorModelPerformance(predictions: DataFrame,
evaluator: BinaryClassificationEvaluator,
timeCol: String = "timestamp"): Unit = {
// Performance over time
val performanceOverTime = predictions
.withColumn("date", to_date(col(timeCol)))
.groupBy("date")
.agg(
count("*").alias("num_predictions"),
mean(when(col("label") === col("prediction"), 1.0).otherwise(0.0)).alias("accuracy"),
mean("rawPrediction").alias("avg_confidence")
)
.orderBy("date")
println("Performance Over Time:")
performanceOverTime.show()
// Prediction distribution
val predictionDist = predictions
.groupBy("prediction")
.count()
.orderBy("prediction")
println("Prediction Distribution:")
predictionDist.show()
// Confidence analysis (for probabilistic models)
if (predictions.columns.contains("probability")) {
val confidenceAnalysis = predictions
.withColumn("max_probability",
expr("transform(probability.values, x -> x)").getItem(0))
.groupBy("prediction")
.agg(
count("*").alias("count"),
mean("max_probability").alias("avg_confidence"),
min("max_probability").alias("min_confidence"),
max("max_probability").alias("max_confidence")
)
println("Confidence Analysis by Prediction:")
confidenceAnalysis.show()
}
}
// Monitor model performance
monitorModelPerformance(predictions, binaryEvaluator)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-mllib-2-12