Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
mllib.md docs/
1# Machine Learning Library (MLlib)23MLlib is Spark's scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives.45## Core Data Types67### Vector89Mathematical vector for representing features:1011```scala { .api }12package org.apache.spark.mllib.linalg1314abstract class Vector extends Serializable {15def size: Int16def toArray: Array[Double]17def apply(i: Int): Double18}19```2021**Vectors Factory**:22```scala { .api }23object Vectors {24def dense(values: Array[Double]): Vector25def dense(firstValue: Double, otherValues: Double*): Vector26def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector27def sparse(size: Int, elements: Seq[(Int, Double)]): Vector28def zeros(size: Int): Vector29}30```3132```scala33import org.apache.spark.mllib.linalg.{Vector, Vectors}3435// Dense vector36val denseVec = Vectors.dense(1.0, 2.0, 3.0, 0.0, 0.0)3738// Sparse vector (size=5, indices=[0,2], values=[1.0,3.0])39val sparseVec = Vectors.sparse(5, Array(0, 2), Array(1.0, 3.0))4041// Alternative sparse creation42val sparseVec2 = Vectors.sparse(5, Seq((0, 1.0), (2, 3.0)))4344// Vector operations45val size = denseVec.size // 546val element = denseVec(2) // 3.047val array = denseVec.toArray // Array(1.0, 2.0, 3.0, 0.0, 0.0)48```4950### Matrix5152Mathematical matrix for linear algebra operations:5354```scala { .api }55abstract class Matrix extends Serializable {56def numRows: Int57def numCols: Int58def toArray: Array[Double]59def apply(i: Int, j: Int): Double60}61```6263**Matrices Factory**:64```scala { .api }65object Matrices {66def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix67def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values: Array[Double]): Matrix68def eye(n: Int): Matrix69def zeros(numRows: Int, numCols: Int): Matrix70}71```7273```scala74import org.apache.spark.mllib.linalg.{Matrix, Matrices}7576// Dense matrix (2x3, column-major order)77val denseMatrix = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))78// Matrix:79// 1.0 3.0 5.080// 2.0 4.0 6.08182// Identity matrix83val identity = Matrices.eye(3)8485// Zero matrix86val zeros = Matrices.zeros(2, 3)87```8889### LabeledPoint9091Data point with label and features for supervised learning:9293```scala { .api }94case class LabeledPoint(label: Double, features: Vector) {95override def toString: String = s"($label,$features)"96}97```9899```scala100import org.apache.spark.mllib.regression.LabeledPoint101import org.apache.spark.mllib.linalg.Vectors102103// Create labeled points104val positive = LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0))105val negative = LabeledPoint(0.0, Vectors.dense(-1.0, -2.0, -3.0))106107// For regression108val regressionPoint = LabeledPoint(3.5, Vectors.dense(1.0, 2.0))109110// Create training data111val trainingData = sc.parallelize(Seq(112LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),113LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),114LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),115LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))116))117```118119### Rating120121User-item rating for collaborative filtering:122123```scala { .api }124case class Rating(user: Int, product: Int, rating: Double)125```126127```scala128import org.apache.spark.mllib.recommendation.Rating129130// Create ratings131val ratings = sc.parallelize(Seq(132Rating(1, 101, 5.0),133Rating(1, 102, 3.0),134Rating(2, 101, 4.0),135Rating(2, 103, 2.0)136))137```138139## Classification140141### Logistic Regression142143**LogisticRegressionWithSGD**: Train logistic regression using stochastic gradient descent144```scala { .api }145object LogisticRegressionWithSGD {146def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel147def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel148def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LogisticRegressionModel149}150151class LogisticRegressionModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {152def predict(testData: RDD[Vector]): RDD[Double]153def predict(testData: Vector): Double154def clearThreshold(): LogisticRegressionModel155def setThreshold(threshold: Double): LogisticRegressionModel156}157```158159```scala160import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}161import org.apache.spark.mllib.regression.LabeledPoint162import org.apache.spark.mllib.linalg.Vectors163164// Prepare training data165val trainingData = sc.parallelize(Seq(166LabeledPoint(1.0, Vectors.dense(1.0, 2.0)),167LabeledPoint(0.0, Vectors.dense(-1.0, -2.0)),168LabeledPoint(1.0, Vectors.dense(1.5, 1.8)),169LabeledPoint(0.0, Vectors.dense(-1.5, -1.8))170))171172// Train model173val model = LogisticRegressionWithSGD.train(trainingData, numIterations = 100)174175// Make predictions176val testData = sc.parallelize(Seq(177Vectors.dense(1.0, 1.0),178Vectors.dense(-1.0, -1.0)179))180181val predictions = model.predict(testData)182predictions.collect() // Array(1.0, 0.0)183184// Single prediction185val singlePrediction = model.predict(Vectors.dense(0.5, 0.5))186187// Set classification threshold188val calibratedModel = model.setThreshold(0.3)189```190191### Support Vector Machines192193**SVMWithSGD**: Train SVM using stochastic gradient descent194```scala { .api }195object SVMWithSGD {196def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel197def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): SVMModel198def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double): SVMModel199}200201class SVMModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {202def predict(testData: RDD[Vector]): RDD[Double]203def predict(testData: Vector): Double204}205```206207```scala208import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel}209210// Train SVM model211val svmModel = SVMWithSGD.train(212input = trainingData,213numIterations = 100,214stepSize = 1.0,215regParam = 0.01216)217218// Make predictions219val svmPredictions = svmModel.predict(testData)220```221222### Naive Bayes223224**NaiveBayes**: Train multinomial Naive Bayes classifier225```scala { .api }226object NaiveBayes {227def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel228}229230class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {231def predict(testData: RDD[Vector]): RDD[Double]232def predict(testData: Vector): Double233}234```235236```scala237import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}238239// Training data for text classification (bag of words)240val textData = sc.parallelize(Seq(241LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 1.0)), // spam242LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 0.0)), // ham243LabeledPoint(0.0, Vectors.dense(1.0, 1.0, 0.0)), // spam244LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 1.0)) // ham245))246247// Train Naive Bayes model248val nbModel = NaiveBayes.train(textData, lambda = 1.0)249250// Make predictions251val textPredictions = nbModel.predict(testData)252```253254## Regression255256### Linear Regression257258**LinearRegressionWithSGD**: Train linear regression using SGD259```scala { .api }260object LinearRegressionWithSGD {261def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel262def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel263def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LinearRegressionModel264}265266class LinearRegressionModel(override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel with RegressionModel with Serializable {267def predict(testData: RDD[Vector]): RDD[Double]268def predict(testData: Vector): Double269}270```271272```scala273import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint}274275// Regression training data276val regressionData = sc.parallelize(Seq(277LabeledPoint(1.0, Vectors.dense(1.0)),278LabeledPoint(2.0, Vectors.dense(2.0)),279LabeledPoint(3.0, Vectors.dense(3.0)),280LabeledPoint(4.0, Vectors.dense(4.0))281))282283// Train linear regression284val lrModel = LinearRegressionWithSGD.train(285input = regressionData,286numIterations = 100,287stepSize = 0.01288)289290// Make predictions291val regressionTestData = sc.parallelize(Seq(292Vectors.dense(1.5),293Vectors.dense(2.5)294))295296val regressionPredictions = lrModel.predict(regressionTestData)297regressionPredictions.collect() // Array(~1.5, ~2.5)298```299300### Ridge Regression301302**RidgeRegressionWithSGD**: Ridge regression with L2 regularization303```scala { .api }304object RidgeRegressionWithSGD {305def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel306def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel307def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): RidgeRegressionModel308}309```310311### Lasso Regression312313**LassoWithSGD**: Lasso regression with L1 regularization314```scala { .api }315object LassoWithSGD {316def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel317def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel318def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): LassoModel319}320```321322```scala323import org.apache.spark.mllib.regression.{RidgeRegressionWithSGD, LassoWithSGD}324325// Ridge regression with regularization326val ridgeModel = RidgeRegressionWithSGD.train(327input = regressionData,328numIterations = 100,329stepSize = 0.01,330regParam = 0.1331)332333// Lasso regression with L1 regularization334val lassoModel = LassoWithSGD.train(335input = regressionData,336numIterations = 100,337stepSize = 0.01,338regParam = 0.1339)340```341342## Clustering343344### K-Means345346**KMeans**: K-means clustering algorithm347```scala { .api }348class KMeans private (private var k: Int, private var maxIterations: Int, private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double) extends Serializable {349def setK(k: Int): KMeans350def setMaxIterations(maxIterations: Int): KMeans351def setRuns(runs: Int): KMeans352def setInitializationMode(initializationMode: String): KMeans353def setInitializationSteps(initializationSteps: Int): KMeans354def setEpsilon(epsilon: Double): KMeans355def run(data: RDD[Vector]): KMeansModel356}357358object KMeans {359def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel360def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel361def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, initializationMode: String): KMeansModel362}363364class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {365def predict(point: Vector): Int366def predict(points: RDD[Vector]): RDD[Int]367def computeCost(data: RDD[Vector]): Double368}369```370371```scala372import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}373374// Prepare clustering data375val clusteringData = sc.parallelize(Seq(376Vectors.dense(1.0, 1.0),377Vectors.dense(1.0, 2.0),378Vectors.dense(2.0, 1.0),379Vectors.dense(9.0, 8.0),380Vectors.dense(8.0, 9.0),381Vectors.dense(9.0, 9.0)382))383384// Train K-means model385val kmeansModel = KMeans.train(386data = clusteringData,387k = 2, // Number of clusters388maxIterations = 20389)390391// Get cluster centers392val centers = kmeansModel.clusterCenters393centers.foreach(println)394395// Make predictions396val clusterPredictions = kmeansModel.predict(clusteringData)397clusterPredictions.collect() // Array(0, 0, 0, 1, 1, 1)398399// Compute cost (sum of squared distances to centroids)400val cost = kmeansModel.computeCost(clusteringData)401402// Advanced K-means with custom parameters403val advancedKMeans = new KMeans()404.setK(3)405.setMaxIterations(50)406.setRuns(10) // Multiple runs for better results407.setInitializationMode("k-means||")408.setEpsilon(1e-4)409410val advancedModel = advancedKMeans.run(clusteringData)411```412413## Collaborative Filtering414415### Alternating Least Squares (ALS)416417**ALS**: Matrix factorization for collaborative filtering418```scala { .api }419object ALS {420def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel421def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel422def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel423def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel424def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double): MatrixFactorizationModel425}426427class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])], val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {428def predict(user: Int, product: Int): Double429def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]430def recommendProducts(user: Int, num: Int): Array[Rating]431def recommendUsers(product: Int, num: Int): Array[Rating]432}433```434435```scala436import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}437438// Create ratings data439val ratings = sc.parallelize(Seq(440Rating(1, 1, 5.0),441Rating(1, 2, 1.0),442Rating(1, 3, 5.0),443Rating(2, 1, 1.0),444Rating(2, 2, 5.0),445Rating(2, 3, 1.0),446Rating(3, 1, 5.0),447Rating(3, 2, 1.0),448Rating(3, 3, 5.0)449))450451// Train collaborative filtering model452val alsModel = ALS.train(453ratings = ratings,454rank = 10, // Number of latent factors455iterations = 10, // Number of iterations456lambda = 0.01 // Regularization parameter457)458459// Predict rating for user-item pair460val prediction = alsModel.predict(1, 2)461462// Predict ratings for multiple user-item pairs463val userProducts = sc.parallelize(Seq((1, 1), (2, 2), (3, 3)))464val predictions = alsModel.predict(userProducts)465466// Recommend products for a user467val recommendations = alsModel.recommendProducts(1, 5)468recommendations.foreach { rating =>469println(s"Product ${rating.product}: ${rating.rating}")470}471472// Recommend users for a product473val userRecommendations = alsModel.recommendUsers(1, 3)474475// For implicit feedback data476val implicitModel = ALS.trainImplicit(477ratings = ratings,478rank = 10,479iterations = 10,480lambda = 0.01,481alpha = 0.1 // Confidence parameter482)483```484485## Statistics486487### Summary Statistics488489**Statistics**: Statistical functions for RDDs490```scala { .api }491object Statistics {492def colStats(rdd: RDD[Vector]): MultivariateStatisticalSummary493def corr(x: RDD[Double], y: RDD[Double], method: String = "pearson"): Double494def corr(X: RDD[Vector], method: String = "pearson"): Matrix495def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult496def chiSqTest(observed: Matrix): ChiSqTestResult497def chiSqTest(observed: RDD[LabeledPoint]): Array[ChiSqTestResult]498}499500trait MultivariateStatisticalSummary {501def mean: Vector502def variance: Vector503def count: Long504def numNonzeros: Vector505def max: Vector506def min: Vector507}508```509510```scala511import org.apache.spark.mllib.stat.Statistics512import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrix}513514// Sample data for statistics515val observations = sc.parallelize(Seq(516Vectors.dense(1.0, 2.0, 3.0),517Vectors.dense(4.0, 5.0, 6.0),518Vectors.dense(7.0, 8.0, 9.0)519))520521// Compute column statistics522val summary = Statistics.colStats(observations)523println(s"Mean: ${summary.mean}") // [4.0, 5.0, 6.0]524println(s"Variance: ${summary.variance}") // [9.0, 9.0, 9.0]525println(s"Count: ${summary.count}") // 3526println(s"Max: ${summary.max}") // [7.0, 8.0, 9.0]527println(s"Min: ${summary.min}") // [1.0, 2.0, 3.0]528529// Correlation between two RDD[Double]530val x = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))531val y = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))532val correlation = Statistics.corr(x, y, "pearson") // 1.0 (perfect positive correlation)533534// Correlation matrix for RDD[Vector]535val correlationMatrix = Statistics.corr(observations, "pearson")536537// Chi-squared test538val observed = Vectors.dense(1.0, 2.0, 3.0)539val expected = Vectors.dense(1.5, 1.5, 3.0)540val chiSqResult = Statistics.chiSqTest(observed, expected)541542println(s"Chi-squared statistic: ${chiSqResult.statistic}")543println(s"P-value: ${chiSqResult.pValue}")544println(s"Degrees of freedom: ${chiSqResult.degreesOfFreedom}")545```546547## Model Evaluation548549### Binary Classification Metrics550551```scala552import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics553554// Predictions and labels (prediction score, true label)555val scoreAndLabels = sc.parallelize(Seq(556(0.9, 1.0), (0.8, 1.0), (0.7, 1.0),557(0.6, 0.0), (0.5, 1.0), (0.4, 0.0),558(0.3, 0.0), (0.2, 0.0), (0.1, 0.0)559))560561val binaryMetrics = new BinaryClassificationMetrics(scoreAndLabels)562563// Area under ROC curve564val areaUnderROC = binaryMetrics.areaUnderROC()565println(s"Area under ROC: $areaUnderROC")566567// Area under Precision-Recall curve568val areaUnderPR = binaryMetrics.areaUnderPR()569println(s"Area under PR: $areaUnderPR")570```571572### Multi-class Classification Metrics573574```scala575import org.apache.spark.mllib.evaluation.MulticlassMetrics576577// Predictions and labels (predicted class, true class)578val predictionAndLabels = sc.parallelize(Seq(579(0.0, 0.0), (1.0, 1.0), (2.0, 2.0),580(0.0, 0.0), (1.0, 2.0), (2.0, 1.0)581))582583val multiMetrics = new MulticlassMetrics(predictionAndLabels)584585// Overall statistics586val accuracy = multiMetrics.accuracy587val weightedPrecision = multiMetrics.weightedPrecision588val weightedRecall = multiMetrics.weightedRecall589val weightedFMeasure = multiMetrics.weightedFMeasure590591// Per-class metrics592val labels = multiMetrics.labels593labels.foreach { label =>594println(s"Class $label precision: ${multiMetrics.precision(label)}")595println(s"Class $label recall: ${multiMetrics.recall(label)}")596println(s"Class $label F1-score: ${multiMetrics.fMeasure(label)}")597}598599// Confusion matrix600val confusionMatrix = multiMetrics.confusionMatrix601println(s"Confusion matrix:\n$confusionMatrix")602```603604## Pipeline Example605606Complete machine learning pipeline:607608```scala609import org.apache.spark.mllib.classification.LogisticRegressionWithSGD610import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics611import org.apache.spark.mllib.regression.LabeledPoint612import org.apache.spark.mllib.linalg.Vectors613614// 1. Load and prepare data615val rawData = sc.textFile("data.csv")616val parsedData = rawData.map { line =>617val parts = line.split(',')618val label = parts(0).toDouble619val features = Vectors.dense(parts.tail.map(_.toDouble))620LabeledPoint(label, features)621}622623// 2. Split data624val Array(training, test) = parsedData.randomSplit(Array(0.7, 0.3), seed = 11L)625training.cache()626627// 3. Train model628val model = LogisticRegressionWithSGD.train(training, numIterations = 100)629630// 4. Make predictions631val predictionAndLabel = test.map { point =>632val prediction = model.predict(point.features)633(prediction, point.label)634}635636// 5. Evaluate model637val metrics = new BinaryClassificationMetrics(predictionAndLabel)638val auROC = metrics.areaUnderROC()639640println(s"Area under ROC: $auROC")641642// 6. Save model643model.save(sc, "myModel")644645// 7. Load model later646val loadedModel = LogisticRegressionModel.load(sc, "myModel")647```648649This comprehensive guide covers all the essential machine learning capabilities available in Spark's MLlib for building scalable ML applications.