or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md
tile.json

rdd-api.mddocs/

RDD-based API (Legacy)

The MLlib RDD-based API (org.apache.spark.mllib) is the original machine learning library built on Spark RDDs. While this API is in maintenance mode and the DataFrame-based API is recommended for new development, it still provides valuable functionality and is widely used in existing systems.

Core Data Types

LabeledPoint

case class LabeledPoint(label: Double, features: org.apache.spark.mllib.linalg.Vector) {
  override def toString: String
}

object LabeledPoint {
  def parse(s: String): LabeledPoint
}

Rating

case class Rating(user: Int, product: Int, rating: Double) {
  override def toString: String
}

Linear Algebra (mllib.linalg)

Vector

trait Vector extends Serializable {
  def size: Int
  def toArray: Array[Double]
  def apply(i: Int): Double
  def copy: Vector
  def foreachActive(f: (Int, Double) => Unit): Unit
  def numActives: Int
  def numNonzeros: Int
  def toDense: DenseVector
  def toSparse: SparseVector
  def compressed: Vector
}

class DenseVector(val values: Array[Double]) extends Vector
class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector

object Vectors {
  def dense(values: Array[Double]): DenseVector
  def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
  def norm(vector: Vector, p: Double): Double
  def sqdist(v1: Vector, v2: Vector): Double
}

Matrix

trait Matrix extends Serializable {
  def numRows: Int
  def numCols: Int
  def toArray: Array[Double]
  def apply(i: Int, j: Int): Double
  def transpose: Matrix
  def multiply(y: DenseMatrix): DenseMatrix
  def multiply(y: DenseVector): DenseVector
}

class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix
class SparseMatrix(override val numRows: Int, override val numCols: Int,
                   val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double]) extends Matrix

object Matrices {
  def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
  def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], 
            rowIndices: Array[Int], values: Array[Double]): SparseMatrix
}

Classification

Logistic Regression

class LogisticRegressionModel(override val weights: Vector, override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
  override def toString: String
}

object LogisticRegressionWithLBFGS {
  def train(input: RDD[LabeledPoint]): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, numCorrections: Int, 
           convergenceTol: Double, regParam: Double): LogisticRegressionModel
}

object LogisticRegressionWithSGD {
  def train(input: RDD[LabeledPoint]): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, 
           miniBatchFraction: Double): LogisticRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           miniBatchFraction: Double, initialWeights: Vector): LogisticRegressionModel
}

Support Vector Machines

class SVMModel(override val weights: Vector, override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

  def setThreshold(threshold: Double): SVMModel
  def clearThreshold(): SVMModel
  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}

object SVMWithSGD {
  def train(input: RDD[LabeledPoint]): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double, miniBatchFraction: Double): SVMModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double, miniBatchFraction: Double, initialWeights: Vector): SVMModel
}

Naive Bayes

class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]],
                     val modelType: String) extends ClassificationModel with Serializable {

  def predict(testData: RDD[Vector]): RDD[Double]
  def predict(testData: Vector): Double
}

object NaiveBayes {
  def train(input: RDD[LabeledPoint]): NaiveBayesModel
  def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel
  def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel
}

Regression

Linear Regression

class LinearRegressionModel(override val weights: Vector, override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}

object LinearRegressionWithSGD {
  def train(input: RDD[LabeledPoint]): LinearRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           miniBatchFraction: Double): LinearRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           miniBatchFraction: Double, initialWeights: Vector): LinearRegressionModel
}

Ridge Regression

class RidgeRegressionModel(override val weights: Vector, override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}

object RidgeRegressionWithSGD {
  def train(input: RDD[LabeledPoint]): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double, miniBatchFraction: Double): RidgeRegressionModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double, miniBatchFraction: Double, initialWeights: Vector): RidgeRegressionModel
}

Lasso Regression

class LassoModel(override val weights: Vector, override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
}

object LassoWithSGD {
  def train(input: RDD[LabeledPoint]): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double, miniBatchFraction: Double): LassoModel
  def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,
           regParam: Double, miniBatchFraction: Double, initialWeights: Vector): LassoModel
}

Clustering

K-Means

class KMeansModel(val clusterCenters: Array[Vector]) extends Saveable with Serializable {
  def predict(point: Vector): Int
  def predict(points: RDD[Vector]): RDD[Int]
  def computeCost(data: RDD[Vector]): Double
  def k: Int
}

object KMeans {
  def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel
  def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel
  def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,
           initializationMode: String): KMeansModel
  def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,
           initializationMode: String, seed: Long): KMeansModel
}

Gaussian Mixture Model

class GaussianMixtureModel(val weights: Array[Double], val gaussians: Array[MultivariateGaussian])
  extends Serializable {

  def predict(point: Vector): Int
  def predict(points: RDD[Vector]): RDD[Int]
  def predictSoft(point: Vector): Array[Double]
  def predictSoft(points: RDD[Vector]): RDD[Array[Double]]
  def k: Int
}

class GaussianMixture extends Serializable {
  def setK(k: Int): GaussianMixture
  def setMaxIterations(maxIterations: Int): GaussianMixture
  def setConvergenceTol(convergenceTol: Double): GaussianMixture
  def setSeed(seed: Long): GaussianMixture
  def run(data: RDD[Vector]): GaussianMixtureModel
}

Power Iteration Clustering

class PowerIterationClusteringModel(val k: Int, val assignments: RDD[(Long, Int)]) extends Serializable {
  def predict(point: Vector): Int
}

class PowerIterationClustering(private var k: Int, private var maxIterations: Int) extends Serializable {
  def setK(k: Int): PowerIterationClustering
  def setMaxIterations(maxIterations: Int): PowerIterationClustering
  def setInitializationMode(initializationMode: String): PowerIterationClustering
  def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel
}

Tree-Based Methods

Decision Trees

class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable {
  def predict(features: Vector): Double
  def predict(features: RDD[Vector]): RDD[Double]
  def depth: Int
  def numNodes: Int
}

object DecisionTree {
  def train(input: RDD[LabeledPoint], strategy: Strategy): DecisionTreeModel
  def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
           maxDepth: Int): DecisionTreeModel
  def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
           maxDepth: Int, numClasses: Int): DecisionTreeModel
  def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,
           maxDepth: Int, numClasses: Int, maxBins: Int,
           quantileCalculationStrategy: QuantileStrategy,
           categoricalFeaturesInfo: Map[Int, Int]): DecisionTreeModel
}

Random Forest

class RandomForestModel(val algo: Algo, val trees: Array[DecisionTreeModel]) extends Serializable {
  def predict(features: Vector): Double
  def predict(features: RDD[Vector]): RDD[Double]
  def totalNumNodes: Int
}

object RandomForest {
  def trainClassifier(input: RDD[LabeledPoint], strategy: Strategy,
                     numTrees: Int, featureSubsetStrategy: String,
                     seed: Int): RandomForestModel
  def trainRegressor(input: RDD[LabeledPoint], strategy: Strategy,
                    numTrees: Int, featureSubsetStrategy: String,
                    seed: Int): RandomForestModel
}

Gradient Boosted Trees

class GradientBoostedTreesModel(val algo: Algo, val trees: Array[DecisionTreeModel], 
                               val treeWeights: Array[Double]) extends Serializable {
  def predict(features: Vector): Double
  def predict(features: RDD[Vector]): RDD[Double]
  def totalNumNodes: Int
}

object GradientBoostedTrees {
  def train(input: RDD[LabeledPoint], boostingStrategy: BoostingStrategy): GradientBoostedTreesModel
}

Recommendation

Collaborative Filtering

class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])],
                              val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
  
  def predict(user: Int, product: Int): Double
  def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
  def recommendProducts(user: Int, num: Int): Array[Rating]
  def recommendUsers(product: Int, num: Int): Array[Rating]
}

object ALS {
  def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
  def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,
                   alpha: Double): MatrixFactorizationModel
}

Feature Extraction and Selection

Text Feature Extraction

class HashingTF(val numFeatures: Int) extends Serializable {
  def this() = this(1048576)
  def transform(document: Iterable[String]): Vector
  def transform(document: RDD[Iterable[String]]): RDD[Vector]
  def indexOf(term: Any): Int
}

class IDFModel(val idf: Vector) extends Serializable {
  def transform(dataset: RDD[Vector]): RDD[Vector]
  def transform(dataset: Vector): Vector
}

class IDF(val minDocFreq: Int) extends Serializable {
  def this() = this(0)
  def fit(dataset: RDD[Vector]): IDFModel
}

class Word2VecModel(private val wordVectors: Map[String, Array[Float]]) extends Serializable {
  def transform(word: String): Vector
  def findSynonyms(word: String, num: Int): Array[(String, Double)]
  def findSynonyms(vector: Vector, num: Int): Array[(String, Double)]
  def getVectors: Map[String, Array[Float]]
}

class Word2Vec extends Serializable {
  def setVectorSize(vectorSize: Int): Word2Vec
  def setLearningRate(learningRate: Double): Word2Vec
  def setNumPartitions(numPartitions: Int): Word2Vec
  def setNumIterations(numIterations: Int): Word2Vec
  def setSeed(seed: Long): Word2Vec
  def setMinCount(minCount: Int): Word2Vec
  def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel
}

Numerical Feature Processing

class StandardScalerModel(val std: Vector, val mean: Vector) extends Serializable {
  def this(std: Vector) = this(std, null)
  def transform(vector: Vector): Vector
  def transform(vectors: RDD[Vector]): RDD[Vector]
}

class StandardScaler(withMean: Boolean, withStd: Boolean) extends Serializable {
  def this() = this(false, true)
  def fit(data: RDD[Vector]): StandardScalerModel
}

class Normalizer(val p: Double) extends Serializable {
  def this() = this(2.0)
  def transform(vector: Vector): Vector
  def transform(data: RDD[Vector]): RDD[Vector]
}

Statistics

Summary Statistics

trait MultivariateStatisticalSummary {
  def mean: Vector
  def variance: Vector
  def count: Long
  def numNonzeros: Vector
  def max: Vector
  def min: Vector
  def normL1: Vector
  def normL2: Vector
}

class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {
  def add(sample: Vector): MultivariateOnlineSummarizer
  def merge(other: MultivariateOnlineSummarizer): MultivariateOnlineSummarizer
}

object Statistics {
  def colStats(X: RDD[Vector]): MultivariateStatisticalSummary
  def corr(x: RDD[Double], y: RDD[Double]): Double
  def corr(x: RDD[Double], y: RDD[Double], method: String): Double
  def corr(X: RDD[Vector]): Matrix
  def corr(X: RDD[Vector], method: String): Matrix
  def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult
  def chiSqTest(observed: Matrix): ChiSqTestResult
  def kolmogorovSmirnovTest(sampleX: RDD[Double], sampleY: RDD[Double]): KolmogorovSmirnovTestResult
  def kolmogorovSmirnovTest(sample: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult
}

Usage Examples

Basic RDD-based Classification

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

// Load and parse data
val data = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")

// Split data into training and test sets
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Train logistic regression model
val lrModel = LogisticRegressionWithLBFGS.train(training)

// Train SVM model
val svmModel = SVMWithSGD.train(training, numIterations = 100)

// Make predictions
val lrPredictionAndLabels = test.map { point =>
  val prediction = lrModel.predict(point.features)
  (prediction, point.label)
}

val svmPredictionAndLabels = test.map { point =>
  val prediction = svmModel.predict(point.features)
  (prediction, point.label)
}

// Evaluate models
val lrMetrics = new BinaryClassificationMetrics(lrPredictionAndLabels)
val svmMetrics = new BinaryClassificationMetrics(svmPredictionAndLabels)

println(s"Logistic Regression AUC: ${lrMetrics.areaUnderROC()}")
println(s"SVM AUC: ${svmMetrics.areaUnderROC()}")

RDD-based Clustering

import org.apache.spark.mllib.clustering.{KMeans, GaussianMixture}
import org.apache.spark.mllib.linalg.Vectors

// Create sample data
val data = sc.parallelize(Array(
  Vectors.dense(0.0, 0.0),
  Vectors.dense(1.0, 1.0),
  Vectors.dense(9.0, 8.0),
  Vectors.dense(8.0, 9.0)
))

// K-means clustering
val numClusters = 2
val numIterations = 20
val kMeansModel = KMeans.train(data, numClusters, numIterations)

println("K-means cluster centers:")
kMeansModel.clusterCenters.foreach(println)

// Predict clusters
val predictions = kMeansModel.predict(data)
data.zip(predictions).collect().foreach { case (point, cluster) =>
  println(s"Point $point belongs to cluster $cluster")
}

// Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
  .setMaxIterations(20)

val gmmModel = gmm.run(data)

println("GMM weights:")
gmmModel.weights.foreach(println)

println("GMM gaussians:")
gmmModel.gaussians.foreach(println)

// Soft predictions (probabilities)
val softPredictions = gmmModel.predictSoft(data)
softPredictions.collect().foreach(println)

Matrix Factorization for Recommendations

import org.apache.spark.mllib.recommendation.{ALS, Rating}

// Sample ratings data
val ratings = sc.parallelize(Array(
  Rating(1, 1, 3.0),
  Rating(1, 2, 4.0),
  Rating(1, 3, 2.0),
  Rating(2, 1, 4.0),
  Rating(2, 2, 2.0),
  Rating(2, 3, 5.0),
  Rating(3, 1, 2.0),
  Rating(3, 2, 3.0),
  Rating(3, 3, 4.0)
))

// Train collaborative filtering model
val rank = 10
val numIterations = 10
val lambda = 0.01

val alsModel = ALS.train(ratings, rank, numIterations, lambda)

// Make predictions
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}

val predictions = alsModel.predict(usersProducts).map { case Rating(user, product, rate) =>
  ((user, product), rate)
}

val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)

// Calculate RMSE
val mse = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()

val rmse = math.sqrt(mse)
println(s"Mean Squared Error = $mse")
println(s"Root Mean Squared Error = $rmse")

// Recommend products for users
val userProducts = alsModel.recommendProducts(1, 3)
println(s"Recommendations for user 1:")
userProducts.foreach(println)

Feature Extraction and Text Processing

import org.apache.spark.mllib.feature.{HashingTF, IDF, Word2Vec}
import org.apache.spark.mllib.linalg.Vector

// Sample documents
val documents: RDD[Seq[String]] = sc.parallelize(Seq(
  "spark is great".split(" ").toSeq,
  "machine learning with spark".split(" ").toSeq,
  "apache spark mllib".split(" ").toSeq
))

// TF-IDF feature extraction
val hashingTF = new HashingTF(1000)
val tf: RDD[Vector] = hashingTF.transform(documents)

tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

println("TF-IDF vectors:")
tfidf.collect().foreach(println)

// Word2Vec embeddings
val word2vec = new Word2Vec()
  .setVectorSize(100)
  .setMinCount(1)

val word2vecModel = word2vec.fit(documents)

// Find synonyms
val synonyms = word2vecModel.findSynonyms("spark", 5)
synonyms.foreach { case (word, similarity) =>
  println(s"$word: $similarity")
}

// Transform words to vectors
val sparkVector = word2vecModel.transform("spark")
println(s"Vector for 'spark': $sparkVector")

Statistical Analysis

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg.Vectors

// Sample data
val observations = sc.parallelize(Seq(
  Vectors.dense(1.0, 10.0, 100.0),
  Vectors.dense(2.0, 20.0, 200.0),
  Vectors.dense(3.0, 30.0, 300.0)
))

// Summary statistics
val summary = Statistics.colStats(observations)
println(s"Mean: ${summary.mean}")
println(s"Variance: ${summary.variance}")
println(s"Min: ${summary.min}")
println(s"Max: ${summary.max}")
println(s"Count: ${summary.count}")

// Correlation matrix
val correlMatrix = Statistics.corr(observations, "pearson")
println(s"Correlation matrix:\n$correlMatrix")

// Correlation between two RDDs
val x: RDD[Double] = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))
val y: RDD[Double] = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))

val correlation = Statistics.corr(x, y, "pearson")
println(s"Correlation between x and y: $correlation")

// Chi-squared test
val observed = Vectors.dense(1.0, 2.0, 3.0)
val expected = Vectors.dense(1.5, 1.5, 2.0)

val chiSqTestResult = Statistics.chiSqTest(observed, expected)
println(s"Chi-squared test result: $chiSqTestResult")

Advanced RDD Operations for ML

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.StandardScaler

// Create labeled points from raw data
def createLabeledPoints(rawData: RDD[String]): RDD[LabeledPoint] = {
  rawData.map { line =>
    val parts = line.split(',')
    val label = parts.last.toDouble
    val features = Vectors.dense(parts.init.map(_.toDouble))
    LabeledPoint(label, features)
  }
}

// Feature scaling
def scaleFeatures(data: RDD[LabeledPoint]): (RDD[LabeledPoint], StandardScalerModel) = {
  val features = data.map(_.features)
  
  val scaler = new StandardScaler(withMean = true, withStd = true)
  val scalerModel = scaler.fit(features)
  
  val scaledData = data.map { point =>
    LabeledPoint(point.label, scalerModel.transform(point.features))
  }
  
  (scaledData, scalerModel)
}

// Cross-validation for RDD-based models
def crossValidate[M](data: RDD[LabeledPoint], 
                    trainFunc: RDD[LabeledPoint] => M,
                    predictFunc: (M, Vector) => Double,
                    k: Int = 5): Array[Double] = {
  
  val folds = data.randomSplit(Array.fill(k)(1.0 / k), seed = 42)
  
  (0 until k).map { i =>
    val validation = folds(i)
    val training = sc.union(folds.zipWithIndex.filter(_._2 != i).map(_._1))
    
    val model = trainFunc(training)
    
    val predictions = validation.map { point =>
      val prediction = predictFunc(model, point.features)
      (prediction, point.label)
    }
    
    // Calculate accuracy for classification or RMSE for regression
    val accuracy = predictions.map { case (pred, label) =>
      if (math.abs(pred - label) < 0.5) 1.0 else 0.0
    }.mean()
    
    accuracy
  }.toArray
}

// Usage example
val rawData = sc.textFile("path/to/data.csv")
val labeledData = createLabeledPoints(rawData)
val (scaledData, scalerModel) = scaleFeatures(labeledData)

// Cross-validate a logistic regression model
val cvScores = crossValidate(
  scaledData,
  LogisticRegressionWithLBFGS.train,
  (model: LogisticRegressionModel, features: Vector) => model.predict(features)
)

println(s"Cross-validation scores: ${cvScores.mkString(", ")}")
println(s"Average accuracy: ${cvScores.sum / cvScores.length}")

Migration from RDD to DataFrame API

import org.apache.spark.ml.{Pipeline, classification => newML}
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler => NewStandardScaler}
import org.apache.spark.sql.DataFrame

// Convert RDD[LabeledPoint] to DataFrame
def rddToDataFrame(data: RDD[LabeledPoint]): DataFrame = {
  import spark.implicits._
  
  data.map { point =>
    (point.label, point.features)
  }.toDF("label", "features")
}

// Convert mllib.linalg.Vector to ml.linalg.Vector
def convertVector(oldVector: org.apache.spark.mllib.linalg.Vector): org.apache.spark.ml.linalg.Vector = {
  org.apache.spark.ml.linalg.Vectors.fromML(oldVector)
}

// Migration example
val rddData: RDD[LabeledPoint] = // ... your RDD data
val dataFrame = rddToDataFrame(rddData)

// Use new DataFrame-based API
val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val scaler = new NewStandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

val lr = new newML.LogisticRegression()
  .setFeaturesCol("scaledFeatures")
  .setLabelCol("label")

val pipeline = new Pipeline()
  .setStages(Array(assembler, scaler, lr))

val model = pipeline.fit(dataFrame)
val predictions = model.transform(dataFrame)

predictions.select("label", "prediction", "probability").show()