MLlib provides comprehensive classification algorithms for supervised learning tasks with categorical labels. All classifiers follow the Estimator-Transformer pattern and support the Pipeline API.
class LogisticRegression(override val uid: String) extends Classifier[Vector, LogisticRegression, LogisticRegressionModel]
with LogisticRegressionParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("logreg"))
def setRegParam(value: Double): LogisticRegression
def setElasticNetParam(value: Double): LogisticRegression
def setMaxIter(value: Int): LogisticRegression
def setTol(value: Double): LogisticRegression
def setFitIntercept(value: Boolean): LogisticRegression
def setFamily(value: String): LogisticRegression
def setStandardization(value: Boolean): LogisticRegression
def setThreshold(value: Double): LogisticRegression
def setThresholds(value: Array[Double]): LogisticRegression
def setWeightCol(value: String): LogisticRegression
def setAggregationDepth(value: Int): LogisticRegression
// Bounded optimization methods (expert parameters)
def setLowerBoundsOnCoefficients(value: Matrix): LogisticRegression
def setUpperBoundsOnCoefficients(value: Matrix): LogisticRegression
def setLowerBoundsOnIntercepts(value: Vector): LogisticRegression
def setUpperBoundsOnIntercepts(value: Vector): LogisticRegression
override def fit(dataset: Dataset[_]): LogisticRegressionModel
override def copy(extra: ParamMap): LogisticRegression
}class LogisticRegressionModel private[spark] (
override val uid: String,
val coefficientMatrix: Matrix,
val interceptVector: Vector,
override val numClasses: Int,
private val isMultinomial: Boolean)
extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] with LogisticRegressionParams
with MLWritable {
// Convenience constructor for binary classification (deprecated)
private[spark] def this(uid: String, coefficients: Vector, intercept: Double) =
this(uid, new DenseMatrix(1, coefficients.size, coefficients.toArray, isTransposed = true),
Vectors.dense(intercept), 2, isMultinomial = false)
// Model coefficients for binary classification (throws exception if multinomial)
def coefficients: Vector
def intercept: Double
lazy val summary: LogisticRegressionTrainingSummary
def hasSummary: Boolean
def binarySummary: BinaryLogisticRegressionTrainingSummary
def evaluate(dataset: Dataset[_]): LogisticRegressionSummary
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
override def predictProbability(features: Vector): Vector
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): LogisticRegressionModel
def write: MLWriter
}
class LogisticRegressionTrainingSummary(predictions: DataFrame, predictionCol: String,
labelCol: String, featuresCol: String,
val objectiveHistory: Array[Double])
extends LogisticRegressionSummary(predictions, predictionCol, labelCol, featuresCol) {
def totalIterations: Int
}
class LogisticRegressionSummary(predictions: DataFrame, predictionCol: String,
labelCol: String, featuresCol: String) extends ClassificationSummary {
def probabilityCol: String
def fMeasureByThreshold: DataFrame
def precisionByThreshold: DataFrame
def recallByThreshold: DataFrame
def roc: DataFrame
def areaUnderROC: Double
def pr: DataFrame
def fMeasureByLabel: DataFrame
def precisionByLabel: DataFrame
def recallByLabel: DataFrame
}class DecisionTreeClassifier(override val uid: String)
extends Classifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel]
with DecisionTreeClassifierParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("dtc"))
def setMaxDepth(value: Int): DecisionTreeClassifier
def setMaxBins(value: Int): DecisionTreeClassifier
def setMinInstancesPerNode(value: Int): DecisionTreeClassifier
def setMinInfoGain(value: Double): DecisionTreeClassifier
def setMaxMemoryInMB(value: Int): DecisionTreeClassifier
def setCacheNodeIds(value: Boolean): DecisionTreeClassifier
def setCheckpointInterval(value: Int): DecisionTreeClassifier
def setImpurity(value: String): DecisionTreeClassifier
def setSeed(value: Long): DecisionTreeClassifier
override def fit(dataset: Dataset[_]): DecisionTreeClassificationModel
override def copy(extra: ParamMap): DecisionTreeClassifier
}class DecisionTreeClassificationModel(override val uid: String, val rootNode: Node,
val numFeatures: Int, val numClasses: Int)
extends ClassificationModel[Vector, DecisionTreeClassificationModel]
with DecisionTreeClassifierParams with TreeEnsembleModel with MLWritable {
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
override def predictProbability(features: Vector): Vector
def depth: Int
def numNodes: Int
def toDebugString: String
override def copy(extra: ParamMap): DecisionTreeClassificationModel
def write: MLWriter
}class RandomForestClassifier(override val uid: String)
extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel]
with RandomForestClassifierParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("rfc"))
def setNumTrees(value: Int): RandomForestClassifier
def setMaxDepth(value: Int): RandomForestClassifier
def setMaxBins(value: Int): RandomForestClassifier
def setMinInstancesPerNode(value: Int): RandomForestClassifier
def setMinInfoGain(value: Double): RandomForestClassifier
def setMaxMemoryInMB(value: Int): RandomForestClassifier
def setCacheNodeIds(value: Boolean): RandomForestClassifier
def setCheckpointInterval(value: Int): RandomForestClassifier
def setImpurity(value: String): RandomForestClassifier
def setSubsamplingRate(value: Double): RandomForestClassifier
def setSeed(value: Long): RandomForestClassifier
def setFeatureSubsetStrategy(value: String): RandomForestClassifier
override def fit(dataset: Dataset[_]): RandomForestClassificationModel
override def copy(extra: ParamMap): RandomForestClassifier
}class RandomForestClassificationModel(override val uid: String, private val _trees: Array[DecisionTreeClassificationModel],
val numFeatures: Int, val numClasses: Int)
extends ClassificationModel[Vector, RandomForestClassificationModel]
with RandomForestClassifierParams with TreeEnsembleModel with MLWritable {
def trees: Array[DecisionTreeClassificationModel]
def treeWeights: Array[Double]
def featureImportances: Vector
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
override def predictProbability(features: Vector): Vector
def totalNumNodes: Int
def toDebugString: String
override def copy(extra: ParamMap): RandomForestClassificationModel
def write: MLWriter
}class GBTClassifier(override val uid: String)
extends Classifier[Vector, GBTClassifier, GBTClassificationModel]
with GBTClassifierParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("gbtc"))
def setMaxDepth(value: Int): GBTClassifier
def setMaxBins(value: Int): GBTClassifier
def setMinInstancesPerNode(value: Int): GBTClassifier
def setMinInfoGain(value: Double): GBTClassifier
def setMaxMemoryInMB(value: Int): GBTClassifier
def setCacheNodeIds(value: Boolean): GBTClassifier
def setCheckpointInterval(value: Int): GBTClassifier
def setLossType(value: String): GBTClassifier
def setMaxIter(value: Int): GBTClassifier
def setStepSize(value: Double): GBTClassifier
def setSubsamplingRate(value: Double): GBTClassifier
def setSeed(value: Long): GBTClassifier
def setFeatureSubsetStrategy(value: String): GBTClassifier
def setValidationTol(value: Double): GBTClassifier
def setValidationIndicatorCol(value: String): GBTClassifier
override def fit(dataset: Dataset[_]): GBTClassificationModel
override def copy(extra: ParamMap): GBTClassifier
}class GBTClassificationModel(override val uid: String, private val _trees: Array[DecisionTreeRegressionModel],
private val _treeWeights: Array[Double], val numFeatures: Int)
extends ClassificationModel[Vector, GBTClassificationModel]
with GBTClassifierParams with TreeEnsembleModel with MLWritable {
def trees: Array[DecisionTreeRegressionModel]
def treeWeights: Array[Double]
def featureImportances: Vector
def totalNumNodes: Int
def getNumTrees: Int
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
def toDebugString: String
override def copy(extra: ParamMap): GBTClassificationModel
def write: MLWriter
}class NaiveBayes(override val uid: String)
extends Classifier[Vector, NaiveBayes, NaiveBayesModel]
with NaiveBayesParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("nb"))
def setModelType(value: String): NaiveBayes
def setSmoothing(value: Double): NaiveBayes
def setThresholds(value: Array[Double]): NaiveBayes
def setWeightCol(value: String): NaiveBayes
override def fit(dataset: Dataset[_]): NaiveBayesModel
override def copy(extra: ParamMap): NaiveBayes
}class NaiveBayesModel(override val uid: String, val pi: Vector, val theta: Matrix, val sigma: Matrix)
extends ClassificationModel[Vector, NaiveBayesModel] with NaiveBayesParams with MLWritable {
val numFeatures: Int
val numClasses: Int
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
override def predictProbability(features: Vector): Vector
override def copy(extra: ParamMap): NaiveBayesModel
def write: MLWriter
}class MultilayerPerceptronClassifier(override val uid: String)
extends Classifier[Vector, MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel]
with MultilayerPerceptronParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("mlpc"))
def setLayers(value: Array[Int]): MultilayerPerceptronClassifier
def setBlockSize(value: Int): MultilayerPerceptronClassifier
def setSeed(value: Long): MultilayerPerceptronClassifier
def setMaxIter(value: Int): MultilayerPerceptronClassifier
def setTol(value: Double): MultilayerPerceptronClassifier
def setStepSize(value: Double): MultilayerPerceptronClassifier
def setSolver(value: String): MultilayerPerceptronClassifier
def setInitialWeights(value: Vector): MultilayerPerceptronClassifier
override def fit(dataset: Dataset[_]): MultilayerPerceptronClassificationModel
override def copy(extra: ParamMap): MultilayerPerceptronClassifier
}class MultilayerPerceptronClassificationModel(override val uid: String, val layers: Array[Int],
val weights: Vector)
extends ClassificationModel[Vector, MultilayerPerceptronClassificationModel]
with MultilayerPerceptronParams with MLWritable {
val numFeatures: Int
val numClasses: Int
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
override def predictProbability(features: Vector): Vector
override def copy(extra: ParamMap): MultilayerPerceptronClassificationModel
def write: MLWriter
}class LinearSVC(override val uid: String)
extends Classifier[Vector, LinearSVC, LinearSVCModel]
with LinearSVCParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("linearsvc"))
def setRegParam(value: Double): LinearSVC
def setMaxIter(value: Int): LinearSVC
def setTol(value: Double): LinearSVC
def setFitIntercept(value: Boolean): LinearSVC
def setStandardization(value: Boolean): LinearSVC
def setThreshold(value: Double): LinearSVC
def setWeightCol(value: String): LinearSVC
def setAggregationDepth(value: Int): LinearSVC
override def fit(dataset: Dataset[_]): LinearSVCModel
override def copy(extra: ParamMap): LinearSVC
}class LinearSVCModel(override val uid: String, val coefficients: Vector, val intercept: Double)
extends ClassificationModel[Vector, LinearSVCModel] with LinearSVCParams with MLWritable {
val numClasses: Int = 2
val numFeatures: Int
override def predict(features: Vector): Double
override def predictRaw(features: Vector): Vector
override def copy(extra: ParamMap): LinearSVCModel
def write: MLWriter
}class OneVsRest(override val uid: String)
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {
def this() = this(Identifiable.randomUID("oneVsRest"))
def setClassifier(value: Classifier[_, _, _]): OneVsRest
def setLabelCol(value: String): OneVsRest
def setFeaturesCol(value: String): OneVsRest
def setPredictionCol(value: String): OneVsRest
def setRawPredictionCol(value: String): OneVsRest
def setParallelism(value: Int): OneVsRest
override def fit(dataset: Dataset[_]): OneVsRestModel
override def copy(extra: ParamMap): OneVsRest
override def transformSchema(schema: StructType): StructType
def write: MLWriter
}class OneVsRestModel(override val uid: String, private val labelMetadata: Metadata, val models: Array[_ <: ClassificationModel[_, _]])
extends Model[OneVsRestModel] with OneVsRestParams with MLWritable {
val numClasses: Int
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): OneVsRestModel
def write: MLWriter
}abstract class ClassificationSummary(predictions: DataFrame, predictionCol: String,
labelCol: String, featuresCol: String) extends Serializable {
lazy val accuracy: Double
lazy val weightedPrecision: Double
lazy val weightedRecall: Double
lazy val weightedFMeasure: Double
lazy val weightedTruePositiveRate: Double = weightedRecall
lazy val weightedFalsePositiveRate: Double
def fMeasureByLabel(beta: Double = 1.0): Array[Double]
def precisionByLabel: Array[Double]
def recallByLabel: Array[Double]
def truePositiveRateByLabel: Array[Double] = recallByLabel
def falsePositiveRateByLabel: Array[Double]
def labels: Array[Double]
}trait BinaryClassificationSummary extends ClassificationSummary {
def scoreCol: String
def roc: DataFrame
def areaUnderROC: Double
def pr: DataFrame
def fMeasureByThreshold: DataFrame
def precisionByThreshold: DataFrame
def recallByThreshold: DataFrame
}import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier}
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// Prepare features
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
// Index labels if they are strings
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
// Create classifier
val lr = new LogisticRegression()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setMaxIter(100)
.setRegParam(0.1)
// Create pipeline
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, assembler, lr))
// Split data
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train model
val model = pipeline.fit(trainingData)
// Make predictions
val predictions = model.transform(testData)
// Evaluate
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("indexedLabel")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
println(s"Area under ROC curve: $auc")import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val rf = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setNumTrees(100)
.setMaxDepth(10)
.setMaxBins(32)
.setMinInstancesPerNode(1)
.setMinInfoGain(0.0)
.setSubsamplingRate(1.0)
.setFeatureSubsetStrategy("auto")
.setSeed(42)
val rfModel = rf.fit(trainingData)
// Get feature importances
val featureImportances = rfModel.featureImportances
println(s"Feature importances: $featureImportances")
val predictions = rfModel.transform(testData)
// Evaluate multiclass metrics
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
val metrics = Array("accuracy", "weightedPrecision", "weightedRecall", "f1")
metrics.foreach { metric =>
evaluator.setMetricName(metric)
val result = evaluator.evaluate(predictions)
println(s"$metric: $result")
}import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
// Define network architecture: input layer (4 features) -> 2 hidden layers (5, 4 nodes) -> output layer (3 classes)
val layers = Array[Int](4, 5, 4, 3)
val mlp = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)
.setStepSize(0.03)
.setSolver("l-bfgs")
val mlpModel = mlp.fit(trainingData)
val mlpPredictions = mlpModel.transform(testData)
// Show predictions
mlpPredictions.select("features", "label", "prediction", "probability").show(20)import org.apache.spark.ml.classification.{OneVsRest, LogisticRegression}
// Create base classifier
val classifier = new LogisticRegression()
.setMaxIter(10)
.setTol(1E-6)
.setFitIntercept(true)
// Create One-vs-Rest wrapper
val ovr = new OneVsRest()
.setClassifier(classifier)
.setLabelCol("label")
.setFeaturesCol("features")
val ovrModel = ovr.fit(trainingData)
val ovrPredictions = ovrModel.transform(testData)
// The model contains one binary classifier per class
println(s"Number of classes: ${ovrModel.models.length}")import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
val lr = new LogisticRegression()
.setMaxIter(100)
.setRegParam(0.01)
.setElasticNetParam(0.0)
val lrModel = lr.fit(trainingData)
// Get training summary
val summary = lrModel.summary
println(s"Total iterations: ${summary.totalIterations}")
println(s"Objective history: ${summary.objectiveHistory.mkString(", ")}")
// Binary classification metrics (if binary classification)
if (summary.isInstanceOf[org.apache.spark.ml.classification.BinaryLogisticRegressionSummary]) {
val binarySummary = summary.asInstanceOf[org.apache.spark.ml.classification.BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")
// Show ROC curve points
binarySummary.roc.show()
// Show precision-recall curve
binarySummary.pr.show()
}
// Evaluate on test data
val testSummary = lrModel.evaluate(testData)
println(s"Test Accuracy: ${testSummary.accuracy}")
println(s"Test Weighted Precision: ${testSummary.weightedPrecision}")
println(s"Test Weighted Recall: ${testSummary.weightedRecall}")