Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
MLlib is Spark's scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives.
Mathematical vector for representing features:
package org.apache.spark.mllib.linalg
abstract class Vector extends Serializable {
def size: Int
def toArray: Array[Double]
def apply(i: Int): Double
}Vectors Factory:
object Vectors {
def dense(values: Array[Double]): Vector
def dense(firstValue: Double, otherValues: Double*): Vector
def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
def sparse(size: Int, elements: Seq[(Int, Double)]): Vector
def zeros(size: Int): Vector
}import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Dense vector
val denseVec = Vectors.dense(1.0, 2.0, 3.0, 0.0, 0.0)
// Sparse vector (size=5, indices=[0,2], values=[1.0,3.0])
val sparseVec = Vectors.sparse(5, Array(0, 2), Array(1.0, 3.0))
// Alternative sparse creation
val sparseVec2 = Vectors.sparse(5, Seq((0, 1.0), (2, 3.0)))
// Vector operations
val size = denseVec.size // 5
val element = denseVec(2) // 3.0
val array = denseVec.toArray // Array(1.0, 2.0, 3.0, 0.0, 0.0)Mathematical matrix for linear algebra operations:
abstract class Matrix extends Serializable {
def numRows: Int
def numCols: Int
def toArray: Array[Double]
def apply(i: Int, j: Int): Double
}Matrices Factory:
object Matrices {
def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values: Array[Double]): Matrix
def eye(n: Int): Matrix
def zeros(numRows: Int, numCols: Int): Matrix
}import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Dense matrix (2x3, column-major order)
val denseMatrix = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
// Matrix:
// 1.0 3.0 5.0
// 2.0 4.0 6.0
// Identity matrix
val identity = Matrices.eye(3)
// Zero matrix
val zeros = Matrices.zeros(2, 3)Data point with label and features for supervised learning:
case class LabeledPoint(label: Double, features: Vector) {
override def toString: String = s"($label,$features)"
}import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// Create labeled points
val positive = LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0))
val negative = LabeledPoint(0.0, Vectors.dense(-1.0, -2.0, -3.0))
// For regression
val regressionPoint = LabeledPoint(3.5, Vectors.dense(1.0, 2.0))
// Create training data
val trainingData = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
))User-item rating for collaborative filtering:
case class Rating(user: Int, product: Int, rating: Double)import org.apache.spark.mllib.recommendation.Rating
// Create ratings
val ratings = sc.parallelize(Seq(
Rating(1, 101, 5.0),
Rating(1, 102, 3.0),
Rating(2, 101, 4.0),
Rating(2, 103, 2.0)
))LogisticRegressionWithSGD: Train logistic regression using stochastic gradient descent
object LogisticRegressionWithSGD {
def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LogisticRegressionModel
}
class LogisticRegressionModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {
def predict(testData: RDD[Vector]): RDD[Double]
def predict(testData: Vector): Double
def clearThreshold(): LogisticRegressionModel
def setThreshold(threshold: Double): LogisticRegressionModel
}import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// Prepare training data
val trainingData = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 2.0)),
LabeledPoint(0.0, Vectors.dense(-1.0, -2.0)),
LabeledPoint(1.0, Vectors.dense(1.5, 1.8)),
LabeledPoint(0.0, Vectors.dense(-1.5, -1.8))
))
// Train model
val model = LogisticRegressionWithSGD.train(trainingData, numIterations = 100)
// Make predictions
val testData = sc.parallelize(Seq(
Vectors.dense(1.0, 1.0),
Vectors.dense(-1.0, -1.0)
))
val predictions = model.predict(testData)
predictions.collect() // Array(1.0, 0.0)
// Single prediction
val singlePrediction = model.predict(Vectors.dense(0.5, 0.5))
// Set classification threshold
val calibratedModel = model.setThreshold(0.3)SVMWithSGD: Train SVM using stochastic gradient descent
object SVMWithSGD {
def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double): SVMModel
}
class SVMModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {
def predict(testData: RDD[Vector]): RDD[Double]
def predict(testData: Vector): Double
}import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel}
// Train SVM model
val svmModel = SVMWithSGD.train(
input = trainingData,
numIterations = 100,
stepSize = 1.0,
regParam = 0.01
)
// Make predictions
val svmPredictions = svmModel.predict(testData)NaiveBayes: Train multinomial Naive Bayes classifier
object NaiveBayes {
def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel
}
class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
def predict(testData: RDD[Vector]): RDD[Double]
def predict(testData: Vector): Double
}import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
// Training data for text classification (bag of words)
val textData = sc.parallelize(Seq(
LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 1.0)), // spam
LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 0.0)), // ham
LabeledPoint(0.0, Vectors.dense(1.0, 1.0, 0.0)), // spam
LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 1.0)) // ham
))
// Train Naive Bayes model
val nbModel = NaiveBayes.train(textData, lambda = 1.0)
// Make predictions
val textPredictions = nbModel.predict(testData)LinearRegressionWithSGD: Train linear regression using SGD
object LinearRegressionWithSGD {
def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LinearRegressionModel
}
class LinearRegressionModel(override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel with RegressionModel with Serializable {
def predict(testData: RDD[Vector]): RDD[Double]
def predict(testData: Vector): Double
}import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint}
// Regression training data
val regressionData = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0)),
LabeledPoint(2.0, Vectors.dense(2.0)),
LabeledPoint(3.0, Vectors.dense(3.0)),
LabeledPoint(4.0, Vectors.dense(4.0))
))
// Train linear regression
val lrModel = LinearRegressionWithSGD.train(
input = regressionData,
numIterations = 100,
stepSize = 0.01
)
// Make predictions
val regressionTestData = sc.parallelize(Seq(
Vectors.dense(1.5),
Vectors.dense(2.5)
))
val regressionPredictions = lrModel.predict(regressionTestData)
regressionPredictions.collect() // Array(~1.5, ~2.5)RidgeRegressionWithSGD: Ridge regression with L2 regularization
object RidgeRegressionWithSGD {
def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): RidgeRegressionModel
}LassoWithSGD: Lasso regression with L1 regularization
object LassoWithSGD {
def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): LassoModel
}import org.apache.spark.mllib.regression.{RidgeRegressionWithSGD, LassoWithSGD}
// Ridge regression with regularization
val ridgeModel = RidgeRegressionWithSGD.train(
input = regressionData,
numIterations = 100,
stepSize = 0.01,
regParam = 0.1
)
// Lasso regression with L1 regularization
val lassoModel = LassoWithSGD.train(
input = regressionData,
numIterations = 100,
stepSize = 0.01,
regParam = 0.1
)KMeans: K-means clustering algorithm
class KMeans private (private var k: Int, private var maxIterations: Int, private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double) extends Serializable {
def setK(k: Int): KMeans
def setMaxIterations(maxIterations: Int): KMeans
def setRuns(runs: Int): KMeans
def setInitializationMode(initializationMode: String): KMeans
def setInitializationSteps(initializationSteps: Int): KMeans
def setEpsilon(epsilon: Double): KMeans
def run(data: RDD[Vector]): KMeansModel
}
object KMeans {
def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, initializationMode: String): KMeansModel
}
class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {
def predict(point: Vector): Int
def predict(points: RDD[Vector]): RDD[Int]
def computeCost(data: RDD[Vector]): Double
}import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
// Prepare clustering data
val clusteringData = sc.parallelize(Seq(
Vectors.dense(1.0, 1.0),
Vectors.dense(1.0, 2.0),
Vectors.dense(2.0, 1.0),
Vectors.dense(9.0, 8.0),
Vectors.dense(8.0, 9.0),
Vectors.dense(9.0, 9.0)
))
// Train K-means model
val kmeansModel = KMeans.train(
data = clusteringData,
k = 2, // Number of clusters
maxIterations = 20
)
// Get cluster centers
val centers = kmeansModel.clusterCenters
centers.foreach(println)
// Make predictions
val clusterPredictions = kmeansModel.predict(clusteringData)
clusterPredictions.collect() // Array(0, 0, 0, 1, 1, 1)
// Compute cost (sum of squared distances to centroids)
val cost = kmeansModel.computeCost(clusteringData)
// Advanced K-means with custom parameters
val advancedKMeans = new KMeans()
.setK(3)
.setMaxIterations(50)
.setRuns(10) // Multiple runs for better results
.setInitializationMode("k-means||")
.setEpsilon(1e-4)
val advancedModel = advancedKMeans.run(clusteringData)ALS: Matrix factorization for collaborative filtering
object ALS {
def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double): MatrixFactorizationModel
}
class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])], val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
def predict(user: Int, product: Int): Double
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
def recommendProducts(user: Int, num: Int): Array[Rating]
def recommendUsers(product: Int, num: Int): Array[Rating]
}import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
// Create ratings data
val ratings = sc.parallelize(Seq(
Rating(1, 1, 5.0),
Rating(1, 2, 1.0),
Rating(1, 3, 5.0),
Rating(2, 1, 1.0),
Rating(2, 2, 5.0),
Rating(2, 3, 1.0),
Rating(3, 1, 5.0),
Rating(3, 2, 1.0),
Rating(3, 3, 5.0)
))
// Train collaborative filtering model
val alsModel = ALS.train(
ratings = ratings,
rank = 10, // Number of latent factors
iterations = 10, // Number of iterations
lambda = 0.01 // Regularization parameter
)
// Predict rating for user-item pair
val prediction = alsModel.predict(1, 2)
// Predict ratings for multiple user-item pairs
val userProducts = sc.parallelize(Seq((1, 1), (2, 2), (3, 3)))
val predictions = alsModel.predict(userProducts)
// Recommend products for a user
val recommendations = alsModel.recommendProducts(1, 5)
recommendations.foreach { rating =>
println(s"Product ${rating.product}: ${rating.rating}")
}
// Recommend users for a product
val userRecommendations = alsModel.recommendUsers(1, 3)
// For implicit feedback data
val implicitModel = ALS.trainImplicit(
ratings = ratings,
rank = 10,
iterations = 10,
lambda = 0.01,
alpha = 0.1 // Confidence parameter
)Statistics: Statistical functions for RDDs
object Statistics {
def colStats(rdd: RDD[Vector]): MultivariateStatisticalSummary
def corr(x: RDD[Double], y: RDD[Double], method: String = "pearson"): Double
def corr(X: RDD[Vector], method: String = "pearson"): Matrix
def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult
def chiSqTest(observed: Matrix): ChiSqTestResult
def chiSqTest(observed: RDD[LabeledPoint]): Array[ChiSqTestResult]
}
trait MultivariateStatisticalSummary {
def mean: Vector
def variance: Vector
def count: Long
def numNonzeros: Vector
def max: Vector
def min: Vector
}import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrix}
// Sample data for statistics
val observations = sc.parallelize(Seq(
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(4.0, 5.0, 6.0),
Vectors.dense(7.0, 8.0, 9.0)
))
// Compute column statistics
val summary = Statistics.colStats(observations)
println(s"Mean: ${summary.mean}") // [4.0, 5.0, 6.0]
println(s"Variance: ${summary.variance}") // [9.0, 9.0, 9.0]
println(s"Count: ${summary.count}") // 3
println(s"Max: ${summary.max}") // [7.0, 8.0, 9.0]
println(s"Min: ${summary.min}") // [1.0, 2.0, 3.0]
// Correlation between two RDD[Double]
val x = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))
val y = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))
val correlation = Statistics.corr(x, y, "pearson") // 1.0 (perfect positive correlation)
// Correlation matrix for RDD[Vector]
val correlationMatrix = Statistics.corr(observations, "pearson")
// Chi-squared test
val observed = Vectors.dense(1.0, 2.0, 3.0)
val expected = Vectors.dense(1.5, 1.5, 3.0)
val chiSqResult = Statistics.chiSqTest(observed, expected)
println(s"Chi-squared statistic: ${chiSqResult.statistic}")
println(s"P-value: ${chiSqResult.pValue}")
println(s"Degrees of freedom: ${chiSqResult.degreesOfFreedom}")import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
// Predictions and labels (prediction score, true label)
val scoreAndLabels = sc.parallelize(Seq(
(0.9, 1.0), (0.8, 1.0), (0.7, 1.0),
(0.6, 0.0), (0.5, 1.0), (0.4, 0.0),
(0.3, 0.0), (0.2, 0.0), (0.1, 0.0)
))
val binaryMetrics = new BinaryClassificationMetrics(scoreAndLabels)
// Area under ROC curve
val areaUnderROC = binaryMetrics.areaUnderROC()
println(s"Area under ROC: $areaUnderROC")
// Area under Precision-Recall curve
val areaUnderPR = binaryMetrics.areaUnderPR()
println(s"Area under PR: $areaUnderPR")import org.apache.spark.mllib.evaluation.MulticlassMetrics
// Predictions and labels (predicted class, true class)
val predictionAndLabels = sc.parallelize(Seq(
(0.0, 0.0), (1.0, 1.0), (2.0, 2.0),
(0.0, 0.0), (1.0, 2.0), (2.0, 1.0)
))
val multiMetrics = new MulticlassMetrics(predictionAndLabels)
// Overall statistics
val accuracy = multiMetrics.accuracy
val weightedPrecision = multiMetrics.weightedPrecision
val weightedRecall = multiMetrics.weightedRecall
val weightedFMeasure = multiMetrics.weightedFMeasure
// Per-class metrics
val labels = multiMetrics.labels
labels.foreach { label =>
println(s"Class $label precision: ${multiMetrics.precision(label)}")
println(s"Class $label recall: ${multiMetrics.recall(label)}")
println(s"Class $label F1-score: ${multiMetrics.fMeasure(label)}")
}
// Confusion matrix
val confusionMatrix = multiMetrics.confusionMatrix
println(s"Confusion matrix:\n$confusionMatrix")Complete machine learning pipeline:
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// 1. Load and prepare data
val rawData = sc.textFile("data.csv")
val parsedData = rawData.map { line =>
val parts = line.split(',')
val label = parts(0).toDouble
val features = Vectors.dense(parts.tail.map(_.toDouble))
LabeledPoint(label, features)
}
// 2. Split data
val Array(training, test) = parsedData.randomSplit(Array(0.7, 0.3), seed = 11L)
training.cache()
// 3. Train model
val model = LogisticRegressionWithSGD.train(training, numIterations = 100)
// 4. Make predictions
val predictionAndLabel = test.map { point =>
val prediction = model.predict(point.features)
(prediction, point.label)
}
// 5. Evaluate model
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
val auROC = metrics.areaUnderROC()
println(s"Area under ROC: $auROC")
// 6. Save model
model.save(sc, "myModel")
// 7. Load model later
val loadedModel = LogisticRegressionModel.load(sc, "myModel")This comprehensive guide covers all the essential machine learning capabilities available in Spark's MLlib for building scalable ML applications.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark