The MLlib core framework provides the foundational abstractions for building machine learning pipelines using the DataFrame-based API. It follows the Estimator-Transformer pattern with type-safe parameter management.
abstract class PipelineStage extends Params with Logging {
val uid: String
def copy(extra: ParamMap): PipelineStage
def transformSchema(schema: StructType): StructType
}
abstract class Estimator[M <: Model[M]] extends PipelineStage {
def fit(dataset: Dataset[_]): M
def fit(dataset: Dataset[_], paramMap: ParamMap): M
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[M]
}
abstract class Transformer extends PipelineStage {
def transform(dataset: Dataset[_]): DataFrame
}
abstract class Model[M <: Model[M]] extends Transformer {
val parent: Estimator[M]
def copy(extra: ParamMap): M
}class Pipeline(val uid: String) extends Estimator[PipelineModel] with MLWritable {
def this() = this(Identifiable.randomUID("pipeline"))
final val stages: Param[Array[PipelineStage]]
def setStages(value: Array[PipelineStage]): Pipeline
def getStages: Array[PipelineStage]
def fit(dataset: Dataset[_]): PipelineModel
def copy(extra: ParamMap): Pipeline
def transformSchema(schema: StructType): StructType
def write: MLWriter
}
class PipelineModel(override val uid: String, val stages: Array[Transformer])
extends Model[PipelineModel] with MLWritable with Logging {
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): PipelineModel
def write: MLWriter
}trait Param[T] extends Serializable {
def name: String
def doc: String
def parent: String
def defaultValue: Option[T]
def isValid(value: T): Boolean
def encode(value: T): String
def decode(encoded: String): T
}
class IntParam(parent: Params, name: String, doc: String, isValid: Int => Boolean)
extends Param[Int](parent, name, doc)
class DoubleParam(parent: Params, name: String, doc: String, isValid: Double => Boolean)
extends Param[Double](parent, name, doc)
class BooleanParam(parent: Params, name: String, doc: String)
extends Param[Boolean](parent, name, doc)
class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array[String] => Boolean)
extends Param[Array[String]](parent, name, doc)trait Params extends Identifiable with Serializable {
def copy(extra: ParamMap): Params
final def set(param: Param[_], value: Any): Params.this.type
final def set[T](param: Param[T], value: T): Params.this.type
final def set(paramPair: ParamPair[_]): Params.this.type
final def setDefault(paramPairs: ParamPair[_]*): Params.this.type
final def setDefault[T](param: Param[T], value: T): Params.this.type
final def get[T](param: Param[T]): Option[T]
final def getOrDefault[T](param: Param[T]): T
final def $(param: Param[_]): Any
final def isSet(param: Param[_]): Boolean
final def isDefined(param: Param[_]): Boolean
final def hasDefault[T](param: Param[T]): Boolean
final def getDefault[T](param: Param[T]): Option[T]
def params: Array[Param[_]]
def explainParam(param: Param[_]): String
def explainParams(): String
final def extractParamMap(): ParamMap
final def extractParamMap(extra: ParamMap): ParamMap
}
class ParamMap extends Serializable {
def put[T](param: Param[T], value: T): ParamMap
def put(paramPair: ParamPair[_]): ParamMap
def put(paramPairs: ParamPair[_]*): ParamMap
def get[T](param: Param[T]): Option[T]
def apply[T](param: Param[T]): T
def contains(param: Param[_]): Boolean
def remove[T](param: Param[T]): ParamMap
def filter(f: ParamPair[_] => Boolean): ParamMap
def copy: ParamMap
def toSeq: Seq[ParamPair[_]]
def size: Int
}
case class ParamPair[T](param: Param[T], value: T)trait HasFeaturesCol extends Params {
final val featuresCol: Param[String]
final def getFeaturesCol: String
def setFeaturesCol(value: String): this.type
}
trait HasLabelCol extends Params {
final val labelCol: Param[String]
final def getLabelCol: String
def setLabelCol(value: String): this.type
}
trait HasPredictionCol extends Params {
final val predictionCol: Param[String]
final def getPredictionCol: String
def setPredictionCol(value: String): this.type
}
trait HasRawPredictionCol extends Params {
final val rawPredictionCol: Param[String]
final def getRawPredictionCol: String
def setRawPredictionCol(value: String): this.type
}
trait HasProbabilityCol extends Params {
final val probabilityCol: Param[String]
final def getProbabilityCol: String
def setProbabilityCol(value: String): this.type
}
trait HasWeightCol extends Params {
final val weightCol: Param[String]
final def getWeightCol: String
def setWeightCol(value: String): this.type
}trait HasRegParam extends Params {
final val regParam: DoubleParam
final def getRegParam: Double
def setRegParam(value: Double): this.type
}
trait HasMaxIter extends Params {
final val maxIter: IntParam
final def getMaxIter: Int
def setMaxIter(value: Int): this.type
}
trait HasTol extends Params {
final val tol: DoubleParam
final def getTol: Double
def setTol(value: Double): this.type
}
trait HasStepSize extends Params {
final val stepSize: DoubleParam
final def getStepSize: Double
def setStepSize(value: Double): this.type
}
trait HasSeed extends Params {
final val seed: LongParam
final def getSeed: Long
def setSeed(value: Long): this.type
}
trait HasElasticNetParam extends Params {
final val elasticNetParam: DoubleParam
final def getElasticNetParam: Double
def setElasticNetParam(value: Double): this.type
}
trait HasFitIntercept extends Params {
final val fitIntercept: BooleanParam
final def getFitIntercept: Boolean
def setFitIntercept(value: Boolean): this.type
}
trait HasStandardization extends Params {
final val standardization: BooleanParam
final def getStandardization: Boolean
def setStandardization(value: Boolean): this.type
}
trait HasThreshold extends Params {
final val threshold: DoubleParam
final def getThreshold: Double
def setThreshold(value: Double): this.type
}
trait HasThresholds extends Params {
final val thresholds: DoubleArrayParam
final def getThresholds: Array[Double]
def setThresholds(value: Array[Double]): this.type
}abstract class Predictor[FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], M <: PredictionModel[FeaturesType, M]]
extends Estimator[M] with PredictorParams {
def train(dataset: Dataset[_]): M
override def fit(dataset: Dataset[_]): M
def copy(extra: ParamMap): Learner
override def transformSchema(schema: StructType): StructType
}
abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]]
extends Model[M] with PredictorParams {
val numFeatures: Int
val parent: Estimator[M]
def predict(features: FeaturesType): Double
def predictRaw(features: FeaturesType): Vector
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
def copy(extra: ParamMap): M
}abstract class Classifier[FeaturesType, E <: Classifier[FeaturesType, E, M], M <: ClassificationModel[FeaturesType, M]]
extends Predictor[FeaturesType, E, M] with HasRawPredictionCol with HasProbabilityCol {
def setRawPredictionCol(value: String): E
def setProbabilityCol(value: String): E
}
abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]]
extends PredictionModel[FeaturesType, M] with HasRawPredictionCol with HasProbabilityCol {
val numClasses: Int
override def predictRaw(features: FeaturesType): Vector
def raw2probabilityInPlace(rawPrediction: Vector): Vector
def probability2predictionInPlace(probability: Vector): Vector
def setRawPredictionCol(value: String): M
def setProbabilityCol(value: String): M
}
abstract class ProbabilisticClassifier[FeaturesType, E <: ProbabilisticClassifier[FeaturesType, E, M], M <: ProbabilisticClassificationModel[FeaturesType, M]]
extends Classifier[FeaturesType, E, M] {
}
abstract class ProbabilisticClassificationModel[FeaturesType, M <: ProbabilisticClassificationModel[FeaturesType, M]]
extends ClassificationModel[FeaturesType, M] {
def predictProbability(features: FeaturesType): Vector
}abstract class Regressor[FeaturesType, E <: Regressor[FeaturesType, E, M], M <: RegressionModel[FeaturesType, M]]
extends Predictor[FeaturesType, E, M] {
}
abstract class RegressionModel[FeaturesType, M <: RegressionModel[FeaturesType, M]]
extends PredictionModel[FeaturesType, M] {
}trait MLWritable {
def write: MLWriter
def save(path: String): Unit
}
trait MLReadable[T] {
def read: MLReader[T]
def load(path: String): T
}
abstract class MLWriter {
def save(path: String): Unit
def overwrite(): MLWriter
def option(key: String, value: String): MLWriter
def session(sparkSession: SparkSession): MLWriter
}
abstract class MLReader[T] {
def load(path: String): T
def option(key: String, value: String): MLReader[T]
def session(sparkSession: SparkSession): MLReader[T]
}trait DefaultParamsWritable extends MLWritable {
def write: MLWriter = new DefaultParamsWriter(this)
}
trait DefaultParamsReadable[T] extends MLReadable[T] {
def read: MLReader[T] = new DefaultParamsReader[T]
}
class DefaultParamsWriter(instance: Params) extends MLWriter {
override def save(path: String): Unit
}
class DefaultParamsReader[T] extends MLReader[T] {
override def load(path: String): T
}trait Identifiable {
val uid: String
}
object Identifiable {
def randomUID(prefix: String): String
}import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.classification.LogisticRegression
// Create pipeline stages
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithMean(false)
.setWithStd(true)
val lr = new LogisticRegression()
.setFeaturesCol("scaledFeatures")
.setLabelCol("label")
.setPredictionCol("prediction")
// Create and fit pipeline
val pipeline = new Pipeline().setStages(Array(assembler, scaler, lr))
val model = pipeline.fit(trainingData)
// Make predictions
val predictions = model.transform(testData)import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
// Set parameters individually
lr.setMaxIter(100)
.setRegParam(0.01)
.setElasticNetParam(0.5)
// Use ParamMap for batch parameter setting
val paramMap = ParamMap(
lr.maxIter -> 50,
lr.regParam -> 0.1,
lr.elasticNetParam -> 0.0
)
val lr2 = lr.copy(paramMap)
// Extract current parameters
val currentParams = lr.extractParamMap()
println(lr.explainParams())import org.apache.spark.ml.param.{Param, Params, DoubleParam}
import org.apache.spark.ml.util.Identifiable
class MyEstimator(override val uid: String) extends Estimator[MyModel] with Params {
def this() = this(Identifiable.randomUID("myEstimator"))
// Custom parameter with validation
final val customParam: DoubleParam = new DoubleParam(this, "customParam",
"custom parameter (must be positive)", (x: Double) => x > 0.0)
def setCustomParam(value: Double): this.type = set(customParam, value)
def getCustomParam: Double = $(customParam)
setDefault(customParam -> 1.0)
override def fit(dataset: Dataset[_]): MyModel = {
// Training logic using getCustomParam
new MyModel(uid)
}
override def copy(extra: ParamMap): MyEstimator = defaultCopy(extra)
override def transformSchema(schema: StructType): StructType = schema
def params: Array[Param[_]] = Array(customParam)
}