0
# Machine Learning
1
2
Spark MLlib provides comprehensive machine learning capabilities including classification, regression, clustering, collaborative filtering, and feature engineering through a unified API based on DataFrames.
3
4
## ML Pipeline API
5
6
The ML Pipeline API provides a uniform set of high-level APIs built on top of DataFrames.
7
8
### Pipeline and PipelineModel
9
10
```scala { .api }
11
class Pipeline extends Estimator[PipelineModel] {
12
def setStages(value: Array[PipelineStage]): Pipeline
13
def getStages: Array[PipelineStage]
14
def fit(dataset: Dataset[_]): PipelineModel
15
def transformSchema(schema: StructType): StructType
16
def copy(extra: ParamMap): Pipeline
17
def write: MLWriter
18
}
19
20
class PipelineModel extends Model[PipelineModel] {
21
def stages: Array[Transformer]
22
def transform(dataset: Dataset[_]): DataFrame
23
def transformSchema(schema: StructType): StructType
24
def copy(extra: ParamMap): PipelineModel
25
def write: MLWriter
26
}
27
```
28
29
### Base Classes
30
31
```scala { .api }
32
abstract class Estimator[M <: Model[M]] extends PipelineStage {
33
def fit(dataset: Dataset[_]): M
34
def transformSchema(schema: StructType): StructType
35
def copy(extra: ParamMap): Estimator[M]
36
}
37
38
abstract class Transformer extends PipelineStage {
39
def transform(dataset: Dataset[_]): DataFrame
40
def transformSchema(schema: StructType): StructType
41
def copy(extra: ParamMap): Transformer
42
}
43
44
abstract class Model[M <: Model[M]] extends Transformer {
45
def copy(extra: ParamMap): M
46
}
47
48
trait PipelineStage extends Params {
49
def transformSchema(schema: StructType): StructType
50
def copy(extra: ParamMap): PipelineStage
51
}
52
```
53
54
### Usage Examples
55
56
```scala
57
import org.apache.spark.ml.{Pipeline, PipelineModel}
58
import org.apache.spark.ml.classification.LogisticRegression
59
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
60
61
// Create pipeline stages
62
val tokenizer = new Tokenizer()
63
.setInputCol("text")
64
.setOutputCol("words")
65
66
val hashingTF = new HashingTF()
67
.setNumFeatures(1000)
68
.setInputCol(tokenizer.getOutputCol)
69
.setOutputCol("features")
70
71
val lr = new LogisticRegression()
72
.setMaxIter(10)
73
.setRegParam(0.001)
74
75
// Create and fit pipeline
76
val pipeline = new Pipeline()
77
.setStages(Array(tokenizer, hashingTF, lr))
78
79
val model = pipeline.fit(trainingData)
80
81
// Transform new data
82
val predictions = model.transform(testData)
83
```
84
85
## Classification
86
87
### Logistic Regression
88
89
```scala { .api }
90
class LogisticRegression extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {
91
def setMaxIter(value: Int): this.type
92
def setRegParam(value: Double): this.type
93
def setElasticNetParam(value: Double): this.type
94
def setTol(value: Double): this.type
95
def setFitIntercept(value: Boolean): this.type
96
def setThreshold(value: Double): this.type
97
def setThresholds(value: Array[Double]): this.type
98
def setStandardization(value: Boolean): this.type
99
def setWeightCol(value: String): this.type
100
def setAggregationDepth(value: Int): this.type
101
def setFamily(value: String): this.type
102
def setLowerBoundsOnCoefficients(value: Matrix): this.type
103
def setUpperBoundsOnCoefficients(value: Matrix): this.type
104
def setLowerBoundsOnIntercepts(value: Vector): this.type
105
def setUpperBoundsOnIntercepts(value: Vector): this.type
106
}
107
108
class LogisticRegressionModel extends ClassificationModel[Vector, LogisticRegressionModel] {
109
def coefficients: Vector
110
def intercept: Double
111
def coefficientMatrix: Matrix
112
def interceptVector: Vector
113
def numClasses: Int
114
def numFeatures: Int
115
def summary: LogisticRegressionTrainingSummary
116
def hasSummary: Boolean
117
def evaluate(dataset: Dataset[_]): LogisticRegressionSummary
118
}
119
```
120
121
### Decision Tree Classifier
122
123
```scala { .api }
124
class DecisionTreeClassifier extends Classifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] {
125
def setMaxDepth(value: Int): this.type
126
def setMaxBins(value: Int): this.type
127
def setMinInstancesPerNode(value: Int): this.type
128
def setMinInfoGain(value: Double): this.type
129
def setMaxMemoryInMB(value: Int): this.type
130
def setCacheNodeIds(value: Boolean): this.type
131
def setCheckpointInterval(value: Int): this.type
132
def setImpurity(value: String): this.type
133
def setSeed(value: Long): this.type
134
def setWeightCol(value: String): this.type
135
}
136
137
class DecisionTreeClassificationModel extends ClassificationModel[Vector, DecisionTreeClassificationModel]
138
with DecisionTreeModel with DecisionTreeClassifierParams {
139
def rootNode: Node
140
def numNodes: Int
141
def depth: Int
142
def featureImportances: Vector
143
def toDebugString: String
144
}
145
```
146
147
### Random Forest Classifier
148
149
```scala { .api }
150
class RandomForestClassifier extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {
151
def setNumTrees(value: Int): this.type
152
def setMaxDepth(value: Int): this.type
153
def setMaxBins(value: Int): this.type
154
def setMinInstancesPerNode(value: Int): this.type
155
def setMinInfoGain(value: Double): this.type
156
def setSubsamplingRate(value: Double): this.type
157
def setFeatureSubsetStrategy(value: String): this.type
158
def setSeed(value: Long): this.type
159
def setImpurity(value: String): this.type
160
}
161
162
class RandomForestClassificationModel extends ClassificationModel[Vector, RandomForestClassificationModel] {
163
def trees: Array[DecisionTreeClassificationModel]
164
def treeWeights: Array[Double]
165
def numFeatures: Int
166
def numClasses: Int
167
def totalNumNodes: Int
168
def featureImportances: Vector
169
def toDebugString: String
170
}
171
```
172
173
### Usage Examples
174
175
```scala
176
import org.apache.spark.ml.classification.{LogisticRegression, DecisionTreeClassifier, RandomForestClassifier}
177
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
178
179
// Logistic Regression
180
val lr = new LogisticRegression()
181
.setMaxIter(20)
182
.setRegParam(0.3)
183
.setElasticNetParam(0.8)
184
185
val lrModel = lr.fit(trainingData)
186
val lrPredictions = lrModel.transform(testData)
187
188
// Decision Tree
189
val dt = new DecisionTreeClassifier()
190
.setLabelCol("indexedLabel")
191
.setFeaturesCol("indexedFeatures")
192
.setMaxDepth(5)
193
.setMaxBins(32)
194
195
val dtModel = dt.fit(trainingData)
196
197
// Random Forest
198
val rf = new RandomForestClassifier()
199
.setLabelCol("indexedLabel")
200
.setFeaturesCol("indexedFeatures")
201
.setNumTrees(20)
202
.setMaxDepth(5)
203
204
val rfModel = rf.fit(trainingData)
205
206
// Evaluation
207
val evaluator = new BinaryClassificationEvaluator()
208
.setLabelCol("indexedLabel")
209
.setRawPredictionCol("rawPrediction")
210
.setMetricName("areaUnderROC")
211
212
val accuracy = evaluator.evaluate(lrPredictions)
213
```
214
215
## Regression
216
217
### Linear Regression
218
219
```scala { .api }
220
class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] {
221
def setMaxIter(value: Int): this.type
222
def setRegParam(value: Double): this.type
223
def setElasticNetParam(value: Double): this.type
224
def setTol(value: Double): this.type
225
def setFitIntercept(value: Boolean): this.type
226
def setStandardization(value: Boolean): this.type
227
def setSolver(value: String): this.type
228
def setWeightCol(value: String): this.type
229
def setAggregationDepth(value: Int): this.type
230
def setLoss(value: String): this.type
231
def setEpsilon(value: Double): this.type
232
}
233
234
class LinearRegressionModel extends RegressionModel[Vector, LinearRegressionModel] {
235
def coefficients: Vector
236
def intercept: Double
237
def numFeatures: Int
238
def summary: LinearRegressionTrainingSummary
239
def hasSummary: Boolean
240
def evaluate(dataset: Dataset[_]): LinearRegressionSummary
241
}
242
```
243
244
### Decision Tree Regressor
245
246
```scala { .api }
247
class DecisionTreeRegressor extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] {
248
def setMaxDepth(value: Int): this.type
249
def setMaxBins(value: Int): this.type
250
def setMinInstancesPerNode(value: Int): this.type
251
def setMinInfoGain(value: Double): this.type
252
def setImpurity(value: String): this.type
253
def setSeed(value: Long): this.type
254
def setVarianceCol(value: String): this.type
255
}
256
257
class DecisionTreeRegressionModel extends RegressionModel[Vector, DecisionTreeRegressionModel]
258
with DecisionTreeModel {
259
def rootNode: Node
260
def numNodes: Int
261
def depth: Int
262
def featureImportances: Vector
263
def toDebugString: String
264
}
265
```
266
267
## Clustering
268
269
### K-Means
270
271
```scala { .api }
272
class KMeans extends Estimator[KMeansModel] with KMeansParams {
273
def setK(value: Int): this.type
274
def setInitMode(value: String): this.type
275
def setInitSteps(value: Int): this.type
276
def setMaxIter(value: Int): this.type
277
def setTol(value: Double): this.type
278
def setSeed(value: Long): this.type
279
def setDistanceMeasure(value: String): this.type
280
def setWeightCol(value: String): this.type
281
}
282
283
class KMeansModel extends Model[KMeansModel] with KMeansParams {
284
def clusterCenters: Array[Vector]
285
def k: Int
286
def computeCost(dataset: Dataset[_]): Double
287
def summary: KMeansSummary
288
def hasSummary: Boolean
289
}
290
```
291
292
### Gaussian Mixture Model
293
294
```scala { .api }
295
class GaussianMixture extends Estimator[GaussianMixtureModel] {
296
def setK(value: Int): this.type
297
def setMaxIter(value: Int): this.type
298
def setTol(value: Double): this.type
299
def setSeed(value: Long): this.type
300
def setAggregationDepth(value: Int): this.type
301
def setWeightCol(value: String): this.type
302
}
303
304
class GaussianMixtureModel extends Model[GaussianMixtureModel] {
305
def weights: Array[Double]
306
def gaussians: Array[MultivariateGaussian]
307
def k: Int
308
def summary: GaussianMixtureSummary
309
def hasSummary: Boolean
310
}
311
```
312
313
### Usage Examples
314
315
```scala
316
import org.apache.spark.ml.clustering.{KMeans, GaussianMixture}
317
import org.apache.spark.ml.evaluation.ClusteringEvaluator
318
319
// K-means clustering
320
val kmeans = new KMeans()
321
.setK(3)
322
.setMaxIter(20)
323
.setSeed(1L)
324
325
val kmeansModel = kmeans.fit(dataset)
326
val kmeansPredictions = kmeansModel.transform(dataset)
327
328
// Gaussian Mixture Model
329
val gmm = new GaussianMixture()
330
.setK(3)
331
.setMaxIter(100)
332
.setSeed(538009335L)
333
334
val gmmModel = gmm.fit(dataset)
335
val gmmPredictions = gmmModel.transform(dataset)
336
337
// Evaluation
338
val evaluator = new ClusteringEvaluator()
339
val silhouette = evaluator.evaluate(kmeansPredictions)
340
```
341
342
## Feature Engineering
343
344
### Vector Assembler
345
346
```scala { .api }
347
class VectorAssembler extends Transformer with VectorAssemblerParams {
348
def setInputCols(value: Array[String]): this.type
349
def setOutputCol(value: String): this.type
350
def setHandleInvalid(value: String): this.type
351
def transform(dataset: Dataset[_]): DataFrame
352
}
353
```
354
355
### String Indexer
356
357
```scala { .api }
358
class StringIndexer extends Estimator[StringIndexerModel] {
359
def setInputCol(value: String): this.type
360
def setOutputCol(value: String): this.type
361
def setHandleInvalid(value: String): this.type
362
def setStringOrderType(value: String): this.type
363
}
364
365
class StringIndexerModel extends Model[StringIndexerModel] {
366
def labels: Array[String]
367
def labelsArray: Array[Array[String]]
368
def transform(dataset: Dataset[_]): DataFrame
369
}
370
```
371
372
### One-Hot Encoder
373
374
```scala { .api }
375
class OneHotEncoder extends Transformer with OneHotEncoderParams {
376
def setInputCols(value: Array[String]): this.type
377
def setOutputCols(value: Array[String]): this.type
378
def setDropLast(value: Boolean): this.type
379
def setHandleInvalid(value: String): this.type
380
def transform(dataset: Dataset[_]): DataFrame
381
}
382
```
383
384
### Standard Scaler
385
386
```scala { .api }
387
class StandardScaler extends Estimator[StandardScalerModel] {
388
def setInputCol(value: String): this.type
389
def setOutputCol(value: String): this.type
390
def setWithMean(value: Boolean): this.type
391
def setWithStd(value: Boolean): this.type
392
}
393
394
class StandardScalerModel extends Model[StandardScalerModel] {
395
def mean: Vector
396
def std: Vector
397
def transform(dataset: Dataset[_]): DataFrame
398
}
399
```
400
401
### Usage Examples
402
403
```scala
404
import org.apache.spark.ml.feature._
405
406
// Vector Assembler
407
val assembler = new VectorAssembler()
408
.setInputCols(Array("feature1", "feature2", "feature3"))
409
.setOutputCol("features")
410
411
val featuresDF = assembler.transform(df)
412
413
// String Indexer
414
val indexer = new StringIndexer()
415
.setInputCol("category")
416
.setOutputCol("categoryIndex")
417
418
val indexerModel = indexer.fit(df)
419
val indexedDF = indexerModel.transform(df)
420
421
// One-Hot Encoder
422
val encoder = new OneHotEncoder()
423
.setInputCols(Array("categoryIndex"))
424
.setOutputCols(Array("categoryVec"))
425
426
val encodedDF = encoder.transform(indexedDF)
427
428
// Standard Scaler
429
val scaler = new StandardScaler()
430
.setInputCol("features")
431
.setOutputCol("scaledFeatures")
432
.setWithStd(true)
433
.setWithMean(false)
434
435
val scalerModel = scaler.fit(featuresDF)
436
val scaledDF = scalerModel.transform(featuresDF)
437
```
438
439
## Model Selection and Evaluation
440
441
### Cross Validator
442
443
```scala { .api }
444
class CrossValidator extends Estimator[CrossValidatorModel] {
445
def setEstimator(value: Estimator[_]): this.type
446
def setEstimatorParamMaps(value: Array[ParamMap]): this.type
447
def setEvaluator(value: Evaluator): this.type
448
def setNumFolds(value: Int): this.type
449
def setSeed(value: Long): this.type
450
def setParallelism(value: Int): this.type
451
def setCollectSubModels(value: Boolean): this.type
452
def setFoldCol(value: String): this.type
453
}
454
455
class CrossValidatorModel extends Model[CrossValidatorModel] {
456
def bestModel: Model[_]
457
def avgMetrics: Array[Double]
458
def stdMetrics: Array[Double]
459
def subModels: Array[Array[Model[_]]]
460
def getEstimatorParamMaps: Array[ParamMap]
461
def getEvaluator: Evaluator
462
}
463
```
464
465
### Parameter Grid Builder
466
467
```scala { .api }
468
class ParamGridBuilder {
469
def addGrid[T](param: Param[T], values: Array[T]): this.type
470
def addGrid[T](param: Param[T], values: java.util.List[T]): this.type
471
def build(): Array[ParamMap]
472
}
473
```
474
475
### Evaluators
476
477
```scala { .api }
478
class BinaryClassificationEvaluator extends Evaluator {
479
def setRawPredictionCol(value: String): this.type
480
def setLabelCol(value: String): this.type
481
def setMetricName(value: String): this.type
482
def setWeightCol(value: String): this.type
483
def evaluate(dataset: Dataset[_]): Double
484
def isLargerBetter: Boolean
485
}
486
487
class MulticlassClassificationEvaluator extends Evaluator {
488
def setPredictionCol(value: String): this.type
489
def setLabelCol(value: String): this.type
490
def setMetricName(value: String): this.type
491
def setWeightCol(value: String): this.type
492
def setBeta(value: Double): this.type
493
def setEps(value: Double): this.type
494
def evaluate(dataset: Dataset[_]): Double
495
}
496
497
class RegressionEvaluator extends Evaluator {
498
def setPredictionCol(value: String): this.type
499
def setLabelCol(value: String): this.type
500
def setMetricName(value: String): this.type
501
def setWeightCol(value: String): this.type
502
def evaluate(dataset: Dataset[_]): Double
503
}
504
```
505
506
### Usage Examples
507
508
```scala
509
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
510
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
511
512
// Parameter grid
513
val paramGrid = new ParamGridBuilder()
514
.addGrid(lr.regParam, Array(0.1, 0.01))
515
.addGrid(lr.fitIntercept, Array(false, true))
516
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
517
.build()
518
519
// Cross validator
520
val cv = new CrossValidator()
521
.setEstimator(lr)
522
.setEvaluator(new BinaryClassificationEvaluator)
523
.setEstimatorParamMaps(paramGrid)
524
.setNumFolds(3)
525
.setParallelism(2)
526
527
val cvModel = cv.fit(trainingData)
528
val bestModel = cvModel.bestModel
529
530
// Predictions and evaluation
531
val predictions = cvModel.transform(testData)
532
val evaluator = new BinaryClassificationEvaluator()
533
val auc = evaluator.evaluate(predictions)
534
```
535
536
## Collaborative Filtering
537
538
### Alternating Least Squares (ALS)
539
540
```scala { .api }
541
class ALS extends Estimator[ALSModel] {
542
def setRank(value: Int): this.type
543
def setMaxIter(value: Int): this.type
544
def setRegParam(value: Double): this.type
545
def setImplicitPrefs(value: Boolean): this.type
546
def setAlpha(value: Double): this.type
547
def setUserCol(value: String): this.type
548
def setItemCol(value: String): this.type
549
def setRatingCol(value: String): this.type
550
def setPredictionCol(value: String): this.type
551
def setNonnegative(value: Boolean): this.type
552
def setNumUserBlocks(value: Int): this.type
553
def setNumItemBlocks(value: Int): this.type
554
def setSeed(value: Long): this.type
555
def setCheckpointInterval(value: Int): this.type
556
def setStorageLevel(value: StorageLevel): this.type
557
def setIntermediateStorageLevel(value: StorageLevel): this.type
558
def setColdStartStrategy(value: String): this.type
559
def setFinalStorageLevel(value: StorageLevel): this.type
560
def setBlockSize(value: Int): this.type
561
}
562
563
class ALSModel extends Model[ALSModel] {
564
def rank: Int
565
def userFactors: DataFrame
566
def itemFactors: DataFrame
567
def recommendForAllUsers(numItems: Int): DataFrame
568
def recommendForAllItems(numUsers: Int): DataFrame
569
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
570
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
571
def transform(dataset: Dataset[_]): DataFrame
572
}
573
```
574
575
### Usage Examples
576
577
```scala
578
import org.apache.spark.ml.recommendation.ALS
579
580
// ALS model
581
val als = new ALS()
582
.setMaxIter(5)
583
.setRegParam(0.01)
584
.setUserCol("userId")
585
.setItemCol("movieId")
586
.setRatingCol("rating")
587
.setColdStartStrategy("drop")
588
589
val model = als.fit(ratings)
590
591
// Generate recommendations
592
val userRecs = model.recommendForAllUsers(10)
593
val movieRecs = model.recommendForAllItems(10)
594
595
// Make predictions
596
val predictions = model.transform(testData)
597
```