CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-13

Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.

Pending
Overview
Eval results
Files

machine-learning.mddocs/

Machine Learning

Apache Spark provides scalable machine learning capabilities through two APIs: the RDD-based MLlib API (in maintenance mode) and the DataFrame-based ML API (primary API). The ML API provides high-level abstractions for building machine learning pipelines.

Package Information

Machine learning functionality is available through:

// ML API (DataFrame-based, primary API)
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.recommendation._
import org.apache.spark.ml.evaluation._

// MLlib API (RDD-based, maintenance mode)
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg._

Basic Usage

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val spark = SparkSession.builder().appName("ML Example").getOrCreate()

// Load data
val data = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("path/to/data.csv")

// Feature engineering
val stringIndexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "categoryIndex"))
  .setOutputCol("features")

// Model
val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")

// Pipeline
val pipeline = new Pipeline()
  .setStages(Array(stringIndexer, assembler, lr))

// Train
val Array(trainData, testData) = data.randomSplit(Array(0.8, 0.2), seed = 1234)
val model = pipeline.fit(trainData)

// Evaluate
val predictions = model.transform(testData)
val evaluator = new BinaryClassificationEvaluator()
val auc = evaluator.evaluate(predictions)

println(s"AUC: $auc")

Capabilities

ML Pipeline API

The primary machine learning API built on DataFrames, providing high-level abstractions for creating ML workflows.

Pipeline Components

abstract class PipelineStage extends Params {
  def copy(extra: ParamMap): PipelineStage
  def transformSchema(schema: StructType): StructType
  def params: Array[Param[_]]
}

abstract class Estimator[M <: Model[M]] extends PipelineStage {
  def fit(dataset: Dataset[_]): M
  def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]
  def fit(dataset: Dataset[_], paramMap: ParamMap): M
  def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M
}

abstract class Transformer extends PipelineStage {
  def transform(dataset: Dataset[_]): DataFrame
  def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame
  def transform(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame
}

abstract class Model[M <: Model[M]] extends Transformer

class Pipeline(val uid: String) extends Estimator[PipelineModel] {
  def this() = this(Identifiable.randomUID("pipeline"))
  
  def setStages(value: Array[PipelineStage]): Pipeline
  def getStages: Array[PipelineStage]
  def fit(dataset: Dataset[_]): PipelineModel
}

class PipelineModel(override val uid: String, val stages: Array[Transformer]) extends Model[PipelineModel] {
  def transform(dataset: Dataset[_]): DataFrame
}

Feature Engineering

// Vector operations
class VectorAssembler(override val uid: String) extends Transformer {
  def this() = this(Identifiable.randomUID("vecAssembler"))
  
  def setInputCols(value: Array[String]): VectorAssembler
  def setOutputCol(value: String): VectorAssembler
  def getInputCols: Array[String]
  def getOutputCol: String
  def transform(dataset: Dataset[_]): DataFrame
}

// String indexing
class StringIndexer(override val uid: String) extends Estimator[StringIndexerModel] {
  def this() = this(Identifiable.randomUID("strIdx"))
  
  def setInputCol(value: String): StringIndexer
  def setOutputCol(value: String): StringIndexer
  def setHandleInvalid(value: String): StringIndexer
  def setStringOrderType(value: String): StringIndexer
  def fit(dataset: Dataset[_]): StringIndexerModel
}

class StringIndexerModel(override val uid: String, val labels: Array[String]) extends Model[StringIndexerModel] {
  def setInputCol(value: String): StringIndexerModel
  def setOutputCol(value: String): StringIndexerModel
  def setHandleInvalid(value: String): StringIndexerModel
  def transform(dataset: Dataset[_]): DataFrame
}

// One-hot encoding
class OneHotEncoder(override val uid: String) extends Transformer {
  def this() = this(Identifiable.randomUID("oneHot"))
  
  def setInputCols(value: Array[String]): OneHotEncoder
  def setOutputCols(value: Array[String]): OneHotEncoder
  def setHandleInvalid(value: String): OneHotEncoder
  def setDropLast(value: Boolean): OneHotEncoder
  def transform(dataset: Dataset[_]): DataFrame
}

// Scaling
class StandardScaler(override val uid: String) extends Estimator[StandardScalerModel] {
  def this() = this(Identifiable.randomUID("stdScal"))
  
  def setInputCol(value: String): StandardScaler
  def setOutputCol(value: String): StandardScaler
  def setWithMean(value: Boolean): StandardScaler
  def setWithStd(value: Boolean): StandardScaler
  def fit(dataset: Dataset[_]): StandardScalerModel
}

class StandardScalerModel(override val uid: String, val std: Vector, val mean: Vector) extends Model[StandardScalerModel] {
  def setInputCol(value: String): StandardScalerModel
  def setOutputCol(value: String): StandardScalerModel
  def transform(dataset: Dataset[_]): DataFrame
}

// Text processing
class Tokenizer(override val uid: String) extends Transformer {
  def this() = this(Identifiable.randomUID("tok"))
  
  def setInputCol(value: String): Tokenizer
  def setOutputCol(value: String): Tokenizer
  def transform(dataset: Dataset[_]): DataFrame
}

class HashingTF(override val uid: String) extends Transformer {
  def this() = this(Identifiable.randomUID("hashingTF"))
  
  def setInputCol(value: String): HashingTF
  def setOutputCol(value: String): HashingTF
  def setNumFeatures(value: Int): HashingTF
  def setBinary(value: Boolean): HashingTF
  def transform(dataset: Dataset[_]): DataFrame
}

class IDF(override val uid: String) extends Estimator[IDFModel] {
  def this() = this(Identifiable.randomUID("idf"))
  
  def setInputCol(value: String): IDF
  def setOutputCol(value: String): IDF
  def setMinDocFreq(value: Int): IDF
  def fit(dataset: Dataset[_]): IDFModel
}

Classification

// Logistic Regression
class LogisticRegression(override val uid: String) extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {
  def this() = this(Identifiable.randomUID("logreg"))
  
  def setRegParam(value: Double): LogisticRegression
  def setElasticNetParam(value: Double): LogisticRegression
  def setMaxIter(value: Int): LogisticRegression
  def setTol(value: Double): LogisticRegression
  def setFitIntercept(value: Boolean): LogisticRegression
  def setStandardization(value: Boolean): LogisticRegression
  def setThreshold(value: Double): LogisticRegression
  def setThresholds(value: Array[Double]): LogisticRegression
  def setWeightCol(value: String): LogisticRegression
  def fit(dataset: Dataset[_]): LogisticRegressionModel
}

class LogisticRegressionModel(override val uid: String, val coefficients: Vector, val intercept: Double) 
  extends ClassificationModel[Vector, LogisticRegressionModel] {
  def setThreshold(value: Double): LogisticRegressionModel
  def setThresholds(value: Array[Double]): LogisticRegressionModel
  def summary: LogisticRegressionTrainingSummary
  def evaluate(dataset: Dataset[_]): LogisticRegressionSummary
}

// Random Forest
class RandomForestClassifier(override val uid: String) extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {
  def this() = this(Identifiable.randomUID("rfc"))
  
  def setNumTrees(value: Int): RandomForestClassifier
  def setMaxDepth(value: Int): RandomForestClassifier
  def setMaxBins(value: Int): RandomForestClassifier
  def setMinInstancesPerNode(value: Int): RandomForestClassifier
  def setMinInfoGain(value: Double): RandomForestClassifier
  def setSubsamplingRate(value: Double): RandomForestClassifier
  def setFeatureSubsetStrategy(value: String): RandomForestClassifier
  def setSeed(value: Long): RandomForestClassifier
  def fit(dataset: Dataset[_]): RandomForestClassificationModel
}

class RandomForestClassificationModel(override val uid: String, val trees: Array[DecisionTreeClassificationModel], val numFeatures: Int)
  extends ClassificationModel[Vector, RandomForestClassificationModel] {
  def featureImportances: Vector
  def totalNumNodes: Int
  def toDebugString: String
}

// Gradient Boosted Trees
class GBTClassifier(override val uid: String) extends Classifier[Vector, GBTClassifier, GBTClassificationModel] {
  def this() = this(Identifiable.randomUID("gbtc"))
  
  def setMaxIter(value: Int): GBTClassifier
  def setMaxDepth(value: Int): GBTClassifier
  def setMaxBins(value: Int): GBTClassifier
  def setMinInstancesPerNode(value: Int): GBTClassifier
  def setMinInfoGain(value: Double): GBTClassifier
  def setSubsamplingRate(value: Double): GBTClassifier
  def setStepSize(value: Double): GBTClassifier
  def setFeatureSubsetStrategy(value: String): GBTClassifier
  def setSeed(value: Long): GBTClassifier
  def fit(dataset: Dataset[_]): GBTClassificationModel
}

// Naive Bayes
class NaiveBayes(override val uid: String) extends Classifier[Vector, NaiveBayes, NaiveBayesModel] {
  def this() = this(Identifiable.randomUID("nb"))
  
  def setSmoothing(value: Double): NaiveBayes
  def setModelType(value: String): NaiveBayes
  def setWeightCol(value: String): NaiveBayes
  def fit(dataset: Dataset[_]): NaiveBayesModel
}

Regression

// Linear Regression
class LinearRegression(override val uid: String) extends Regressor[Vector, LinearRegression, LinearRegressionModel] {
  def this() = this(Identifiable.randomUID("linReg"))
  
  def setRegParam(value: Double): LinearRegression
  def setElasticNetParam(value: Double): LinearRegression
  def setMaxIter(value: Int): LinearRegression
  def setTol(value: Double): LinearRegression
  def setFitIntercept(value: Boolean): LinearRegression
  def setStandardization(value: Boolean): LinearRegression
  def setSolver(value: String): LinearRegression
  def setWeightCol(value: String): LinearRegression
  def fit(dataset: Dataset[_]): LinearRegressionModel
}

class LinearRegressionModel(override val uid: String, val coefficients: Vector, val intercept: Double)
  extends RegressionModel[Vector, LinearRegressionModel] {
  def summary: LinearRegressionTrainingSummary
  def evaluate(dataset: Dataset[_]): LinearRegressionSummary
}

// Random Forest Regression
class RandomForestRegressor(override val uid: String) extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel] {
  def this() = this(Identifiable.randomUID("rfr"))
  
  def setNumTrees(value: Int): RandomForestRegressor
  def setMaxDepth(value: Int): RandomForestRegressor
  def setMaxBins(value: Int): RandomForestRegressor
  def setMinInstancesPerNode(value: Int): RandomForestRegressor
  def setMinInfoGain(value: Double): RandomForestRegressor
  def setSubsamplingRate(value: Double): RandomForestRegressor
  def setFeatureSubsetStrategy(value: String): RandomForestRegressor
  def setSeed(value: Long): RandomForestRegressor
  def fit(dataset: Dataset[_]): RandomForestRegressionModel
}

// Gradient Boosted Trees Regression
class GBTRegressor(override val uid: String) extends Regressor[Vector, GBTRegressor, GBTRegressionModel] {
  def this() = this(Identifiable.randomUID("gbtr"))
  
  def setMaxIter(value: Int): GBTRegressor
  def setMaxDepth(value: Int): GBTRegressor
  def setMaxBins(value: Int): GBTRegressor
  def setMinInstancesPerNode(value: Int): GBTRegressor
  def setMinInfoGain(value: Double): GBTRegressor
  def setSubsamplingRate(value: Double): GBTRegressor
  def setStepSize(value: Double): GBTRegressor
  def setFeatureSubsetStrategy(value: String): GBTRegressor
  def setSeed(value: Long): GBTRegressor
  def fit(dataset: Dataset[_]): GBTRegressionModel
}

Clustering

// K-Means
class KMeans(override val uid: String) extends Estimator[KMeansModel] {
  def this() = this(Identifiable.randomUID("kmeans"))
  
  def setK(value: Int): KMeans  
  def setMaxIter(value: Int): KMeans
  def setTol(value: Double): KMeans
  def setInitMode(value: String): KMeans
  def setInitSteps(value: Int): KMeans
  def setSeed(value: Long): KMeans
  def setFeaturesCol(value: String): KMeans
  def setPredictionCol(value: String): KMeans
  def fit(dataset: Dataset[_]): KMeansModel
}

class KMeansModel(override val uid: String, val clusterCenters: Array[Vector]) extends Model[KMeansModel] {
  def setPredictionCol(value: String): KMeansModel
  def setFeaturesCol(value: String): KMeansModel
  def transform(dataset: Dataset[_]): DataFrame
  def computeCost(dataset: Dataset[_]): Double
  def summary: KMeansSummary
}

// Gaussian Mixture Model
class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel] {
  def this() = this(Identifiable.randomUID("GaussianMixture"))
  
  def setK(value: Int): GaussianMixture
  def setMaxIter(value: Int): GaussianMixture  
  def setTol(value: Double): GaussianMixture
  def setSeed(value: Long): GaussianMixture
  def setFeaturesCol(value: String): GaussianMixture
  def setPredictionCol(value: String): GaussianMixture
  def setProbabilityCol(value: String): GaussianMixture
  def fit(dataset: Dataset[_]): GaussianMixtureModel
}

class GaussianMixtureModel(override val uid: String, val weights: Array[Double], val gaussians: Array[MultivariateGaussian])
  extends Model[GaussianMixtureModel] {
  def setFeaturesCol(value: String): GaussianMixtureModel
  def setPredictionCol(value: String): GaussianMixtureModel
  def setProbabilityCol(value: String): GaussianMixtureModel
  def transform(dataset: Dataset[_]): DataFrame
  def summary: GaussianMixtureSummary
}

Recommendation

// Alternating Least Squares (ALS)
class ALS(override val uid: String) extends Estimator[ALSModel] {
  def this() = this(Identifiable.randomUID("als"))
  
  def setRank(value: Int): ALS
  def setNumUserBlocks(value: Int): ALS
  def setNumItemBlocks(value: Int): ALS
  def setMaxIter(value: Int): ALS
  def setRegParam(value: Double): ALS
  def setAlpha(value: Double): ALS
  def setColdStartStrategy(value: String): ALS
  def setUserCol(value: String): ALS
  def setItemCol(value: String): ALS
  def setRatingCol(value: String): ALS
  def setPredictionCol(value: String): ALS
  def setImplicitPrefs(value: Boolean): ALS
  def setNonnegative(value: Boolean): ALS
  def setSeed(value: Long): ALS
  def fit(dataset: Dataset[_]): ALSModel
}

class ALSModel(override val uid: String, val rank: Int, val userFactors: DataFrame, val itemFactors: DataFrame)
  extends Model[ALSModel] {
  def setColdStartStrategy(value: String): ALSModel
  def setUserCol(value: String): ALSModel
  def setItemCol(value: String): ALSModel  
  def setPredictionCol(value: String): ALSModel
  def transform(dataset: Dataset[_]): DataFrame
  def recommendForAllUsers(numItems: Int): DataFrame
  def recommendForAllItems(numUsers: Int): DataFrame
  def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
  def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
}

Model Evaluation

// Binary Classification Evaluator
class BinaryClassificationEvaluator(override val uid: String) extends Evaluator {
  def this() = this(Identifiable.randomUID("binEval"))
  
  def setRawPredictionCol(value: String): BinaryClassificationEvaluator
  def setLabelCol(value: String): BinaryClassificationEvaluator
  def setMetricName(value: String): BinaryClassificationEvaluator
  def setWeightCol(value: String): BinaryClassificationEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

// Multiclass Classification Evaluator  
class MulticlassClassificationEvaluator(override val uid: String) extends Evaluator {
  def this() = this(Identifiable.randomUID("mcEval"))
  
  def setPredictionCol(value: String): MulticlassClassificationEvaluator
  def setLabelCol(value: String): MulticlassClassificationEvaluator
  def setMetricName(value: String): MulticlassClassificationEvaluator
  def setWeightCol(value: String): MulticlassClassificationEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

// Regression Evaluator
class RegressionEvaluator(override val uid: String) extends Evaluator {
  def this() = this(Identifiable.randomUID("regEval"))
  
  def setPredictionCol(value: String): RegressionEvaluator
  def setLabelCol(value: String): RegressionEvaluator
  def setMetricName(value: String): RegressionEvaluator
  def setWeightCol(value: String): RegressionEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

// Clustering Evaluator
class ClusteringEvaluator(override val uid: String) extends Evaluator {
  def this() = this(Identifiable.randomUID("cluEval"))
  
  def setPredictionCol(value: String): ClusteringEvaluator
  def setFeaturesCol(value: String): ClusteringEvaluator
  def setMetricName(value: String): ClusteringEvaluator
  def setWeightCol(value: String): ClusteringEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

Model Selection and Tuning

// Cross Validation
class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel] {
  def this() = this(Identifiable.randomUID("cv"))
  
  def setEstimator(value: Estimator[_]): CrossValidator
  def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
  def setEvaluator(value: Evaluator): CrossValidator
  def setNumFolds(value: Int): CrossValidator
  def setSeed(value: Long): CrossValidator
  def setParallelism(value: Int): CrossValidator
  def setCollectSubModels(value: Boolean): CrossValidator
  def fit(dataset: Dataset[_]): CrossValidatorModel
}

class CrossValidatorModel(override val uid: String, val bestModel: Model[_], val avgMetrics: Array[Double])
  extends Model[CrossValidatorModel] {
  def setNumFolds(value: Int): CrossValidatorModel
  def transform(dataset: Dataset[_]): DataFrame
}

// Train Validation Split
class TrainValidationSplit(override val uid: String) extends Estimator[TrainValidationSplitModel] {
  def this() = this(Identifiable.randomUID("tvs"))
  
  def setEstimator(value: Estimator[_]): TrainValidationSplit
  def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
  def setEvaluator(value: Evaluator): TrainValidationSplit
  def setTrainRatio(value: Double): TrainValidationSplit
  def setSeed(value: Long): TrainValidationSplit
  def setParallelism(value: Int): TrainValidationSplit  
  def setCollectSubModels(value: Boolean): TrainValidationSplit
  def fit(dataset: Dataset[_]): TrainValidationSplitModel
}

// Parameter Grid Builder
class ParamGridBuilder {
  def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
  def baseOn(paramMap: ParamMap): ParamGridBuilder
  def baseOn(paramPair: ParamPair[_], paramPairs: ParamPair[_]*): ParamGridBuilder
  def build(): Array[ParamMap]
}

MLlib RDD-based API (Legacy)

The original RDD-based machine learning library, now in maintenance mode.

Linear Algebra

// Vectors
trait Vector extends Serializable {
  def size: Int
  def toArray: Array[Double]
  def apply(i: Int): Double
  def copy: Vector
  def foreachActive(f: (Int, Double) => Unit): Unit
  def numActives: Int
  def numNonzeros: Int
}

object Vectors {
  def dense(values: Array[Double]): Vector
  def dense(firstValue: Double, otherValues: Double*): Vector
  def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
  def sparse(size: Int, elements: Seq[(Int, Double)]): Vector
  def zeros(size: Int): Vector
  def norm(vector: Vector, p: Double): Double
  def sqdist(v1: Vector, v2: Vector): Double
}

class DenseVector(val values: Array[Double]) extends Vector
class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector

// Matrices
trait Matrix extends Serializable {
  def numRows: Int
  def numCols: Int
  def toArray: Array[Double]
  def apply(i: Int, j: Int): Double
  def copy: Matrix  
  def transpose: Matrix
  def foreachActive(f: (Int, Int, Double) => Unit): Unit
}

object Matrices {
  def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix
  def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values: Array[Double]): Matrix
  def zeros(numRows: Int, numCols: Int): Matrix
  def eye(n: Int): Matrix
}

Usage example for MLlib:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}
import org.apache.spark.mllib.evaluation.RegressionMetrics

// Create labeled points
val data = sc.textFile("data.txt").map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}

// Split data
val splits = data.randomSplit(Array(0.8, 0.2), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Train model
val numIterations = 100
val stepSize = 0.00000001
val model = LinearRegressionWithSGD.train(training, numIterations, stepSize)

// Evaluate
val valuesAndPreds = test.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

val metrics = new RegressionMetrics(valuesAndPreds)
println(s"RMSE = ${metrics.rootMeanSquaredError}")

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13

docs

core-engine.md

graph-processing.md

index.md

machine-learning.md

sql-dataframes.md

stream-processing.md

tile.json