0
# Machine Learning Library (MLlib)
1
2
MLlib is Spark's scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives.
3
4
## Core Data Types
5
6
### Vector
7
8
Mathematical vector for representing features:
9
10
```scala { .api }
11
package org.apache.spark.mllib.linalg
12
13
abstract class Vector extends Serializable {
14
def size: Int
15
def toArray: Array[Double]
16
def apply(i: Int): Double
17
}
18
```
19
20
**Vectors Factory**:
21
```scala { .api }
22
object Vectors {
23
def dense(values: Array[Double]): Vector
24
def dense(firstValue: Double, otherValues: Double*): Vector
25
def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
26
def sparse(size: Int, elements: Seq[(Int, Double)]): Vector
27
def zeros(size: Int): Vector
28
}
29
```
30
31
```scala
32
import org.apache.spark.mllib.linalg.{Vector, Vectors}
33
34
// Dense vector
35
val denseVec = Vectors.dense(1.0, 2.0, 3.0, 0.0, 0.0)
36
37
// Sparse vector (size=5, indices=[0,2], values=[1.0,3.0])
38
val sparseVec = Vectors.sparse(5, Array(0, 2), Array(1.0, 3.0))
39
40
// Alternative sparse creation
41
val sparseVec2 = Vectors.sparse(5, Seq((0, 1.0), (2, 3.0)))
42
43
// Vector operations
44
val size = denseVec.size // 5
45
val element = denseVec(2) // 3.0
46
val array = denseVec.toArray // Array(1.0, 2.0, 3.0, 0.0, 0.0)
47
```
48
49
### Matrix
50
51
Mathematical matrix for linear algebra operations:
52
53
```scala { .api }
54
abstract class Matrix extends Serializable {
55
def numRows: Int
56
def numCols: Int
57
def toArray: Array[Double]
58
def apply(i: Int, j: Int): Double
59
}
60
```
61
62
**Matrices Factory**:
63
```scala { .api }
64
object Matrices {
65
def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix
66
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values: Array[Double]): Matrix
67
def eye(n: Int): Matrix
68
def zeros(numRows: Int, numCols: Int): Matrix
69
}
70
```
71
72
```scala
73
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
74
75
// Dense matrix (2x3, column-major order)
76
val denseMatrix = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
77
// Matrix:
78
// 1.0 3.0 5.0
79
// 2.0 4.0 6.0
80
81
// Identity matrix
82
val identity = Matrices.eye(3)
83
84
// Zero matrix
85
val zeros = Matrices.zeros(2, 3)
86
```
87
88
### LabeledPoint
89
90
Data point with label and features for supervised learning:
91
92
```scala { .api }
93
case class LabeledPoint(label: Double, features: Vector) {
94
override def toString: String = s"($label,$features)"
95
}
96
```
97
98
```scala
99
import org.apache.spark.mllib.regression.LabeledPoint
100
import org.apache.spark.mllib.linalg.Vectors
101
102
// Create labeled points
103
val positive = LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0))
104
val negative = LabeledPoint(0.0, Vectors.dense(-1.0, -2.0, -3.0))
105
106
// For regression
107
val regressionPoint = LabeledPoint(3.5, Vectors.dense(1.0, 2.0))
108
109
// Create training data
110
val trainingData = sc.parallelize(Seq(
111
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
112
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
113
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
114
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
115
))
116
```
117
118
### Rating
119
120
User-item rating for collaborative filtering:
121
122
```scala { .api }
123
case class Rating(user: Int, product: Int, rating: Double)
124
```
125
126
```scala
127
import org.apache.spark.mllib.recommendation.Rating
128
129
// Create ratings
130
val ratings = sc.parallelize(Seq(
131
Rating(1, 101, 5.0),
132
Rating(1, 102, 3.0),
133
Rating(2, 101, 4.0),
134
Rating(2, 103, 2.0)
135
))
136
```
137
138
## Classification
139
140
### Logistic Regression
141
142
**LogisticRegressionWithSGD**: Train logistic regression using stochastic gradient descent
143
```scala { .api }
144
object LogisticRegressionWithSGD {
145
def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
146
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel
147
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LogisticRegressionModel
148
}
149
150
class LogisticRegressionModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {
151
def predict(testData: RDD[Vector]): RDD[Double]
152
def predict(testData: Vector): Double
153
def clearThreshold(): LogisticRegressionModel
154
def setThreshold(threshold: Double): LogisticRegressionModel
155
}
156
```
157
158
```scala
159
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionModel}
160
import org.apache.spark.mllib.regression.LabeledPoint
161
import org.apache.spark.mllib.linalg.Vectors
162
163
// Prepare training data
164
val trainingData = sc.parallelize(Seq(
165
LabeledPoint(1.0, Vectors.dense(1.0, 2.0)),
166
LabeledPoint(0.0, Vectors.dense(-1.0, -2.0)),
167
LabeledPoint(1.0, Vectors.dense(1.5, 1.8)),
168
LabeledPoint(0.0, Vectors.dense(-1.5, -1.8))
169
))
170
171
// Train model
172
val model = LogisticRegressionWithSGD.train(trainingData, numIterations = 100)
173
174
// Make predictions
175
val testData = sc.parallelize(Seq(
176
Vectors.dense(1.0, 1.0),
177
Vectors.dense(-1.0, -1.0)
178
))
179
180
val predictions = model.predict(testData)
181
predictions.collect() // Array(1.0, 0.0)
182
183
// Single prediction
184
val singlePrediction = model.predict(Vectors.dense(0.5, 0.5))
185
186
// Set classification threshold
187
val calibratedModel = model.setThreshold(0.3)
188
```
189
190
### Support Vector Machines
191
192
**SVMWithSGD**: Train SVM using stochastic gradient descent
193
```scala { .api }
194
object SVMWithSGD {
195
def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel
196
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): SVMModel
197
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double): SVMModel
198
}
199
200
class SVMModel(override val weights: Vector, override val intercept: Double) extends ClassificationModel with Serializable {
201
def predict(testData: RDD[Vector]): RDD[Double]
202
def predict(testData: Vector): Double
203
}
204
```
205
206
```scala
207
import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel}
208
209
// Train SVM model
210
val svmModel = SVMWithSGD.train(
211
input = trainingData,
212
numIterations = 100,
213
stepSize = 1.0,
214
regParam = 0.01
215
)
216
217
// Make predictions
218
val svmPredictions = svmModel.predict(testData)
219
```
220
221
### Naive Bayes
222
223
**NaiveBayes**: Train multinomial Naive Bayes classifier
224
```scala { .api }
225
object NaiveBayes {
226
def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel
227
}
228
229
class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
230
def predict(testData: RDD[Vector]): RDD[Double]
231
def predict(testData: Vector): Double
232
}
233
```
234
235
```scala
236
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
237
238
// Training data for text classification (bag of words)
239
val textData = sc.parallelize(Seq(
240
LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 1.0)), // spam
241
LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 0.0)), // ham
242
LabeledPoint(0.0, Vectors.dense(1.0, 1.0, 0.0)), // spam
243
LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 1.0)) // ham
244
))
245
246
// Train Naive Bayes model
247
val nbModel = NaiveBayes.train(textData, lambda = 1.0)
248
249
// Make predictions
250
val textPredictions = nbModel.predict(testData)
251
```
252
253
## Regression
254
255
### Linear Regression
256
257
**LinearRegressionWithSGD**: Train linear regression using SGD
258
```scala { .api }
259
object LinearRegressionWithSGD {
260
def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel
261
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel
262
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LinearRegressionModel
263
}
264
265
class LinearRegressionModel(override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel with RegressionModel with Serializable {
266
def predict(testData: RDD[Vector]): RDD[Double]
267
def predict(testData: Vector): Double
268
}
269
```
270
271
```scala
272
import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint}
273
274
// Regression training data
275
val regressionData = sc.parallelize(Seq(
276
LabeledPoint(1.0, Vectors.dense(1.0)),
277
LabeledPoint(2.0, Vectors.dense(2.0)),
278
LabeledPoint(3.0, Vectors.dense(3.0)),
279
LabeledPoint(4.0, Vectors.dense(4.0))
280
))
281
282
// Train linear regression
283
val lrModel = LinearRegressionWithSGD.train(
284
input = regressionData,
285
numIterations = 100,
286
stepSize = 0.01
287
)
288
289
// Make predictions
290
val regressionTestData = sc.parallelize(Seq(
291
Vectors.dense(1.5),
292
Vectors.dense(2.5)
293
))
294
295
val regressionPredictions = lrModel.predict(regressionTestData)
296
regressionPredictions.collect() // Array(~1.5, ~2.5)
297
```
298
299
### Ridge Regression
300
301
**RidgeRegressionWithSGD**: Ridge regression with L2 regularization
302
```scala { .api }
303
object RidgeRegressionWithSGD {
304
def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel
305
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel
306
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): RidgeRegressionModel
307
}
308
```
309
310
### Lasso Regression
311
312
**LassoWithSGD**: Lasso regression with L1 regularization
313
```scala { .api }
314
object LassoWithSGD {
315
def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel
316
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel
317
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double): LassoModel
318
}
319
```
320
321
```scala
322
import org.apache.spark.mllib.regression.{RidgeRegressionWithSGD, LassoWithSGD}
323
324
// Ridge regression with regularization
325
val ridgeModel = RidgeRegressionWithSGD.train(
326
input = regressionData,
327
numIterations = 100,
328
stepSize = 0.01,
329
regParam = 0.1
330
)
331
332
// Lasso regression with L1 regularization
333
val lassoModel = LassoWithSGD.train(
334
input = regressionData,
335
numIterations = 100,
336
stepSize = 0.01,
337
regParam = 0.1
338
)
339
```
340
341
## Clustering
342
343
### K-Means
344
345
**KMeans**: K-means clustering algorithm
346
```scala { .api }
347
class KMeans private (private var k: Int, private var maxIterations: Int, private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double) extends Serializable {
348
def setK(k: Int): KMeans
349
def setMaxIterations(maxIterations: Int): KMeans
350
def setRuns(runs: Int): KMeans
351
def setInitializationMode(initializationMode: String): KMeans
352
def setInitializationSteps(initializationSteps: Int): KMeans
353
def setEpsilon(epsilon: Double): KMeans
354
def run(data: RDD[Vector]): KMeansModel
355
}
356
357
object KMeans {
358
def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel
359
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel
360
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, initializationMode: String): KMeansModel
361
}
362
363
class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {
364
def predict(point: Vector): Int
365
def predict(points: RDD[Vector]): RDD[Int]
366
def computeCost(data: RDD[Vector]): Double
367
}
368
```
369
370
```scala
371
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
372
373
// Prepare clustering data
374
val clusteringData = sc.parallelize(Seq(
375
Vectors.dense(1.0, 1.0),
376
Vectors.dense(1.0, 2.0),
377
Vectors.dense(2.0, 1.0),
378
Vectors.dense(9.0, 8.0),
379
Vectors.dense(8.0, 9.0),
380
Vectors.dense(9.0, 9.0)
381
))
382
383
// Train K-means model
384
val kmeansModel = KMeans.train(
385
data = clusteringData,
386
k = 2, // Number of clusters
387
maxIterations = 20
388
)
389
390
// Get cluster centers
391
val centers = kmeansModel.clusterCenters
392
centers.foreach(println)
393
394
// Make predictions
395
val clusterPredictions = kmeansModel.predict(clusteringData)
396
clusterPredictions.collect() // Array(0, 0, 0, 1, 1, 1)
397
398
// Compute cost (sum of squared distances to centroids)
399
val cost = kmeansModel.computeCost(clusteringData)
400
401
// Advanced K-means with custom parameters
402
val advancedKMeans = new KMeans()
403
.setK(3)
404
.setMaxIterations(50)
405
.setRuns(10) // Multiple runs for better results
406
.setInitializationMode("k-means||")
407
.setEpsilon(1e-4)
408
409
val advancedModel = advancedKMeans.run(clusteringData)
410
```
411
412
## Collaborative Filtering
413
414
### Alternating Least Squares (ALS)
415
416
**ALS**: Matrix factorization for collaborative filtering
417
```scala { .api }
418
object ALS {
419
def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
420
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
421
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel
422
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
423
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double): MatrixFactorizationModel
424
}
425
426
class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])], val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
427
def predict(user: Int, product: Int): Double
428
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
429
def recommendProducts(user: Int, num: Int): Array[Rating]
430
def recommendUsers(product: Int, num: Int): Array[Rating]
431
}
432
```
433
434
```scala
435
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
436
437
// Create ratings data
438
val ratings = sc.parallelize(Seq(
439
Rating(1, 1, 5.0),
440
Rating(1, 2, 1.0),
441
Rating(1, 3, 5.0),
442
Rating(2, 1, 1.0),
443
Rating(2, 2, 5.0),
444
Rating(2, 3, 1.0),
445
Rating(3, 1, 5.0),
446
Rating(3, 2, 1.0),
447
Rating(3, 3, 5.0)
448
))
449
450
// Train collaborative filtering model
451
val alsModel = ALS.train(
452
ratings = ratings,
453
rank = 10, // Number of latent factors
454
iterations = 10, // Number of iterations
455
lambda = 0.01 // Regularization parameter
456
)
457
458
// Predict rating for user-item pair
459
val prediction = alsModel.predict(1, 2)
460
461
// Predict ratings for multiple user-item pairs
462
val userProducts = sc.parallelize(Seq((1, 1), (2, 2), (3, 3)))
463
val predictions = alsModel.predict(userProducts)
464
465
// Recommend products for a user
466
val recommendations = alsModel.recommendProducts(1, 5)
467
recommendations.foreach { rating =>
468
println(s"Product ${rating.product}: ${rating.rating}")
469
}
470
471
// Recommend users for a product
472
val userRecommendations = alsModel.recommendUsers(1, 3)
473
474
// For implicit feedback data
475
val implicitModel = ALS.trainImplicit(
476
ratings = ratings,
477
rank = 10,
478
iterations = 10,
479
lambda = 0.01,
480
alpha = 0.1 // Confidence parameter
481
)
482
```
483
484
## Statistics
485
486
### Summary Statistics
487
488
**Statistics**: Statistical functions for RDDs
489
```scala { .api }
490
object Statistics {
491
def colStats(rdd: RDD[Vector]): MultivariateStatisticalSummary
492
def corr(x: RDD[Double], y: RDD[Double], method: String = "pearson"): Double
493
def corr(X: RDD[Vector], method: String = "pearson"): Matrix
494
def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult
495
def chiSqTest(observed: Matrix): ChiSqTestResult
496
def chiSqTest(observed: RDD[LabeledPoint]): Array[ChiSqTestResult]
497
}
498
499
trait MultivariateStatisticalSummary {
500
def mean: Vector
501
def variance: Vector
502
def count: Long
503
def numNonzeros: Vector
504
def max: Vector
505
def min: Vector
506
}
507
```
508
509
```scala
510
import org.apache.spark.mllib.stat.Statistics
511
import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrix}
512
513
// Sample data for statistics
514
val observations = sc.parallelize(Seq(
515
Vectors.dense(1.0, 2.0, 3.0),
516
Vectors.dense(4.0, 5.0, 6.0),
517
Vectors.dense(7.0, 8.0, 9.0)
518
))
519
520
// Compute column statistics
521
val summary = Statistics.colStats(observations)
522
println(s"Mean: ${summary.mean}") // [4.0, 5.0, 6.0]
523
println(s"Variance: ${summary.variance}") // [9.0, 9.0, 9.0]
524
println(s"Count: ${summary.count}") // 3
525
println(s"Max: ${summary.max}") // [7.0, 8.0, 9.0]
526
println(s"Min: ${summary.min}") // [1.0, 2.0, 3.0]
527
528
// Correlation between two RDD[Double]
529
val x = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))
530
val y = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))
531
val correlation = Statistics.corr(x, y, "pearson") // 1.0 (perfect positive correlation)
532
533
// Correlation matrix for RDD[Vector]
534
val correlationMatrix = Statistics.corr(observations, "pearson")
535
536
// Chi-squared test
537
val observed = Vectors.dense(1.0, 2.0, 3.0)
538
val expected = Vectors.dense(1.5, 1.5, 3.0)
539
val chiSqResult = Statistics.chiSqTest(observed, expected)
540
541
println(s"Chi-squared statistic: ${chiSqResult.statistic}")
542
println(s"P-value: ${chiSqResult.pValue}")
543
println(s"Degrees of freedom: ${chiSqResult.degreesOfFreedom}")
544
```
545
546
## Model Evaluation
547
548
### Binary Classification Metrics
549
550
```scala
551
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
552
553
// Predictions and labels (prediction score, true label)
554
val scoreAndLabels = sc.parallelize(Seq(
555
(0.9, 1.0), (0.8, 1.0), (0.7, 1.0),
556
(0.6, 0.0), (0.5, 1.0), (0.4, 0.0),
557
(0.3, 0.0), (0.2, 0.0), (0.1, 0.0)
558
))
559
560
val binaryMetrics = new BinaryClassificationMetrics(scoreAndLabels)
561
562
// Area under ROC curve
563
val areaUnderROC = binaryMetrics.areaUnderROC()
564
println(s"Area under ROC: $areaUnderROC")
565
566
// Area under Precision-Recall curve
567
val areaUnderPR = binaryMetrics.areaUnderPR()
568
println(s"Area under PR: $areaUnderPR")
569
```
570
571
### Multi-class Classification Metrics
572
573
```scala
574
import org.apache.spark.mllib.evaluation.MulticlassMetrics
575
576
// Predictions and labels (predicted class, true class)
577
val predictionAndLabels = sc.parallelize(Seq(
578
(0.0, 0.0), (1.0, 1.0), (2.0, 2.0),
579
(0.0, 0.0), (1.0, 2.0), (2.0, 1.0)
580
))
581
582
val multiMetrics = new MulticlassMetrics(predictionAndLabels)
583
584
// Overall statistics
585
val accuracy = multiMetrics.accuracy
586
val weightedPrecision = multiMetrics.weightedPrecision
587
val weightedRecall = multiMetrics.weightedRecall
588
val weightedFMeasure = multiMetrics.weightedFMeasure
589
590
// Per-class metrics
591
val labels = multiMetrics.labels
592
labels.foreach { label =>
593
println(s"Class $label precision: ${multiMetrics.precision(label)}")
594
println(s"Class $label recall: ${multiMetrics.recall(label)}")
595
println(s"Class $label F1-score: ${multiMetrics.fMeasure(label)}")
596
}
597
598
// Confusion matrix
599
val confusionMatrix = multiMetrics.confusionMatrix
600
println(s"Confusion matrix:\n$confusionMatrix")
601
```
602
603
## Pipeline Example
604
605
Complete machine learning pipeline:
606
607
```scala
608
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
609
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
610
import org.apache.spark.mllib.regression.LabeledPoint
611
import org.apache.spark.mllib.linalg.Vectors
612
613
// 1. Load and prepare data
614
val rawData = sc.textFile("data.csv")
615
val parsedData = rawData.map { line =>
616
val parts = line.split(',')
617
val label = parts(0).toDouble
618
val features = Vectors.dense(parts.tail.map(_.toDouble))
619
LabeledPoint(label, features)
620
}
621
622
// 2. Split data
623
val Array(training, test) = parsedData.randomSplit(Array(0.7, 0.3), seed = 11L)
624
training.cache()
625
626
// 3. Train model
627
val model = LogisticRegressionWithSGD.train(training, numIterations = 100)
628
629
// 4. Make predictions
630
val predictionAndLabel = test.map { point =>
631
val prediction = model.predict(point.features)
632
(prediction, point.label)
633
}
634
635
// 5. Evaluate model
636
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
637
val auROC = metrics.areaUnderROC()
638
639
println(s"Area under ROC: $auROC")
640
641
// 6. Save model
642
model.save(sc, "myModel")
643
644
// 7. Load model later
645
val loadedModel = LogisticRegressionModel.load(sc, "myModel")
646
```
647
648
This comprehensive guide covers all the essential machine learning capabilities available in Spark's MLlib for building scalable ML applications.