Apache Spark MLlib is a scalable machine learning library that provides high-level APIs for common machine learning algorithms and utilities
npx @tessl/cli install tessl/maven-org-apache-spark--spark-mllib_2-12@2.4.0Apache Spark MLlib is a scalable machine learning library built on top of Apache Spark's distributed computing framework. It provides comprehensive machine learning capabilities through unified APIs for classification, regression, clustering, collaborative filtering, dimensionality reduction, and feature processing across large datasets.
build.sbt: libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.8"// DataFrame-based API (recommended)
import org.apache.spark.ml._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.linalg.{Vector, Vectors, Matrix, Matrices}
// RDD-based API (legacy)
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.linalg.{Vector => MLlibVector, Vectors => MLlibVectors}
// Spark SQL and core
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
// Create Spark session
val spark = SparkSession.builder()
.appName("MLlib Example")
.getOrCreate()
import spark.implicits._
// Load data
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
// Create feature vector assembler
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
// Create classifier
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// Create and fit pipeline
val pipeline = new Pipeline().setStages(Array(assembler, lr))
val model = pipeline.fit(data)
// Make predictions
val predictions = model.transform(data)
predictions.select("features", "label", "probability", "prediction").show()MLlib provides two API layers:
Modern, high-level API built on Spark DataFrames with:
Lower-level API built on Spark RDDs:
Pipeline-based machine learning with standardized interfaces for building and deploying ML workflows.
abstract class PipelineStage extends Params with Logging {
def copy(extra: ParamMap): PipelineStage
def transformSchema(schema: StructType): StructType
}
abstract class Estimator[M <: Model[M]] extends PipelineStage {
def fit(dataset: Dataset[_]): M
def fit(dataset: Dataset[_], paramMap: ParamMap): M
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[M]
}
abstract class Transformer extends PipelineStage {
def transform(dataset: Dataset[_]): DataFrame
}
class Pipeline(val uid: String) extends Estimator[PipelineModel] {
def setStages(value: Array[PipelineStage]): Pipeline
def getStages: Array[PipelineStage]
def fit(dataset: Dataset[_]): PipelineModel
}Supervised learning algorithms for predicting categorical labels including logistic regression, decision trees, random forests, gradient boosted trees, naive Bayes, neural networks, and support vector machines.
class LogisticRegression(override val uid: String) extends Classifier[Vector, LogisticRegression, LogisticRegressionModel]
with LogisticRegressionParams with DefaultParamsWritable {
def setRegParam(value: Double): LogisticRegression
def setElasticNetParam(value: Double): LogisticRegression
def setMaxIter(value: Int): LogisticRegression
def setTol(value: Double): LogisticRegression
def setFitIntercept(value: Boolean): LogisticRegression
def setThreshold(value: Double): LogisticRegression
def setThresholds(value: Array[Double]): LogisticRegression
}
class RandomForestClassifier(override val uid: String) extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel]
with RandomForestClassifierParams with DefaultParamsWritable {
def setNumTrees(value: Int): RandomForestClassifier
def setMaxDepth(value: Int): RandomForestClassifier
def setMinInstancesPerNode(value: Int): RandomForestClassifier
def setFeatureSubsetStrategy(value: String): RandomForestClassifier
}Supervised learning algorithms for predicting continuous values including linear regression, generalized linear models, decision trees, random forests, gradient boosted trees, and survival regression.
class LinearRegression(override val uid: String) extends Regressor[Vector, LinearRegression, LinearRegressionModel]
with LinearRegressionParams with DefaultParamsWritable {
def setRegParam(value: Double): LinearRegression
def setElasticNetParam(value: Double): LinearRegression
def setMaxIter(value: Int): LinearRegression
def setTol(value: Double): LinearRegression
def setFitIntercept(value: Boolean): LinearRegression
def setSolver(value: String): LinearRegression
}
class RandomForestRegressor(override val uid: String) extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel]
with RandomForestRegressorParams with DefaultParamsWritable {
def setNumTrees(value: Int): RandomForestRegressor
def setMaxDepth(value: Int): RandomForestRegressor
def setMinInstancesPerNode(value: Int): RandomForestRegressor
}Unsupervised learning algorithms for discovering data patterns including K-means, bisecting K-means, Gaussian mixture models, latent Dirichlet allocation, and power iteration clustering.
class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams with DefaultParamsWritable {
def setK(value: Int): KMeans
def setMaxIter(value: Int): KMeans
def setTol(value: Double): KMeans
def setInitMode(value: String): KMeans
def setInitSteps(value: Int): KMeans
def setSeed(value: Long): KMeans
}
class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel]
with GaussianMixtureParams with DefaultParamsWritable {
def setK(value: Int): GaussianMixture
def setMaxIter(value: Int): GaussianMixture
def setTol(value: Double): GaussianMixture
def setSeed(value: Long): GaussianMixture
}Comprehensive feature engineering including scaling, normalization, encoding, selection, extraction, and transformation for preparing data for machine learning.
class VectorAssembler(override val uid: String) extends Transformer
with HasInputCols with HasOutputCol with DefaultParamsWritable {
def setInputCols(value: Array[String]): VectorAssembler
def setOutputCol(value: String): VectorAssembler
def transform(dataset: Dataset[_]): DataFrame
}
class StandardScaler(override val uid: String) extends Estimator[StandardScalerModel]
with StandardScalerParams with DefaultParamsWritable {
def setWithMean(value: Boolean): StandardScaler
def setWithStd(value: Boolean): StandardScaler
def fit(dataset: Dataset[_]): StandardScalerModel
}Comprehensive evaluation metrics and validation techniques including cross-validation, train-validation split, and specialized evaluators for different task types.
class BinaryClassificationEvaluator(override val uid: String) extends Evaluator
with HasLabelCol with HasRawPredictionCol with DefaultParamsWritable {
def setMetricName(value: String): BinaryClassificationEvaluator
def evaluate(dataset: Dataset[_]): Double
}
class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel]
with CrossValidatorParams with MLWritable {
def setEstimator(value: Estimator[_]): CrossValidator
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
def setEvaluator(value: Evaluator): CrossValidator
def setNumFolds(value: Int): CrossValidator
}Collaborative filtering algorithms for building recommendation systems using alternating least squares matrix factorization.
class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {
def setRank(value: Int): ALS
def setMaxIter(value: Int): ALS
def setRegParam(value: Double): ALS
def setAlpha(value: Double): ALS
def setImplicitPrefs(value: Boolean): ALS
def setNonnegative(value: Boolean): ALS
}Algorithms for discovering frequent patterns and association rules in transactional data including FP-Growth and PrefixSpan.
class FPGrowth(override val uid: String) extends Estimator[FPGrowthModel]
with FPGrowthParams with DefaultParamsWritable {
def setMinSupport(value: Double): FPGrowth
def setNumPartitions(value: Int): FPGrowth
def setMinConfidence(value: Double): FPGrowth
}Distributed linear algebra operations with support for dense and sparse vectors and matrices, plus specialized distributed matrix types.
trait Vector extends Serializable {
def size: Int
def toArray: Array[Double]
def apply(i: Int): Double
def copy: Vector
def foreachActive(f: (Int, Double) => Unit): Unit
}
object Vectors {
def dense(values: Array[Double]): Vector
def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
def zeros(size: Int): Vector
def norm(vector: Vector, p: Double): Double
}Original MLlib API built on RDDs, providing direct algorithm access and distributed linear algebra operations.
// Classification
object LogisticRegressionWithLBFGS {
def train(input: RDD[LabeledPoint], numClasses: Int): LogisticRegressionModel
}
// Clustering
object KMeans {
def train(data: RDD[org.apache.spark.mllib.linalg.Vector],
k: Int, maxIterations: Int): KMeansModel
}// Core parameter types
trait Param[T] extends Serializable {
def name: String
def doc: String
def parent: String
}
class ParamMap extends Serializable {
def put[T](param: Param[T], value: T): ParamMap
def get[T](param: Param[T]): Option[T]
def apply[T](param: Param[T]): T
}
case class ParamPair[T](param: Param[T], value: T)
// Model summary types
abstract class TrainingSummary extends Serializable {
def predictions: DataFrame
def predictionCol: String
def labelCol: String
def featuresCol: String
}
// Linear algebra types (ml.linalg)
sealed trait Vector extends Serializable
case class DenseVector(values: Array[Double]) extends Vector
case class SparseVector(size: Int, indices: Array[Int], values: Array[Double]) extends Vector
sealed trait Matrix extends Serializable
case class DenseMatrix(numRows: Int, numCols: Int, values: Array[Double],
isTransposed: Boolean = false) extends Matrix
case class SparseMatrix(numRows: Int, numCols: Int, colPtrs: Array[Int],
rowIndices: Array[Int], values: Array[Double],
isTransposed: Boolean = false) extends Matrix