or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md
tile.json

clustering.mddocs/

Clustering

MLlib provides comprehensive unsupervised learning algorithms for discovering patterns and groupings in data. All clustering algorithms follow the Estimator-Transformer pattern and support the Pipeline API.

K-Means Clustering

Estimator

class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams with DefaultParamsWritable {
  def this() = this(Identifiable.randomUID("kmeans"))

  def setK(value: Int): KMeans
  def setInitMode(value: String): KMeans
  def setInitSteps(value: Int): KMeans
  def setMaxIter(value: Int): KMeans
  def setTol(value: Double): KMeans
  def setSeed(value: Long): KMeans
  def setDistanceMeasure(value: String): KMeans

  override def fit(dataset: Dataset[_]): KMeansModel
  override def copy(extra: ParamMap): KMeans
}

Model

class KMeansModel private[ml] (
    override val uid: String, 
    private[clustering] val parentModel: MLlibKMeansModel)
  extends Model[KMeansModel] with KMeansParams with GeneralMLWritable {

  def clusterCenters: Array[Vector]
  lazy val summary: KMeansSummary
  def hasSummary: Boolean

  override def transform(dataset: Dataset[_]): DataFrame
  
  @deprecated("This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator " +
    "instead. You can also get the cost on the training dataset in the summary.", "2.4.0")
  def computeCost(dataset: Dataset[_]): Double
  
  override def copy(extra: ParamMap): KMeansModel
  def write: MLWriter
}

class KMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,
                                       featuresCol: String, val k: Int, val numIter: Int,
                                       val trainingCost: Double) extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
  
  def cluster: DataFrame = predictions.select(predictionCol)
  def clusterSizes: Array[Long]
}

Bisecting K-Means

Estimator

class BisectingKMeans(override val uid: String) extends Estimator[BisectingKMeansModel] 
  with BisectingKMeansParams with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("bisecting_kmeans"))

  def setK(value: Int): BisectingKMeans
  def setMaxIter(value: Int): BisectingKMeans
  def setSeed(value: Long): BisectingKMeans
  def setMinDivisibleClusterSize(value: Double): BisectingKMeans
  def setDistanceMeasure(value: String): BisectingKMeans

  override def fit(dataset: Dataset[_]): BisectingKMeansModel
  override def copy(extra: ParamMap): BisectingKMeans
}

Model

class BisectingKMeansModel(override val uid: String, private val parentModel: MLlibBisectingKMeansModel)
  extends Model[BisectingKMeansModel] with BisectingKMeansParams with MLWritable {

  def clusterCenters: Array[Vector]
  lazy val summary: BisectingKMeansSummary
  def hasSummary: Boolean

  override def transform(dataset: Dataset[_]): DataFrame
  def computeCost(dataset: Dataset[_]): Double
  override def copy(extra: ParamMap): BisectingKMeansModel
  def write: MLWriter
}

class BisectingKMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,
                                                featuresCol: String, val k: Int, val numIter: Int,
                                                val trainingCost: Double)
  extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {

  def cluster: DataFrame = predictions.select(predictionCol)
  def clusterSizes: Array[Long]
}

Gaussian Mixture Model

Estimator

class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel]
  with GaussianMixtureParams with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("GaussianMixture"))

  def setK(value: Int): GaussianMixture
  def setMaxIter(value: Int): GaussianMixture
  def setTol(value: Double): GaussianMixture
  def setSeed(value: Long): GaussianMixture
  def setAggregationDepth(value: Int): GaussianMixture

  override def fit(dataset: Dataset[_]): GaussianMixtureModel
  override def copy(extra: ParamMap): GaussianMixture
}

Model

class GaussianMixtureModel(override val uid: String, private val parentModel: MLlibGaussianMixtureModel)
  extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable {

  def weights: Array[Double]
  def gaussians: Array[MultivariateGaussian]
  lazy val summary: GaussianMixtureSummary
  def hasSummary: Boolean

  override def transform(dataset: Dataset[_]): DataFrame
  def predictProbability(features: Vector): Vector
  def logLikelihood(dataset: Dataset[_]): Double
  override def copy(extra: ParamMap): GaussianMixtureModel
  def write: MLWriter
}

class GaussianMixtureSummary private[clustering](predictions: DataFrame, predictionCol: String,
                                                featuresCol: String, val k: Int, val numIter: Int,
                                                val logLikelihood: Double, val logLikelihoodHistory: Array[Double])
  extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {

  def cluster: DataFrame = predictions.select(predictionCol)
  def probability: DataFrame = predictions.select(probabilityCol)
  def probabilityCol: String
  def clusterSizes: Array[Long]
}

Latent Dirichlet Allocation (LDA)

Estimator

class LDA(override val uid: String) extends Estimator[LDAModel] with LDAParams with DefaultParamsWritable {
  def this() = this(Identifiable.randomUID("lda"))

  def setK(value: Int): LDA
  def setMaxIter(value: Int): LDA
  def setDocConcentration(value: Array[Double]): LDA
  def setTopicConcentration(value: Double): LDA
  def setSeed(value: Long): LDA
  def setCheckpointInterval(value: Int): LDA
  def setOptimizer(value: String): LDA
  def setLearningOffset(value: Double): LDA
  def setLearningDecay(value: Double): LDA
  def setSubsamplingRate(value: Double): LDA
  def setOptimizeDocConcentration(value: Boolean): LDA
  def setKeepLastCheckpoint(value: Boolean): LDA

  override def fit(dataset: Dataset[_]): LDAModel
  override def copy(extra: ParamMap): LDA
}

Models

sealed abstract class LDAModel private[ml] extends Model[LDAModel] with LDAParams with MLWritable {
  def vocabSize: Int
  def topicsMatrix: Matrix
  def isDistributed: Boolean
  def logLikelihood(dataset: Dataset[_]): Double
  def logPerplexity(dataset: Dataset[_]): Double
  def describeTopics(maxTermsPerTopic: Int = 10): DataFrame
  def describeTopics(): DataFrame
  override def transform(dataset: Dataset[_]): DataFrame
  def write: MLWriter
}

class LocalLDAModel private[ml](override val uid: String, vocabSize: Int, 
                               private val model: MLlibLocalLDAModel, sqlContext: SQLContext)
  extends LDAModel {
  
  override def copy(extra: ParamMap): LocalLDAModel
  override def isDistributed: Boolean = false
  override def topicsMatrix: Matrix
}

class DistributedLDAModel private[ml](override val uid: String, vocabSize: Int,
                                     private val model: MLlibDistributedLDAModel, sqlContext: SQLContext,
                                     private val graph: Option[Graph[LDA.TopicCounts, LDA.TokenCount]])
  extends LDAModel {

  override def copy(extra: ParamMap): DistributedLDAModel  
  override def isDistributed: Boolean = true
  override def topicsMatrix: Matrix
  def toLocal: LocalLDAModel
  def trainingLogLikelihood: Double
  def logPrior: Double
}

Power Iteration Clustering

Estimator

class PowerIterationClustering(override val uid: String) extends Estimator[PowerIterationClusteringModel]
  with PowerIterationClusteringParams with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("pic"))

  def setK(value: Int): PowerIterationClustering
  def setMaxIter(value: Int): PowerIterationClustering
  def setInitMode(value: String): PowerIterationClustering
  def setSrcCol(value: String): PowerIterationClustering
  def setDstCol(value: String): PowerIterationClustering
  def setWeightCol(value: String): PowerIterationClustering

  override def fit(dataset: Dataset[_]): PowerIterationClusteringModel
  override def copy(extra: ParamMap): PowerIterationClustering
}

Model

class PowerIterationClusteringModel(override val uid: String, private val assignmentsDF: DataFrame)
  extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable {

  override def transform(dataset: Dataset[_]): DataFrame
  override def transformSchema(schema: StructType): StructType
  override def copy(extra: ParamMap): PowerIterationClusteringModel
  def write: MLWriter
}

Base Clustering Summary

abstract class ClusteringSummary(predictions: DataFrame, predictionCol: String, 
                                featuresCol: String, numIter: Int) extends Serializable {
  
  def cluster: DataFrame
  def clusterSizes: Array[Long]
  def silhouette: Double
  def trainingCost: Double
}

Usage Examples

K-Means Clustering

import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Prepare features
val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val data = assembler.transform(rawData)

// Create K-means
val kmeans = new KMeans()
  .setK(3)
  .setSeed(42)
  .setFeaturesCol("features")
  .setPredictionCol("cluster")
  .setMaxIter(100)
  .setTol(1E-4)
  .setDistanceMeasure("euclidean")

// Train model
val model = kmeans.fit(data)

// Get cluster centers
val centers = model.clusterCenters
println("Cluster Centers:")
centers.zipWithIndex.foreach { case (center, i) =>
  println(s"Cluster $i: $center")
}

// Make predictions
val predictions = model.transform(data)
predictions.select("features", "cluster").show()

// Get training summary
val summary = model.summary
println(s"Training cost: ${summary.trainingCost}")
println(s"Number of iterations: ${summary.numIter}")
println(s"Cluster sizes: ${summary.clusterSizes.mkString(", ")}")

// Evaluate clustering
val evaluator = new ClusteringEvaluator()
  .setFeaturesCol("features")
  .setPredictionCol("cluster")
  .setMetricName("silhouette")
  .setDistanceMeasure("squaredEuclidean")

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance: $silhouette")

Gaussian Mixture Model

import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel}

val gmm = new GaussianMixture()
  .setK(3)
  .setSeed(42)
  .setFeaturesCol("features")
  .setPredictionCol("cluster")
  .setProbabilityCol("probability")
  .setMaxIter(100)
  .setTol(1E-4)

val gmmModel = gmm.fit(data)

// Get mixture weights and gaussians
val weights = gmmModel.weights
val gaussians = gmmModel.gaussians

println("Mixture weights:")
weights.zipWithIndex.foreach { case (weight, i) =>
  println(s"Component $i: $weight")
}

println("Gaussian parameters:")
gaussians.zipWithIndex.foreach { case (gaussian, i) =>
  println(s"Component $i:")
  println(s"  Mean: ${gaussian.mean}")
  println(s"  Covariance: ${gaussian.cov}")
}

// Make predictions with probabilities
val gmmPredictions = gmmModel.transform(data)
gmmPredictions.select("features", "cluster", "probability").show(truncate = false)

// Get model summary
val gmmSummary = gmmModel.summary
println(s"Log-likelihood: ${gmmSummary.logLikelihood}")
println(s"Log-likelihood history: ${gmmSummary.logLikelihoodHistory.mkString(", ")}")

Bisecting K-Means

import org.apache.spark.ml.clustering.BisectingKMeans

val bkm = new BisectingKMeans()
  .setK(4)
  .setSeed(42)
  .setMaxIter(20)
  .setMinDivisibleClusterSize(1.0)
  .setDistanceMeasure("euclidean")

val bkmModel = bkm.fit(data)

// Get cluster centers
val bkmCenters = bkmModel.clusterCenters
println("Bisecting K-Means Cluster Centers:")
bkmCenters.zipWithIndex.foreach { case (center, i) =>
  println(s"Cluster $i: $center")
}

val bkmPredictions = bkmModel.transform(data)
val bkmSummary = bkmModel.summary
println(s"Bisecting K-Means training cost: ${bkmSummary.trainingCost}")

Latent Dirichlet Allocation (Topic Modeling)

import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.{CountVectorizer, Tokenizer, StopWordsRemover}

// Prepare text data for LDA
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")

val stopWordsRemover = new StopWordsRemover()
  .setInputCol("words")
  .setOutputCol("filtered_words")

val countVectorizer = new CountVectorizer()
  .setInputCol("filtered_words")
  .setOutputCol("features")
  .setVocabSize(10000)
  .setMinDF(2)

// Process text data
val tokenized = tokenizer.transform(textData)
val filtered = stopWordsRemover.transform(tokenized)
val vectorized = countVectorizer.fit(filtered).transform(filtered)

// Train LDA model
val lda = new LDA()
  .setK(10)  // Number of topics
  .setMaxIter(100)
  .setSeed(42)
  .setFeaturesCol("features")
  .setOptimizer("online")  // or "em"
  .setLearningDecay(0.51)
  .setLearningOffset(1024)
  .setSubsamplingRate(0.05)
  .setOptimizeDocConcentration(true)

val ldaModel = lda.fit(vectorized)

// Describe topics
val topics = ldaModel.describeTopics(maxTermsPerTopic = 10)
topics.show(truncate = false)

// Get document-topic distributions
val docTopics = ldaModel.transform(vectorized)
docTopics.select("features", "topicDistribution").show(truncate = false)

// Model evaluation
val ll = ldaModel.logLikelihood(vectorized)
val lp = ldaModel.logPerplexity(vectorized)
println(s"Log likelihood: $ll")
println(s"Log perplexity: $lp")

// Convert to local model if distributed
if (ldaModel.isDistributed) {
  val localModel = ldaModel.asInstanceOf[org.apache.spark.ml.clustering.DistributedLDAModel].toLocal
  // Use local model for inference on new data
}

Power Iteration Clustering

import org.apache.spark.ml.clustering.PowerIterationClustering

// Create graph data (source, destination, weight)
val edges = spark.createDataFrame(Seq(
  (0L, 1L, 0.9),
  (1L, 2L, 0.8),
  (2L, 3L, 0.7),
  (3L, 4L, 0.6),
  (4L, 0L, 0.5)
)).toDF("src", "dst", "weight")

val pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(20)
  .setSrcCol("src")
  .setDstCol("dst")
  .setWeightCol("weight")

val picModel = pic.fit(edges)
val picResults = picModel.transform(edges)
picResults.select("src", "cluster").distinct().show()

Clustering Pipeline with Feature Processing

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StandardScaler, PCA}
import org.apache.spark.ml.clustering.KMeans

// Create preprocessing pipeline
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithMean(true)
  .setWithStd(true)

val pca = new PCA()
  .setInputCol("scaledFeatures")
  .setOutputCol("pcaFeatures")
  .setK(5)

val kmeans = new KMeans()
  .setFeaturesCol("pcaFeatures")
  .setPredictionCol("cluster")
  .setK(4)
  .setSeed(42)

// Create and fit pipeline
val pipeline = new Pipeline()
  .setStages(Array(assembler, scaler, pca, kmeans))

val pipelineModel = pipeline.fit(data)
val clusteringResults = pipelineModel.transform(data)

clusteringResults.select("pcaFeatures", "cluster").show()

// Extract final K-means model for analysis
val finalKMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
println(s"Final cluster centers: ${finalKMeansModel.clusterCenters.mkString("\n")}")

Hyperparameter Tuning for Clustering

import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.ClusteringEvaluator

val kmeans = new KMeans()
  .setSeed(42)
  .setFeaturesCol("features")

// Build parameter grid
val paramGrid = new ParamGridBuilder()
  .addGrid(kmeans.k, Array(2, 3, 4, 5, 6))
  .addGrid(kmeans.maxIter, Array(50, 100, 200))
  .build()

// Create evaluator
val evaluator = new ClusteringEvaluator()
  .setFeaturesCol("features")
  .setPredictionCol("prediction")
  .setMetricName("silhouette")

// Create train-validation split
val tvs = new TrainValidationSplit()
  .setEstimator(kmeans)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8)
  .setSeed(42)

// Train with validation
val tvsModel = tvs.fit(data)

// Get best model
val bestKMeansModel = tvsModel.bestModel.asInstanceOf[KMeansModel]
println(s"Best K: ${bestKMeansModel.getK}")
println(s"Best silhouette score: ${evaluator.evaluate(tvsModel.transform(data))}")