Apache Spark - Unified analytics engine for large-scale data processing
—
Comprehensive machine learning library with algorithms for classification, regression, clustering, and collaborative filtering. Provides both high-level Pipeline API for building ML workflows and lower-level RDD-based APIs for advanced use cases.
High-level API for building machine learning workflows with transformers and estimators.
/**
* Machine learning pipeline for chaining transformers and estimators
*/
class Pipeline extends Estimator[PipelineModel] {
/** Set pipeline stages */
def setStages(value: Array[PipelineStage]): Pipeline
/** Get pipeline stages */
def getStages: Array[PipelineStage]
/** Fit pipeline to data */
def fit(dataset: Dataset[_]): PipelineModel
}
/**
* Fitted pipeline ready for predictions
*/
class PipelineModel extends Transformer {
/** Transform dataset using fitted pipeline */
def transform(dataset: Dataset[_]): DataFrame
/** Get fitted stages */
def stages: Array[Transformer]
}
/**
* Base class for pipeline components
*/
abstract class PipelineStage {
def uid: String
def copy(extra: ParamMap): PipelineStage
}
/**
* Algorithm that can be fit on a DataFrame
*/
abstract class Estimator[M <: Model[M]] extends PipelineStage {
/** Fit model to dataset */
def fit(dataset: Dataset[_]): M
}
/**
* Algorithm that transforms one DataFrame into another
*/
abstract class Transformer extends PipelineStage {
/** Transform dataset */
def transform(dataset: Dataset[_]): DataFrame
}
/**
* Result of fitting an Estimator
*/
abstract class Model[M <: Model[M]] extends TransformerUsage Examples:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.classification.LogisticRegression
// Create pipeline stages
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val assembler = new VectorAssembler()
.setInputCols(Array("age", "income", "categoryIndex"))
.setOutputCol("features")
val lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("label")
// Create and fit pipeline
val pipeline = new Pipeline()
.setStages(Array(indexer, assembler, lr))
val model = pipeline.fit(trainingData)
val predictions = model.transform(testData)Algorithms for supervised learning with discrete target variables.
/**
* Logistic regression classifier
*/
class LogisticRegression extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {
def setFeaturesCol(value: String): LogisticRegression
def setLabelCol(value: String): LogisticRegression
def setPredictionCol(value: String): LogisticRegression
def setProbabilityCol(value: String): LogisticRegression
def setMaxIter(value: Int): LogisticRegression
def setRegParam(value: Double): LogisticRegression
def setElasticNetParam(value: Double): LogisticRegression
def setFamily(value: String): LogisticRegression
def fit(dataset: Dataset[_]): LogisticRegressionModel
}
/**
* Decision tree classifier
*/
class DecisionTreeClassifier extends Classifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] {
def setMaxDepth(value: Int): DecisionTreeClassifier
def setMinInstancesPerNode(value: Int): DecisionTreeClassifier
def setImpurity(value: String): DecisionTreeClassifier
def setMaxBins(value: Int): DecisionTreeClassifier
def setSeed(value: Long): DecisionTreeClassifier
def fit(dataset: Dataset[_]): DecisionTreeClassificationModel
}
/**
* Random forest classifier
*/
class RandomForestClassifier extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {
def setNumTrees(value: Int): RandomForestClassifier
def setMaxDepth(value: Int): RandomForestClassifier
def setSubsamplingRate(value: Double): RandomForestClassifier
def setFeatureSubsetStrategy(value: String): RandomForestClassifier
def fit(dataset: Dataset[_]): RandomForestClassificationModel
}
/**
* Gradient-boosted tree classifier
*/
class GBTClassifier extends Classifier[Vector, GBTClassifier, GBTClassificationModel] {
def setMaxIter(value: Int): GBTClassifier
def setStepSize(value: Double): GBTClassifier
def setMaxDepth(value: Int): GBTClassifier
def fit(dataset: Dataset[_]): GBTClassificationModel
}
/**
* Naive Bayes classifier
*/
class NaiveBayes extends Classifier[Vector, NaiveBayes, NaiveBayesModel] {
def setModelType(value: String): NaiveBayes
def setSmoothing(value: Double): NaiveBayes
def fit(dataset: Dataset[_]): NaiveBayesModel
}
/**
* Linear Support Vector Machine
*/
class LinearSVC extends Classifier[Vector, LinearSVC, LinearSVCModel] {
def setMaxIter(value: Int): LinearSVC
def setRegParam(value: Double): LinearSVC
def setTol(value: Double): LinearSVC
def fit(dataset: Dataset[_]): LinearSVCModel
}Usage Examples:
import org.apache.spark.ml.classification._
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// Logistic Regression
val lr = new LogisticRegression()
.setMaxIter(20)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val lrModel = lr.fit(trainingData)
val lrPredictions = lrModel.transform(testData)
// Random Forest
val rf = new RandomForestClassifier()
.setNumTrees(100)
.setFeatureSubsetStrategy("auto")
.setImpurity("gini")
.setMaxDepth(4)
.setMaxBins(32)
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)
// Evaluation
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)Algorithms for supervised learning with continuous target variables.
/**
* Linear regression
*/
class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] {
def setFeaturesCol(value: String): LinearRegression
def setLabelCol(value: String): LinearRegression
def setPredictionCol(value: String): LinearRegression
def setMaxIter(value: Int): LinearRegression
def setRegParam(value: Double): LinearRegression
def setElasticNetParam(value: Double): LinearRegression
def setTol(value: Double): LinearRegression
def setFitIntercept(value: Boolean): LinearRegression
def fit(dataset: Dataset[_]): LinearRegressionModel
}
/**
* Decision tree regressor
*/
class DecisionTreeRegressor extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] {
def setMaxDepth(value: Int): DecisionTreeRegressor
def setMinInstancesPerNode(value: Int): DecisionTreeRegressor
def setImpurity(value: String): DecisionTreeRegressor
def fit(dataset: Dataset[_]): DecisionTreeRegressionModel
}
/**
* Random forest regressor
*/
class RandomForestRegressor extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel] {
def setNumTrees(value: Int): RandomForestRegressor
def setMaxDepth(value: Int): RandomForestRegressor
def setSubsamplingRate(value: Double): RandomForestRegressor
def fit(dataset: Dataset[_]): RandomForestRegressionModel
}
/**
* Gradient-boosted tree regressor
*/
class GBTRegressor extends Regressor[Vector, GBTRegressor, GBTRegressionModel] {
def setMaxIter(value: Int): GBTRegressor
def setStepSize(value: Double): GBTRegressor
def setMaxDepth(value: Int): GBTRegressor
def fit(dataset: Dataset[_]): GBTRegressionModel
}
/**
* Generalized linear regression
*/
class GeneralizedLinearRegression extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] {
def setFamily(value: String): GeneralizedLinearRegression
def setLink(value: String): GeneralizedLinearRegression
def setMaxIter(value: Int): GeneralizedLinearRegression
def setRegParam(value: Double): GeneralizedLinearRegression
def fit(dataset: Dataset[_]): GeneralizedLinearRegressionModel
}Unsupervised learning algorithms for grouping data points.
/**
* K-means clustering
*/
class KMeans extends Estimator[KMeansModel] {
def setFeaturesCol(value: String): KMeans
def setPredictionCol(value: String): KMeans
def setK(value: Int): KMeans
def setMaxIter(value: Int): KMeans
def setTol(value: Double): KMeans
def setInitMode(value: String): KMeans
def setInitSteps(value: Int): KMeans
def setSeed(value: Long): KMeans
def fit(dataset: Dataset[_]): KMeansModel
}
/**
* Bisecting K-means clustering
*/
class BisectingKMeans extends Estimator[BisectingKMeansModel] {
def setK(value: Int): BisectingKMeans
def setMaxIter(value: Int): BisectingKMeans
def setSeed(value: Long): BisectingKMeans
def fit(dataset: Dataset[_]): BisectingKMeansModel
}
/**
* Gaussian Mixture Model
*/
class GaussianMixture extends Estimator[GaussianMixtureModel] {
def setK(value: Int): GaussianMixture
def setMaxIter(value: Int): GaussianMixture
def setTol(value: Double): GaussianMixture
def setSeed(value: Long): GaussianMixture
def fit(dataset: Dataset[_]): GaussianMixtureModel
}
/**
* Latent Dirichlet Allocation for topic modeling
*/
class LDA extends Estimator[LDAModel] {
def setK(value: Int): LDA
def setMaxIter(value: Int): LDA
def setTopicConcentration(value: Double): LDA
def setDocConcentration(value: Double): LDA
def setSeed(value: Long): LDA
def fit(dataset: Dataset[_]): LDAModel
}Usage Examples:
import org.apache.spark.ml.clustering._
// K-Means
val kmeans = new KMeans()
.setK(3)
.setSeed(1L)
.setMaxIter(20)
val kmeansModel = kmeans.fit(dataset)
val predictions = kmeansModel.transform(dataset)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val wssse = kmeansModel.computeCost(dataset)
println(s"Within Set Sum of Squared Errors = $wssse")
// Gaussian Mixture Model
val gmm = new GaussianMixture()
.setK(3)
.setSeed(538009335L)
val gmmModel = gmm.fit(dataset)
val gmmPredictions = gmmModel.transform(dataset)Data preprocessing and feature engineering transformers.
/**
* Combine multiple columns into vector column
*/
class VectorAssembler extends Transformer {
def setInputCols(value: Array[String]): VectorAssembler
def setOutputCol(value: String): VectorAssembler
def setHandleInvalid(value: String): VectorAssembler
def transform(dataset: Dataset[_]): DataFrame
}
/**
* Standardize features by removing mean and scaling to unit variance
*/
class StandardScaler extends Estimator[StandardScalerModel] {
def setInputCol(value: String): StandardScaler
def setOutputCol(value: String): StandardScaler
def setWithMean(value: Boolean): StandardScaler
def setWithStd(value: Boolean): StandardScaler
def fit(dataset: Dataset[_]): StandardScalerModel
}
/**
* Scale features to given range
*/
class MinMaxScaler extends Estimator[MinMaxScalerModel] {
def setInputCol(value: String): MinMaxScaler
def setOutputCol(value: String): MinMaxScaler
def setMin(value: Double): MinMaxScaler
def setMax(value: Double): MinMaxScaler
def fit(dataset: Dataset[_]): MinMaxScalerModel
}
/**
* Map strings to indices
*/
class StringIndexer extends Estimator[StringIndexerModel] {
def setInputCol(value: String): StringIndexer
def setOutputCol(value: String): StringIndexer
def setHandleInvalid(value: String): StringIndexer
def setStringOrderType(value: String): StringIndexer
def fit(dataset: Dataset[_]): StringIndexerModel
}
/**
* One-hot encode categorical features
*/
class OneHotEncoder extends Transformer {
def setInputCols(value: Array[String]): OneHotEncoder
def setOutputCols(value: Array[String]): OneHotEncoder
def setDropLast(value: Boolean): OneHotEncoder
def setHandleInvalid(value: String): OneHotEncoder
def transform(dataset: Dataset[_]): DataFrame
}
/**
* Tokenize text into words
*/
class Tokenizer extends Transformer {
def setInputCol(value: String): Tokenizer
def setOutputCol(value: String): Tokenizer
def transform(dataset: Dataset[_]): DataFrame
}
/**
* Regular expression tokenizer
*/
class RegexTokenizer extends Transformer {
def setInputCol(value: String): RegexTokenizer
def setOutputCol(value: String): RegexTokenizer
def setPattern(value: String): RegexTokenizer
def setGaps(value: Boolean): RegexTokenizer
def setMinTokenLength(value: Int): RegexTokenizer
def transform(dataset: Dataset[_]): DataFrame
}
/**
* Hash text features to fixed-length vectors
*/
class HashingTF extends Transformer {
def setInputCol(value: String): HashingTF
def setOutputCol(value: String): HashingTF
def setNumFeatures(value: Int): HashingTF
def setBinary(value: Boolean): HashingTF
def transform(dataset: Dataset[_]): DataFrame
}
/**
* Compute Inverse Document Frequency
*/
class IDF extends Estimator[IDFModel] {
def setInputCol(value: String): IDF
def setOutputCol(value: String): IDF
def setMinDocFreq(value: Int): IDF
def fit(dataset: Dataset[_]): IDFModel
}
/**
* Principal Component Analysis
*/
class PCA extends Estimator[PCAModel] {
def setInputCol(value: String): PCA
def setOutputCol(value: String): PCA
def setK(value: Int): PCA
def fit(dataset: Dataset[_]): PCAModel
}
/**
* Chi-squared feature selection
*/
class ChiSqSelector extends Estimator[ChiSqSelectorModel] {
def setFeaturesCol(value: String): ChiSqSelector
def setOutputCol(value: String): ChiSqSelector
def setLabelCol(value: String): ChiSqSelector
def setNumTopFeatures(value: Int): ChiSqSelector
def setSelectorType(value: String): ChiSqSelector
def fit(dataset: Dataset[_]): ChiSqSelectorModel
}Usage Examples:
import org.apache.spark.ml.feature._
// Feature assembly
val assembler = new VectorAssembler()
.setInputCols(Array("age", "income", "education"))
.setOutputCol("features")
val assembled = assembler.transform(df)
// String indexing and one-hot encoding
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex"))
.setOutputCols(Array("categoryVec"))
val encoded = encoder.transform(indexed)
// Text processing pipeline
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(10000)
val idf = new IDF()
.setInputCol("rawFeatures")
.setOutputCol("features")
val idfModel = idf.fit(hashingTF.transform(tokenizer.transform(textDF)))Metrics and evaluators for assessing model performance.
/**
* Evaluator for multiclass classification
*/
class MulticlassClassificationEvaluator extends Evaluator {
def setLabelCol(value: String): MulticlassClassificationEvaluator
def setPredictionCol(value: String): MulticlassClassificationEvaluator
def setMetricName(value: String): MulticlassClassificationEvaluator
def evaluate(dataset: Dataset[_]): Double
}
/**
* Evaluator for binary classification
*/
class BinaryClassificationEvaluator extends Evaluator {
def setLabelCol(value: String): BinaryClassificationEvaluator
def setRawPredictionCol(value: String): BinaryClassificationEvaluator
def setMetricName(value: String): BinaryClassificationEvaluator
def evaluate(dataset: Dataset[_]): Double
}
/**
* Evaluator for regression
*/
class RegressionEvaluator extends Evaluator {
def setLabelCol(value: String): RegressionEvaluator
def setPredictionCol(value: String): RegressionEvaluator
def setMetricName(value: String): RegressionEvaluator
def evaluate(dataset: Dataset[_]): Double
}
/**
* Evaluator for clustering
*/
class ClusteringEvaluator extends Evaluator {
def setFeaturesCol(value: String): ClusteringEvaluator
def setPredictionCol(value: String): ClusteringEvaluator
def setMetricName(value: String): ClusteringEvaluator
def evaluate(dataset: Dataset[_]): Double
}Cross-validation and hyperparameter tuning utilities.
/**
* Cross-validator for model selection
*/
class CrossValidator extends Estimator[CrossValidatorModel] {
def setEstimator(value: Estimator[_]): CrossValidator
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
def setEvaluator(value: Evaluator): CrossValidator
def setNumFolds(value: Int): CrossValidator
def setSeed(value: Long): CrossValidator
def fit(dataset: Dataset[_]): CrossValidatorModel
}
/**
* Train-validation split for model selection
*/
class TrainValidationSplit extends Estimator[TrainValidationSplitModel] {
def setEstimator(value: Estimator[_]): TrainValidationSplit
def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
def setEvaluator(value: Evaluator): TrainValidationSplit
def setTrainRatio(value: Double): TrainValidationSplit
def setSeed(value: Long): TrainValidationSplit
def fit(dataset: Dataset[_]): TrainValidationSplitModel
}
/**
* Parameter grid builder
*/
class ParamGridBuilder {
def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
def build(): Array[ParamMap]
}Usage Examples:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// Parameter grid
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.maxIter, Array(10, 100))
.build()
// Cross validation
val cv = new CrossValidator()
.setEstimator(lr)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
.setSeed(1234L)
val cvModel = cv.fit(trainingData)
val bestModel = cvModel.bestModelCommon ML exceptions:
IllegalArgumentException - Invalid parameters or configurationsSparkException - General Spark execution errors during ML operationsUnsupportedOperationException - Unsupported operations on specific model typesInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12