Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.
—
Apache Spark provides scalable machine learning capabilities through two APIs: the RDD-based MLlib API (in maintenance mode) and the DataFrame-based ML API (primary API). The ML API provides high-level abstractions for building machine learning pipelines.
Machine learning functionality is available through:
// ML API (DataFrame-based, primary API)
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.recommendation._
import org.apache.spark.ml.evaluation._
// MLlib API (RDD-based, maintenance mode)
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg._import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val spark = SparkSession.builder().appName("ML Example").getOrCreate()
// Load data
val data = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/data.csv")
// Feature engineering
val stringIndexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "categoryIndex"))
.setOutputCol("features")
// Model
val lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("label")
// Pipeline
val pipeline = new Pipeline()
.setStages(Array(stringIndexer, assembler, lr))
// Train
val Array(trainData, testData) = data.randomSplit(Array(0.8, 0.2), seed = 1234)
val model = pipeline.fit(trainData)
// Evaluate
val predictions = model.transform(testData)
val evaluator = new BinaryClassificationEvaluator()
val auc = evaluator.evaluate(predictions)
println(s"AUC: $auc")The primary machine learning API built on DataFrames, providing high-level abstractions for creating ML workflows.
abstract class PipelineStage extends Params {
def copy(extra: ParamMap): PipelineStage
def transformSchema(schema: StructType): StructType
def params: Array[Param[_]]
}
abstract class Estimator[M <: Model[M]] extends PipelineStage {
def fit(dataset: Dataset[_]): M
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]
def fit(dataset: Dataset[_], paramMap: ParamMap): M
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M
}
abstract class Transformer extends PipelineStage {
def transform(dataset: Dataset[_]): DataFrame
def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame
def transform(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame
}
abstract class Model[M <: Model[M]] extends Transformer
class Pipeline(val uid: String) extends Estimator[PipelineModel] {
def this() = this(Identifiable.randomUID("pipeline"))
def setStages(value: Array[PipelineStage]): Pipeline
def getStages: Array[PipelineStage]
def fit(dataset: Dataset[_]): PipelineModel
}
class PipelineModel(override val uid: String, val stages: Array[Transformer]) extends Model[PipelineModel] {
def transform(dataset: Dataset[_]): DataFrame
}// Vector operations
class VectorAssembler(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID("vecAssembler"))
def setInputCols(value: Array[String]): VectorAssembler
def setOutputCol(value: String): VectorAssembler
def getInputCols: Array[String]
def getOutputCol: String
def transform(dataset: Dataset[_]): DataFrame
}
// String indexing
class StringIndexer(override val uid: String) extends Estimator[StringIndexerModel] {
def this() = this(Identifiable.randomUID("strIdx"))
def setInputCol(value: String): StringIndexer
def setOutputCol(value: String): StringIndexer
def setHandleInvalid(value: String): StringIndexer
def setStringOrderType(value: String): StringIndexer
def fit(dataset: Dataset[_]): StringIndexerModel
}
class StringIndexerModel(override val uid: String, val labels: Array[String]) extends Model[StringIndexerModel] {
def setInputCol(value: String): StringIndexerModel
def setOutputCol(value: String): StringIndexerModel
def setHandleInvalid(value: String): StringIndexerModel
def transform(dataset: Dataset[_]): DataFrame
}
// One-hot encoding
class OneHotEncoder(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID("oneHot"))
def setInputCols(value: Array[String]): OneHotEncoder
def setOutputCols(value: Array[String]): OneHotEncoder
def setHandleInvalid(value: String): OneHotEncoder
def setDropLast(value: Boolean): OneHotEncoder
def transform(dataset: Dataset[_]): DataFrame
}
// Scaling
class StandardScaler(override val uid: String) extends Estimator[StandardScalerModel] {
def this() = this(Identifiable.randomUID("stdScal"))
def setInputCol(value: String): StandardScaler
def setOutputCol(value: String): StandardScaler
def setWithMean(value: Boolean): StandardScaler
def setWithStd(value: Boolean): StandardScaler
def fit(dataset: Dataset[_]): StandardScalerModel
}
class StandardScalerModel(override val uid: String, val std: Vector, val mean: Vector) extends Model[StandardScalerModel] {
def setInputCol(value: String): StandardScalerModel
def setOutputCol(value: String): StandardScalerModel
def transform(dataset: Dataset[_]): DataFrame
}
// Text processing
class Tokenizer(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID("tok"))
def setInputCol(value: String): Tokenizer
def setOutputCol(value: String): Tokenizer
def transform(dataset: Dataset[_]): DataFrame
}
class HashingTF(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID("hashingTF"))
def setInputCol(value: String): HashingTF
def setOutputCol(value: String): HashingTF
def setNumFeatures(value: Int): HashingTF
def setBinary(value: Boolean): HashingTF
def transform(dataset: Dataset[_]): DataFrame
}
class IDF(override val uid: String) extends Estimator[IDFModel] {
def this() = this(Identifiable.randomUID("idf"))
def setInputCol(value: String): IDF
def setOutputCol(value: String): IDF
def setMinDocFreq(value: Int): IDF
def fit(dataset: Dataset[_]): IDFModel
}// Logistic Regression
class LogisticRegression(override val uid: String) extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {
def this() = this(Identifiable.randomUID("logreg"))
def setRegParam(value: Double): LogisticRegression
def setElasticNetParam(value: Double): LogisticRegression
def setMaxIter(value: Int): LogisticRegression
def setTol(value: Double): LogisticRegression
def setFitIntercept(value: Boolean): LogisticRegression
def setStandardization(value: Boolean): LogisticRegression
def setThreshold(value: Double): LogisticRegression
def setThresholds(value: Array[Double]): LogisticRegression
def setWeightCol(value: String): LogisticRegression
def fit(dataset: Dataset[_]): LogisticRegressionModel
}
class LogisticRegressionModel(override val uid: String, val coefficients: Vector, val intercept: Double)
extends ClassificationModel[Vector, LogisticRegressionModel] {
def setThreshold(value: Double): LogisticRegressionModel
def setThresholds(value: Array[Double]): LogisticRegressionModel
def summary: LogisticRegressionTrainingSummary
def evaluate(dataset: Dataset[_]): LogisticRegressionSummary
}
// Random Forest
class RandomForestClassifier(override val uid: String) extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {
def this() = this(Identifiable.randomUID("rfc"))
def setNumTrees(value: Int): RandomForestClassifier
def setMaxDepth(value: Int): RandomForestClassifier
def setMaxBins(value: Int): RandomForestClassifier
def setMinInstancesPerNode(value: Int): RandomForestClassifier
def setMinInfoGain(value: Double): RandomForestClassifier
def setSubsamplingRate(value: Double): RandomForestClassifier
def setFeatureSubsetStrategy(value: String): RandomForestClassifier
def setSeed(value: Long): RandomForestClassifier
def fit(dataset: Dataset[_]): RandomForestClassificationModel
}
class RandomForestClassificationModel(override val uid: String, val trees: Array[DecisionTreeClassificationModel], val numFeatures: Int)
extends ClassificationModel[Vector, RandomForestClassificationModel] {
def featureImportances: Vector
def totalNumNodes: Int
def toDebugString: String
}
// Gradient Boosted Trees
class GBTClassifier(override val uid: String) extends Classifier[Vector, GBTClassifier, GBTClassificationModel] {
def this() = this(Identifiable.randomUID("gbtc"))
def setMaxIter(value: Int): GBTClassifier
def setMaxDepth(value: Int): GBTClassifier
def setMaxBins(value: Int): GBTClassifier
def setMinInstancesPerNode(value: Int): GBTClassifier
def setMinInfoGain(value: Double): GBTClassifier
def setSubsamplingRate(value: Double): GBTClassifier
def setStepSize(value: Double): GBTClassifier
def setFeatureSubsetStrategy(value: String): GBTClassifier
def setSeed(value: Long): GBTClassifier
def fit(dataset: Dataset[_]): GBTClassificationModel
}
// Naive Bayes
class NaiveBayes(override val uid: String) extends Classifier[Vector, NaiveBayes, NaiveBayesModel] {
def this() = this(Identifiable.randomUID("nb"))
def setSmoothing(value: Double): NaiveBayes
def setModelType(value: String): NaiveBayes
def setWeightCol(value: String): NaiveBayes
def fit(dataset: Dataset[_]): NaiveBayesModel
}// Linear Regression
class LinearRegression(override val uid: String) extends Regressor[Vector, LinearRegression, LinearRegressionModel] {
def this() = this(Identifiable.randomUID("linReg"))
def setRegParam(value: Double): LinearRegression
def setElasticNetParam(value: Double): LinearRegression
def setMaxIter(value: Int): LinearRegression
def setTol(value: Double): LinearRegression
def setFitIntercept(value: Boolean): LinearRegression
def setStandardization(value: Boolean): LinearRegression
def setSolver(value: String): LinearRegression
def setWeightCol(value: String): LinearRegression
def fit(dataset: Dataset[_]): LinearRegressionModel
}
class LinearRegressionModel(override val uid: String, val coefficients: Vector, val intercept: Double)
extends RegressionModel[Vector, LinearRegressionModel] {
def summary: LinearRegressionTrainingSummary
def evaluate(dataset: Dataset[_]): LinearRegressionSummary
}
// Random Forest Regression
class RandomForestRegressor(override val uid: String) extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel] {
def this() = this(Identifiable.randomUID("rfr"))
def setNumTrees(value: Int): RandomForestRegressor
def setMaxDepth(value: Int): RandomForestRegressor
def setMaxBins(value: Int): RandomForestRegressor
def setMinInstancesPerNode(value: Int): RandomForestRegressor
def setMinInfoGain(value: Double): RandomForestRegressor
def setSubsamplingRate(value: Double): RandomForestRegressor
def setFeatureSubsetStrategy(value: String): RandomForestRegressor
def setSeed(value: Long): RandomForestRegressor
def fit(dataset: Dataset[_]): RandomForestRegressionModel
}
// Gradient Boosted Trees Regression
class GBTRegressor(override val uid: String) extends Regressor[Vector, GBTRegressor, GBTRegressionModel] {
def this() = this(Identifiable.randomUID("gbtr"))
def setMaxIter(value: Int): GBTRegressor
def setMaxDepth(value: Int): GBTRegressor
def setMaxBins(value: Int): GBTRegressor
def setMinInstancesPerNode(value: Int): GBTRegressor
def setMinInfoGain(value: Double): GBTRegressor
def setSubsamplingRate(value: Double): GBTRegressor
def setStepSize(value: Double): GBTRegressor
def setFeatureSubsetStrategy(value: String): GBTRegressor
def setSeed(value: Long): GBTRegressor
def fit(dataset: Dataset[_]): GBTRegressionModel
}// K-Means
class KMeans(override val uid: String) extends Estimator[KMeansModel] {
def this() = this(Identifiable.randomUID("kmeans"))
def setK(value: Int): KMeans
def setMaxIter(value: Int): KMeans
def setTol(value: Double): KMeans
def setInitMode(value: String): KMeans
def setInitSteps(value: Int): KMeans
def setSeed(value: Long): KMeans
def setFeaturesCol(value: String): KMeans
def setPredictionCol(value: String): KMeans
def fit(dataset: Dataset[_]): KMeansModel
}
class KMeansModel(override val uid: String, val clusterCenters: Array[Vector]) extends Model[KMeansModel] {
def setPredictionCol(value: String): KMeansModel
def setFeaturesCol(value: String): KMeansModel
def transform(dataset: Dataset[_]): DataFrame
def computeCost(dataset: Dataset[_]): Double
def summary: KMeansSummary
}
// Gaussian Mixture Model
class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel] {
def this() = this(Identifiable.randomUID("GaussianMixture"))
def setK(value: Int): GaussianMixture
def setMaxIter(value: Int): GaussianMixture
def setTol(value: Double): GaussianMixture
def setSeed(value: Long): GaussianMixture
def setFeaturesCol(value: String): GaussianMixture
def setPredictionCol(value: String): GaussianMixture
def setProbabilityCol(value: String): GaussianMixture
def fit(dataset: Dataset[_]): GaussianMixtureModel
}
class GaussianMixtureModel(override val uid: String, val weights: Array[Double], val gaussians: Array[MultivariateGaussian])
extends Model[GaussianMixtureModel] {
def setFeaturesCol(value: String): GaussianMixtureModel
def setPredictionCol(value: String): GaussianMixtureModel
def setProbabilityCol(value: String): GaussianMixtureModel
def transform(dataset: Dataset[_]): DataFrame
def summary: GaussianMixtureSummary
}// Alternating Least Squares (ALS)
class ALS(override val uid: String) extends Estimator[ALSModel] {
def this() = this(Identifiable.randomUID("als"))
def setRank(value: Int): ALS
def setNumUserBlocks(value: Int): ALS
def setNumItemBlocks(value: Int): ALS
def setMaxIter(value: Int): ALS
def setRegParam(value: Double): ALS
def setAlpha(value: Double): ALS
def setColdStartStrategy(value: String): ALS
def setUserCol(value: String): ALS
def setItemCol(value: String): ALS
def setRatingCol(value: String): ALS
def setPredictionCol(value: String): ALS
def setImplicitPrefs(value: Boolean): ALS
def setNonnegative(value: Boolean): ALS
def setSeed(value: Long): ALS
def fit(dataset: Dataset[_]): ALSModel
}
class ALSModel(override val uid: String, val rank: Int, val userFactors: DataFrame, val itemFactors: DataFrame)
extends Model[ALSModel] {
def setColdStartStrategy(value: String): ALSModel
def setUserCol(value: String): ALSModel
def setItemCol(value: String): ALSModel
def setPredictionCol(value: String): ALSModel
def transform(dataset: Dataset[_]): DataFrame
def recommendForAllUsers(numItems: Int): DataFrame
def recommendForAllItems(numUsers: Int): DataFrame
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
}// Binary Classification Evaluator
class BinaryClassificationEvaluator(override val uid: String) extends Evaluator {
def this() = this(Identifiable.randomUID("binEval"))
def setRawPredictionCol(value: String): BinaryClassificationEvaluator
def setLabelCol(value: String): BinaryClassificationEvaluator
def setMetricName(value: String): BinaryClassificationEvaluator
def setWeightCol(value: String): BinaryClassificationEvaluator
def evaluate(dataset: Dataset[_]): Double
}
// Multiclass Classification Evaluator
class MulticlassClassificationEvaluator(override val uid: String) extends Evaluator {
def this() = this(Identifiable.randomUID("mcEval"))
def setPredictionCol(value: String): MulticlassClassificationEvaluator
def setLabelCol(value: String): MulticlassClassificationEvaluator
def setMetricName(value: String): MulticlassClassificationEvaluator
def setWeightCol(value: String): MulticlassClassificationEvaluator
def evaluate(dataset: Dataset[_]): Double
}
// Regression Evaluator
class RegressionEvaluator(override val uid: String) extends Evaluator {
def this() = this(Identifiable.randomUID("regEval"))
def setPredictionCol(value: String): RegressionEvaluator
def setLabelCol(value: String): RegressionEvaluator
def setMetricName(value: String): RegressionEvaluator
def setWeightCol(value: String): RegressionEvaluator
def evaluate(dataset: Dataset[_]): Double
}
// Clustering Evaluator
class ClusteringEvaluator(override val uid: String) extends Evaluator {
def this() = this(Identifiable.randomUID("cluEval"))
def setPredictionCol(value: String): ClusteringEvaluator
def setFeaturesCol(value: String): ClusteringEvaluator
def setMetricName(value: String): ClusteringEvaluator
def setWeightCol(value: String): ClusteringEvaluator
def evaluate(dataset: Dataset[_]): Double
}// Cross Validation
class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel] {
def this() = this(Identifiable.randomUID("cv"))
def setEstimator(value: Estimator[_]): CrossValidator
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
def setEvaluator(value: Evaluator): CrossValidator
def setNumFolds(value: Int): CrossValidator
def setSeed(value: Long): CrossValidator
def setParallelism(value: Int): CrossValidator
def setCollectSubModels(value: Boolean): CrossValidator
def fit(dataset: Dataset[_]): CrossValidatorModel
}
class CrossValidatorModel(override val uid: String, val bestModel: Model[_], val avgMetrics: Array[Double])
extends Model[CrossValidatorModel] {
def setNumFolds(value: Int): CrossValidatorModel
def transform(dataset: Dataset[_]): DataFrame
}
// Train Validation Split
class TrainValidationSplit(override val uid: String) extends Estimator[TrainValidationSplitModel] {
def this() = this(Identifiable.randomUID("tvs"))
def setEstimator(value: Estimator[_]): TrainValidationSplit
def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
def setEvaluator(value: Evaluator): TrainValidationSplit
def setTrainRatio(value: Double): TrainValidationSplit
def setSeed(value: Long): TrainValidationSplit
def setParallelism(value: Int): TrainValidationSplit
def setCollectSubModels(value: Boolean): TrainValidationSplit
def fit(dataset: Dataset[_]): TrainValidationSplitModel
}
// Parameter Grid Builder
class ParamGridBuilder {
def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
def baseOn(paramMap: ParamMap): ParamGridBuilder
def baseOn(paramPair: ParamPair[_], paramPairs: ParamPair[_]*): ParamGridBuilder
def build(): Array[ParamMap]
}The original RDD-based machine learning library, now in maintenance mode.
// Vectors
trait Vector extends Serializable {
def size: Int
def toArray: Array[Double]
def apply(i: Int): Double
def copy: Vector
def foreachActive(f: (Int, Double) => Unit): Unit
def numActives: Int
def numNonzeros: Int
}
object Vectors {
def dense(values: Array[Double]): Vector
def dense(firstValue: Double, otherValues: Double*): Vector
def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
def sparse(size: Int, elements: Seq[(Int, Double)]): Vector
def zeros(size: Int): Vector
def norm(vector: Vector, p: Double): Double
def sqdist(v1: Vector, v2: Vector): Double
}
class DenseVector(val values: Array[Double]) extends Vector
class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector
// Matrices
trait Matrix extends Serializable {
def numRows: Int
def numCols: Int
def toArray: Array[Double]
def apply(i: Int, j: Int): Double
def copy: Matrix
def transpose: Matrix
def foreachActive(f: (Int, Int, Double) => Unit): Unit
}
object Matrices {
def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values: Array[Double]): Matrix
def zeros(numRows: Int, numCols: Int): Matrix
def eye(n: Int): Matrix
}Usage example for MLlib:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}
import org.apache.spark.mllib.evaluation.RegressionMetrics
// Create labeled points
val data = sc.textFile("data.txt").map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}
// Split data
val splits = data.randomSplit(Array(0.8, 0.2), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// Train model
val numIterations = 100
val stepSize = 0.00000001
val model = LinearRegressionWithSGD.train(training, numIterations, stepSize)
// Evaluate
val valuesAndPreds = test.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val metrics = new RegressionMetrics(valuesAndPreds)
println(s"RMSE = ${metrics.rootMeanSquaredError}")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13