0
# RDD-based API (Legacy)
1
2
The MLlib RDD-based API (`org.apache.spark.mllib`) is the original machine learning library built on Spark RDDs. While this API is in maintenance mode and the DataFrame-based API is recommended for new development, it still provides valuable functionality and is widely used in existing systems.
3
4
## Core Data Types
5
6
### LabeledPoint
7
8
```scala { .api }
9
case class LabeledPoint(label: Double, features: org.apache.spark.mllib.linalg.Vector) {
10
override def toString: String
11
}
12
13
object LabeledPoint {
14
def parse(s: String): LabeledPoint
15
}
16
```
17
18
### Rating
19
20
```scala { .api }
21
case class Rating(user: Int, product: Int, rating: Double) {
22
override def toString: String
23
}
24
```
25
26
## Linear Algebra (mllib.linalg)
27
28
### Vector
29
30
```scala { .api }
31
trait Vector extends Serializable {
32
def size: Int
33
def toArray: Array[Double]
34
def apply(i: Int): Double
35
def copy: Vector
36
def foreachActive(f: (Int, Double) => Unit): Unit
37
def numActives: Int
38
def numNonzeros: Int
39
def toDense: DenseVector
40
def toSparse: SparseVector
41
def compressed: Vector
42
}
43
44
class DenseVector(val values: Array[Double]) extends Vector
45
class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector
46
47
object Vectors {
48
def dense(values: Array[Double]): DenseVector
49
def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
50
def norm(vector: Vector, p: Double): Double
51
def sqdist(v1: Vector, v2: Vector): Double
52
}
53
```
54
55
### Matrix
56
57
```scala { .api }
58
trait Matrix extends Serializable {
59
def numRows: Int
60
def numCols: Int
61
def toArray: Array[Double]
62
def apply(i: Int, j: Int): Double
63
def transpose: Matrix
64
def multiply(y: DenseMatrix): DenseMatrix
65
def multiply(y: DenseVector): DenseVector
66
}
67
68
class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix
69
class SparseMatrix(override val numRows: Int, override val numCols: Int,
70
val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double]) extends Matrix
71
72
object Matrices {
73
def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
74
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],
75
rowIndices: Array[Int], values: Array[Double]): SparseMatrix
76
}
77
```
78
79
## Classification
80
81
### Logistic Regression
82
83
```scala { .api }
84
class LogisticRegressionModel(override val weights: Vector, override val intercept: Double)
85
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
86
87
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
88
override def toString: String
89
}
90
91
object LogisticRegressionWithLBFGS {
92
def train(input: RDD[LabeledPoint]): LogisticRegressionModel
93
def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
94
def train(input: RDD[LabeledPoint], numIterations: Int, numCorrections: Int,
95
convergenceTol: Double, regParam: Double): LogisticRegressionModel
96
}
97
98
object LogisticRegressionWithSGD {
99
def train(input: RDD[LabeledPoint]): LogisticRegressionModel
100
def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
101
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel
102
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
103
miniBatchFraction: Double): LogisticRegressionModel
104
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
105
miniBatchFraction: Double, initialWeights: Vector): LogisticRegressionModel
106
}
107
```
108
109
### Support Vector Machines
110
111
```scala { .api }
112
class SVMModel(override val weights: Vector, override val intercept: Double)
113
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
114
115
def setThreshold(threshold: Double): SVMModel
116
def clearThreshold(): SVMModel
117
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
118
}
119
120
object SVMWithSGD {
121
def train(input: RDD[LabeledPoint]): SVMModel
122
def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel
123
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): SVMModel
124
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
125
regParam: Double): SVMModel
126
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
127
regParam: Double, miniBatchFraction: Double): SVMModel
128
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
129
regParam: Double, miniBatchFraction: Double, initialWeights: Vector): SVMModel
130
}
131
```
132
133
### Naive Bayes
134
135
```scala { .api }
136
class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]],
137
val modelType: String) extends ClassificationModel with Serializable {
138
139
def predict(testData: RDD[Vector]): RDD[Double]
140
def predict(testData: Vector): Double
141
}
142
143
object NaiveBayes {
144
def train(input: RDD[LabeledPoint]): NaiveBayesModel
145
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel
146
def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel
147
}
148
```
149
150
## Regression
151
152
### Linear Regression
153
154
```scala { .api }
155
class LinearRegressionModel(override val weights: Vector, override val intercept: Double)
156
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
157
158
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
159
}
160
161
object LinearRegressionWithSGD {
162
def train(input: RDD[LabeledPoint]): LinearRegressionModel
163
def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel
164
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel
165
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
166
miniBatchFraction: Double): LinearRegressionModel
167
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
168
miniBatchFraction: Double, initialWeights: Vector): LinearRegressionModel
169
}
170
```
171
172
### Ridge Regression
173
174
```scala { .api }
175
class RidgeRegressionModel(override val weights: Vector, override val intercept: Double)
176
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
177
178
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
179
}
180
181
object RidgeRegressionWithSGD {
182
def train(input: RDD[LabeledPoint]): RidgeRegressionModel
183
def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel
184
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel
185
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
186
regParam: Double): RidgeRegressionModel
187
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
188
regParam: Double, miniBatchFraction: Double): RidgeRegressionModel
189
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
190
regParam: Double, miniBatchFraction: Double, initialWeights: Vector): RidgeRegressionModel
191
}
192
```
193
194
### Lasso Regression
195
196
```scala { .api }
197
class LassoModel(override val weights: Vector, override val intercept: Double)
198
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
199
200
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
201
}
202
203
object LassoWithSGD {
204
def train(input: RDD[LabeledPoint]): LassoModel
205
def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel
206
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel
207
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
208
regParam: Double): LassoModel
209
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
210
regParam: Double, miniBatchFraction: Double): LassoModel
211
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
212
regParam: Double, miniBatchFraction: Double, initialWeights: Vector): LassoModel
213
}
214
```
215
216
## Clustering
217
218
### K-Means
219
220
```scala { .api }
221
class KMeansModel(val clusterCenters: Array[Vector]) extends Saveable with Serializable {
222
def predict(point: Vector): Int
223
def predict(points: RDD[Vector]): RDD[Int]
224
def computeCost(data: RDD[Vector]): Double
225
def k: Int
226
}
227
228
object KMeans {
229
def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel
230
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel
231
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,
232
initializationMode: String): KMeansModel
233
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,
234
initializationMode: String, seed: Long): KMeansModel
235
}
236
```
237
238
### Gaussian Mixture Model
239
240
```scala { .api }
241
class GaussianMixtureModel(val weights: Array[Double], val gaussians: Array[MultivariateGaussian])
242
extends Serializable {
243
244
def predict(point: Vector): Int
245
def predict(points: RDD[Vector]): RDD[Int]
246
def predictSoft(point: Vector): Array[Double]
247
def predictSoft(points: RDD[Vector]): RDD[Array[Double]]
248
def k: Int
249
}
250
251
class GaussianMixture extends Serializable {
252
def setK(k: Int): GaussianMixture
253
def setMaxIterations(maxIterations: Int): GaussianMixture
254
def setConvergenceTol(convergenceTol: Double): GaussianMixture
255
def setSeed(seed: Long): GaussianMixture
256
def run(data: RDD[Vector]): GaussianMixtureModel
257
}
258
```
259
260
### Power Iteration Clustering
261
262
```scala { .api }
263
class PowerIterationClusteringModel(val k: Int, val assignments: RDD[(Long, Int)]) extends Serializable {
264
def predict(point: Vector): Int
265
}
266
267
class PowerIterationClustering(private var k: Int, private var maxIterations: Int) extends Serializable {
268
def setK(k: Int): PowerIterationClustering
269
def setMaxIterations(maxIterations: Int): PowerIterationClustering
270
def setInitializationMode(initializationMode: String): PowerIterationClustering
271
def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel
272
}
273
```
274
275
## Tree-Based Methods
276
277
### Decision Trees
278
279
```scala { .api }
280
class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable {
281
def predict(features: Vector): Double
282
def predict(features: RDD[Vector]): RDD[Double]
283
def depth: Int
284
def numNodes: Int
285
}
286
287
object DecisionTree {
288
def train(input: RDD[LabeledPoint], strategy: Strategy): DecisionTreeModel
289
def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
290
maxDepth: Int): DecisionTreeModel
291
def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
292
maxDepth: Int, numClasses: Int): DecisionTreeModel
293
def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
294
maxDepth: Int, numClasses: Int, maxBins: Int,
295
quantileCalculationStrategy: QuantileStrategy,
296
categoricalFeaturesInfo: Map[Int, Int]): DecisionTreeModel
297
}
298
```
299
300
### Random Forest
301
302
```scala { .api }
303
class RandomForestModel(val algo: Algo, val trees: Array[DecisionTreeModel]) extends Serializable {
304
def predict(features: Vector): Double
305
def predict(features: RDD[Vector]): RDD[Double]
306
def totalNumNodes: Int
307
}
308
309
object RandomForest {
310
def trainClassifier(input: RDD[LabeledPoint], strategy: Strategy,
311
numTrees: Int, featureSubsetStrategy: String,
312
seed: Int): RandomForestModel
313
def trainRegressor(input: RDD[LabeledPoint], strategy: Strategy,
314
numTrees: Int, featureSubsetStrategy: String,
315
seed: Int): RandomForestModel
316
}
317
```
318
319
### Gradient Boosted Trees
320
321
```scala { .api }
322
class GradientBoostedTreesModel(val algo: Algo, val trees: Array[DecisionTreeModel],
323
val treeWeights: Array[Double]) extends Serializable {
324
def predict(features: Vector): Double
325
def predict(features: RDD[Vector]): RDD[Double]
326
def totalNumNodes: Int
327
}
328
329
object GradientBoostedTrees {
330
def train(input: RDD[LabeledPoint], boostingStrategy: BoostingStrategy): GradientBoostedTreesModel
331
}
332
```
333
334
## Recommendation
335
336
### Collaborative Filtering
337
338
```scala { .api }
339
class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])],
340
val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
341
342
def predict(user: Int, product: Int): Double
343
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
344
def recommendProducts(user: Int, num: Int): Array[Rating]
345
def recommendUsers(product: Int, num: Int): Array[Rating]
346
}
347
348
object ALS {
349
def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
350
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
351
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
352
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,
353
alpha: Double): MatrixFactorizationModel
354
}
355
```
356
357
## Feature Extraction and Selection
358
359
### Text Feature Extraction
360
361
```scala { .api }
362
class HashingTF(val numFeatures: Int) extends Serializable {
363
def this() = this(1048576)
364
def transform(document: Iterable[String]): Vector
365
def transform(document: RDD[Iterable[String]]): RDD[Vector]
366
def indexOf(term: Any): Int
367
}
368
369
class IDFModel(val idf: Vector) extends Serializable {
370
def transform(dataset: RDD[Vector]): RDD[Vector]
371
def transform(dataset: Vector): Vector
372
}
373
374
class IDF(val minDocFreq: Int) extends Serializable {
375
def this() = this(0)
376
def fit(dataset: RDD[Vector]): IDFModel
377
}
378
379
class Word2VecModel(private val wordVectors: Map[String, Array[Float]]) extends Serializable {
380
def transform(word: String): Vector
381
def findSynonyms(word: String, num: Int): Array[(String, Double)]
382
def findSynonyms(vector: Vector, num: Int): Array[(String, Double)]
383
def getVectors: Map[String, Array[Float]]
384
}
385
386
class Word2Vec extends Serializable {
387
def setVectorSize(vectorSize: Int): Word2Vec
388
def setLearningRate(learningRate: Double): Word2Vec
389
def setNumPartitions(numPartitions: Int): Word2Vec
390
def setNumIterations(numIterations: Int): Word2Vec
391
def setSeed(seed: Long): Word2Vec
392
def setMinCount(minCount: Int): Word2Vec
393
def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel
394
}
395
```
396
397
### Numerical Feature Processing
398
399
```scala { .api }
400
class StandardScalerModel(val std: Vector, val mean: Vector) extends Serializable {
401
def this(std: Vector) = this(std, null)
402
def transform(vector: Vector): Vector
403
def transform(vectors: RDD[Vector]): RDD[Vector]
404
}
405
406
class StandardScaler(withMean: Boolean, withStd: Boolean) extends Serializable {
407
def this() = this(false, true)
408
def fit(data: RDD[Vector]): StandardScalerModel
409
}
410
411
class Normalizer(val p: Double) extends Serializable {
412
def this() = this(2.0)
413
def transform(vector: Vector): Vector
414
def transform(data: RDD[Vector]): RDD[Vector]
415
}
416
```
417
418
## Statistics
419
420
### Summary Statistics
421
422
```scala { .api }
423
trait MultivariateStatisticalSummary {
424
def mean: Vector
425
def variance: Vector
426
def count: Long
427
def numNonzeros: Vector
428
def max: Vector
429
def min: Vector
430
def normL1: Vector
431
def normL2: Vector
432
}
433
434
class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {
435
def add(sample: Vector): MultivariateOnlineSummarizer
436
def merge(other: MultivariateOnlineSummarizer): MultivariateOnlineSummarizer
437
}
438
439
object Statistics {
440
def colStats(X: RDD[Vector]): MultivariateStatisticalSummary
441
def corr(x: RDD[Double], y: RDD[Double]): Double
442
def corr(x: RDD[Double], y: RDD[Double], method: String): Double
443
def corr(X: RDD[Vector]): Matrix
444
def corr(X: RDD[Vector], method: String): Matrix
445
def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult
446
def chiSqTest(observed: Matrix): ChiSqTestResult
447
def kolmogorovSmirnovTest(sampleX: RDD[Double], sampleY: RDD[Double]): KolmogorovSmirnovTestResult
448
def kolmogorovSmirnovTest(sample: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult
449
}
450
```
451
452
## Usage Examples
453
454
### Basic RDD-based Classification
455
456
```scala
457
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, SVMWithSGD}
458
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
459
import org.apache.spark.mllib.regression.LabeledPoint
460
import org.apache.spark.mllib.linalg.Vectors
461
import org.apache.spark.mllib.util.MLUtils
462
463
// Load and parse data
464
val data = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")
465
466
// Split data into training and test sets
467
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
468
val training = splits(0).cache()
469
val test = splits(1)
470
471
// Train logistic regression model
472
val lrModel = LogisticRegressionWithLBFGS.train(training)
473
474
// Train SVM model
475
val svmModel = SVMWithSGD.train(training, numIterations = 100)
476
477
// Make predictions
478
val lrPredictionAndLabels = test.map { point =>
479
val prediction = lrModel.predict(point.features)
480
(prediction, point.label)
481
}
482
483
val svmPredictionAndLabels = test.map { point =>
484
val prediction = svmModel.predict(point.features)
485
(prediction, point.label)
486
}
487
488
// Evaluate models
489
val lrMetrics = new BinaryClassificationMetrics(lrPredictionAndLabels)
490
val svmMetrics = new BinaryClassificationMetrics(svmPredictionAndLabels)
491
492
println(s"Logistic Regression AUC: ${lrMetrics.areaUnderROC()}")
493
println(s"SVM AUC: ${svmMetrics.areaUnderROC()}")
494
```
495
496
### RDD-based Clustering
497
498
```scala
499
import org.apache.spark.mllib.clustering.{KMeans, GaussianMixture}
500
import org.apache.spark.mllib.linalg.Vectors
501
502
// Create sample data
503
val data = sc.parallelize(Array(
504
Vectors.dense(0.0, 0.0),
505
Vectors.dense(1.0, 1.0),
506
Vectors.dense(9.0, 8.0),
507
Vectors.dense(8.0, 9.0)
508
))
509
510
// K-means clustering
511
val numClusters = 2
512
val numIterations = 20
513
val kMeansModel = KMeans.train(data, numClusters, numIterations)
514
515
println("K-means cluster centers:")
516
kMeansModel.clusterCenters.foreach(println)
517
518
// Predict clusters
519
val predictions = kMeansModel.predict(data)
520
data.zip(predictions).collect().foreach { case (point, cluster) =>
521
println(s"Point $point belongs to cluster $cluster")
522
}
523
524
// Gaussian Mixture Model
525
val gmm = new GaussianMixture()
526
.setK(2)
527
.setMaxIterations(20)
528
529
val gmmModel = gmm.run(data)
530
531
println("GMM weights:")
532
gmmModel.weights.foreach(println)
533
534
println("GMM gaussians:")
535
gmmModel.gaussians.foreach(println)
536
537
// Soft predictions (probabilities)
538
val softPredictions = gmmModel.predictSoft(data)
539
softPredictions.collect().foreach(println)
540
```
541
542
### Matrix Factorization for Recommendations
543
544
```scala
545
import org.apache.spark.mllib.recommendation.{ALS, Rating}
546
547
// Sample ratings data
548
val ratings = sc.parallelize(Array(
549
Rating(1, 1, 3.0),
550
Rating(1, 2, 4.0),
551
Rating(1, 3, 2.0),
552
Rating(2, 1, 4.0),
553
Rating(2, 2, 2.0),
554
Rating(2, 3, 5.0),
555
Rating(3, 1, 2.0),
556
Rating(3, 2, 3.0),
557
Rating(3, 3, 4.0)
558
))
559
560
// Train collaborative filtering model
561
val rank = 10
562
val numIterations = 10
563
val lambda = 0.01
564
565
val alsModel = ALS.train(ratings, rank, numIterations, lambda)
566
567
// Make predictions
568
val usersProducts = ratings.map { case Rating(user, product, rate) =>
569
(user, product)
570
}
571
572
val predictions = alsModel.predict(usersProducts).map { case Rating(user, product, rate) =>
573
((user, product), rate)
574
}
575
576
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
577
((user, product), rate)
578
}.join(predictions)
579
580
// Calculate RMSE
581
val mse = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
582
val err = (r1 - r2)
583
err * err
584
}.mean()
585
586
val rmse = math.sqrt(mse)
587
println(s"Mean Squared Error = $mse")
588
println(s"Root Mean Squared Error = $rmse")
589
590
// Recommend products for users
591
val userProducts = alsModel.recommendProducts(1, 3)
592
println(s"Recommendations for user 1:")
593
userProducts.foreach(println)
594
```
595
596
### Feature Extraction and Text Processing
597
598
```scala
599
import org.apache.spark.mllib.feature.{HashingTF, IDF, Word2Vec}
600
import org.apache.spark.mllib.linalg.Vector
601
602
// Sample documents
603
val documents: RDD[Seq[String]] = sc.parallelize(Seq(
604
"spark is great".split(" ").toSeq,
605
"machine learning with spark".split(" ").toSeq,
606
"apache spark mllib".split(" ").toSeq
607
))
608
609
// TF-IDF feature extraction
610
val hashingTF = new HashingTF(1000)
611
val tf: RDD[Vector] = hashingTF.transform(documents)
612
613
tf.cache()
614
val idf = new IDF().fit(tf)
615
val tfidf: RDD[Vector] = idf.transform(tf)
616
617
println("TF-IDF vectors:")
618
tfidf.collect().foreach(println)
619
620
// Word2Vec embeddings
621
val word2vec = new Word2Vec()
622
.setVectorSize(100)
623
.setMinCount(1)
624
625
val word2vecModel = word2vec.fit(documents)
626
627
// Find synonyms
628
val synonyms = word2vecModel.findSynonyms("spark", 5)
629
synonyms.foreach { case (word, similarity) =>
630
println(s"$word: $similarity")
631
}
632
633
// Transform words to vectors
634
val sparkVector = word2vecModel.transform("spark")
635
println(s"Vector for 'spark': $sparkVector")
636
```
637
638
### Statistical Analysis
639
640
```scala
641
import org.apache.spark.mllib.stat.Statistics
642
import org.apache.spark.mllib.linalg.Vectors
643
644
// Sample data
645
val observations = sc.parallelize(Seq(
646
Vectors.dense(1.0, 10.0, 100.0),
647
Vectors.dense(2.0, 20.0, 200.0),
648
Vectors.dense(3.0, 30.0, 300.0)
649
))
650
651
// Summary statistics
652
val summary = Statistics.colStats(observations)
653
println(s"Mean: ${summary.mean}")
654
println(s"Variance: ${summary.variance}")
655
println(s"Min: ${summary.min}")
656
println(s"Max: ${summary.max}")
657
println(s"Count: ${summary.count}")
658
659
// Correlation matrix
660
val correlMatrix = Statistics.corr(observations, "pearson")
661
println(s"Correlation matrix:\n$correlMatrix")
662
663
// Correlation between two RDDs
664
val x: RDD[Double] = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))
665
val y: RDD[Double] = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))
666
667
val correlation = Statistics.corr(x, y, "pearson")
668
println(s"Correlation between x and y: $correlation")
669
670
// Chi-squared test
671
val observed = Vectors.dense(1.0, 2.0, 3.0)
672
val expected = Vectors.dense(1.5, 1.5, 2.0)
673
674
val chiSqTestResult = Statistics.chiSqTest(observed, expected)
675
println(s"Chi-squared test result: $chiSqTestResult")
676
```
677
678
### Advanced RDD Operations for ML
679
680
```scala
681
import org.apache.spark.mllib.regression.LabeledPoint
682
import org.apache.spark.mllib.feature.StandardScaler
683
684
// Create labeled points from raw data
685
def createLabeledPoints(rawData: RDD[String]): RDD[LabeledPoint] = {
686
rawData.map { line =>
687
val parts = line.split(',')
688
val label = parts.last.toDouble
689
val features = Vectors.dense(parts.init.map(_.toDouble))
690
LabeledPoint(label, features)
691
}
692
}
693
694
// Feature scaling
695
def scaleFeatures(data: RDD[LabeledPoint]): (RDD[LabeledPoint], StandardScalerModel) = {
696
val features = data.map(_.features)
697
698
val scaler = new StandardScaler(withMean = true, withStd = true)
699
val scalerModel = scaler.fit(features)
700
701
val scaledData = data.map { point =>
702
LabeledPoint(point.label, scalerModel.transform(point.features))
703
}
704
705
(scaledData, scalerModel)
706
}
707
708
// Cross-validation for RDD-based models
709
def crossValidate[M](data: RDD[LabeledPoint],
710
trainFunc: RDD[LabeledPoint] => M,
711
predictFunc: (M, Vector) => Double,
712
k: Int = 5): Array[Double] = {
713
714
val folds = data.randomSplit(Array.fill(k)(1.0 / k), seed = 42)
715
716
(0 until k).map { i =>
717
val validation = folds(i)
718
val training = sc.union(folds.zipWithIndex.filter(_._2 != i).map(_._1))
719
720
val model = trainFunc(training)
721
722
val predictions = validation.map { point =>
723
val prediction = predictFunc(model, point.features)
724
(prediction, point.label)
725
}
726
727
// Calculate accuracy for classification or RMSE for regression
728
val accuracy = predictions.map { case (pred, label) =>
729
if (math.abs(pred - label) < 0.5) 1.0 else 0.0
730
}.mean()
731
732
accuracy
733
}.toArray
734
}
735
736
// Usage example
737
val rawData = sc.textFile("path/to/data.csv")
738
val labeledData = createLabeledPoints(rawData)
739
val (scaledData, scalerModel) = scaleFeatures(labeledData)
740
741
// Cross-validate a logistic regression model
742
val cvScores = crossValidate(
743
scaledData,
744
LogisticRegressionWithLBFGS.train,
745
(model: LogisticRegressionModel, features: Vector) => model.predict(features)
746
)
747
748
println(s"Cross-validation scores: ${cvScores.mkString(", ")}")
749
println(s"Average accuracy: ${cvScores.sum / cvScores.length}")
750
```
751
752
### Migration from RDD to DataFrame API
753
754
```scala
755
import org.apache.spark.ml.{Pipeline, classification => newML}
756
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler => NewStandardScaler}
757
import org.apache.spark.sql.DataFrame
758
759
// Convert RDD[LabeledPoint] to DataFrame
760
def rddToDataFrame(data: RDD[LabeledPoint]): DataFrame = {
761
import spark.implicits._
762
763
data.map { point =>
764
(point.label, point.features)
765
}.toDF("label", "features")
766
}
767
768
// Convert mllib.linalg.Vector to ml.linalg.Vector
769
def convertVector(oldVector: org.apache.spark.mllib.linalg.Vector): org.apache.spark.ml.linalg.Vector = {
770
org.apache.spark.ml.linalg.Vectors.fromML(oldVector)
771
}
772
773
// Migration example
774
val rddData: RDD[LabeledPoint] = // ... your RDD data
775
val dataFrame = rddToDataFrame(rddData)
776
777
// Use new DataFrame-based API
778
val assembler = new VectorAssembler()
779
.setInputCols(Array("feature1", "feature2", "feature3"))
780
.setOutputCol("features")
781
782
val scaler = new NewStandardScaler()
783
.setInputCol("features")
784
.setOutputCol("scaledFeatures")
785
786
val lr = new newML.LogisticRegression()
787
.setFeaturesCol("scaledFeatures")
788
.setLabelCol("label")
789
790
val pipeline = new Pipeline()
791
.setStages(Array(assembler, scaler, lr))
792
793
val model = pipeline.fit(dataFrame)
794
val predictions = model.transform(dataFrame)
795
796
predictions.select("label", "prediction", "probability").show()
797
```