0
# Feature Processing
1
2
MLlib provides comprehensive feature engineering capabilities including transformation, scaling, encoding, selection, and extraction. All feature processors follow the Estimator-Transformer pattern and integrate seamlessly with the Pipeline API.
3
4
## Vector Operations
5
6
### VectorAssembler
7
8
```scala { .api }
9
class VectorAssembler(override val uid: String) extends Transformer
10
with HasInputCols with HasOutputCol with HasHandleInvalid with DefaultParamsWritable {
11
12
def this() = this(Identifiable.randomUID("vecAssembler"))
13
14
def setInputCols(value: Array[String]): VectorAssembler
15
def setOutputCol(value: String): VectorAssembler
16
def setHandleInvalid(value: String): VectorAssembler
17
18
override def transform(dataset: Dataset[_]): DataFrame
19
override def transformSchema(schema: StructType): StructType
20
override def copy(extra: ParamMap): VectorAssembler
21
}
22
```
23
24
### VectorSlicer
25
26
```scala { .api }
27
class VectorSlicer(override val uid: String) extends Transformer
28
with HasInputCol with HasOutputCol with DefaultParamsWritable {
29
30
def this() = this(Identifiable.randomUID("vectorSlicer"))
31
32
final val indices: IntArrayParam
33
final val names: StringArrayParam
34
35
def setIndices(value: Array[Int]): VectorSlicer
36
def setNames(value: Array[String]): VectorSlicer
37
def setInputCol(value: String): VectorSlicer
38
def setOutputCol(value: String): VectorSlicer
39
40
override def transform(dataset: Dataset[_]): DataFrame
41
override def copy(extra: ParamMap): VectorSlicer
42
}
43
```
44
45
### VectorSizeHint
46
47
```scala { .api }
48
class VectorSizeHint(override val uid: String) extends Transformer
49
with HasInputCol with HasOutputCol with DefaultParamsWritable {
50
51
def this() = this(Identifiable.randomUID("vectorSizeHint"))
52
53
final val size: IntParam
54
final val handleInvalid: Param[String]
55
56
def setSize(value: Int): VectorSizeHint
57
def setHandleInvalid(value: String): VectorSizeHint
58
def setInputCol(value: String): VectorSizeHint
59
def setOutputCol(value: String): VectorSizeHint
60
61
override def transform(dataset: Dataset[_]): DataFrame
62
override def copy(extra: ParamMap): VectorSizeHint
63
}
64
```
65
66
## Scaling and Normalization
67
68
### StandardScaler
69
70
```scala { .api }
71
class StandardScaler(override val uid: String) extends Estimator[StandardScalerModel]
72
with StandardScalerParams with DefaultParamsWritable {
73
74
def this() = this(Identifiable.randomUID("stdScal"))
75
76
def setWithMean(value: Boolean): StandardScaler
77
def setWithStd(value: Boolean): StandardScaler
78
def setInputCol(value: String): StandardScaler
79
def setOutputCol(value: String): StandardScaler
80
81
override def fit(dataset: Dataset[_]): StandardScalerModel
82
override def copy(extra: ParamMap): StandardScaler
83
}
84
85
class StandardScalerModel(override val uid: String, val std: Vector, val mean: Vector)
86
extends Model[StandardScalerModel] with StandardScalerParams with MLWritable {
87
88
def setInputCol(value: String): StandardScalerModel
89
def setOutputCol(value: String): StandardScalerModel
90
91
override def transform(dataset: Dataset[_]): DataFrame
92
override def copy(extra: ParamMap): StandardScalerModel
93
def write: MLWriter
94
}
95
```
96
97
### MinMaxScaler
98
99
```scala { .api }
100
class MinMaxScaler(override val uid: String) extends Estimator[MinMaxScalerModel]
101
with MinMaxScalerParams with DefaultParamsWritable {
102
103
def this() = this(Identifiable.randomUID("minMaxScal"))
104
105
def setMin(value: Double): MinMaxScaler
106
def setMax(value: Double): MinMaxScaler
107
def setInputCol(value: String): MinMaxScaler
108
def setOutputCol(value: String): MinMaxScaler
109
110
override def fit(dataset: Dataset[_]): MinMaxScalerModel
111
override def copy(extra: ParamMap): MinMaxScaler
112
}
113
114
class MinMaxScalerModel(override val uid: String, val originalMin: Vector, val originalMax: Vector)
115
extends Model[MinMaxScalerModel] with MinMaxScalerParams with MLWritable {
116
117
def setInputCol(value: String): MinMaxScalerModel
118
def setOutputCol(value: String): MinMaxScalerModel
119
120
override def transform(dataset: Dataset[_]): DataFrame
121
override def copy(extra: ParamMap): MinMaxScalerModel
122
def write: MLWriter
123
}
124
```
125
126
### MaxAbsScaler
127
128
```scala { .api }
129
class MaxAbsScaler(override val uid: String) extends Estimator[MaxAbsScalerModel]
130
with MaxAbsScalerParams with DefaultParamsWritable {
131
132
def this() = this(Identifiable.randomUID("maxAbsScal"))
133
134
def setInputCol(value: String): MaxAbsScaler
135
def setOutputCol(value: String): MaxAbsScaler
136
137
override def fit(dataset: Dataset[_]): MaxAbsScalerModel
138
override def copy(extra: ParamMap): MaxAbsScaler
139
}
140
141
class MaxAbsScalerModel(override val uid: String, val maxAbs: Vector)
142
extends Model[MaxAbsScalerModel] with MaxAbsScalerParams with MLWritable {
143
144
def setInputCol(value: String): MaxAbsScalerModel
145
def setOutputCol(value: String): MaxAbsScalerModel
146
147
override def transform(dataset: Dataset[_]): DataFrame
148
override def copy(extra: ParamMap): MaxAbsScalerModel
149
def write: MLWriter
150
}
151
```
152
153
### Normalizer
154
155
```scala { .api }
156
class Normalizer(override val uid: String) extends Transformer
157
with HasInputCol with HasOutputCol with DefaultParamsWritable {
158
159
def this() = this(Identifiable.randomUID("normalizer"))
160
161
final val p: DoubleParam
162
163
def setP(value: Double): Normalizer
164
def setInputCol(value: String): Normalizer
165
def setOutputCol(value: String): Normalizer
166
167
override def transform(dataset: Dataset[_]): DataFrame
168
override def copy(extra: ParamMap): Normalizer
169
}
170
```
171
172
## Categorical Feature Processing
173
174
### StringIndexer
175
176
```scala { .api }
177
class StringIndexer(override val uid: String) extends Estimator[StringIndexerModel]
178
with StringIndexerBase with DefaultParamsWritable {
179
180
def this() = this(Identifiable.randomUID("strIdx"))
181
182
def setHandleInvalid(value: String): StringIndexer
183
def setStringOrderType(value: String): StringIndexer
184
def setInputCol(value: String): StringIndexer
185
def setOutputCol(value: String): StringIndexer
186
def setInputCols(value: Array[String]): StringIndexer
187
def setOutputCols(value: Array[String]): StringIndexer
188
189
override def fit(dataset: Dataset[_]): StringIndexerModel
190
override def copy(extra: ParamMap): StringIndexer
191
}
192
193
class StringIndexerModel(override val uid: String, val labelsArray: Array[Array[String]])
194
extends Model[StringIndexerModel] with StringIndexerBase with MLWritable {
195
196
def this(uid: String, labels: Array[String]) = this(uid, Array(labels))
197
def labels: Array[String] = labelsArray(0)
198
199
def setHandleInvalid(value: String): StringIndexerModel
200
def setInputCol(value: String): StringIndexerModel
201
def setOutputCol(value: String): StringIndexerModel
202
def setInputCols(value: Array[String]): StringIndexerModel
203
def setOutputCols(value: Array[String]): StringIndexerModel
204
205
override def transform(dataset: Dataset[_]): DataFrame
206
override def copy(extra: ParamMap): StringIndexerModel
207
def write: MLWriter
208
}
209
```
210
211
### IndexToString
212
213
```scala { .api }
214
class IndexToString(override val uid: String) extends Transformer
215
with HasInputCol with HasOutputCol with DefaultParamsWritable {
216
217
def this() = this(Identifiable.randomUID("idxToStr"))
218
219
final val labels: StringArrayParam
220
221
def setLabels(value: Array[String]): IndexToString
222
def setInputCol(value: String): IndexToString
223
def setOutputCol(value: String): IndexToString
224
225
override def transform(dataset: Dataset[_]): DataFrame
226
override def copy(extra: ParamMap): IndexToString
227
}
228
```
229
230
### OneHotEncoder
231
232
```scala { .api }
233
class OneHotEncoder(override val uid: String) extends Transformer
234
with HasInputCol with HasOutputCol with HasInputCols with HasOutputCols with DefaultParamsWritable {
235
236
def this() = this(Identifiable.randomUID("oneHot"))
237
238
final val dropLast: BooleanParam
239
final val handleInvalid: Param[String]
240
241
def setDropLast(value: Boolean): OneHotEncoder
242
def setHandleInvalid(value: String): OneHotEncoder
243
def setInputCol(value: String): OneHotEncoder
244
def setOutputCol(value: String): OneHotEncoder
245
def setInputCols(value: Array[String]): OneHotEncoder
246
def setOutputCols(value: Array[String]): OneHotEncoder
247
248
override def transform(dataset: Dataset[_]): DataFrame
249
override def copy(extra: ParamMap): OneHotEncoder
250
}
251
```
252
253
### VectorIndexer
254
255
```scala { .api }
256
class VectorIndexer(override val uid: String) extends Estimator[VectorIndexerModel]
257
with VectorIndexerParams with DefaultParamsWritable {
258
259
def this() = this(Identifiable.randomUID("vecIdx"))
260
261
def setMaxCategories(value: Int): VectorIndexer
262
def setHandleInvalid(value: String): VectorIndexer
263
def setInputCol(value: String): VectorIndexer
264
def setOutputCol(value: String): VectorIndexer
265
266
override def fit(dataset: Dataset[_]): VectorIndexerModel
267
override def copy(extra: ParamMap): VectorIndexer
268
}
269
270
class VectorIndexerModel(override val uid: String, val numFeatures: Int, val categoryMaps: Map[Int, Map[Double, Int]])
271
extends Model[VectorIndexerModel] with VectorIndexerParams with MLWritable {
272
273
def setInputCol(value: String): VectorIndexerModel
274
def setOutputCol(value: String): VectorIndexerModel
275
276
override def transform(dataset: Dataset[_]): DataFrame
277
override def copy(extra: ParamMap): VectorIndexerModel
278
def write: MLWriter
279
}
280
```
281
282
## Discretization and Binning
283
284
### Bucketizer
285
286
```scala { .api }
287
class Bucketizer(override val uid: String) extends Transformer
288
with HasInputCol with HasOutputCol with HasInputCols with HasOutputCols with DefaultParamsWritable {
289
290
def this() = this(Identifiable.randomUID("bucketizer"))
291
292
final val splits: DoubleArrayParam
293
final val splitsArray: Param[Array[Array[Double]]]
294
final val handleInvalid: Param[String]
295
296
def setSplits(value: Array[Double]): Bucketizer
297
def setSplitsArray(value: Array[Array[Double]]): Bucketizer
298
def setHandleInvalid(value: String): Bucketizer
299
def setInputCol(value: String): Bucketizer
300
def setOutputCol(value: String): Bucketizer
301
def setInputCols(value: Array[String]): Bucketizer
302
def setOutputCols(value: Array[String]): Bucketizer
303
304
override def transform(dataset: Dataset[_]): DataFrame
305
override def copy(extra: ParamMap): Bucketizer
306
}
307
```
308
309
### QuantileDiscretizer
310
311
```scala { .api }
312
class QuantileDiscretizer(override val uid: String) extends Estimator[Bucketizer]
313
with QuantileDiscretizerBase with DefaultParamsWritable {
314
315
def this() = this(Identifiable.randomUID("quantileDiscretizer"))
316
317
def setNumBuckets(value: Int): QuantileDiscretizer
318
def setNumBucketsArray(value: Array[Int]): QuantileDiscretizer
319
def setRelativeError(value: Double): QuantileDiscretizer
320
def setHandleInvalid(value: String): QuantileDiscretizer
321
def setInputCol(value: String): QuantileDiscretizer
322
def setOutputCol(value: String): QuantileDiscretizer
323
def setInputCols(value: Array[String]): QuantileDiscretizer
324
def setOutputCols(value: Array[String]): QuantileDiscretizer
325
326
override def fit(dataset: Dataset[_]): Bucketizer
327
override def copy(extra: ParamMap): QuantileDiscretizer
328
}
329
```
330
331
### Binarizer
332
333
```scala { .api }
334
class Binarizer(override val uid: String) extends Transformer
335
with HasInputCol with HasOutputCol with HasInputCols with HasOutputCols with DefaultParamsWritable {
336
337
def this() = this(Identifiable.randomUID("binarizer"))
338
339
final val threshold: DoubleParam
340
final val thresholds: DoubleArrayParam
341
342
def setThreshold(value: Double): Binarizer
343
def setThresholds(value: Array[Double]): Binarizer
344
def setInputCol(value: String): Binarizer
345
def setOutputCol(value: String): Binarizer
346
def setInputCols(value: Array[String]): Binarizer
347
def setOutputCols(value: Array[String]): Binarizer
348
349
override def transform(dataset: Dataset[_]): DataFrame
350
override def copy(extra: ParamMap): Binarizer
351
}
352
```
353
354
## Text Processing
355
356
### Tokenizer
357
358
```scala { .api }
359
class Tokenizer(override val uid: String) extends Transformer
360
with HasInputCol with HasOutputCol with DefaultParamsWritable {
361
362
def this() = this(Identifiable.randomUID("tok"))
363
364
def setInputCol(value: String): Tokenizer
365
def setOutputCol(value: String): Tokenizer
366
367
override def transform(dataset: Dataset[_]): DataFrame
368
override def copy(extra: ParamMap): Tokenizer
369
}
370
```
371
372
### RegexTokenizer
373
374
```scala { .api }
375
class RegexTokenizer(override val uid: String) extends Transformer
376
with HasInputCol with HasOutputCol with DefaultParamsWritable {
377
378
def this() = this(Identifiable.randomUID("regexTok"))
379
380
final val minTokenLength: IntParam
381
final val gaps: BooleanParam
382
final val pattern: Param[String]
383
final val toLowercase: BooleanParam
384
385
def setMinTokenLength(value: Int): RegexTokenizer
386
def setGaps(value: Boolean): RegexTokenizer
387
def setPattern(value: String): RegexTokenizer
388
def setToLowercase(value: Boolean): RegexTokenizer
389
def setInputCol(value: String): RegexTokenizer
390
def setOutputCol(value: String): RegexTokenizer
391
392
override def transform(dataset: Dataset[_]): DataFrame
393
override def copy(extra: ParamMap): RegexTokenizer
394
}
395
```
396
397
### StopWordsRemover
398
399
```scala { .api }
400
class StopWordsRemover(override val uid: String) extends Transformer
401
with HasInputCol with HasOutputCol with DefaultParamsWritable {
402
403
def this() = this(Identifiable.randomUID("stopWords"))
404
405
final val stopWords: StringArrayParam
406
final val caseSensitive: BooleanParam
407
final val locale: Param[String]
408
409
def setStopWords(value: Array[String]): StopWordsRemover
410
def setCaseSensitive(value: Boolean): StopWordsRemover
411
def setLocale(value: String): StopWordsRemover
412
def setInputCol(value: String): StopWordsRemover
413
def setOutputCol(value: String): StopWordsRemover
414
415
override def transform(dataset: Dataset[_]): DataFrame
416
override def copy(extra: ParamMap): StopWordsRemover
417
}
418
419
object StopWordsRemover extends DefaultParamsReadable[StopWordsRemover] {
420
def loadDefaultStopWords(language: String): Array[String]
421
}
422
```
423
424
### NGram
425
426
```scala { .api }
427
class NGram(override val uid: String) extends Transformer
428
with HasInputCol with HasOutputCol with DefaultParamsWritable {
429
430
def this() = this(Identifiable.randomUID("ngram"))
431
432
final val n: IntParam
433
434
def setN(value: Int): NGram
435
def setInputCol(value: String): NGram
436
def setOutputCol(value: String): NGram
437
438
override def transform(dataset: Dataset[_]): DataFrame
439
override def copy(extra: ParamMap): NGram
440
}
441
```
442
443
## Text Vectorization
444
445
### CountVectorizer
446
447
```scala { .api }
448
class CountVectorizer(override val uid: String) extends Estimator[CountVectorizerModel]
449
with CountVectorizerParams with DefaultParamsWritable {
450
451
def this() = this(Identifiable.randomUID("cntVec"))
452
453
def setVocabSize(value: Int): CountVectorizer
454
def setMinDF(value: Double): CountVectorizer
455
def setMaxDF(value: Double): CountVectorizer
456
def setMinTF(value: Double): CountVectorizer
457
def setBinary(value: Boolean): CountVectorizer
458
def setInputCol(value: String): CountVectorizer
459
def setOutputCol(value: String): CountVectorizer
460
461
override def fit(dataset: Dataset[_]): CountVectorizerModel
462
override def copy(extra: ParamMap): CountVectorizer
463
}
464
465
class CountVectorizerModel(override val uid: String, val vocabulary: Array[String])
466
extends Model[CountVectorizerModel] with CountVectorizerParams with MLWritable {
467
468
def setInputCol(value: String): CountVectorizerModel
469
def setOutputCol(value: String): CountVectorizerModel
470
471
override def transform(dataset: Dataset[_]): DataFrame
472
override def copy(extra: ParamMap): CountVectorizerModel
473
def write: MLWriter
474
}
475
```
476
477
### HashingTF
478
479
```scala { .api }
480
class HashingTF(override val uid: String) extends Transformer
481
with HasInputCol with HasOutputCol with DefaultParamsWritable {
482
483
def this() = this(Identifiable.randomUID("hashingTF"))
484
485
final val numFeatures: IntParam
486
final val binary: BooleanParam
487
488
def setNumFeatures(value: Int): HashingTF
489
def setBinary(value: Boolean): HashingTF
490
def setInputCol(value: String): HashingTF
491
def setOutputCol(value: String): HashingTF
492
493
override def transform(dataset: Dataset[_]): DataFrame
494
override def copy(extra: ParamMap): HashingTF
495
}
496
```
497
498
### IDF
499
500
```scala { .api }
501
class IDF(override val uid: String) extends Estimator[IDFModel]
502
with IDFParams with DefaultParamsWritable {
503
504
def this() = this(Identifiable.randomUID("idf"))
505
506
def setMinDocFreq(value: Int): IDF
507
def setInputCol(value: String): IDF
508
def setOutputCol(value: String): IDF
509
510
override def fit(dataset: Dataset[_]): IDFModel
511
override def copy(extra: ParamMap): IDF
512
}
513
514
class IDFModel(override val uid: String, val idf: Vector)
515
extends Model[IDFModel] with IDFParams with MLWritable {
516
517
def setInputCol(value: String): IDFModel
518
def setOutputCol(value: String): IDFModel
519
520
override def transform(dataset: Dataset[_]): DataFrame
521
override def copy(extra: ParamMap): IDFModel
522
def write: MLWriter
523
}
524
```
525
526
### Word2Vec
527
528
```scala { .api }
529
class Word2Vec(override val uid: String) extends Estimator[Word2VecModel]
530
with Word2VecBase with DefaultParamsWritable {
531
532
def this() = this(Identifiable.randomUID("w2v"))
533
534
def setVectorSize(value: Int): Word2Vec
535
def setMinCount(value: Int): Word2Vec
536
def setNumPartitions(value: Int): Word2Vec
537
def setStepSize(value: Double): Word2Vec
538
def setMaxIter(value: Int): Word2Vec
539
def setSeed(value: Long): Word2Vec
540
def setInputCol(value: String): Word2Vec
541
def setOutputCol(value: String): Word2Vec
542
def setWindowSize(value: Int): Word2Vec
543
def setMaxSentenceLength(value: Int): Word2Vec
544
545
override def fit(dataset: Dataset[_]): Word2VecModel
546
override def copy(extra: ParamMap): Word2Vec
547
}
548
549
class Word2VecModel(override val uid: String, private val wordVectors: Map[String, Array[Float]])
550
extends Model[Word2VecModel] with Word2VecBase with MLWritable {
551
552
def getVectors: DataFrame
553
def findSynonyms(word: String, num: Int): DataFrame
554
def findSynonymsArray(word: String, num: Int): Array[(String, Double)]
555
556
def setInputCol(value: String): Word2VecModel
557
def setOutputCol(value: String): Word2VecModel
558
559
override def transform(dataset: Dataset[_]): DataFrame
560
override def copy(extra: ParamMap): Word2VecModel
561
def write: MLWriter
562
}
563
```
564
565
## Dimensionality Reduction
566
567
### PCA
568
569
```scala { .api }
570
class PCA(override val uid: String) extends Estimator[PCAModel]
571
with PCAParams with DefaultParamsWritable {
572
573
def this() = this(Identifiable.randomUID("pca"))
574
575
def setK(value: Int): PCA
576
def setInputCol(value: String): PCA
577
def setOutputCol(value: String): PCA
578
579
override def fit(dataset: Dataset[_]): PCAModel
580
override def copy(extra: ParamMap): PCA
581
}
582
583
class PCAModel(override val uid: String, val pc: Matrix, val explainedVariance: Vector)
584
extends Model[PCAModel] with PCAParams with MLWritable {
585
586
def setInputCol(value: String): PCAModel
587
def setOutputCol(value: String): PCAModel
588
589
override def transform(dataset: Dataset[_]): DataFrame
590
override def copy(extra: ParamMap): PCAModel
591
def write: MLWriter
592
}
593
```
594
595
## Feature Selection
596
597
### ChiSqSelector
598
599
```scala { .api }
600
class ChiSqSelector(override val uid: String) extends Estimator[ChiSqSelectorModel]
601
with ChiSqSelectorParams with DefaultParamsWritable {
602
603
def this() = this(Identifiable.randomUID("chiSqSelector"))
604
605
def setNumTopFeatures(value: Int): ChiSqSelector
606
def setPercentile(value: Double): ChiSqSelector
607
def setFpr(value: Double): ChiSqSelector
608
def setFdr(value: Double): ChiSqSelector
609
def setFwe(value: Double): ChiSqSelector
610
def setSelectorType(value: String): ChiSqSelector
611
def setFeaturesCol(value: String): ChiSqSelector
612
def setOutputCol(value: String): ChiSqSelector
613
def setLabelCol(value: String): ChiSqSelector
614
615
override def fit(dataset: Dataset[_]): ChiSqSelectorModel
616
override def copy(extra: ParamMap): ChiSqSelector
617
}
618
619
class ChiSqSelectorModel(override val uid: String, private val selector: org.apache.spark.mllib.feature.ChiSqSelectorModel)
620
extends Model[ChiSqSelectorModel] with ChiSqSelectorParams with MLWritable {
621
622
val selectedFeatures: Array[Int]
623
624
def setFeaturesCol(value: String): ChiSqSelectorModel
625
def setOutputCol(value: String): ChiSqSelectorModel
626
627
override def transform(dataset: Dataset[_]): DataFrame
628
override def copy(extra: ParamMap): ChiSqSelectorModel
629
def write: MLWriter
630
}
631
```
632
633
## Missing Value Imputation
634
635
### Imputer
636
637
```scala { .api }
638
class Imputer(override val uid: String) extends Estimator[ImputerModel]
639
with ImputerParams with DefaultParamsWritable {
640
641
def this() = this(Identifiable.randomUID("imputer"))
642
643
def setInputCols(value: Array[String]): Imputer
644
def setOutputCols(value: Array[String]): Imputer
645
def setStrategy(value: String): Imputer
646
def setMissingValue(value: Double): Imputer
647
def setRelativeError(value: Double): Imputer
648
649
override def fit(dataset: Dataset[_]): ImputerModel
650
override def copy(extra: ParamMap): Imputer
651
}
652
653
class ImputerModel(override val uid: String, val surrogateDF: DataFrame)
654
extends Model[ImputerModel] with ImputerParams with MLWritable {
655
656
def setInputCols(value: Array[String]): ImputerModel
657
def setOutputCols(value: Array[String]): ImputerModel
658
659
override def transform(dataset: Dataset[_]): DataFrame
660
override def copy(extra: ParamMap): ImputerModel
661
def write: MLWriter
662
}
663
```
664
665
## Advanced Feature Transformations
666
667
### PolynomialExpansion
668
669
```scala { .api }
670
class PolynomialExpansion(override val uid: String) extends Transformer
671
with HasInputCol with HasOutputCol with DefaultParamsWritable {
672
673
def this() = this(Identifiable.randomUID("polyExpansion"))
674
675
final val degree: IntParam
676
677
def setDegree(value: Int): PolynomialExpansion
678
def setInputCol(value: String): PolynomialExpansion
679
def setOutputCol(value: String): PolynomialExpansion
680
681
override def transform(dataset: Dataset[_]): DataFrame
682
override def copy(extra: ParamMap): PolynomialExpansion
683
}
684
```
685
686
### Interaction
687
688
```scala { .api }
689
class Interaction(override val uid: String) extends Transformer
690
with HasInputCols with HasOutputCol with DefaultParamsWritable {
691
692
def this() = this(Identifiable.randomUID("interaction"))
693
694
def setInputCols(value: Array[String]): Interaction
695
def setOutputCol(value: String): Interaction
696
697
override def transform(dataset: Dataset[_]): DataFrame
698
override def copy(extra: ParamMap): Interaction
699
}
700
```
701
702
### ElementwiseProduct
703
704
```scala { .api }
705
class ElementwiseProduct(override val uid: String) extends Transformer
706
with HasInputCol with HasOutputCol with DefaultParamsWritable {
707
708
def this() = this(Identifiable.randomUID("elemProd"))
709
710
final val scalingVec: Param[Vector]
711
712
def setScalingVec(value: Vector): ElementwiseProduct
713
def setInputCol(value: String): ElementwiseProduct
714
def setOutputCol(value: String): ElementwiseProduct
715
716
override def transform(dataset: Dataset[_]): DataFrame
717
override def copy(extra: ParamMap): ElementwiseProduct
718
}
719
```
720
721
### DCT (Discrete Cosine Transform)
722
723
```scala { .api }
724
class DCT(override val uid: String) extends Transformer
725
with HasInputCol with HasOutputCol with DefaultParamsWritable {
726
727
def this() = this(Identifiable.randomUID("dct"))
728
729
final val inverse: BooleanParam
730
731
def setInverse(value: Boolean): DCT
732
def setInputCol(value: String): DCT
733
def setOutputCol(value: String): DCT
734
735
override def transform(dataset: Dataset[_]): DataFrame
736
override def copy(extra: ParamMap): DCT
737
}
738
```
739
740
### SQLTransformer
741
742
```scala { .api }
743
class SQLTransformer(override val uid: String) extends Transformer with DefaultParamsWritable {
744
def this() = this(Identifiable.randomUID("sql"))
745
746
final val statement: Param[String]
747
748
def setStatement(value: String): SQLTransformer
749
def getStatement: String
750
751
override def transform(dataset: Dataset[_]): DataFrame
752
override def copy(extra: ParamMap): SQLTransformer
753
}
754
```
755
756
## RFormula
757
758
```scala { .api }
759
class RFormula(override val uid: String) extends Estimator[RFormulaModel]
760
with RFormulaParams with DefaultParamsWritable {
761
762
def this() = this(Identifiable.randomUID("rFormula"))
763
764
def setFormula(value: String): RFormula
765
def setFeaturesCol(value: String): RFormula
766
def setLabelCol(value: String): RFormula
767
def setHandleInvalid(value: String): RFormula
768
769
override def fit(dataset: Dataset[_]): RFormulaModel
770
override def copy(extra: ParamMap): RFormula
771
}
772
773
class RFormulaModel(override val uid: String, private val resolvedFormula: ResolvedRFormula,
774
val pipelineModel: PipelineModel)
775
extends Model[RFormulaModel] with RFormulaParams with MLWritable {
776
777
def setFeaturesCol(value: String): RFormulaModel
778
def setLabelCol(value: String): RFormulaModel
779
780
override def transform(dataset: Dataset[_]): DataFrame
781
override def copy(extra: ParamMap): RFormulaModel
782
def write: MLWriter
783
}
784
```
785
786
## Feature Hashing
787
788
### FeatureHasher
789
790
```scala { .api }
791
class FeatureHasher(override val uid: String) extends Transformer
792
with HasInputCols with HasOutputCol with DefaultParamsWritable {
793
794
def this() = this(Identifiable.randomUID("featureHasher"))
795
796
final val numFeatures: IntParam
797
final val categoricalCols: StringArrayParam
798
799
def setNumFeatures(value: Int): FeatureHasher
800
def setCategoricalCols(value: Array[String]): FeatureHasher
801
def setInputCols(value: Array[String]): FeatureHasher
802
def setOutputCol(value: String): FeatureHasher
803
804
override def transform(dataset: Dataset[_]): DataFrame
805
override def copy(extra: ParamMap): FeatureHasher
806
}
807
```
808
809
## Usage Examples
810
811
### Basic Feature Preprocessing Pipeline
812
813
```scala
814
import org.apache.spark.ml.Pipeline
815
import org.apache.spark.ml.feature._
816
817
// Create sample data
818
val data = spark.createDataFrame(Seq(
819
(0, "male", 25, 50000.0, Array("java", "spark")),
820
(1, "female", 30, 60000.0, Array("python", "ml")),
821
(2, "male", 35, 70000.0, Array("scala", "spark"))
822
)).toDF("id", "gender", "age", "salary", "skills")
823
824
// String indexing for categorical variables
825
val genderIndexer = new StringIndexer()
826
.setInputCol("gender")
827
.setOutputCol("genderIndex")
828
.setHandleInvalid("keep")
829
830
// One-hot encoding
831
val genderEncoder = new OneHotEncoder()
832
.setInputCol("genderIndex")
833
.setOutputCol("genderVec")
834
.setDropLast(true)
835
836
// Discretize continuous variables
837
val ageDiscretizer = new QuantileDiscretizer()
838
.setInputCol("age")
839
.setOutputCol("ageDiscrete")
840
.setNumBuckets(3)
841
842
// Scale salary
843
val salaryScaler = new StandardScaler()
844
.setInputCol("salary")
845
.setOutputCol("salaryScaled")
846
.setWithMean(true)
847
.setWithStd(true)
848
849
// Convert salary to vector first
850
val salaryAssembler = new VectorAssembler()
851
.setInputCols(Array("salary"))
852
.setOutputCol("salaryVec")
853
854
// Text processing for skills
855
val skillsTokenizer = new Tokenizer()
856
.setInputCol("skills")
857
.setOutputCol("skillsTokens")
858
859
val skillsHasher = new HashingTF()
860
.setInputCol("skillsTokens")
861
.setOutputCol("skillsFeatures")
862
.setNumFeatures(1000)
863
864
// Assemble all features
865
val featureAssembler = new VectorAssembler()
866
.setInputCols(Array("genderVec", "ageDiscrete", "salaryScaled", "skillsFeatures"))
867
.setOutputCol("features")
868
869
// Create pipeline
870
val pipeline = new Pipeline().setStages(Array(
871
genderIndexer, genderEncoder, ageDiscretizer,
872
salaryAssembler, salaryScaler,
873
skillsHasher, featureAssembler
874
))
875
876
val model = pipeline.fit(data)
877
val processedData = model.transform(data)
878
processedData.select("features").show(truncate = false)
879
```
880
881
### Text Feature Processing
882
883
```scala
884
import org.apache.spark.ml.feature._
885
886
// Sample text data
887
val textData = spark.createDataFrame(Seq(
888
(0, "Apache Spark is a fast and general engine for large-scale data processing"),
889
(1, "Spark runs on both standalone clusters and on Hadoop"),
890
(2, "Machine learning algorithms in MLlib are scalable")
891
)).toDF("id", "text")
892
893
// Tokenization
894
val tokenizer = new Tokenizer()
895
.setInputCol("text")
896
.setOutputCol("words")
897
898
val tokenized = tokenizer.transform(textData)
899
900
// Alternative: Regex tokenizer with custom pattern
901
val regexTokenizer = new RegexTokenizer()
902
.setInputCol("text")
903
.setOutputCol("words")
904
.setPattern("\\W+")
905
.setMinTokenLength(2)
906
.setToLowercase(true)
907
908
// Remove stop words
909
val stopWordsRemover = new StopWordsRemover()
910
.setInputCol("words")
911
.setOutputCol("filteredWords")
912
.setCaseSensitive(false)
913
914
val filtered = stopWordsRemover.transform(tokenized)
915
916
// Create n-grams
917
val ngram = new NGram()
918
.setN(2)
919
.setInputCol("filteredWords")
920
.setOutputCol("ngrams")
921
922
val ngrammed = ngram.transform(filtered)
923
924
// Count vectorization
925
val countVectorizer = new CountVectorizer()
926
.setInputCol("filteredWords")
927
.setOutputCol("rawFeatures")
928
.setVocabSize(10000)
929
.setMinDF(1)
930
931
val countVectorizerModel = countVectorizer.fit(filtered)
932
val featurized = countVectorizerModel.transform(filtered)
933
934
// TF-IDF
935
val idf = new IDF()
936
.setInputCol("rawFeatures")
937
.setOutputCol("features")
938
939
val idfModel = idf.fit(featurized)
940
val tfidfFeatures = idfModel.transform(featurized)
941
942
tfidfFeatures.select("text", "features").show(truncate = false)
943
```
944
945
### Word2Vec Embeddings
946
947
```scala
948
import org.apache.spark.ml.feature.Word2Vec
949
950
// Prepare sentence data
951
val sentences = spark.createDataFrame(Seq(
952
("I love Apache Spark".split(" ").toSeq),
953
("Spark is great for big data".split(" ").toSeq),
954
("Machine learning with Spark MLlib".split(" ").toSeq)
955
)).toDF("text")
956
957
// Train Word2Vec model
958
val word2Vec = new Word2Vec()
959
.setInputCol("text")
960
.setOutputCol("features")
961
.setVectorSize(100)
962
.setMinCount(1)
963
.setMaxIter(10)
964
.setStepSize(0.025)
965
.setWindowSize(5)
966
.setSeed(42)
967
968
val word2VecModel = word2Vec.fit(sentences)
969
970
// Transform sentences to vectors
971
val embeddings = word2VecModel.transform(sentences)
972
embeddings.select("text", "features").show(truncate = false)
973
974
// Find synonyms
975
val synonyms = word2VecModel.findSynonyms("Spark", 3)
976
synonyms.show()
977
978
// Get word vectors
979
val vectors = word2VecModel.getVectors
980
vectors.show()
981
```
982
983
### Feature Scaling and Normalization
984
985
```scala
986
import org.apache.spark.ml.feature._
987
import org.apache.spark.ml.linalg.Vectors
988
989
// Sample data with features
990
val data = spark.createDataFrame(Seq(
991
(0, Vectors.dense(1.0, 0.1, -1.0)),
992
(1, Vectors.dense(2.0, 1.1, 1.0)),
993
(2, Vectors.dense(3.0, 10.1, 3.0))
994
)).toDF("id", "features")
995
996
// Standard scaling (z-score normalization)
997
val standardScaler = new StandardScaler()
998
.setInputCol("features")
999
.setOutputCol("standardScaled")
1000
.setWithMean(true)
1001
.setWithStd(true)
1002
1003
val standardScalerModel = standardScaler.fit(data)
1004
val standardScaled = standardScalerModel.transform(data)
1005
1006
println("Standard Scaler - Mean and Std:")
1007
println(s"Mean: ${standardScalerModel.mean}")
1008
println(s"Std: ${standardScalerModel.std}")
1009
1010
// Min-Max scaling
1011
val minMaxScaler = new MinMaxScaler()
1012
.setInputCol("features")
1013
.setOutputCol("minMaxScaled")
1014
.setMin(0.0)
1015
.setMax(1.0)
1016
1017
val minMaxScalerModel = minMaxScaler.fit(data)
1018
val minMaxScaled = minMaxScalerModel.transform(data)
1019
1020
// Max-Abs scaling
1021
val maxAbsScaler = new MaxAbsScaler()
1022
.setInputCol("features")
1023
.setOutputCol("maxAbsScaled")
1024
1025
val maxAbsScalerModel = maxAbsScaler.fit(data)
1026
val maxAbsScaled = maxAbsScalerModel.transform(data)
1027
1028
// L2 normalization
1029
val normalizer = new Normalizer()
1030
.setInputCol("features")
1031
.setOutputCol("normalized")
1032
.setP(2.0)
1033
1034
val normalized = normalizer.transform(data)
1035
1036
// Show all transformations
1037
val allTransformed = standardScaled
1038
.join(minMaxScaled.select("id", "minMaxScaled"), "id")
1039
.join(maxAbsScaled.select("id", "maxAbsScaled"), "id")
1040
.join(normalized.select("id", "normalized"), "id")
1041
1042
allTransformed.show(truncate = false)
1043
```
1044
1045
### Principal Component Analysis
1046
1047
```scala
1048
import org.apache.spark.ml.feature.PCA
1049
import org.apache.spark.ml.linalg.Vectors
1050
1051
// High-dimensional data
1052
val data = spark.createDataFrame(Seq(
1053
(Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),),
1054
(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),),
1055
(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),)
1056
)).toDF("features")
1057
1058
// PCA to reduce to 3 dimensions
1059
val pca = new PCA()
1060
.setInputCol("features")
1061
.setOutputCol("pcaFeatures")
1062
.setK(3)
1063
1064
val pcaModel = pca.fit(data)
1065
val pcaData = pcaModel.transform(data)
1066
1067
println("Principal Components:")
1068
pcaModel.pc.toArray.grouped(pcaModel.pc.numRows).foreach { row =>
1069
println(row.mkString(", "))
1070
}
1071
1072
println("Explained Variance:")
1073
println(pcaModel.explainedVariance)
1074
1075
pcaData.select("features", "pcaFeatures").show(truncate = false)
1076
```
1077
1078
### Feature Selection with Chi-Square
1079
1080
```scala
1081
import org.apache.spark.ml.feature.ChiSqSelector
1082
import org.apache.spark.ml.linalg.Vectors
1083
1084
// Data with labels and features
1085
val data = spark.createDataFrame(Seq(
1086
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
1087
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
1088
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
1089
)).toDF("id", "features", "clicked")
1090
1091
// Chi-square feature selection
1092
val selector = new ChiSqSelector()
1093
.setNumTopFeatures(2)
1094
.setFeaturesCol("features")
1095
.setLabelCol("clicked")
1096
.setOutputCol("selectedFeatures")
1097
1098
val selectorModel = selector.fit(data)
1099
val selectedData = selectorModel.transform(data)
1100
1101
println(s"Selected features: ${selectorModel.selectedFeatures.mkString(", ")}")
1102
selectedData.show(truncate = false)
1103
1104
// Alternative selection methods
1105
val percentileSelector = new ChiSqSelector()
1106
.setSelectorType("percentile")
1107
.setPercentile(0.5)
1108
.setFeaturesCol("features")
1109
.setLabelCol("clicked")
1110
.setOutputCol("selectedFeatures")
1111
1112
val fprSelector = new ChiSqSelector()
1113
.setSelectorType("fpr")
1114
.setFpr(0.05)
1115
.setFeaturesCol("features")
1116
.setLabelCol("clicked")
1117
.setOutputCol("selectedFeatures")
1118
```
1119
1120
### Missing Value Imputation
1121
1122
```scala
1123
import org.apache.spark.ml.feature.Imputer
1124
1125
// Data with missing values (represented as NaN)
1126
val data = spark.createDataFrame(Seq(
1127
(1.0, Double.NaN, 3.0),
1128
(4.0, 5.0, Double.NaN),
1129
(7.0, 8.0, 9.0)
1130
)).toDF("a", "b", "c")
1131
1132
// Impute missing values with mean
1133
val imputer = new Imputer()
1134
.setInputCols(Array("a", "b", "c"))
1135
.setOutputCols(Array("a_imputed", "b_imputed", "c_imputed"))
1136
.setStrategy("mean") // "mean", "median", or "mode"
1137
1138
val imputerModel = imputer.fit(data)
1139
val imputedData = imputerModel.transform(data)
1140
1141
imputedData.show()
1142
1143
// Check surrogate values used for imputation
1144
imputerModel.surrogateDF.show()
1145
```
1146
1147
### Advanced Feature Engineering
1148
1149
```scala
1150
import org.apache.spark.ml.feature._
1151
import org.apache.spark.ml.linalg.Vectors
1152
1153
// Polynomial expansion
1154
val polyData = spark.createDataFrame(Seq(
1155
(Vectors.dense(2.0, 1.0),),
1156
(Vectors.dense(0.0, 0.0),),
1157
(Vectors.dense(3.0, -1.0),)
1158
)).toDF("features")
1159
1160
val polyExpansion = new PolynomialExpansion()
1161
.setInputCol("features")
1162
.setOutputCol("polyFeatures")
1163
.setDegree(2)
1164
1165
val polyExpanded = polyExpansion.transform(polyData)
1166
polyExpanded.show(truncate = false)
1167
1168
// Feature interactions
1169
val interactionData = spark.createDataFrame(Seq(
1170
(1, "A", Vectors.dense(1.0, 2.0)),
1171
(2, "B", Vectors.dense(3.0, 4.0))
1172
)).toDF("id", "category", "features")
1173
1174
val interaction = new Interaction()
1175
.setInputCols(Array("id", "category", "features"))
1176
.setOutputCol("interactionFeatures")
1177
1178
val interacted = interaction.transform(interactionData)
1179
interacted.show(truncate = false)
1180
1181
// Element-wise product
1182
val elementProduct = new ElementwiseProduct()
1183
.setScalingVec(Vectors.dense(0.0, 1.0, 2.0))
1184
.setInputCol("features")
1185
.setOutputCol("scaledFeatures")
1186
1187
val scaled = elementProduct.transform(polyData)
1188
scaled.show(truncate = false)
1189
```