CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

mllib.mddocs/

Machine Learning Library (MLlib)

MLlib is Spark's scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives.

Core Data Types

Vector

Mathematical vector for representing features:

package org.apache.spark.mllib.linalg

abstract class Vector extends Serializable {
  def size: Int
  def toArray: Array[Double]
  def apply(i: Int): Double
}

Vectors Factory:

object Vectors {
  def dense(values: Array[Double]): Vector
  def dense(firstValue: Double, otherValues: Double*): Vector
  def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
  def sparse(size: Int, elements: Seq[(Int, Double)]): Vector
  def zeros(size: Int): Vector
}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Dense vector
val denseVec = Vectors.dense(1.0, 2.0, 3.0, 0.0, 0.0)

// Sparse vector (size=5, indices=[0,2], values=[1.0,3.0])
val sparseVec = Vectors.sparse(5, Array(0, 2), Array(1.0, 3.0))

// Alternative sparse creation
val sparseVec2 = Vectors.sparse(5, Seq((0, 1.0), (2, 3.0)))

// Vector operations
val size = denseVec.size           // 5
val element = denseVec(2)          // 3.0  
val array = denseVec.toArray       // Array(1.0, 2.0, 3.0, 0.0, 0.0)

Matrix

Mathematical matrix for linear algebra operations:

abstract class Matrix extends Serializable {
  def numRows: Int
  def numCols: Int
  def toArray: Array[Double]
  def apply(i: Int, j: Int): Double
}

Matrices Factory:

object Matrices {
  def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix
  def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values: Array[Double]): Matrix
  def eye(n: Int): Matrix
  def zeros(numRows: Int, numCols: Int): Matrix
}
import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Dense matrix (2x3, column-major order)
val denseMatrix = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
// Matrix:
// 1.0  3.0  5.0
// 2.0  4.0  6.0

// Identity matrix
val identity = Matrices.eye(3)

// Zero matrix
val zeros = Matrices.zeros(2, 3)

LabeledPoint

Data point with label and features for supervised learning:

case class LabeledPoint(label: Double, features: Vector) {
  override def toString: String = s"($label,$features)"
}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// Create labeled points
val positive = LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0))
val negative = LabeledPoint(0.0, Vectors.dense(-1.0, -2.0, -3.0))

// For regression
val regressionPoint = LabeledPoint(3.5, Vectors.dense(1.0, 2.0))

// Create training data
val trainingData = sc.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
))

Rating

User-item rating for collaborative filtering:

case class Rating(user: Int, product: Int, rating: Double)
import org.apache.spark.mllib.recommendation.Rating

// Create ratings
val ratings = sc.parallelize(Seq(
  Rating(1, 101, 5.0),
  Rating(1, 102, 3.0),
  Rating(2, 101, 4.0),
  Rating(2, 103, 2.0)
))

Classification

Logistic Regression

LogisticRegressionWithSGD: Train logistic regression using stochastic gradient descent

object LogisticRegressionWithSGD {
  def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LogisticRegressionModel
}

class LogisticRegressionModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {
  def predict(testData: RDD[Vector]): RDD[Double]
  def predict(testData: Vector): Double
  def clearThreshold(): LogisticRegressionModel
  def setThreshold(threshold: Double): LogisticRegressionModel
}
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// Prepare training data
val trainingData = sc.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(1.0, 2.0)),
  LabeledPoint(0.0, Vectors.dense(-1.0, -2.0)),
  LabeledPoint(1.0, Vectors.dense(1.5, 1.8)),
  LabeledPoint(0.0, Vectors.dense(-1.5, -1.8))
))

// Train model
val model = LogisticRegressionWithSGD.train(trainingData, numIterations = 100)

// Make predictions
val testData = sc.parallelize(Seq(
  Vectors.dense(1.0, 1.0),
  Vectors.dense(-1.0, -1.0)
))

val predictions = model.predict(testData)
predictions.collect()  // Array(1.0, 0.0)

// Single prediction
val singlePrediction = model.predict(Vectors.dense(0.5, 0.5))

// Set classification threshold
val calibratedModel = model.setThreshold(0.3)

Support Vector Machines

SVMWithSGD: Train SVM using stochastic gradient descent

object SVMWithSGD {
  def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double): SVMModel
}

class SVMModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {
  def predict(testData: RDD[Vector]): RDD[Double]
  def predict(testData: Vector): Double
}
import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel}

// Train SVM model
val svmModel = SVMWithSGD.train(
  input = trainingData,
  numIterations = 100,
  stepSize = 1.0,
  regParam = 0.01
)

// Make predictions
val svmPredictions = svmModel.predict(testData)

Naive Bayes

NaiveBayes: Train multinomial Naive Bayes classifier

object NaiveBayes {
  def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel
}

class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
  def predict(testData: RDD[Vector]): RDD[Double]
  def predict(testData: Vector): Double
}
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}

// Training data for text classification (bag of words)
val textData = sc.parallelize(Seq(
  LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 1.0)), // spam
  LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 0.0)), // ham
  LabeledPoint(0.0, Vectors.dense(1.0, 1.0, 0.0)), // spam
  LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 1.0))  // ham
))

// Train Naive Bayes model
val nbModel = NaiveBayes.train(textData, lambda = 1.0)

// Make predictions
val textPredictions = nbModel.predict(testData)

Regression

Linear Regression

LinearRegressionWithSGD: Train linear regression using SGD

object LinearRegressionWithSGD {
  def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LinearRegressionModel
}

class LinearRegressionModel(override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel with RegressionModel with Serializable {
  def predict(testData: RDD[Vector]): RDD[Double]
  def predict(testData: Vector): Double
}
import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint}

// Regression training data
val regressionData = sc.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(1.0)),
  LabeledPoint(2.0, Vectors.dense(2.0)),
  LabeledPoint(3.0, Vectors.dense(3.0)),
  LabeledPoint(4.0, Vectors.dense(4.0))
))

// Train linear regression
val lrModel = LinearRegressionWithSGD.train(
  input = regressionData,
  numIterations = 100,
  stepSize = 0.01
)

// Make predictions
val regressionTestData = sc.parallelize(Seq(
  Vectors.dense(1.5),
  Vectors.dense(2.5)
))

val regressionPredictions = lrModel.predict(regressionTestData)
regressionPredictions.collect()  // Array(~1.5, ~2.5)

Ridge Regression

RidgeRegressionWithSGD: Ridge regression with L2 regularization

object RidgeRegressionWithSGD {
  def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): RidgeRegressionModel
}

Lasso Regression

LassoWithSGD: Lasso regression with L1 regularization

object LassoWithSGD {
  def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): LassoModel
}
import org.apache.spark.mllib.regression.{RidgeRegressionWithSGD, LassoWithSGD}

// Ridge regression with regularization
val ridgeModel = RidgeRegressionWithSGD.train(
  input = regressionData,
  numIterations = 100,
  stepSize = 0.01,
  regParam = 0.1
)

// Lasso regression with L1 regularization
val lassoModel = LassoWithSGD.train(
  input = regressionData,
  numIterations = 100,
  stepSize = 0.01,
  regParam = 0.1
)

Clustering

K-Means

KMeans: K-means clustering algorithm

class KMeans private (private var k: Int, private var maxIterations: Int, private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double) extends Serializable {
  def setK(k: Int): KMeans
  def setMaxIterations(maxIterations: Int): KMeans
  def setRuns(runs: Int): KMeans
  def setInitializationMode(initializationMode: String): KMeans
  def setInitializationSteps(initializationSteps: Int): KMeans
  def setEpsilon(epsilon: Double): KMeans
  def run(data: RDD[Vector]): KMeansModel
}

object KMeans {
  def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel
  def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel
  def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, initializationMode: String): KMeansModel
}

class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {
  def predict(point: Vector): Int
  def predict(points: RDD[Vector]): RDD[Int]
  def computeCost(data: RDD[Vector]): Double
}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

// Prepare clustering data
val clusteringData = sc.parallelize(Seq(
  Vectors.dense(1.0, 1.0),
  Vectors.dense(1.0, 2.0),
  Vectors.dense(2.0, 1.0),
  Vectors.dense(9.0, 8.0),
  Vectors.dense(8.0, 9.0),
  Vectors.dense(9.0, 9.0)
))

// Train K-means model
val kmeansModel = KMeans.train(
  data = clusteringData,
  k = 2,              // Number of clusters
  maxIterations = 20
)

// Get cluster centers
val centers = kmeansModel.clusterCenters
centers.foreach(println)

// Make predictions
val clusterPredictions = kmeansModel.predict(clusteringData)
clusterPredictions.collect()  // Array(0, 0, 0, 1, 1, 1)

// Compute cost (sum of squared distances to centroids)
val cost = kmeansModel.computeCost(clusteringData)

// Advanced K-means with custom parameters
val advancedKMeans = new KMeans()
  .setK(3)
  .setMaxIterations(50)
  .setRuns(10)                    // Multiple runs for better results
  .setInitializationMode("k-means||")
  .setEpsilon(1e-4)

val advancedModel = advancedKMeans.run(clusteringData)

Collaborative Filtering

Alternating Least Squares (ALS)

ALS: Matrix factorization for collaborative filtering

object ALS {
  def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
  def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
  def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel
  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double): MatrixFactorizationModel
}

class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])], val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
  def predict(user: Int, product: Int): Double
  def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
  def recommendProducts(user: Int, num: Int): Array[Rating]
  def recommendUsers(product: Int, num: Int): Array[Rating]
}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

// Create ratings data
val ratings = sc.parallelize(Seq(
  Rating(1, 1, 5.0),
  Rating(1, 2, 1.0),
  Rating(1, 3, 5.0),
  Rating(2, 1, 1.0),
  Rating(2, 2, 5.0),
  Rating(2, 3, 1.0),
  Rating(3, 1, 5.0),
  Rating(3, 2, 1.0),
  Rating(3, 3, 5.0)
))

// Train collaborative filtering model
val alsModel = ALS.train(
  ratings = ratings,
  rank = 10,        // Number of latent factors
  iterations = 10,   // Number of iterations
  lambda = 0.01     // Regularization parameter
)

// Predict rating for user-item pair
val prediction = alsModel.predict(1, 2)

// Predict ratings for multiple user-item pairs
val userProducts = sc.parallelize(Seq((1, 1), (2, 2), (3, 3)))
val predictions = alsModel.predict(userProducts)

// Recommend products for a user
val recommendations = alsModel.recommendProducts(1, 5)
recommendations.foreach { rating =>
  println(s"Product ${rating.product}: ${rating.rating}")
}

// Recommend users for a product
val userRecommendations = alsModel.recommendUsers(1, 3)

// For implicit feedback data
val implicitModel = ALS.trainImplicit(
  ratings = ratings,
  rank = 10,
  iterations = 10,
  lambda = 0.01,
  alpha = 0.1      // Confidence parameter
)

Statistics

Summary Statistics

Statistics: Statistical functions for RDDs

object Statistics {
  def colStats(rdd: RDD[Vector]): MultivariateStatisticalSummary
  def corr(x: RDD[Double], y: RDD[Double], method: String = "pearson"): Double
  def corr(X: RDD[Vector], method: String = "pearson"): Matrix
  def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult
  def chiSqTest(observed: Matrix): ChiSqTestResult
  def chiSqTest(observed: RDD[LabeledPoint]): Array[ChiSqTestResult]
}

trait MultivariateStatisticalSummary {
  def mean: Vector
  def variance: Vector
  def count: Long  
  def numNonzeros: Vector
  def max: Vector
  def min: Vector
}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrix}

// Sample data for statistics
val observations = sc.parallelize(Seq(
  Vectors.dense(1.0, 2.0, 3.0),
  Vectors.dense(4.0, 5.0, 6.0),
  Vectors.dense(7.0, 8.0, 9.0)
))

// Compute column statistics
val summary = Statistics.colStats(observations)
println(s"Mean: ${summary.mean}")           // [4.0, 5.0, 6.0]
println(s"Variance: ${summary.variance}")   // [9.0, 9.0, 9.0]
println(s"Count: ${summary.count}")         // 3
println(s"Max: ${summary.max}")             // [7.0, 8.0, 9.0]
println(s"Min: ${summary.min}")             // [1.0, 2.0, 3.0]

// Correlation between two RDD[Double]
val x = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))
val y = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))
val correlation = Statistics.corr(x, y, "pearson")  // 1.0 (perfect positive correlation)

// Correlation matrix for RDD[Vector]
val correlationMatrix = Statistics.corr(observations, "pearson")

// Chi-squared test
val observed = Vectors.dense(1.0, 2.0, 3.0)
val expected = Vectors.dense(1.5, 1.5, 3.0)
val chiSqResult = Statistics.chiSqTest(observed, expected)

println(s"Chi-squared statistic: ${chiSqResult.statistic}")
println(s"P-value: ${chiSqResult.pValue}")
println(s"Degrees of freedom: ${chiSqResult.degreesOfFreedom}")

Model Evaluation

Binary Classification Metrics

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

// Predictions and labels (prediction score, true label)
val scoreAndLabels = sc.parallelize(Seq(
  (0.9, 1.0), (0.8, 1.0), (0.7, 1.0),
  (0.6, 0.0), (0.5, 1.0), (0.4, 0.0),
  (0.3, 0.0), (0.2, 0.0), (0.1, 0.0)
))

val binaryMetrics = new BinaryClassificationMetrics(scoreAndLabels)

// Area under ROC curve
val areaUnderROC = binaryMetrics.areaUnderROC()
println(s"Area under ROC: $areaUnderROC")

// Area under Precision-Recall curve
val areaUnderPR = binaryMetrics.areaUnderPR()
println(s"Area under PR: $areaUnderPR")

Multi-class Classification Metrics

import org.apache.spark.mllib.evaluation.MulticlassMetrics

// Predictions and labels (predicted class, true class)
val predictionAndLabels = sc.parallelize(Seq(
  (0.0, 0.0), (1.0, 1.0), (2.0, 2.0),
  (0.0, 0.0), (1.0, 2.0), (2.0, 1.0)
))

val multiMetrics = new MulticlassMetrics(predictionAndLabels)

// Overall statistics
val accuracy = multiMetrics.accuracy
val weightedPrecision = multiMetrics.weightedPrecision  
val weightedRecall = multiMetrics.weightedRecall
val weightedFMeasure = multiMetrics.weightedFMeasure

// Per-class metrics
val labels = multiMetrics.labels
labels.foreach { label =>
  println(s"Class $label precision: ${multiMetrics.precision(label)}")
  println(s"Class $label recall: ${multiMetrics.recall(label)}")
  println(s"Class $label F1-score: ${multiMetrics.fMeasure(label)}")
}

// Confusion matrix
val confusionMatrix = multiMetrics.confusionMatrix
println(s"Confusion matrix:\n$confusionMatrix")

Pipeline Example

Complete machine learning pipeline:

import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// 1. Load and prepare data
val rawData = sc.textFile("data.csv")
val parsedData = rawData.map { line =>
  val parts = line.split(',')
  val label = parts(0).toDouble
  val features = Vectors.dense(parts.tail.map(_.toDouble))
  LabeledPoint(label, features)
}

// 2. Split data
val Array(training, test) = parsedData.randomSplit(Array(0.7, 0.3), seed = 11L)
training.cache()

// 3. Train model
val model = LogisticRegressionWithSGD.train(training, numIterations = 100)

// 4. Make predictions
val predictionAndLabel = test.map { point =>
  val prediction = model.predict(point.features)
  (prediction, point.label)
}

// 5. Evaluate model
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
val auROC = metrics.areaUnderROC()

println(s"Area under ROC: $auROC")

// 6. Save model
model.save(sc, "myModel")

// 7. Load model later
val loadedModel = LogisticRegressionModel.load(sc, "myModel")

This comprehensive guide covers all the essential machine learning capabilities available in Spark's MLlib for building scalable ML applications.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json