MLlib provides comprehensive feature engineering capabilities including transformation, scaling, encoding, selection, and extraction. All feature processors follow the Estimator-Transformer pattern and integrate seamlessly with the Pipeline API.
class VectorAssembler(override val uid: String) extends Transformer
with HasInputCols with HasOutputCol with HasHandleInvalid with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("vecAssembler"))
def setInputCols(value: Array[String]): VectorAssembler
def setOutputCol(value: String): VectorAssembler
def setHandleInvalid(value: String): VectorAssembler
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): VectorAssembler
}class VectorSlicer(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("vectorSlicer"))
final val indices: IntArrayParam
final val names: StringArrayParam
def setIndices(value: Array[Int]): VectorSlicer
def setNames(value: Array[String]): VectorSlicer
def setInputCol(value: String): VectorSlicer
def setOutputCol(value: String): VectorSlicer
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): VectorSlicer
}class VectorSizeHint(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("vectorSizeHint"))
final val size: IntParam
final val handleInvalid: Param[String]
def setSize(value: Int): VectorSizeHint
def setHandleInvalid(value: String): VectorSizeHint
def setInputCol(value: String): VectorSizeHint
def setOutputCol(value: String): VectorSizeHint
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): VectorSizeHint
}class StandardScaler(override val uid: String) extends Estimator[StandardScalerModel]
with StandardScalerParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("stdScal"))
def setWithMean(value: Boolean): StandardScaler
def setWithStd(value: Boolean): StandardScaler
def setInputCol(value: String): StandardScaler
def setOutputCol(value: String): StandardScaler
override def fit(dataset: Dataset[_]): StandardScalerModel
override def copy(extra: ParamMap): StandardScaler
}
class StandardScalerModel(override val uid: String, val std: Vector, val mean: Vector)
extends Model[StandardScalerModel] with StandardScalerParams with MLWritable {
def setInputCol(value: String): StandardScalerModel
def setOutputCol(value: String): StandardScalerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): StandardScalerModel
def write: MLWriter
}class MinMaxScaler(override val uid: String) extends Estimator[MinMaxScalerModel]
with MinMaxScalerParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("minMaxScal"))
def setMin(value: Double): MinMaxScaler
def setMax(value: Double): MinMaxScaler
def setInputCol(value: String): MinMaxScaler
def setOutputCol(value: String): MinMaxScaler
override def fit(dataset: Dataset[_]): MinMaxScalerModel
override def copy(extra: ParamMap): MinMaxScaler
}
class MinMaxScalerModel(override val uid: String, val originalMin: Vector, val originalMax: Vector)
extends Model[MinMaxScalerModel] with MinMaxScalerParams with MLWritable {
def setInputCol(value: String): MinMaxScalerModel
def setOutputCol(value: String): MinMaxScalerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): MinMaxScalerModel
def write: MLWriter
}class MaxAbsScaler(override val uid: String) extends Estimator[MaxAbsScalerModel]
with MaxAbsScalerParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("maxAbsScal"))
def setInputCol(value: String): MaxAbsScaler
def setOutputCol(value: String): MaxAbsScaler
override def fit(dataset: Dataset[_]): MaxAbsScalerModel
override def copy(extra: ParamMap): MaxAbsScaler
}
class MaxAbsScalerModel(override val uid: String, val maxAbs: Vector)
extends Model[MaxAbsScalerModel] with MaxAbsScalerParams with MLWritable {
def setInputCol(value: String): MaxAbsScalerModel
def setOutputCol(value: String): MaxAbsScalerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): MaxAbsScalerModel
def write: MLWriter
}class Normalizer(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("normalizer"))
final val p: DoubleParam
def setP(value: Double): Normalizer
def setInputCol(value: String): Normalizer
def setOutputCol(value: String): Normalizer
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Normalizer
}class StringIndexer(override val uid: String) extends Estimator[StringIndexerModel]
with StringIndexerBase with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("strIdx"))
def setHandleInvalid(value: String): StringIndexer
def setStringOrderType(value: String): StringIndexer
def setInputCol(value: String): StringIndexer
def setOutputCol(value: String): StringIndexer
def setInputCols(value: Array[String]): StringIndexer
def setOutputCols(value: Array[String]): StringIndexer
override def fit(dataset: Dataset[_]): StringIndexerModel
override def copy(extra: ParamMap): StringIndexer
}
class StringIndexerModel(override val uid: String, val labelsArray: Array[Array[String]])
extends Model[StringIndexerModel] with StringIndexerBase with MLWritable {
def this(uid: String, labels: Array[String]) = this(uid, Array(labels))
def labels: Array[String] = labelsArray(0)
def setHandleInvalid(value: String): StringIndexerModel
def setInputCol(value: String): StringIndexerModel
def setOutputCol(value: String): StringIndexerModel
def setInputCols(value: Array[String]): StringIndexerModel
def setOutputCols(value: Array[String]): StringIndexerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): StringIndexerModel
def write: MLWriter
}class IndexToString(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("idxToStr"))
final val labels: StringArrayParam
def setLabels(value: Array[String]): IndexToString
def setInputCol(value: String): IndexToString
def setOutputCol(value: String): IndexToString
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): IndexToString
}class OneHotEncoder(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with HasInputCols with HasOutputCols with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("oneHot"))
final val dropLast: BooleanParam
final val handleInvalid: Param[String]
def setDropLast(value: Boolean): OneHotEncoder
def setHandleInvalid(value: String): OneHotEncoder
def setInputCol(value: String): OneHotEncoder
def setOutputCol(value: String): OneHotEncoder
def setInputCols(value: Array[String]): OneHotEncoder
def setOutputCols(value: Array[String]): OneHotEncoder
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): OneHotEncoder
}class VectorIndexer(override val uid: String) extends Estimator[VectorIndexerModel]
with VectorIndexerParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("vecIdx"))
def setMaxCategories(value: Int): VectorIndexer
def setHandleInvalid(value: String): VectorIndexer
def setInputCol(value: String): VectorIndexer
def setOutputCol(value: String): VectorIndexer
override def fit(dataset: Dataset[_]): VectorIndexerModel
override def copy(extra: ParamMap): VectorIndexer
}
class VectorIndexerModel(override val uid: String, val numFeatures: Int, val categoryMaps: Map[Int, Map[Double, Int]])
extends Model[VectorIndexerModel] with VectorIndexerParams with MLWritable {
def setInputCol(value: String): VectorIndexerModel
def setOutputCol(value: String): VectorIndexerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): VectorIndexerModel
def write: MLWriter
}class Bucketizer(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with HasInputCols with HasOutputCols with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("bucketizer"))
final val splits: DoubleArrayParam
final val splitsArray: Param[Array[Array[Double]]]
final val handleInvalid: Param[String]
def setSplits(value: Array[Double]): Bucketizer
def setSplitsArray(value: Array[Array[Double]]): Bucketizer
def setHandleInvalid(value: String): Bucketizer
def setInputCol(value: String): Bucketizer
def setOutputCol(value: String): Bucketizer
def setInputCols(value: Array[String]): Bucketizer
def setOutputCols(value: Array[String]): Bucketizer
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Bucketizer
}class QuantileDiscretizer(override val uid: String) extends Estimator[Bucketizer]
with QuantileDiscretizerBase with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("quantileDiscretizer"))
def setNumBuckets(value: Int): QuantileDiscretizer
def setNumBucketsArray(value: Array[Int]): QuantileDiscretizer
def setRelativeError(value: Double): QuantileDiscretizer
def setHandleInvalid(value: String): QuantileDiscretizer
def setInputCol(value: String): QuantileDiscretizer
def setOutputCol(value: String): QuantileDiscretizer
def setInputCols(value: Array[String]): QuantileDiscretizer
def setOutputCols(value: Array[String]): QuantileDiscretizer
override def fit(dataset: Dataset[_]): Bucketizer
override def copy(extra: ParamMap): QuantileDiscretizer
}class Binarizer(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with HasInputCols with HasOutputCols with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("binarizer"))
final val threshold: DoubleParam
final val thresholds: DoubleArrayParam
def setThreshold(value: Double): Binarizer
def setThresholds(value: Array[Double]): Binarizer
def setInputCol(value: String): Binarizer
def setOutputCol(value: String): Binarizer
def setInputCols(value: Array[String]): Binarizer
def setOutputCols(value: Array[String]): Binarizer
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Binarizer
}class Tokenizer(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("tok"))
def setInputCol(value: String): Tokenizer
def setOutputCol(value: String): Tokenizer
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Tokenizer
}class RegexTokenizer(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("regexTok"))
final val minTokenLength: IntParam
final val gaps: BooleanParam
final val pattern: Param[String]
final val toLowercase: BooleanParam
def setMinTokenLength(value: Int): RegexTokenizer
def setGaps(value: Boolean): RegexTokenizer
def setPattern(value: String): RegexTokenizer
def setToLowercase(value: Boolean): RegexTokenizer
def setInputCol(value: String): RegexTokenizer
def setOutputCol(value: String): RegexTokenizer
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): RegexTokenizer
}class StopWordsRemover(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("stopWords"))
final val stopWords: StringArrayParam
final val caseSensitive: BooleanParam
final val locale: Param[String]
def setStopWords(value: Array[String]): StopWordsRemover
def setCaseSensitive(value: Boolean): StopWordsRemover
def setLocale(value: String): StopWordsRemover
def setInputCol(value: String): StopWordsRemover
def setOutputCol(value: String): StopWordsRemover
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): StopWordsRemover
}
object StopWordsRemover extends DefaultParamsReadable[StopWordsRemover] {
def loadDefaultStopWords(language: String): Array[String]
}class NGram(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("ngram"))
final val n: IntParam
def setN(value: Int): NGram
def setInputCol(value: String): NGram
def setOutputCol(value: String): NGram
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): NGram
}class CountVectorizer(override val uid: String) extends Estimator[CountVectorizerModel]
with CountVectorizerParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("cntVec"))
def setVocabSize(value: Int): CountVectorizer
def setMinDF(value: Double): CountVectorizer
def setMaxDF(value: Double): CountVectorizer
def setMinTF(value: Double): CountVectorizer
def setBinary(value: Boolean): CountVectorizer
def setInputCol(value: String): CountVectorizer
def setOutputCol(value: String): CountVectorizer
override def fit(dataset: Dataset[_]): CountVectorizerModel
override def copy(extra: ParamMap): CountVectorizer
}
class CountVectorizerModel(override val uid: String, val vocabulary: Array[String])
extends Model[CountVectorizerModel] with CountVectorizerParams with MLWritable {
def setInputCol(value: String): CountVectorizerModel
def setOutputCol(value: String): CountVectorizerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): CountVectorizerModel
def write: MLWriter
}class HashingTF(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("hashingTF"))
final val numFeatures: IntParam
final val binary: BooleanParam
def setNumFeatures(value: Int): HashingTF
def setBinary(value: Boolean): HashingTF
def setInputCol(value: String): HashingTF
def setOutputCol(value: String): HashingTF
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): HashingTF
}class IDF(override val uid: String) extends Estimator[IDFModel]
with IDFParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("idf"))
def setMinDocFreq(value: Int): IDF
def setInputCol(value: String): IDF
def setOutputCol(value: String): IDF
override def fit(dataset: Dataset[_]): IDFModel
override def copy(extra: ParamMap): IDF
}
class IDFModel(override val uid: String, val idf: Vector)
extends Model[IDFModel] with IDFParams with MLWritable {
def setInputCol(value: String): IDFModel
def setOutputCol(value: String): IDFModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): IDFModel
def write: MLWriter
}class Word2Vec(override val uid: String) extends Estimator[Word2VecModel]
with Word2VecBase with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("w2v"))
def setVectorSize(value: Int): Word2Vec
def setMinCount(value: Int): Word2Vec
def setNumPartitions(value: Int): Word2Vec
def setStepSize(value: Double): Word2Vec
def setMaxIter(value: Int): Word2Vec
def setSeed(value: Long): Word2Vec
def setInputCol(value: String): Word2Vec
def setOutputCol(value: String): Word2Vec
def setWindowSize(value: Int): Word2Vec
def setMaxSentenceLength(value: Int): Word2Vec
override def fit(dataset: Dataset[_]): Word2VecModel
override def copy(extra: ParamMap): Word2Vec
}
class Word2VecModel(override val uid: String, private val wordVectors: Map[String, Array[Float]])
extends Model[Word2VecModel] with Word2VecBase with MLWritable {
def getVectors: DataFrame
def findSynonyms(word: String, num: Int): DataFrame
def findSynonymsArray(word: String, num: Int): Array[(String, Double)]
def setInputCol(value: String): Word2VecModel
def setOutputCol(value: String): Word2VecModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Word2VecModel
def write: MLWriter
}class PCA(override val uid: String) extends Estimator[PCAModel]
with PCAParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("pca"))
def setK(value: Int): PCA
def setInputCol(value: String): PCA
def setOutputCol(value: String): PCA
override def fit(dataset: Dataset[_]): PCAModel
override def copy(extra: ParamMap): PCA
}
class PCAModel(override val uid: String, val pc: Matrix, val explainedVariance: Vector)
extends Model[PCAModel] with PCAParams with MLWritable {
def setInputCol(value: String): PCAModel
def setOutputCol(value: String): PCAModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): PCAModel
def write: MLWriter
}class ChiSqSelector(override val uid: String) extends Estimator[ChiSqSelectorModel]
with ChiSqSelectorParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("chiSqSelector"))
def setNumTopFeatures(value: Int): ChiSqSelector
def setPercentile(value: Double): ChiSqSelector
def setFpr(value: Double): ChiSqSelector
def setFdr(value: Double): ChiSqSelector
def setFwe(value: Double): ChiSqSelector
def setSelectorType(value: String): ChiSqSelector
def setFeaturesCol(value: String): ChiSqSelector
def setOutputCol(value: String): ChiSqSelector
def setLabelCol(value: String): ChiSqSelector
override def fit(dataset: Dataset[_]): ChiSqSelectorModel
override def copy(extra: ParamMap): ChiSqSelector
}
class ChiSqSelectorModel(override val uid: String, private val selector: org.apache.spark.mllib.feature.ChiSqSelectorModel)
extends Model[ChiSqSelectorModel] with ChiSqSelectorParams with MLWritable {
val selectedFeatures: Array[Int]
def setFeaturesCol(value: String): ChiSqSelectorModel
def setOutputCol(value: String): ChiSqSelectorModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): ChiSqSelectorModel
def write: MLWriter
}class Imputer(override val uid: String) extends Estimator[ImputerModel]
with ImputerParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("imputer"))
def setInputCols(value: Array[String]): Imputer
def setOutputCols(value: Array[String]): Imputer
def setStrategy(value: String): Imputer
def setMissingValue(value: Double): Imputer
def setRelativeError(value: Double): Imputer
override def fit(dataset: Dataset[_]): ImputerModel
override def copy(extra: ParamMap): Imputer
}
class ImputerModel(override val uid: String, val surrogateDF: DataFrame)
extends Model[ImputerModel] with ImputerParams with MLWritable {
def setInputCols(value: Array[String]): ImputerModel
def setOutputCols(value: Array[String]): ImputerModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): ImputerModel
def write: MLWriter
}class PolynomialExpansion(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("polyExpansion"))
final val degree: IntParam
def setDegree(value: Int): PolynomialExpansion
def setInputCol(value: String): PolynomialExpansion
def setOutputCol(value: String): PolynomialExpansion
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): PolynomialExpansion
}class Interaction(override val uid: String) extends Transformer
with HasInputCols with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("interaction"))
def setInputCols(value: Array[String]): Interaction
def setOutputCol(value: String): Interaction
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Interaction
}class ElementwiseProduct(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("elemProd"))
final val scalingVec: Param[Vector]
def setScalingVec(value: Vector): ElementwiseProduct
def setInputCol(value: String): ElementwiseProduct
def setOutputCol(value: String): ElementwiseProduct
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): ElementwiseProduct
}class DCT(override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("dct"))
final val inverse: BooleanParam
def setInverse(value: Boolean): DCT
def setInputCol(value: String): DCT
def setOutputCol(value: String): DCT
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): DCT
}class SQLTransformer(override val uid: String) extends Transformer with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("sql"))
final val statement: Param[String]
def setStatement(value: String): SQLTransformer
def getStatement: String
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): SQLTransformer
}class RFormula(override val uid: String) extends Estimator[RFormulaModel]
with RFormulaParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("rFormula"))
def setFormula(value: String): RFormula
def setFeaturesCol(value: String): RFormula
def setLabelCol(value: String): RFormula
def setHandleInvalid(value: String): RFormula
override def fit(dataset: Dataset[_]): RFormulaModel
override def copy(extra: ParamMap): RFormula
}
class RFormulaModel(override val uid: String, private val resolvedFormula: ResolvedRFormula,
val pipelineModel: PipelineModel)
extends Model[RFormulaModel] with RFormulaParams with MLWritable {
def setFeaturesCol(value: String): RFormulaModel
def setLabelCol(value: String): RFormulaModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): RFormulaModel
def write: MLWriter
}class FeatureHasher(override val uid: String) extends Transformer
with HasInputCols with HasOutputCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("featureHasher"))
final val numFeatures: IntParam
final val categoricalCols: StringArrayParam
def setNumFeatures(value: Int): FeatureHasher
def setCategoricalCols(value: Array[String]): FeatureHasher
def setInputCols(value: Array[String]): FeatureHasher
def setOutputCol(value: String): FeatureHasher
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): FeatureHasher
}import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
// Create sample data
val data = spark.createDataFrame(Seq(
(0, "male", 25, 50000.0, Array("java", "spark")),
(1, "female", 30, 60000.0, Array("python", "ml")),
(2, "male", 35, 70000.0, Array("scala", "spark"))
)).toDF("id", "gender", "age", "salary", "skills")
// String indexing for categorical variables
val genderIndexer = new StringIndexer()
.setInputCol("gender")
.setOutputCol("genderIndex")
.setHandleInvalid("keep")
// One-hot encoding
val genderEncoder = new OneHotEncoder()
.setInputCol("genderIndex")
.setOutputCol("genderVec")
.setDropLast(true)
// Discretize continuous variables
val ageDiscretizer = new QuantileDiscretizer()
.setInputCol("age")
.setOutputCol("ageDiscrete")
.setNumBuckets(3)
// Scale salary
val salaryScaler = new StandardScaler()
.setInputCol("salary")
.setOutputCol("salaryScaled")
.setWithMean(true)
.setWithStd(true)
// Convert salary to vector first
val salaryAssembler = new VectorAssembler()
.setInputCols(Array("salary"))
.setOutputCol("salaryVec")
// Text processing for skills
val skillsTokenizer = new Tokenizer()
.setInputCol("skills")
.setOutputCol("skillsTokens")
val skillsHasher = new HashingTF()
.setInputCol("skillsTokens")
.setOutputCol("skillsFeatures")
.setNumFeatures(1000)
// Assemble all features
val featureAssembler = new VectorAssembler()
.setInputCols(Array("genderVec", "ageDiscrete", "salaryScaled", "skillsFeatures"))
.setOutputCol("features")
// Create pipeline
val pipeline = new Pipeline().setStages(Array(
genderIndexer, genderEncoder, ageDiscretizer,
salaryAssembler, salaryScaler,
skillsHasher, featureAssembler
))
val model = pipeline.fit(data)
val processedData = model.transform(data)
processedData.select("features").show(truncate = false)import org.apache.spark.ml.feature._
// Sample text data
val textData = spark.createDataFrame(Seq(
(0, "Apache Spark is a fast and general engine for large-scale data processing"),
(1, "Spark runs on both standalone clusters and on Hadoop"),
(2, "Machine learning algorithms in MLlib are scalable")
)).toDF("id", "text")
// Tokenization
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val tokenized = tokenizer.transform(textData)
// Alternative: Regex tokenizer with custom pattern
val regexTokenizer = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("words")
.setPattern("\\W+")
.setMinTokenLength(2)
.setToLowercase(true)
// Remove stop words
val stopWordsRemover = new StopWordsRemover()
.setInputCol("words")
.setOutputCol("filteredWords")
.setCaseSensitive(false)
val filtered = stopWordsRemover.transform(tokenized)
// Create n-grams
val ngram = new NGram()
.setN(2)
.setInputCol("filteredWords")
.setOutputCol("ngrams")
val ngrammed = ngram.transform(filtered)
// Count vectorization
val countVectorizer = new CountVectorizer()
.setInputCol("filteredWords")
.setOutputCol("rawFeatures")
.setVocabSize(10000)
.setMinDF(1)
val countVectorizerModel = countVectorizer.fit(filtered)
val featurized = countVectorizerModel.transform(filtered)
// TF-IDF
val idf = new IDF()
.setInputCol("rawFeatures")
.setOutputCol("features")
val idfModel = idf.fit(featurized)
val tfidfFeatures = idfModel.transform(featurized)
tfidfFeatures.select("text", "features").show(truncate = false)import org.apache.spark.ml.feature.Word2Vec
// Prepare sentence data
val sentences = spark.createDataFrame(Seq(
("I love Apache Spark".split(" ").toSeq),
("Spark is great for big data".split(" ").toSeq),
("Machine learning with Spark MLlib".split(" ").toSeq)
)).toDF("text")
// Train Word2Vec model
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("features")
.setVectorSize(100)
.setMinCount(1)
.setMaxIter(10)
.setStepSize(0.025)
.setWindowSize(5)
.setSeed(42)
val word2VecModel = word2Vec.fit(sentences)
// Transform sentences to vectors
val embeddings = word2VecModel.transform(sentences)
embeddings.select("text", "features").show(truncate = false)
// Find synonyms
val synonyms = word2VecModel.findSynonyms("Spark", 3)
synonyms.show()
// Get word vectors
val vectors = word2VecModel.getVectors
vectors.show()import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg.Vectors
// Sample data with features
val data = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
// Standard scaling (z-score normalization)
val standardScaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("standardScaled")
.setWithMean(true)
.setWithStd(true)
val standardScalerModel = standardScaler.fit(data)
val standardScaled = standardScalerModel.transform(data)
println("Standard Scaler - Mean and Std:")
println(s"Mean: ${standardScalerModel.mean}")
println(s"Std: ${standardScalerModel.std}")
// Min-Max scaling
val minMaxScaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("minMaxScaled")
.setMin(0.0)
.setMax(1.0)
val minMaxScalerModel = minMaxScaler.fit(data)
val minMaxScaled = minMaxScalerModel.transform(data)
// Max-Abs scaling
val maxAbsScaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("maxAbsScaled")
val maxAbsScalerModel = maxAbsScaler.fit(data)
val maxAbsScaled = maxAbsScalerModel.transform(data)
// L2 normalization
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normalized")
.setP(2.0)
val normalized = normalizer.transform(data)
// Show all transformations
val allTransformed = standardScaled
.join(minMaxScaled.select("id", "minMaxScaled"), "id")
.join(maxAbsScaled.select("id", "maxAbsScaled"), "id")
.join(normalized.select("id", "normalized"), "id")
allTransformed.show(truncate = false)import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
// High-dimensional data
val data = spark.createDataFrame(Seq(
(Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),),
(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),),
(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),)
)).toDF("features")
// PCA to reduce to 3 dimensions
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
val pcaModel = pca.fit(data)
val pcaData = pcaModel.transform(data)
println("Principal Components:")
pcaModel.pc.toArray.grouped(pcaModel.pc.numRows).foreach { row =>
println(row.mkString(", "))
}
println("Explained Variance:")
println(pcaModel.explainedVariance)
pcaData.select("features", "pcaFeatures").show(truncate = false)import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
// Data with labels and features
val data = spark.createDataFrame(Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)).toDF("id", "features", "clicked")
// Chi-square feature selection
val selector = new ChiSqSelector()
.setNumTopFeatures(2)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val selectorModel = selector.fit(data)
val selectedData = selectorModel.transform(data)
println(s"Selected features: ${selectorModel.selectedFeatures.mkString(", ")}")
selectedData.show(truncate = false)
// Alternative selection methods
val percentileSelector = new ChiSqSelector()
.setSelectorType("percentile")
.setPercentile(0.5)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val fprSelector = new ChiSqSelector()
.setSelectorType("fpr")
.setFpr(0.05)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")import org.apache.spark.ml.feature.Imputer
// Data with missing values (represented as NaN)
val data = spark.createDataFrame(Seq(
(1.0, Double.NaN, 3.0),
(4.0, 5.0, Double.NaN),
(7.0, 8.0, 9.0)
)).toDF("a", "b", "c")
// Impute missing values with mean
val imputer = new Imputer()
.setInputCols(Array("a", "b", "c"))
.setOutputCols(Array("a_imputed", "b_imputed", "c_imputed"))
.setStrategy("mean") // "mean", "median", or "mode"
val imputerModel = imputer.fit(data)
val imputedData = imputerModel.transform(data)
imputedData.show()
// Check surrogate values used for imputation
imputerModel.surrogateDF.show()import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg.Vectors
// Polynomial expansion
val polyData = spark.createDataFrame(Seq(
(Vectors.dense(2.0, 1.0),),
(Vectors.dense(0.0, 0.0),),
(Vectors.dense(3.0, -1.0),)
)).toDF("features")
val polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(2)
val polyExpanded = polyExpansion.transform(polyData)
polyExpanded.show(truncate = false)
// Feature interactions
val interactionData = spark.createDataFrame(Seq(
(1, "A", Vectors.dense(1.0, 2.0)),
(2, "B", Vectors.dense(3.0, 4.0))
)).toDF("id", "category", "features")
val interaction = new Interaction()
.setInputCols(Array("id", "category", "features"))
.setOutputCol("interactionFeatures")
val interacted = interaction.transform(interactionData)
interacted.show(truncate = false)
// Element-wise product
val elementProduct = new ElementwiseProduct()
.setScalingVec(Vectors.dense(0.0, 1.0, 2.0))
.setInputCol("features")
.setOutputCol("scaledFeatures")
val scaled = elementProduct.transform(polyData)
scaled.show(truncate = false)