MLlib provides comprehensive regression algorithms for supervised learning tasks with continuous target variables. All regressors follow the Estimator-Transformer pattern and support the Pipeline API.
class LinearRegression(override val uid: String) extends Regressor[Vector, LinearRegression, LinearRegressionModel]
with LinearRegressionParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("linReg"))
def setRegParam(value: Double): LinearRegression
def setFitIntercept(value: Boolean): LinearRegression
def setStandardization(value: Boolean): LinearRegression
def setElasticNetParam(value: Double): LinearRegression
def setMaxIter(value: Int): LinearRegression
def setTol(value: Double): LinearRegression
def setWeightCol(value: String): LinearRegression
def setSolver(value: String): LinearRegression
def setAggregationDepth(value: Int): LinearRegression
def setLoss(value: String): LinearRegression
def setEpsilon(value: Double): LinearRegression
override def fit(dataset: Dataset[_]): LinearRegressionModel
override def copy(extra: ParamMap): LinearRegression
}class LinearRegressionModel private[ml] (
override val uid: String,
val coefficients: Vector,
val intercept: Double,
val scale: Double)
extends RegressionModel[Vector, LinearRegressionModel] with LinearRegressionParams with GeneralMLWritable {
// Convenience constructor for backward compatibility
private[ml] def this(uid: String, coefficients: Vector, intercept: Double) =
this(uid, coefficients, intercept, 1.0)
lazy val summary: LinearRegressionTrainingSummary
def hasSummary: Boolean
def evaluate(dataset: Dataset[_]): LinearRegressionSummary
override def predict(features: Vector): Double
override def copy(extra: ParamMap): LinearRegressionModel
def write: MLWriter
}
class LinearRegressionTrainingSummary(predictions: DataFrame, predictionCol: String, labelCol: String,
featuresCol: String, val objectiveHistory: Array[Double],
val totalIterations: Int, val solver: String)
extends LinearRegressionSummary(predictions, predictionCol, labelCol, featuresCol) {
val coefficientStandardErrors: Array[Double]
val tValues: Array[Double]
val pValues: Array[Double]
}
class LinearRegressionSummary(predictions: DataFrame, predictionCol: String,
labelCol: String, featuresCol: String) extends Serializable {
val residuals: DataFrame
val rootMeanSquaredError: Double
val meanSquaredError: Double
val meanAbsoluteError: Double
val r2: Double
val explainedVariance: Double
val numInstances: Long
val degreesOfFreedom: Long
val devianceResiduals: Array[Double]
}class GeneralizedLinearRegression(override val uid: String)
extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel]
with GeneralizedLinearRegressionParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("glm"))
def setFamily(value: String): GeneralizedLinearRegression
def setVarianceFunction(value: String): GeneralizedLinearRegression
def setLink(value: String): GeneralizedLinearRegression
def setLinkFunction(value: String): GeneralizedLinearRegression
def setFitIntercept(value: Boolean): GeneralizedLinearRegression
def setMaxIter(value: Int): GeneralizedLinearRegression
def setTol(value: Double): GeneralizedLinearRegression
def setRegParam(value: Double): GeneralizedLinearRegression
def setWeightCol(value: String): GeneralizedLinearRegression
def setSolver(value: String): GeneralizedLinearRegression
def setLinkPredictionCol(value: String): GeneralizedLinearRegression
def setOffsetCol(value: String): GeneralizedLinearRegression
def setAggregationDepth(value: Int): GeneralizedLinearRegression
override def fit(dataset: Dataset[_]): GeneralizedLinearRegressionModel
override def copy(extra: ParamMap): GeneralizedLinearRegression
}class GeneralizedLinearRegressionModel(override val uid: String, val coefficients: Vector, val intercept: Double)
extends RegressionModel[Vector, GeneralizedLinearRegressionModel]
with GeneralizedLinearRegressionParams with MLWritable {
lazy val summary: GeneralizedLinearRegressionTrainingSummary
def hasSummary: Boolean
def evaluate(dataset: Dataset[_]): GeneralizedLinearRegressionSummary
override def predict(features: Vector): Double
override def copy(extra: ParamMap): GeneralizedLinearRegressionModel
def write: MLWriter
}
class GeneralizedLinearRegressionTrainingSummary(predictions: DataFrame, predictionCol: String,
labelCol: String, featuresCol: String,
val objectiveHistory: Array[Double], val solver: String,
val totalIterations: Int, val aic: Double, val deviance: Double,
val nullDeviance: Double, val dispersionParameter: Double,
val degreesOfFreedom: Long, val residualDegreeOfFreedom: Long,
val residualDegreeOfFreedomNull: Long, val coefficientStandardErrors: Array[Double],
val tValues: Array[Double], val pValues: Array[Double])
extends GeneralizedLinearRegressionSummary(predictions, predictionCol, labelCol, featuresCol) {
}
class GeneralizedLinearRegressionSummary(predictions: DataFrame, predictionCol: String,
labelCol: String, featuresCol: String) extends Serializable {
val residuals: DataFrame
val rank: Long
val numInstances: Long
}class DecisionTreeRegressor(override val uid: String)
extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel]
with DecisionTreeRegressorParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("dtr"))
def setMaxDepth(value: Int): DecisionTreeRegressor
def setMaxBins(value: Int): DecisionTreeRegressor
def setMinInstancesPerNode(value: Int): DecisionTreeRegressor
def setMinInfoGain(value: Double): DecisionTreeRegressor
def setMaxMemoryInMB(value: Int): DecisionTreeRegressor
def setCacheNodeIds(value: Boolean): DecisionTreeRegressor
def setCheckpointInterval(value: Int): DecisionTreeRegressor
def setImpurity(value: String): DecisionTreeRegressor
def setVarianceCol(value: String): DecisionTreeRegressor
def setSeed(value: Long): DecisionTreeRegressor
override def fit(dataset: Dataset[_]): DecisionTreeRegressionModel
override def copy(extra: ParamMap): DecisionTreeRegressor
}class DecisionTreeRegressionModel(override val uid: String, val rootNode: Node, val numFeatures: Int)
extends RegressionModel[Vector, DecisionTreeRegressionModel]
with DecisionTreeRegressorParams with TreeEnsembleModel with MLWritable {
override def predict(features: Vector): Double
def depth: Int
def numNodes: Int
def featureImportances: Vector
def toDebugString: String
override def copy(extra: ParamMap): DecisionTreeRegressionModel
def write: MLWriter
}class RandomForestRegressor(override val uid: String)
extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel]
with RandomForestRegressorParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("rfr"))
def setNumTrees(value: Int): RandomForestRegressor
def setMaxDepth(value: Int): RandomForestRegressor
def setMaxBins(value: Int): RandomForestRegressor
def setMinInstancesPerNode(value: Int): RandomForestRegressor
def setMinInfoGain(value: Double): RandomForestRegressor
def setMaxMemoryInMB(value: Int): RandomForestRegressor
def setCacheNodeIds(value: Boolean): RandomForestRegressor
def setCheckpointInterval(value: Int): RandomForestRegressor
def setImpurity(value: String): RandomForestRegressor
def setSubsamplingRate(value: Double): RandomForestRegressor
def setSeed(value: Long): RandomForestRegressor
def setFeatureSubsetStrategy(value: String): RandomForestRegressor
override def fit(dataset: Dataset[_]): RandomForestRegressionModel
override def copy(extra: ParamMap): RandomForestRegressor
}class RandomForestRegressionModel(override val uid: String, private val _trees: Array[DecisionTreeRegressionModel],
val numFeatures: Int)
extends RegressionModel[Vector, RandomForestRegressionModel]
with RandomForestRegressorParams with TreeEnsembleModel with MLWritable {
def trees: Array[DecisionTreeRegressionModel]
def treeWeights: Array[Double]
def featureImportances: Vector
override def predict(features: Vector): Double
def totalNumNodes: Int
def toDebugString: String
override def copy(extra: ParamMap): RandomForestRegressionModel
def write: MLWriter
}class GBTRegressor(override val uid: String)
extends Regressor[Vector, GBTRegressor, GBTRegressionModel]
with GBTRegressorParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("gbtr"))
def setMaxDepth(value: Int): GBTRegressor
def setMaxBins(value: Int): GBTRegressor
def setMinInstancesPerNode(value: Int): GBTRegressor
def setMinInfoGain(value: Double): GBTRegressor
def setMaxMemoryInMB(value: Int): GBTRegressor
def setCacheNodeIds(value: Boolean): GBTRegressor
def setCheckpointInterval(value: Int): GBTRegressor
def setLossType(value: String): GBTRegressor
def setMaxIter(value: Int): GBTRegressor
def setStepSize(value: Double): GBTRegressor
def setSubsamplingRate(value: Double): GBTRegressor
def setSeed(value: Long): GBTRegressor
def setFeatureSubsetStrategy(value: String): GBTRegressor
def setValidationTol(value: Double): GBTRegressor
def setValidationIndicatorCol(value: String): GBTRegressor
override def fit(dataset: Dataset[_]): GBTRegressionModel
override def copy(extra: ParamMap): GBTRegressor
}class GBTRegressionModel(override val uid: String, private val _trees: Array[DecisionTreeRegressionModel],
private val _treeWeights: Array[Double], val numFeatures: Int)
extends RegressionModel[Vector, GBTRegressionModel]
with GBTRegressorParams with TreeEnsembleModel with MLWritable {
def trees: Array[DecisionTreeRegressionModel]
def treeWeights: Array[Double]
def featureImportances: Vector
def totalNumNodes: Int
def getNumTrees: Int
override def predict(features: Vector): Double
def toDebugString: String
override def copy(extra: ParamMap): GBTRegressionModel
def write: MLWriter
}class AFTSurvivalRegression(override val uid: String)
extends Regressor[Vector, AFTSurvivalRegression, AFTSurvivalRegressionModel]
with AFTSurvivalRegressionParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("aft"))
def setCensorCol(value: String): AFTSurvivalRegression
def setQuantileProbabilities(value: Array[Double]): AFTSurvivalRegression
def setQuantilesCol(value: String): AFTSurvivalRegression
def setFitIntercept(value: Boolean): AFTSurvivalRegression
def setMaxIter(value: Int): AFTSurvivalRegression
def setTol(value: Double): AFTSurvivalRegression
def setAggregationDepth(value: Int): AFTSurvivalRegression
override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel
override def copy(extra: ParamMap): AFTSurvivalRegression
}class AFTSurvivalRegressionModel(override val uid: String, val coefficients: Vector,
val intercept: Double, val scale: Double)
extends RegressionModel[Vector, AFTSurvivalRegressionModel]
with AFTSurvivalRegressionParams with MLWritable {
def predictQuantiles(features: Vector): Vector
override def predict(features: Vector): Double
override def copy(extra: ParamMap): AFTSurvivalRegressionModel
def write: MLWriter
}class IsotonicRegression(override val uid: String)
extends Regressor[Vector, IsotonicRegression, IsotonicRegressionModel]
with IsotonicRegressionParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("isoReg"))
def setIsotonic(value: Boolean): IsotonicRegression
def setWeightCol(value: String): IsotonicRegression
def setFeatureIndex(value: Int): IsotonicRegression
override def fit(dataset: Dataset[_]): IsotonicRegressionModel
override def copy(extra: ParamMap): IsotonicRegression
}class IsotonicRegressionModel(override val uid: String, val boundaries: Array[Double],
val predictions: Array[Double], val numFeatures: Int)
extends RegressionModel[Vector, IsotonicRegressionModel]
with IsotonicRegressionParams with MLWritable {
override def predict(features: Vector): Double
override def copy(extra: ParamMap): IsotonicRegressionModel
def write: MLWriter
}import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.VectorAssembler
// Prepare features
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val data = assembler.transform(rawData)
// Create and configure linear regression
val lr = new LinearRegression()
.setLabelCol("label")
.setFeaturesCol("features")
.setRegParam(0.1)
.setElasticNetParam(0.8)
.setMaxIter(100)
.setTol(1E-6)
// Split data
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 42)
// Train model
val lrModel = lr.fit(trainingData)
// Print coefficients and intercept
println(s"Coefficients: ${lrModel.coefficients}")
println(s"Intercept: ${lrModel.intercept}")
// Make predictions
val predictions = lrModel.transform(testData)
predictions.select("features", "label", "prediction").show()
// Get training summary
val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"R-squared: ${trainingSummary.r2}")
println(s"Mean Absolute Error: ${trainingSummary.meanAbsoluteError}")import org.apache.spark.ml.regression.GeneralizedLinearRegression
// Create GLM with Poisson family and log link
val glr = new GeneralizedLinearRegression()
.setFamily("poisson")
.setLink("log")
.setMaxIter(10)
.setRegParam(0.3)
val glrModel = glr.fit(trainingData)
// Print model summary
val glrSummary = glrModel.summary
println(s"Coefficients: ${glrModel.coefficients}")
println(s"Intercept: ${glrModel.intercept}")
println(s"AIC: ${glrSummary.aic}")
println(s"Deviance: ${glrSummary.deviance}")
// Statistical significance tests
println("Coefficient Standard Errors:")
glrSummary.coefficientStandardErrors.zipWithIndex.foreach {
case (se, idx) => println(s" Coefficient $idx: $se")
}
println("T-Values:")
glrSummary.tValues.zipWithIndex.foreach {
case (t, idx) => println(s" Coefficient $idx: $t")
}
println("P-Values:")
glrSummary.pValues.zipWithIndex.foreach {
case (p, idx) => println(s" Coefficient $idx: $p")
}import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
val rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(100)
.setMaxDepth(10)
.setMaxBins(32)
.setMinInstancesPerNode(1)
.setMinInfoGain(0.0)
.setSubsamplingRate(1.0)
.setFeatureSubsetStrategy("auto")
.setSeed(42)
val rfModel = rf.fit(trainingData)
// Get feature importances
val featureImportances = rfModel.featureImportances
println(s"Feature importances: $featureImportances")
// Make predictions
val rfPredictions = rfModel.transform(testData)
// Evaluate model
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
val metrics = Array("rmse", "mse", "mae", "r2")
metrics.foreach { metric =>
evaluator.setMetricName(metric)
val result = evaluator.evaluate(rfPredictions)
println(s"$metric: $result")
}
// Print tree structure information
println(s"Total number of trees: ${rfModel.getNumTrees}")
println(s"Total number of nodes: ${rfModel.totalNumNodes}")import org.apache.spark.ml.regression.GBTRegressor
val gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(100)
.setMaxDepth(5)
.setStepSize(0.1)
.setSubsamplingRate(1.0)
.setFeatureSubsetStrategy("auto")
.setSeed(42)
// Add validation for early stopping
val gbtWithValidation = gbt
.setValidationTol(0.01)
.setValidationIndicatorCol("isValidation")
val gbtModel = gbt.fit(trainingData)
// Print model information
println(s"Feature importances: ${gbtModel.featureImportances}")
println(s"Number of trees: ${gbtModel.getNumTrees}")
println(s"Tree weights: ${gbtModel.treeWeights.mkString(", ")}")
val gbtPredictions = gbtModel.transform(testData)
// Evaluate
val rmse = evaluator.setMetricName("rmse").evaluate(gbtPredictions)
println(s"RMSE on test data: $rmse")import org.apache.spark.ml.regression.AFTSurvivalRegression
// Data should have label (survival time), censor indicator, and features
val aft = new AFTSurvivalRegression()
.setLabelCol("survival_time")
.setCensorCol("censor")
.setFeaturesCol("features")
.setQuantileProbabilities(Array(0.1, 0.5, 0.9))
.setQuantilesCol("quantiles")
val aftModel = aft.fit(trainingData)
// Print model parameters
println(s"Coefficients: ${aftModel.coefficients}")
println(s"Intercept: ${aftModel.intercept}")
println(s"Scale: ${aftModel.scale}")
// Make predictions including quantiles
val aftPredictions = aftModel.transform(testData)
aftPredictions.select("survival_time", "prediction", "quantiles").show()import org.apache.spark.ml.regression.IsotonicRegression
// Isotonic regression expects monotonic relationship
val iso = new IsotonicRegression()
.setLabelCol("label")
.setFeaturesCol("features")
.setIsotonic(true) // true for increasing, false for decreasing
.setFeatureIndex(0) // which feature to use (for vector inputs)
val isoModel = iso.fit(trainingData)
// Print isotonic boundaries and predictions
println(s"Boundaries: ${isoModel.boundaries.mkString(", ")}")
println(s"Predictions: ${isoModel.predictions.mkString(", ")}")
val isoPredictions = isoModel.transform(testData)
isoPredictions.select("features", "label", "prediction").show()import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
val rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
// Build parameter grid
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(10, 20, 50))
.addGrid(rf.maxDepth, Array(5, 10, 15))
.addGrid(rf.minInstancesPerNode, Array(1, 2, 5))
.build()
// Create evaluator
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
// Create cross-validator
val cv = new CrossValidator()
.setEstimator(rf)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
.setParallelism(2)
// Train model with cross-validation
val cvModel = cv.fit(trainingData)
// Get best model
val bestModel = cvModel.bestModel.asInstanceOf[RandomForestRegressionModel]
println(s"Best model num trees: ${bestModel.getNumTrees}")
println(s"Best model max depth: ${bestModel.getMaxDepth}")
// Evaluate on test data
val finalPredictions = cvModel.transform(testData)
val finalRMSE = evaluator.evaluate(finalPredictions)
println(s"Final RMSE on test data: $finalRMSE")