Apache Spark MLlib machine learning library providing scalable algorithms for classification, regression, clustering, and collaborative filtering.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-mllib-2-13@3.5.0Apache Spark MLlib is a comprehensive machine learning library designed for large-scale data processing and analysis. It provides a unified API for implementing machine learning algorithms including classification, regression, clustering, and collaborative filtering for recommendation systems. MLlib supports end-to-end machine learning workflows through its Pipeline API and is built on Spark's distributed computing framework for scalable production deployments.
build.sbt: libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.5.6"<dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.13</artifactId><version>3.5.6</version></dependency>import org.apache.spark.ml._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.linalg._import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.Pipeline
// Initialize Spark session
val spark = SparkSession.builder()
.appName("MLlib Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Load data
val data = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/data.csv")
// Feature preparation
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val indexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("labelIndex")
// Create classifier
val lr = new LogisticRegression()
.setLabelCol("labelIndex")
.setFeaturesCol("features")
// Create pipeline
val pipeline = new Pipeline()
.setStages(Array(indexer, assembler, lr))
// Fit model
val model = pipeline.fit(data)
// Make predictions
val predictions = model.transform(data)
predictions.select("prediction", "labelIndex", "features").show()Apache Spark MLlib is built around several key abstractions:
Estimator, Transformer, and Model abstractionsThe design follows these key patterns:
Estimator[M] with a fit() method that returns a Model[M]Transformer with a transform() methodPipeline for complex workflowsSupervised learning algorithms for predicting categorical labels including logistic regression, decision trees, random forests, gradient-boosted trees, and neural networks.
// Core classification interface
abstract class Classifier[
FeaturesType,
Learner <: Classifier[FeaturesType, Learner, M],
M <: ClassificationModel[FeaturesType, M]
] extends Predictor[FeaturesType, Learner, M]
// Main algorithms
class LogisticRegression extends ProbabilisticClassifier[Vector, LogisticRegression, LogisticRegressionModel]
class DecisionTreeClassifier extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel]
class RandomForestClassifier extends ProbabilisticClassifier[Vector, RandomForestClassifier, RandomForestClassificationModel]
class GBTClassifier extends Classifier[Vector, GBTClassifier, GBTClassificationModel]Supervised learning algorithms for predicting continuous values including linear regression, decision trees, random forests, and generalized linear models.
// Core regression interface
abstract class Regressor[
FeaturesType,
Learner <: Regressor[FeaturesType, Learner, M],
M <: RegressionModel[FeaturesType, M]
] extends Predictor[FeaturesType, Learner, M]
// Main algorithms
class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel]
class DecisionTreeRegressor extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel]
class RandomForestRegressor extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel]
class GBTRegressor extends Regressor[Vector, GBTRegressor, GBTRegressionModel]Unsupervised learning algorithms for discovering patterns and groupings in data including K-means, Gaussian mixture models, and topic modeling.
// Main clustering algorithms
class KMeans extends Estimator[KMeansModel] with KMeansParams
class GaussianMixture extends Estimator[GaussianMixtureModel] with GaussianMixtureParams
class BisectingKMeans extends Estimator[BisectingKMeansModel] with BisectingKMeansParams
class LDA extends Estimator[LDAModel] with LDAParamsComprehensive feature extraction, transformation, selection, and engineering capabilities including text processing, scaling, dimensionality reduction, and categorical encoding.
// Feature transformation base classes
abstract class Transformer extends PipelineStage
abstract class Estimator[M <: Model[M]] extends PipelineStage
// Key feature transformers
class VectorAssembler extends Transformer
class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerParams
class StringIndexer extends Estimator[StringIndexerModel] with StringIndexerParams
class OneHotEncoder extends Estimator[OneHotEncoderModel] with OneHotEncoderParams
class PCA extends Estimator[PCAModel] with PCAParamsTools for assessing model performance including classification metrics, regression metrics, clustering evaluation, and cross-validation.
// Evaluation base class
abstract class Evaluator extends Params
// Main evaluators
class BinaryClassificationEvaluator extends Evaluator with HasLabelCol with HasFeaturesCol
class MulticlassClassificationEvaluator extends Evaluator with HasLabelCol with HasPredictionCol
class RegressionEvaluator extends Evaluator with HasLabelCol with HasPredictionCol
class ClusteringEvaluator extends Evaluator with HasFeaturesCol with HasPredictionColFramework for building complex ML workflows and automated hyperparameter optimization including pipelines, cross-validation, and parameter grids.
// Pipeline framework
class Pipeline extends Estimator[PipelineModel]
class PipelineModel extends Model[PipelineModel] with Transformer
// Hyperparameter tuning
class CrossValidator extends Estimator[CrossValidatorModel]
class TrainValidationSplit extends Estimator[TrainValidationSplitModel]
class ParamGridBuilderHigh-performance vector and matrix operations with support for dense and sparse representations, distributed matrices, and common linear algebra operations.
// Core linear algebra types
sealed trait Vector extends Serializable
class DenseVector(val values: Array[Double]) extends Vector
class SparseVector(val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector
sealed trait Matrix extends Serializable
class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix
class SparseMatrix(val numRows: Int, val numCols: Int, val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double]) extends Matrix
// Factory objects
object Vectors
object MatricesMatrix factorization-based recommendation systems using Alternating Least Squares for building recommendation engines from user-item interaction data.
class ALS extends Estimator[ALSModel] with ALSParams
class ALSModel extends Model[ALSModel] with ALSModelParams {
def recommendForAllUsers(numItems: Int): DataFrame
def recommendForAllItems(numUsers: Int): DataFrame
def userFactors: DataFrame
def itemFactors: DataFrame
}Data mining algorithms for discovering frequent patterns, itemsets, and sequential patterns including FP-Growth for market basket analysis and PrefixSpan for sequential pattern discovery.
// FP-Growth for frequent itemset mining
class FPGrowth extends Estimator[FPGrowthModel] with FPGrowthParams
class FPGrowthModel extends Model[FPGrowthModel] with FPGrowthParams {
def transform(dataset: Dataset[_]): DataFrame
def freqItemsets: DataFrame
def associationRules: DataFrame
}
// PrefixSpan for sequential pattern mining
class PrefixSpan extends Estimator[PrefixSpanModel] with PrefixSpanParams
class PrefixSpanModel extends Model[PrefixSpanModel] with PrefixSpanParams {
def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame
}Statistical analysis and hypothesis testing including correlation analysis, summary statistics, Chi-square tests, ANOVA, and kernel density estimation.
object Statistics {
def corr(dataset: Dataset[_], columns: Seq[String], method: String): Matrix
def chiSquareTest(dataset: Dataset[_], featuresCol: String, labelCol: String): DataFrame
def anovaTest(dataset: Dataset[_], featuresCol: String, labelCol: String): DataFrame
}
object Summarizer {
def metrics(metrics: String*): SummaryBuilder
def mean(col: Column): Column
def variance(col: Column): Column
}// Pipeline stage base types
trait PipelineStage extends Params with Logging
trait Estimator[M <: Model[M]] extends PipelineStage {
def fit(dataset: Dataset[_]): M
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]
}
trait Transformer extends PipelineStage {
def transform(dataset: Dataset[_]): DataFrame
def transformSchema(schema: StructType): StructType
}
trait Model[M <: Model[M]] extends Transformer {
def parent: Estimator[M]
def hasParent: Boolean
}
// Parameter system
trait Params {
def copy(extra: ParamMap): this.type
def extractParamMap(): ParamMap
def explainParams(): String
}
case class ParamMap(map: mutable.Map[Param[Any], Any])
class Param[T](val parent: String, val name: String)
case class ParamPair[T](param: Param[T], value: T)
// Common parameter traits
trait HasInputCol extends Params
trait HasOutputCol extends Params
trait HasLabelCol extends Params
trait HasFeaturesCol extends Params
trait HasPredictionCol extends Params