or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md
tile.json

core-framework.mddocs/

Core Framework

The MLlib core framework provides the foundational abstractions for building machine learning pipelines using the DataFrame-based API. It follows the Estimator-Transformer pattern with type-safe parameter management.

Pipeline Architecture

Base Classes

abstract class PipelineStage extends Params with Logging {
  val uid: String
  def copy(extra: ParamMap): PipelineStage
  def transformSchema(schema: StructType): StructType
}

abstract class Estimator[M <: Model[M]] extends PipelineStage {
  def fit(dataset: Dataset[_]): M
  def fit(dataset: Dataset[_], paramMap: ParamMap): M  
  def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[M]
}

abstract class Transformer extends PipelineStage {
  def transform(dataset: Dataset[_]): DataFrame
}

abstract class Model[M <: Model[M]] extends Transformer {
  val parent: Estimator[M]
  def copy(extra: ParamMap): M
}

Pipeline Components

class Pipeline(val uid: String) extends Estimator[PipelineModel] with MLWritable {
  def this() = this(Identifiable.randomUID("pipeline"))
  
  final val stages: Param[Array[PipelineStage]]
  
  def setStages(value: Array[PipelineStage]): Pipeline
  def getStages: Array[PipelineStage]
  def fit(dataset: Dataset[_]): PipelineModel
  def copy(extra: ParamMap): Pipeline
  def transformSchema(schema: StructType): StructType
  def write: MLWriter
}

class PipelineModel(override val uid: String, val stages: Array[Transformer]) 
  extends Model[PipelineModel] with MLWritable with Logging {
  
  override def transform(dataset: Dataset[_]): DataFrame
  override def transformSchema(schema: StructType): StructType
  override def copy(extra: ParamMap): PipelineModel
  def write: MLWriter
}

Parameter System

Parameter Definition

trait Param[T] extends Serializable {
  def name: String
  def doc: String  
  def parent: String
  def defaultValue: Option[T]
  def isValid(value: T): Boolean
  def encode(value: T): String
  def decode(encoded: String): T
}

class IntParam(parent: Params, name: String, doc: String, isValid: Int => Boolean)
  extends Param[Int](parent, name, doc)

class DoubleParam(parent: Params, name: String, doc: String, isValid: Double => Boolean) 
  extends Param[Double](parent, name, doc)

class BooleanParam(parent: Params, name: String, doc: String)
  extends Param[Boolean](parent, name, doc)

class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array[String] => Boolean)
  extends Param[Array[String]](parent, name, doc)

Parameter Management

trait Params extends Identifiable with Serializable {
  def copy(extra: ParamMap): Params
  
  final def set(param: Param[_], value: Any): Params.this.type
  final def set[T](param: Param[T], value: T): Params.this.type
  final def set(paramPair: ParamPair[_]): Params.this.type
  final def setDefault(paramPairs: ParamPair[_]*): Params.this.type
  final def setDefault[T](param: Param[T], value: T): Params.this.type
  
  final def get[T](param: Param[T]): Option[T]
  final def getOrDefault[T](param: Param[T]): T
  final def $(param: Param[_]): Any
  final def isSet(param: Param[_]): Boolean
  final def isDefined(param: Param[_]): Boolean
  final def hasDefault[T](param: Param[T]): Boolean
  final def getDefault[T](param: Param[T]): Option[T]
  
  def params: Array[Param[_]]
  def explainParam(param: Param[_]): String
  def explainParams(): String
  final def extractParamMap(): ParamMap
  final def extractParamMap(extra: ParamMap): ParamMap
}

class ParamMap extends Serializable {
  def put[T](param: Param[T], value: T): ParamMap
  def put(paramPair: ParamPair[_]): ParamMap
  def put(paramPairs: ParamPair[_]*): ParamMap
  def get[T](param: Param[T]): Option[T]
  def apply[T](param: Param[T]): T
  def contains(param: Param[_]): Boolean
  def remove[T](param: Param[T]): ParamMap
  def filter(f: ParamPair[_] => Boolean): ParamMap
  def copy: ParamMap
  def toSeq: Seq[ParamPair[_]]
  def size: Int
}

case class ParamPair[T](param: Param[T], value: T)

Shared Parameters

Common Parameter Traits

trait HasFeaturesCol extends Params {
  final val featuresCol: Param[String]
  final def getFeaturesCol: String
  def setFeaturesCol(value: String): this.type
}

trait HasLabelCol extends Params {
  final val labelCol: Param[String] 
  final def getLabelCol: String
  def setLabelCol(value: String): this.type
}

trait HasPredictionCol extends Params {
  final val predictionCol: Param[String]
  final def getPredictionCol: String
  def setPredictionCol(value: String): this.type
}

trait HasRawPredictionCol extends Params {
  final val rawPredictionCol: Param[String]
  final def getRawPredictionCol: String
  def setRawPredictionCol(value: String): this.type
}

trait HasProbabilityCol extends Params {
  final val probabilityCol: Param[String]
  final def getProbabilityCol: String
  def setProbabilityCol(value: String): this.type
}

trait HasWeightCol extends Params {
  final val weightCol: Param[String]
  final def getWeightCol: String
  def setWeightCol(value: String): this.type
}

Algorithm Parameters

trait HasRegParam extends Params {
  final val regParam: DoubleParam
  final def getRegParam: Double
  def setRegParam(value: Double): this.type
}

trait HasMaxIter extends Params {
  final val maxIter: IntParam
  final def getMaxIter: Int  
  def setMaxIter(value: Int): this.type
}

trait HasTol extends Params {
  final val tol: DoubleParam
  final def getTol: Double
  def setTol(value: Double): this.type
}

trait HasStepSize extends Params {
  final val stepSize: DoubleParam
  final def getStepSize: Double
  def setStepSize(value: Double): this.type
}

trait HasSeed extends Params {
  final val seed: LongParam
  final def getSeed: Long
  def setSeed(value: Long): this.type
}

trait HasElasticNetParam extends Params {
  final val elasticNetParam: DoubleParam
  final def getElasticNetParam: Double
  def setElasticNetParam(value: Double): this.type
}

trait HasFitIntercept extends Params {
  final val fitIntercept: BooleanParam
  final def getFitIntercept: Boolean
  def setFitIntercept(value: Boolean): this.type
}

trait HasStandardization extends Params {
  final val standardization: BooleanParam
  final def getStandardization: Boolean
  def setStandardization(value: Boolean): this.type
}

trait HasThreshold extends Params {
  final val threshold: DoubleParam
  final def getThreshold: Double
  def setThreshold(value: Double): this.type
}

trait HasThresholds extends Params {
  final val thresholds: DoubleArrayParam
  final def getThresholds: Array[Double]  
  def setThresholds(value: Array[Double]): this.type
}

Supervised Learning Base Classes

Predictor Framework

abstract class Predictor[FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], M <: PredictionModel[FeaturesType, M]]
  extends Estimator[M] with PredictorParams {
  
  def train(dataset: Dataset[_]): M
  override def fit(dataset: Dataset[_]): M
  def copy(extra: ParamMap): Learner
  override def transformSchema(schema: StructType): StructType
}

abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]]
  extends Model[M] with PredictorParams {
  
  val numFeatures: Int
  val parent: Estimator[M]
  
  def predict(features: FeaturesType): Double
  def predictRaw(features: FeaturesType): Vector
  
  override def transform(dataset: Dataset[_]): DataFrame
  override def transformSchema(schema: StructType): StructType
  def copy(extra: ParamMap): M
}

Classification Framework

abstract class Classifier[FeaturesType, E <: Classifier[FeaturesType, E, M], M <: ClassificationModel[FeaturesType, M]]
  extends Predictor[FeaturesType, E, M] with HasRawPredictionCol with HasProbabilityCol {
  
  def setRawPredictionCol(value: String): E
  def setProbabilityCol(value: String): E
}

abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]]
  extends PredictionModel[FeaturesType, M] with HasRawPredictionCol with HasProbabilityCol {
  
  val numClasses: Int
  
  override def predictRaw(features: FeaturesType): Vector
  def raw2probabilityInPlace(rawPrediction: Vector): Vector
  def probability2predictionInPlace(probability: Vector): Vector
  
  def setRawPredictionCol(value: String): M
  def setProbabilityCol(value: String): M
}

abstract class ProbabilisticClassifier[FeaturesType, E <: ProbabilisticClassifier[FeaturesType, E, M], M <: ProbabilisticClassificationModel[FeaturesType, M]]
  extends Classifier[FeaturesType, E, M] {
}

abstract class ProbabilisticClassificationModel[FeaturesType, M <: ProbabilisticClassificationModel[FeaturesType, M]]
  extends ClassificationModel[FeaturesType, M] {
  
  def predictProbability(features: FeaturesType): Vector
}

Regression Framework

abstract class Regressor[FeaturesType, E <: Regressor[FeaturesType, E, M], M <: RegressionModel[FeaturesType, M]]
  extends Predictor[FeaturesType, E, M] {
}

abstract class RegressionModel[FeaturesType, M <: RegressionModel[FeaturesType, M]]
  extends PredictionModel[FeaturesType, M] {
}

Persistence Framework

MLWritable and MLReadable

trait MLWritable {
  def write: MLWriter
  def save(path: String): Unit
}

trait MLReadable[T] {
  def read: MLReader[T]
  def load(path: String): T
}

abstract class MLWriter {
  def save(path: String): Unit
  def overwrite(): MLWriter
  def option(key: String, value: String): MLWriter
  def session(sparkSession: SparkSession): MLWriter
}

abstract class MLReader[T] {
  def load(path: String): T
  def option(key: String, value: String): MLReader[T]
  def session(sparkSession: SparkSession): MLReader[T]
}

Default Persistence

trait DefaultParamsWritable extends MLWritable {
  def write: MLWriter = new DefaultParamsWriter(this)
}

trait DefaultParamsReadable[T] extends MLReadable[T] {
  def read: MLReader[T] = new DefaultParamsReader[T]
}

class DefaultParamsWriter(instance: Params) extends MLWriter {
  override def save(path: String): Unit
}

class DefaultParamsReader[T] extends MLReader[T] {
  override def load(path: String): T
}

Identifiable Trait

trait Identifiable {
  val uid: String
}

object Identifiable {
  def randomUID(prefix: String): String
}

Usage Examples

Creating a Simple Pipeline

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.classification.LogisticRegression

// Create pipeline stages
val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithMean(false)
  .setWithStd(true)

val lr = new LogisticRegression()
  .setFeaturesCol("scaledFeatures")
  .setLabelCol("label")
  .setPredictionCol("prediction")

// Create and fit pipeline
val pipeline = new Pipeline().setStages(Array(assembler, scaler, lr))
val model = pipeline.fit(trainingData)

// Make predictions
val predictions = model.transform(testData)

Parameter Management

import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression()

// Set parameters individually
lr.setMaxIter(100)
  .setRegParam(0.01)
  .setElasticNetParam(0.5)

// Use ParamMap for batch parameter setting
val paramMap = ParamMap(
  lr.maxIter -> 50,
  lr.regParam -> 0.1,
  lr.elasticNetParam -> 0.0
)

val lr2 = lr.copy(paramMap)

// Extract current parameters
val currentParams = lr.extractParamMap()
println(lr.explainParams())

Custom Parameter Validation

import org.apache.spark.ml.param.{Param, Params, DoubleParam}
import org.apache.spark.ml.util.Identifiable

class MyEstimator(override val uid: String) extends Estimator[MyModel] with Params {
  def this() = this(Identifiable.randomUID("myEstimator"))
  
  // Custom parameter with validation
  final val customParam: DoubleParam = new DoubleParam(this, "customParam", 
    "custom parameter (must be positive)", (x: Double) => x > 0.0)
  
  def setCustomParam(value: Double): this.type = set(customParam, value)
  def getCustomParam: Double = $(customParam)
  
  setDefault(customParam -> 1.0)
  
  override def fit(dataset: Dataset[_]): MyModel = {
    // Training logic using getCustomParam
    new MyModel(uid)
  }
  
  override def copy(extra: ParamMap): MyEstimator = defaultCopy(extra)
  override def transformSchema(schema: StructType): StructType = schema
  def params: Array[Param[_]] = Array(customParam)
}