0
# Machine Learning
1
2
Comprehensive machine learning library with algorithms for classification, regression, clustering, and collaborative filtering. Provides both high-level Pipeline API for building ML workflows and lower-level RDD-based APIs for advanced use cases.
3
4
## Capabilities
5
6
### Pipeline API
7
8
High-level API for building machine learning workflows with transformers and estimators.
9
10
```scala { .api }
11
/**
12
* Machine learning pipeline for chaining transformers and estimators
13
*/
14
class Pipeline extends Estimator[PipelineModel] {
15
/** Set pipeline stages */
16
def setStages(value: Array[PipelineStage]): Pipeline
17
/** Get pipeline stages */
18
def getStages: Array[PipelineStage]
19
/** Fit pipeline to data */
20
def fit(dataset: Dataset[_]): PipelineModel
21
}
22
23
/**
24
* Fitted pipeline ready for predictions
25
*/
26
class PipelineModel extends Transformer {
27
/** Transform dataset using fitted pipeline */
28
def transform(dataset: Dataset[_]): DataFrame
29
/** Get fitted stages */
30
def stages: Array[Transformer]
31
}
32
33
/**
34
* Base class for pipeline components
35
*/
36
abstract class PipelineStage {
37
def uid: String
38
def copy(extra: ParamMap): PipelineStage
39
}
40
41
/**
42
* Algorithm that can be fit on a DataFrame
43
*/
44
abstract class Estimator[M <: Model[M]] extends PipelineStage {
45
/** Fit model to dataset */
46
def fit(dataset: Dataset[_]): M
47
}
48
49
/**
50
* Algorithm that transforms one DataFrame into another
51
*/
52
abstract class Transformer extends PipelineStage {
53
/** Transform dataset */
54
def transform(dataset: Dataset[_]): DataFrame
55
}
56
57
/**
58
* Result of fitting an Estimator
59
*/
60
abstract class Model[M <: Model[M]] extends Transformer
61
```
62
63
**Usage Examples:**
64
65
```scala
66
import org.apache.spark.ml.Pipeline
67
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
68
import org.apache.spark.ml.classification.LogisticRegression
69
70
// Create pipeline stages
71
val indexer = new StringIndexer()
72
.setInputCol("category")
73
.setOutputCol("categoryIndex")
74
75
val assembler = new VectorAssembler()
76
.setInputCols(Array("age", "income", "categoryIndex"))
77
.setOutputCol("features")
78
79
val lr = new LogisticRegression()
80
.setFeaturesCol("features")
81
.setLabelCol("label")
82
83
// Create and fit pipeline
84
val pipeline = new Pipeline()
85
.setStages(Array(indexer, assembler, lr))
86
87
val model = pipeline.fit(trainingData)
88
val predictions = model.transform(testData)
89
```
90
91
### Classification
92
93
Algorithms for supervised learning with discrete target variables.
94
95
```scala { .api }
96
/**
97
* Logistic regression classifier
98
*/
99
class LogisticRegression extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {
100
def setFeaturesCol(value: String): LogisticRegression
101
def setLabelCol(value: String): LogisticRegression
102
def setPredictionCol(value: String): LogisticRegression
103
def setProbabilityCol(value: String): LogisticRegression
104
def setMaxIter(value: Int): LogisticRegression
105
def setRegParam(value: Double): LogisticRegression
106
def setElasticNetParam(value: Double): LogisticRegression
107
def setFamily(value: String): LogisticRegression
108
def fit(dataset: Dataset[_]): LogisticRegressionModel
109
}
110
111
/**
112
* Decision tree classifier
113
*/
114
class DecisionTreeClassifier extends Classifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] {
115
def setMaxDepth(value: Int): DecisionTreeClassifier
116
def setMinInstancesPerNode(value: Int): DecisionTreeClassifier
117
def setImpurity(value: String): DecisionTreeClassifier
118
def setMaxBins(value: Int): DecisionTreeClassifier
119
def setSeed(value: Long): DecisionTreeClassifier
120
def fit(dataset: Dataset[_]): DecisionTreeClassificationModel
121
}
122
123
/**
124
* Random forest classifier
125
*/
126
class RandomForestClassifier extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {
127
def setNumTrees(value: Int): RandomForestClassifier
128
def setMaxDepth(value: Int): RandomForestClassifier
129
def setSubsamplingRate(value: Double): RandomForestClassifier
130
def setFeatureSubsetStrategy(value: String): RandomForestClassifier
131
def fit(dataset: Dataset[_]): RandomForestClassificationModel
132
}
133
134
/**
135
* Gradient-boosted tree classifier
136
*/
137
class GBTClassifier extends Classifier[Vector, GBTClassifier, GBTClassificationModel] {
138
def setMaxIter(value: Int): GBTClassifier
139
def setStepSize(value: Double): GBTClassifier
140
def setMaxDepth(value: Int): GBTClassifier
141
def fit(dataset: Dataset[_]): GBTClassificationModel
142
}
143
144
/**
145
* Naive Bayes classifier
146
*/
147
class NaiveBayes extends Classifier[Vector, NaiveBayes, NaiveBayesModel] {
148
def setModelType(value: String): NaiveBayes
149
def setSmoothing(value: Double): NaiveBayes
150
def fit(dataset: Dataset[_]): NaiveBayesModel
151
}
152
153
/**
154
* Linear Support Vector Machine
155
*/
156
class LinearSVC extends Classifier[Vector, LinearSVC, LinearSVCModel] {
157
def setMaxIter(value: Int): LinearSVC
158
def setRegParam(value: Double): LinearSVC
159
def setTol(value: Double): LinearSVC
160
def fit(dataset: Dataset[_]): LinearSVCModel
161
}
162
```
163
164
**Usage Examples:**
165
166
```scala
167
import org.apache.spark.ml.classification._
168
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
169
170
// Logistic Regression
171
val lr = new LogisticRegression()
172
.setMaxIter(20)
173
.setRegParam(0.3)
174
.setElasticNetParam(0.8)
175
176
val lrModel = lr.fit(trainingData)
177
val lrPredictions = lrModel.transform(testData)
178
179
// Random Forest
180
val rf = new RandomForestClassifier()
181
.setNumTrees(100)
182
.setFeatureSubsetStrategy("auto")
183
.setImpurity("gini")
184
.setMaxDepth(4)
185
.setMaxBins(32)
186
187
val rfModel = rf.fit(trainingData)
188
val rfPredictions = rfModel.transform(testData)
189
190
// Evaluation
191
val evaluator = new MulticlassClassificationEvaluator()
192
.setLabelCol("indexedLabel")
193
.setPredictionCol("prediction")
194
.setMetricName("accuracy")
195
196
val accuracy = evaluator.evaluate(predictions)
197
```
198
199
### Regression
200
201
Algorithms for supervised learning with continuous target variables.
202
203
```scala { .api }
204
/**
205
* Linear regression
206
*/
207
class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] {
208
def setFeaturesCol(value: String): LinearRegression
209
def setLabelCol(value: String): LinearRegression
210
def setPredictionCol(value: String): LinearRegression
211
def setMaxIter(value: Int): LinearRegression
212
def setRegParam(value: Double): LinearRegression
213
def setElasticNetParam(value: Double): LinearRegression
214
def setTol(value: Double): LinearRegression
215
def setFitIntercept(value: Boolean): LinearRegression
216
def fit(dataset: Dataset[_]): LinearRegressionModel
217
}
218
219
/**
220
* Decision tree regressor
221
*/
222
class DecisionTreeRegressor extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] {
223
def setMaxDepth(value: Int): DecisionTreeRegressor
224
def setMinInstancesPerNode(value: Int): DecisionTreeRegressor
225
def setImpurity(value: String): DecisionTreeRegressor
226
def fit(dataset: Dataset[_]): DecisionTreeRegressionModel
227
}
228
229
/**
230
* Random forest regressor
231
*/
232
class RandomForestRegressor extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel] {
233
def setNumTrees(value: Int): RandomForestRegressor
234
def setMaxDepth(value: Int): RandomForestRegressor
235
def setSubsamplingRate(value: Double): RandomForestRegressor
236
def fit(dataset: Dataset[_]): RandomForestRegressionModel
237
}
238
239
/**
240
* Gradient-boosted tree regressor
241
*/
242
class GBTRegressor extends Regressor[Vector, GBTRegressor, GBTRegressionModel] {
243
def setMaxIter(value: Int): GBTRegressor
244
def setStepSize(value: Double): GBTRegressor
245
def setMaxDepth(value: Int): GBTRegressor
246
def fit(dataset: Dataset[_]): GBTRegressionModel
247
}
248
249
/**
250
* Generalized linear regression
251
*/
252
class GeneralizedLinearRegression extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] {
253
def setFamily(value: String): GeneralizedLinearRegression
254
def setLink(value: String): GeneralizedLinearRegression
255
def setMaxIter(value: Int): GeneralizedLinearRegression
256
def setRegParam(value: Double): GeneralizedLinearRegression
257
def fit(dataset: Dataset[_]): GeneralizedLinearRegressionModel
258
}
259
```
260
261
### Clustering
262
263
Unsupervised learning algorithms for grouping data points.
264
265
```scala { .api }
266
/**
267
* K-means clustering
268
*/
269
class KMeans extends Estimator[KMeansModel] {
270
def setFeaturesCol(value: String): KMeans
271
def setPredictionCol(value: String): KMeans
272
def setK(value: Int): KMeans
273
def setMaxIter(value: Int): KMeans
274
def setTol(value: Double): KMeans
275
def setInitMode(value: String): KMeans
276
def setInitSteps(value: Int): KMeans
277
def setSeed(value: Long): KMeans
278
def fit(dataset: Dataset[_]): KMeansModel
279
}
280
281
/**
282
* Bisecting K-means clustering
283
*/
284
class BisectingKMeans extends Estimator[BisectingKMeansModel] {
285
def setK(value: Int): BisectingKMeans
286
def setMaxIter(value: Int): BisectingKMeans
287
def setSeed(value: Long): BisectingKMeans
288
def fit(dataset: Dataset[_]): BisectingKMeansModel
289
}
290
291
/**
292
* Gaussian Mixture Model
293
*/
294
class GaussianMixture extends Estimator[GaussianMixtureModel] {
295
def setK(value: Int): GaussianMixture
296
def setMaxIter(value: Int): GaussianMixture
297
def setTol(value: Double): GaussianMixture
298
def setSeed(value: Long): GaussianMixture
299
def fit(dataset: Dataset[_]): GaussianMixtureModel
300
}
301
302
/**
303
* Latent Dirichlet Allocation for topic modeling
304
*/
305
class LDA extends Estimator[LDAModel] {
306
def setK(value: Int): LDA
307
def setMaxIter(value: Int): LDA
308
def setTopicConcentration(value: Double): LDA
309
def setDocConcentration(value: Double): LDA
310
def setSeed(value: Long): LDA
311
def fit(dataset: Dataset[_]): LDAModel
312
}
313
```
314
315
**Usage Examples:**
316
317
```scala
318
import org.apache.spark.ml.clustering._
319
320
// K-Means
321
val kmeans = new KMeans()
322
.setK(3)
323
.setSeed(1L)
324
.setMaxIter(20)
325
326
val kmeansModel = kmeans.fit(dataset)
327
val predictions = kmeansModel.transform(dataset)
328
329
// Evaluate clustering by computing Within Set Sum of Squared Errors
330
val wssse = kmeansModel.computeCost(dataset)
331
println(s"Within Set Sum of Squared Errors = $wssse")
332
333
// Gaussian Mixture Model
334
val gmm = new GaussianMixture()
335
.setK(3)
336
.setSeed(538009335L)
337
338
val gmmModel = gmm.fit(dataset)
339
val gmmPredictions = gmmModel.transform(dataset)
340
```
341
342
### Feature Processing
343
344
Data preprocessing and feature engineering transformers.
345
346
```scala { .api }
347
/**
348
* Combine multiple columns into vector column
349
*/
350
class VectorAssembler extends Transformer {
351
def setInputCols(value: Array[String]): VectorAssembler
352
def setOutputCol(value: String): VectorAssembler
353
def setHandleInvalid(value: String): VectorAssembler
354
def transform(dataset: Dataset[_]): DataFrame
355
}
356
357
/**
358
* Standardize features by removing mean and scaling to unit variance
359
*/
360
class StandardScaler extends Estimator[StandardScalerModel] {
361
def setInputCol(value: String): StandardScaler
362
def setOutputCol(value: String): StandardScaler
363
def setWithMean(value: Boolean): StandardScaler
364
def setWithStd(value: Boolean): StandardScaler
365
def fit(dataset: Dataset[_]): StandardScalerModel
366
}
367
368
/**
369
* Scale features to given range
370
*/
371
class MinMaxScaler extends Estimator[MinMaxScalerModel] {
372
def setInputCol(value: String): MinMaxScaler
373
def setOutputCol(value: String): MinMaxScaler
374
def setMin(value: Double): MinMaxScaler
375
def setMax(value: Double): MinMaxScaler
376
def fit(dataset: Dataset[_]): MinMaxScalerModel
377
}
378
379
/**
380
* Map strings to indices
381
*/
382
class StringIndexer extends Estimator[StringIndexerModel] {
383
def setInputCol(value: String): StringIndexer
384
def setOutputCol(value: String): StringIndexer
385
def setHandleInvalid(value: String): StringIndexer
386
def setStringOrderType(value: String): StringIndexer
387
def fit(dataset: Dataset[_]): StringIndexerModel
388
}
389
390
/**
391
* One-hot encode categorical features
392
*/
393
class OneHotEncoder extends Transformer {
394
def setInputCols(value: Array[String]): OneHotEncoder
395
def setOutputCols(value: Array[String]): OneHotEncoder
396
def setDropLast(value: Boolean): OneHotEncoder
397
def setHandleInvalid(value: String): OneHotEncoder
398
def transform(dataset: Dataset[_]): DataFrame
399
}
400
401
/**
402
* Tokenize text into words
403
*/
404
class Tokenizer extends Transformer {
405
def setInputCol(value: String): Tokenizer
406
def setOutputCol(value: String): Tokenizer
407
def transform(dataset: Dataset[_]): DataFrame
408
}
409
410
/**
411
* Regular expression tokenizer
412
*/
413
class RegexTokenizer extends Transformer {
414
def setInputCol(value: String): RegexTokenizer
415
def setOutputCol(value: String): RegexTokenizer
416
def setPattern(value: String): RegexTokenizer
417
def setGaps(value: Boolean): RegexTokenizer
418
def setMinTokenLength(value: Int): RegexTokenizer
419
def transform(dataset: Dataset[_]): DataFrame
420
}
421
422
/**
423
* Hash text features to fixed-length vectors
424
*/
425
class HashingTF extends Transformer {
426
def setInputCol(value: String): HashingTF
427
def setOutputCol(value: String): HashingTF
428
def setNumFeatures(value: Int): HashingTF
429
def setBinary(value: Boolean): HashingTF
430
def transform(dataset: Dataset[_]): DataFrame
431
}
432
433
/**
434
* Compute Inverse Document Frequency
435
*/
436
class IDF extends Estimator[IDFModel] {
437
def setInputCol(value: String): IDF
438
def setOutputCol(value: String): IDF
439
def setMinDocFreq(value: Int): IDF
440
def fit(dataset: Dataset[_]): IDFModel
441
}
442
443
/**
444
* Principal Component Analysis
445
*/
446
class PCA extends Estimator[PCAModel] {
447
def setInputCol(value: String): PCA
448
def setOutputCol(value: String): PCA
449
def setK(value: Int): PCA
450
def fit(dataset: Dataset[_]): PCAModel
451
}
452
453
/**
454
* Chi-squared feature selection
455
*/
456
class ChiSqSelector extends Estimator[ChiSqSelectorModel] {
457
def setFeaturesCol(value: String): ChiSqSelector
458
def setOutputCol(value: String): ChiSqSelector
459
def setLabelCol(value: String): ChiSqSelector
460
def setNumTopFeatures(value: Int): ChiSqSelector
461
def setSelectorType(value: String): ChiSqSelector
462
def fit(dataset: Dataset[_]): ChiSqSelectorModel
463
}
464
```
465
466
**Usage Examples:**
467
468
```scala
469
import org.apache.spark.ml.feature._
470
471
// Feature assembly
472
val assembler = new VectorAssembler()
473
.setInputCols(Array("age", "income", "education"))
474
.setOutputCol("features")
475
476
val assembled = assembler.transform(df)
477
478
// String indexing and one-hot encoding
479
val indexer = new StringIndexer()
480
.setInputCol("category")
481
.setOutputCol("categoryIndex")
482
.fit(df)
483
484
val indexed = indexer.transform(df)
485
486
val encoder = new OneHotEncoder()
487
.setInputCols(Array("categoryIndex"))
488
.setOutputCols(Array("categoryVec"))
489
490
val encoded = encoder.transform(indexed)
491
492
// Text processing pipeline
493
val tokenizer = new Tokenizer()
494
.setInputCol("text")
495
.setOutputCol("words")
496
497
val hashingTF = new HashingTF()
498
.setInputCol("words")
499
.setOutputCol("rawFeatures")
500
.setNumFeatures(10000)
501
502
val idf = new IDF()
503
.setInputCol("rawFeatures")
504
.setOutputCol("features")
505
506
val idfModel = idf.fit(hashingTF.transform(tokenizer.transform(textDF)))
507
```
508
509
### Model Evaluation
510
511
Metrics and evaluators for assessing model performance.
512
513
```scala { .api }
514
/**
515
* Evaluator for multiclass classification
516
*/
517
class MulticlassClassificationEvaluator extends Evaluator {
518
def setLabelCol(value: String): MulticlassClassificationEvaluator
519
def setPredictionCol(value: String): MulticlassClassificationEvaluator
520
def setMetricName(value: String): MulticlassClassificationEvaluator
521
def evaluate(dataset: Dataset[_]): Double
522
}
523
524
/**
525
* Evaluator for binary classification
526
*/
527
class BinaryClassificationEvaluator extends Evaluator {
528
def setLabelCol(value: String): BinaryClassificationEvaluator
529
def setRawPredictionCol(value: String): BinaryClassificationEvaluator
530
def setMetricName(value: String): BinaryClassificationEvaluator
531
def evaluate(dataset: Dataset[_]): Double
532
}
533
534
/**
535
* Evaluator for regression
536
*/
537
class RegressionEvaluator extends Evaluator {
538
def setLabelCol(value: String): RegressionEvaluator
539
def setPredictionCol(value: String): RegressionEvaluator
540
def setMetricName(value: String): RegressionEvaluator
541
def evaluate(dataset: Dataset[_]): Double
542
}
543
544
/**
545
* Evaluator for clustering
546
*/
547
class ClusteringEvaluator extends Evaluator {
548
def setFeaturesCol(value: String): ClusteringEvaluator
549
def setPredictionCol(value: String): ClusteringEvaluator
550
def setMetricName(value: String): ClusteringEvaluator
551
def evaluate(dataset: Dataset[_]): Double
552
}
553
```
554
555
### Model Selection
556
557
Cross-validation and hyperparameter tuning utilities.
558
559
```scala { .api }
560
/**
561
* Cross-validator for model selection
562
*/
563
class CrossValidator extends Estimator[CrossValidatorModel] {
564
def setEstimator(value: Estimator[_]): CrossValidator
565
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
566
def setEvaluator(value: Evaluator): CrossValidator
567
def setNumFolds(value: Int): CrossValidator
568
def setSeed(value: Long): CrossValidator
569
def fit(dataset: Dataset[_]): CrossValidatorModel
570
}
571
572
/**
573
* Train-validation split for model selection
574
*/
575
class TrainValidationSplit extends Estimator[TrainValidationSplitModel] {
576
def setEstimator(value: Estimator[_]): TrainValidationSplit
577
def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
578
def setEvaluator(value: Evaluator): TrainValidationSplit
579
def setTrainRatio(value: Double): TrainValidationSplit
580
def setSeed(value: Long): TrainValidationSplit
581
def fit(dataset: Dataset[_]): TrainValidationSplitModel
582
}
583
584
/**
585
* Parameter grid builder
586
*/
587
class ParamGridBuilder {
588
def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
589
def build(): Array[ParamMap]
590
}
591
```
592
593
**Usage Examples:**
594
595
```scala
596
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
597
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
598
599
// Parameter grid
600
val paramGrid = new ParamGridBuilder()
601
.addGrid(lr.regParam, Array(0.1, 0.01))
602
.addGrid(lr.maxIter, Array(10, 100))
603
.build()
604
605
// Cross validation
606
val cv = new CrossValidator()
607
.setEstimator(lr)
608
.setEvaluator(new BinaryClassificationEvaluator())
609
.setEstimatorParamMaps(paramGrid)
610
.setNumFolds(3)
611
.setSeed(1234L)
612
613
val cvModel = cv.fit(trainingData)
614
val bestModel = cvModel.bestModel
615
```
616
617
## Error Handling
618
619
Common ML exceptions:
620
621
- `IllegalArgumentException` - Invalid parameters or configurations
622
- `SparkException` - General Spark execution errors during ML operations
623
- `UnsupportedOperationException` - Unsupported operations on specific model types