The MLlib RDD-based API (org.apache.spark.mllib) is the original machine learning library built on Spark RDDs. While this API is in maintenance mode and the DataFrame-based API is recommended for new development, it still provides valuable functionality and is widely used in existing systems.
case class LabeledPoint(label: Double, features: org.apache.spark.mllib.linalg.Vector) {
override def toString: String
}
object LabeledPoint {
def parse(s: String): LabeledPoint
}case class Rating(user: Int, product: Int, rating: Double) {
override def toString: String
}trait Vector extends Serializable {
def size: Int
def toArray: Array[Double]
def apply(i: Int): Double
def copy: Vector
def foreachActive(f: (Int, Double) => Unit): Unit
def numActives: Int
def numNonzeros: Int
def toDense: DenseVector
def toSparse: SparseVector
def compressed: Vector
}
class DenseVector(val values: Array[Double]) extends Vector
class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector
object Vectors {
def dense(values: Array[Double]): DenseVector
def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
def norm(vector: Vector, p: Double): Double
def sqdist(v1: Vector, v2: Vector): Double
}trait Matrix extends Serializable {
def numRows: Int
def numCols: Int
def toArray: Array[Double]
def apply(i: Int, j: Int): Double
def transpose: Matrix
def multiply(y: DenseMatrix): DenseMatrix
def multiply(y: DenseVector): DenseVector
}
class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix
class SparseMatrix(override val numRows: Int, override val numCols: Int,
val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double]) extends Matrix
object Matrices {
def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],
rowIndices: Array[Int], values: Array[Double]): SparseMatrix
}class LogisticRegressionModel(override val weights: Vector, override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
override def toString: String
}
object LogisticRegressionWithLBFGS {
def train(input: RDD[LabeledPoint]): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, numCorrections: Int,
convergenceTol: Double, regParam: Double): LogisticRegressionModel
}
object LogisticRegressionWithSGD {
def train(input: RDD[LabeledPoint]): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
miniBatchFraction: Double): LogisticRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
miniBatchFraction: Double, initialWeights: Vector): LogisticRegressionModel
}class SVMModel(override val weights: Vector, override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
def setThreshold(threshold: Double): SVMModel
def clearThreshold(): SVMModel
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}
object SVMWithSGD {
def train(input: RDD[LabeledPoint]): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double, miniBatchFraction: Double): SVMModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double, miniBatchFraction: Double, initialWeights: Vector): SVMModel
}class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]],
val modelType: String) extends ClassificationModel with Serializable {
def predict(testData: RDD[Vector]): RDD[Double]
def predict(testData: Vector): Double
}
object NaiveBayes {
def train(input: RDD[LabeledPoint]): NaiveBayesModel
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel
def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel
}class LinearRegressionModel(override val weights: Vector, override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}
object LinearRegressionWithSGD {
def train(input: RDD[LabeledPoint]): LinearRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
miniBatchFraction: Double): LinearRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
miniBatchFraction: Double, initialWeights: Vector): LinearRegressionModel
}class RidgeRegressionModel(override val weights: Vector, override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}
object RidgeRegressionWithSGD {
def train(input: RDD[LabeledPoint]): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double, miniBatchFraction: Double): RidgeRegressionModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double, miniBatchFraction: Double, initialWeights: Vector): RidgeRegressionModel
}class LassoModel(override val weights: Vector, override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}
object LassoWithSGD {
def train(input: RDD[LabeledPoint]): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double, miniBatchFraction: Double): LassoModel
def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
regParam: Double, miniBatchFraction: Double, initialWeights: Vector): LassoModel
}class KMeansModel(val clusterCenters: Array[Vector]) extends Saveable with Serializable {
def predict(point: Vector): Int
def predict(points: RDD[Vector]): RDD[Int]
def computeCost(data: RDD[Vector]): Double
def k: Int
}
object KMeans {
def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,
initializationMode: String): KMeansModel
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,
initializationMode: String, seed: Long): KMeansModel
}class GaussianMixtureModel(val weights: Array[Double], val gaussians: Array[MultivariateGaussian])
extends Serializable {
def predict(point: Vector): Int
def predict(points: RDD[Vector]): RDD[Int]
def predictSoft(point: Vector): Array[Double]
def predictSoft(points: RDD[Vector]): RDD[Array[Double]]
def k: Int
}
class GaussianMixture extends Serializable {
def setK(k: Int): GaussianMixture
def setMaxIterations(maxIterations: Int): GaussianMixture
def setConvergenceTol(convergenceTol: Double): GaussianMixture
def setSeed(seed: Long): GaussianMixture
def run(data: RDD[Vector]): GaussianMixtureModel
}class PowerIterationClusteringModel(val k: Int, val assignments: RDD[(Long, Int)]) extends Serializable {
def predict(point: Vector): Int
}
class PowerIterationClustering(private var k: Int, private var maxIterations: Int) extends Serializable {
def setK(k: Int): PowerIterationClustering
def setMaxIterations(maxIterations: Int): PowerIterationClustering
def setInitializationMode(initializationMode: String): PowerIterationClustering
def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel
}class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable {
def predict(features: Vector): Double
def predict(features: RDD[Vector]): RDD[Double]
def depth: Int
def numNodes: Int
}
object DecisionTree {
def train(input: RDD[LabeledPoint], strategy: Strategy): DecisionTreeModel
def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
maxDepth: Int): DecisionTreeModel
def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
maxDepth: Int, numClasses: Int): DecisionTreeModel
def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
maxDepth: Int, numClasses: Int, maxBins: Int,
quantileCalculationStrategy: QuantileStrategy,
categoricalFeaturesInfo: Map[Int, Int]): DecisionTreeModel
}class RandomForestModel(val algo: Algo, val trees: Array[DecisionTreeModel]) extends Serializable {
def predict(features: Vector): Double
def predict(features: RDD[Vector]): RDD[Double]
def totalNumNodes: Int
}
object RandomForest {
def trainClassifier(input: RDD[LabeledPoint], strategy: Strategy,
numTrees: Int, featureSubsetStrategy: String,
seed: Int): RandomForestModel
def trainRegressor(input: RDD[LabeledPoint], strategy: Strategy,
numTrees: Int, featureSubsetStrategy: String,
seed: Int): RandomForestModel
}class GradientBoostedTreesModel(val algo: Algo, val trees: Array[DecisionTreeModel],
val treeWeights: Array[Double]) extends Serializable {
def predict(features: Vector): Double
def predict(features: RDD[Vector]): RDD[Double]
def totalNumNodes: Int
}
object GradientBoostedTrees {
def train(input: RDD[LabeledPoint], boostingStrategy: BoostingStrategy): GradientBoostedTreesModel
}class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])],
val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
def predict(user: Int, product: Int): Double
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
def recommendProducts(user: Int, num: Int): Array[Rating]
def recommendUsers(product: Int, num: Int): Array[Rating]
}
object ALS {
def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,
alpha: Double): MatrixFactorizationModel
}class HashingTF(val numFeatures: Int) extends Serializable {
def this() = this(1048576)
def transform(document: Iterable[String]): Vector
def transform(document: RDD[Iterable[String]]): RDD[Vector]
def indexOf(term: Any): Int
}
class IDFModel(val idf: Vector) extends Serializable {
def transform(dataset: RDD[Vector]): RDD[Vector]
def transform(dataset: Vector): Vector
}
class IDF(val minDocFreq: Int) extends Serializable {
def this() = this(0)
def fit(dataset: RDD[Vector]): IDFModel
}
class Word2VecModel(private val wordVectors: Map[String, Array[Float]]) extends Serializable {
def transform(word: String): Vector
def findSynonyms(word: String, num: Int): Array[(String, Double)]
def findSynonyms(vector: Vector, num: Int): Array[(String, Double)]
def getVectors: Map[String, Array[Float]]
}
class Word2Vec extends Serializable {
def setVectorSize(vectorSize: Int): Word2Vec
def setLearningRate(learningRate: Double): Word2Vec
def setNumPartitions(numPartitions: Int): Word2Vec
def setNumIterations(numIterations: Int): Word2Vec
def setSeed(seed: Long): Word2Vec
def setMinCount(minCount: Int): Word2Vec
def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel
}class StandardScalerModel(val std: Vector, val mean: Vector) extends Serializable {
def this(std: Vector) = this(std, null)
def transform(vector: Vector): Vector
def transform(vectors: RDD[Vector]): RDD[Vector]
}
class StandardScaler(withMean: Boolean, withStd: Boolean) extends Serializable {
def this() = this(false, true)
def fit(data: RDD[Vector]): StandardScalerModel
}
class Normalizer(val p: Double) extends Serializable {
def this() = this(2.0)
def transform(vector: Vector): Vector
def transform(data: RDD[Vector]): RDD[Vector]
}trait MultivariateStatisticalSummary {
def mean: Vector
def variance: Vector
def count: Long
def numNonzeros: Vector
def max: Vector
def min: Vector
def normL1: Vector
def normL2: Vector
}
class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {
def add(sample: Vector): MultivariateOnlineSummarizer
def merge(other: MultivariateOnlineSummarizer): MultivariateOnlineSummarizer
}
object Statistics {
def colStats(X: RDD[Vector]): MultivariateStatisticalSummary
def corr(x: RDD[Double], y: RDD[Double]): Double
def corr(x: RDD[Double], y: RDD[Double], method: String): Double
def corr(X: RDD[Vector]): Matrix
def corr(X: RDD[Vector], method: String): Matrix
def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult
def chiSqTest(observed: Matrix): ChiSqTestResult
def kolmogorovSmirnovTest(sampleX: RDD[Double], sampleY: RDD[Double]): KolmogorovSmirnovTestResult
def kolmogorovSmirnovTest(sample: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult
}import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
// Load and parse data
val data = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")
// Split data into training and test sets
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// Train logistic regression model
val lrModel = LogisticRegressionWithLBFGS.train(training)
// Train SVM model
val svmModel = SVMWithSGD.train(training, numIterations = 100)
// Make predictions
val lrPredictionAndLabels = test.map { point =>
val prediction = lrModel.predict(point.features)
(prediction, point.label)
}
val svmPredictionAndLabels = test.map { point =>
val prediction = svmModel.predict(point.features)
(prediction, point.label)
}
// Evaluate models
val lrMetrics = new BinaryClassificationMetrics(lrPredictionAndLabels)
val svmMetrics = new BinaryClassificationMetrics(svmPredictionAndLabels)
println(s"Logistic Regression AUC: ${lrMetrics.areaUnderROC()}")
println(s"SVM AUC: ${svmMetrics.areaUnderROC()}")import org.apache.spark.mllib.clustering.{KMeans, GaussianMixture}
import org.apache.spark.mllib.linalg.Vectors
// Create sample data
val data = sc.parallelize(Array(
Vectors.dense(0.0, 0.0),
Vectors.dense(1.0, 1.0),
Vectors.dense(9.0, 8.0),
Vectors.dense(8.0, 9.0)
))
// K-means clustering
val numClusters = 2
val numIterations = 20
val kMeansModel = KMeans.train(data, numClusters, numIterations)
println("K-means cluster centers:")
kMeansModel.clusterCenters.foreach(println)
// Predict clusters
val predictions = kMeansModel.predict(data)
data.zip(predictions).collect().foreach { case (point, cluster) =>
println(s"Point $point belongs to cluster $cluster")
}
// Gaussian Mixture Model
val gmm = new GaussianMixture()
.setK(2)
.setMaxIterations(20)
val gmmModel = gmm.run(data)
println("GMM weights:")
gmmModel.weights.foreach(println)
println("GMM gaussians:")
gmmModel.gaussians.foreach(println)
// Soft predictions (probabilities)
val softPredictions = gmmModel.predictSoft(data)
softPredictions.collect().foreach(println)import org.apache.spark.mllib.recommendation.{ALS, Rating}
// Sample ratings data
val ratings = sc.parallelize(Array(
Rating(1, 1, 3.0),
Rating(1, 2, 4.0),
Rating(1, 3, 2.0),
Rating(2, 1, 4.0),
Rating(2, 2, 2.0),
Rating(2, 3, 5.0),
Rating(3, 1, 2.0),
Rating(3, 2, 3.0),
Rating(3, 3, 4.0)
))
// Train collaborative filtering model
val rank = 10
val numIterations = 10
val lambda = 0.01
val alsModel = ALS.train(ratings, rank, numIterations, lambda)
// Make predictions
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions = alsModel.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
// Calculate RMSE
val mse = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
val rmse = math.sqrt(mse)
println(s"Mean Squared Error = $mse")
println(s"Root Mean Squared Error = $rmse")
// Recommend products for users
val userProducts = alsModel.recommendProducts(1, 3)
println(s"Recommendations for user 1:")
userProducts.foreach(println)import org.apache.spark.mllib.feature.{HashingTF, IDF, Word2Vec}
import org.apache.spark.mllib.linalg.Vector
// Sample documents
val documents: RDD[Seq[String]] = sc.parallelize(Seq(
"spark is great".split(" ").toSeq,
"machine learning with spark".split(" ").toSeq,
"apache spark mllib".split(" ").toSeq
))
// TF-IDF feature extraction
val hashingTF = new HashingTF(1000)
val tf: RDD[Vector] = hashingTF.transform(documents)
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
println("TF-IDF vectors:")
tfidf.collect().foreach(println)
// Word2Vec embeddings
val word2vec = new Word2Vec()
.setVectorSize(100)
.setMinCount(1)
val word2vecModel = word2vec.fit(documents)
// Find synonyms
val synonyms = word2vecModel.findSynonyms("spark", 5)
synonyms.foreach { case (word, similarity) =>
println(s"$word: $similarity")
}
// Transform words to vectors
val sparkVector = word2vecModel.transform("spark")
println(s"Vector for 'spark': $sparkVector")import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg.Vectors
// Sample data
val observations = sc.parallelize(Seq(
Vectors.dense(1.0, 10.0, 100.0),
Vectors.dense(2.0, 20.0, 200.0),
Vectors.dense(3.0, 30.0, 300.0)
))
// Summary statistics
val summary = Statistics.colStats(observations)
println(s"Mean: ${summary.mean}")
println(s"Variance: ${summary.variance}")
println(s"Min: ${summary.min}")
println(s"Max: ${summary.max}")
println(s"Count: ${summary.count}")
// Correlation matrix
val correlMatrix = Statistics.corr(observations, "pearson")
println(s"Correlation matrix:\n$correlMatrix")
// Correlation between two RDDs
val x: RDD[Double] = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))
val y: RDD[Double] = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))
val correlation = Statistics.corr(x, y, "pearson")
println(s"Correlation between x and y: $correlation")
// Chi-squared test
val observed = Vectors.dense(1.0, 2.0, 3.0)
val expected = Vectors.dense(1.5, 1.5, 2.0)
val chiSqTestResult = Statistics.chiSqTest(observed, expected)
println(s"Chi-squared test result: $chiSqTestResult")import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.StandardScaler
// Create labeled points from raw data
def createLabeledPoints(rawData: RDD[String]): RDD[LabeledPoint] = {
rawData.map { line =>
val parts = line.split(',')
val label = parts.last.toDouble
val features = Vectors.dense(parts.init.map(_.toDouble))
LabeledPoint(label, features)
}
}
// Feature scaling
def scaleFeatures(data: RDD[LabeledPoint]): (RDD[LabeledPoint], StandardScalerModel) = {
val features = data.map(_.features)
val scaler = new StandardScaler(withMean = true, withStd = true)
val scalerModel = scaler.fit(features)
val scaledData = data.map { point =>
LabeledPoint(point.label, scalerModel.transform(point.features))
}
(scaledData, scalerModel)
}
// Cross-validation for RDD-based models
def crossValidate[M](data: RDD[LabeledPoint],
trainFunc: RDD[LabeledPoint] => M,
predictFunc: (M, Vector) => Double,
k: Int = 5): Array[Double] = {
val folds = data.randomSplit(Array.fill(k)(1.0 / k), seed = 42)
(0 until k).map { i =>
val validation = folds(i)
val training = sc.union(folds.zipWithIndex.filter(_._2 != i).map(_._1))
val model = trainFunc(training)
val predictions = validation.map { point =>
val prediction = predictFunc(model, point.features)
(prediction, point.label)
}
// Calculate accuracy for classification or RMSE for regression
val accuracy = predictions.map { case (pred, label) =>
if (math.abs(pred - label) < 0.5) 1.0 else 0.0
}.mean()
accuracy
}.toArray
}
// Usage example
val rawData = sc.textFile("path/to/data.csv")
val labeledData = createLabeledPoints(rawData)
val (scaledData, scalerModel) = scaleFeatures(labeledData)
// Cross-validate a logistic regression model
val cvScores = crossValidate(
scaledData,
LogisticRegressionWithLBFGS.train,
(model: LogisticRegressionModel, features: Vector) => model.predict(features)
)
println(s"Cross-validation scores: ${cvScores.mkString(", ")}")
println(s"Average accuracy: ${cvScores.sum / cvScores.length}")import org.apache.spark.ml.{Pipeline, classification => newML}
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler => NewStandardScaler}
import org.apache.spark.sql.DataFrame
// Convert RDD[LabeledPoint] to DataFrame
def rddToDataFrame(data: RDD[LabeledPoint]): DataFrame = {
import spark.implicits._
data.map { point =>
(point.label, point.features)
}.toDF("label", "features")
}
// Convert mllib.linalg.Vector to ml.linalg.Vector
def convertVector(oldVector: org.apache.spark.mllib.linalg.Vector): org.apache.spark.ml.linalg.Vector = {
org.apache.spark.ml.linalg.Vectors.fromML(oldVector)
}
// Migration example
val rddData: RDD[LabeledPoint] = // ... your RDD data
val dataFrame = rddToDataFrame(rddData)
// Use new DataFrame-based API
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val scaler = new NewStandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
val lr = new newML.LogisticRegression()
.setFeaturesCol("scaledFeatures")
.setLabelCol("label")
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler, lr))
val model = pipeline.fit(dataFrame)
val predictions = model.transform(dataFrame)
predictions.select("label", "prediction", "probability").show()