MLlib provides comprehensive unsupervised learning algorithms for discovering patterns and groupings in data. All clustering algorithms follow the Estimator-Transformer pattern and support the Pipeline API.
class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("kmeans"))
def setK(value: Int): KMeans
def setInitMode(value: String): KMeans
def setInitSteps(value: Int): KMeans
def setMaxIter(value: Int): KMeans
def setTol(value: Double): KMeans
def setSeed(value: Long): KMeans
def setDistanceMeasure(value: String): KMeans
override def fit(dataset: Dataset[_]): KMeansModel
override def copy(extra: ParamMap): KMeans
}class KMeansModel private[ml] (
override val uid: String,
private[clustering] val parentModel: MLlibKMeansModel)
extends Model[KMeansModel] with KMeansParams with GeneralMLWritable {
def clusterCenters: Array[Vector]
lazy val summary: KMeansSummary
def hasSummary: Boolean
override def transform(dataset: Dataset[_]): DataFrame
@deprecated("This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator " +
"instead. You can also get the cost on the training dataset in the summary.", "2.4.0")
def computeCost(dataset: Dataset[_]): Double
override def copy(extra: ParamMap): KMeansModel
def write: MLWriter
}
class KMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,
featuresCol: String, val k: Int, val numIter: Int,
val trainingCost: Double) extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
def cluster: DataFrame = predictions.select(predictionCol)
def clusterSizes: Array[Long]
}class BisectingKMeans(override val uid: String) extends Estimator[BisectingKMeansModel]
with BisectingKMeansParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("bisecting_kmeans"))
def setK(value: Int): BisectingKMeans
def setMaxIter(value: Int): BisectingKMeans
def setSeed(value: Long): BisectingKMeans
def setMinDivisibleClusterSize(value: Double): BisectingKMeans
def setDistanceMeasure(value: String): BisectingKMeans
override def fit(dataset: Dataset[_]): BisectingKMeansModel
override def copy(extra: ParamMap): BisectingKMeans
}class BisectingKMeansModel(override val uid: String, private val parentModel: MLlibBisectingKMeansModel)
extends Model[BisectingKMeansModel] with BisectingKMeansParams with MLWritable {
def clusterCenters: Array[Vector]
lazy val summary: BisectingKMeansSummary
def hasSummary: Boolean
override def transform(dataset: Dataset[_]): DataFrame
def computeCost(dataset: Dataset[_]): Double
override def copy(extra: ParamMap): BisectingKMeansModel
def write: MLWriter
}
class BisectingKMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,
featuresCol: String, val k: Int, val numIter: Int,
val trainingCost: Double)
extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
def cluster: DataFrame = predictions.select(predictionCol)
def clusterSizes: Array[Long]
}class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel]
with GaussianMixtureParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("GaussianMixture"))
def setK(value: Int): GaussianMixture
def setMaxIter(value: Int): GaussianMixture
def setTol(value: Double): GaussianMixture
def setSeed(value: Long): GaussianMixture
def setAggregationDepth(value: Int): GaussianMixture
override def fit(dataset: Dataset[_]): GaussianMixtureModel
override def copy(extra: ParamMap): GaussianMixture
}class GaussianMixtureModel(override val uid: String, private val parentModel: MLlibGaussianMixtureModel)
extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable {
def weights: Array[Double]
def gaussians: Array[MultivariateGaussian]
lazy val summary: GaussianMixtureSummary
def hasSummary: Boolean
override def transform(dataset: Dataset[_]): DataFrame
def predictProbability(features: Vector): Vector
def logLikelihood(dataset: Dataset[_]): Double
override def copy(extra: ParamMap): GaussianMixtureModel
def write: MLWriter
}
class GaussianMixtureSummary private[clustering](predictions: DataFrame, predictionCol: String,
featuresCol: String, val k: Int, val numIter: Int,
val logLikelihood: Double, val logLikelihoodHistory: Array[Double])
extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
def cluster: DataFrame = predictions.select(predictionCol)
def probability: DataFrame = predictions.select(probabilityCol)
def probabilityCol: String
def clusterSizes: Array[Long]
}class LDA(override val uid: String) extends Estimator[LDAModel] with LDAParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("lda"))
def setK(value: Int): LDA
def setMaxIter(value: Int): LDA
def setDocConcentration(value: Array[Double]): LDA
def setTopicConcentration(value: Double): LDA
def setSeed(value: Long): LDA
def setCheckpointInterval(value: Int): LDA
def setOptimizer(value: String): LDA
def setLearningOffset(value: Double): LDA
def setLearningDecay(value: Double): LDA
def setSubsamplingRate(value: Double): LDA
def setOptimizeDocConcentration(value: Boolean): LDA
def setKeepLastCheckpoint(value: Boolean): LDA
override def fit(dataset: Dataset[_]): LDAModel
override def copy(extra: ParamMap): LDA
}sealed abstract class LDAModel private[ml] extends Model[LDAModel] with LDAParams with MLWritable {
def vocabSize: Int
def topicsMatrix: Matrix
def isDistributed: Boolean
def logLikelihood(dataset: Dataset[_]): Double
def logPerplexity(dataset: Dataset[_]): Double
def describeTopics(maxTermsPerTopic: Int = 10): DataFrame
def describeTopics(): DataFrame
override def transform(dataset: Dataset[_]): DataFrame
def write: MLWriter
}
class LocalLDAModel private[ml](override val uid: String, vocabSize: Int,
private val model: MLlibLocalLDAModel, sqlContext: SQLContext)
extends LDAModel {
override def copy(extra: ParamMap): LocalLDAModel
override def isDistributed: Boolean = false
override def topicsMatrix: Matrix
}
class DistributedLDAModel private[ml](override val uid: String, vocabSize: Int,
private val model: MLlibDistributedLDAModel, sqlContext: SQLContext,
private val graph: Option[Graph[LDA.TopicCounts, LDA.TokenCount]])
extends LDAModel {
override def copy(extra: ParamMap): DistributedLDAModel
override def isDistributed: Boolean = true
override def topicsMatrix: Matrix
def toLocal: LocalLDAModel
def trainingLogLikelihood: Double
def logPrior: Double
}class PowerIterationClustering(override val uid: String) extends Estimator[PowerIterationClusteringModel]
with PowerIterationClusteringParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("pic"))
def setK(value: Int): PowerIterationClustering
def setMaxIter(value: Int): PowerIterationClustering
def setInitMode(value: String): PowerIterationClustering
def setSrcCol(value: String): PowerIterationClustering
def setDstCol(value: String): PowerIterationClustering
def setWeightCol(value: String): PowerIterationClustering
override def fit(dataset: Dataset[_]): PowerIterationClusteringModel
override def copy(extra: ParamMap): PowerIterationClustering
}class PowerIterationClusteringModel(override val uid: String, private val assignmentsDF: DataFrame)
extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable {
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
override def copy(extra: ParamMap): PowerIterationClusteringModel
def write: MLWriter
}abstract class ClusteringSummary(predictions: DataFrame, predictionCol: String,
featuresCol: String, numIter: Int) extends Serializable {
def cluster: DataFrame
def clusterSizes: Array[Long]
def silhouette: Double
def trainingCost: Double
}import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Prepare features
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val data = assembler.transform(rawData)
// Create K-means
val kmeans = new KMeans()
.setK(3)
.setSeed(42)
.setFeaturesCol("features")
.setPredictionCol("cluster")
.setMaxIter(100)
.setTol(1E-4)
.setDistanceMeasure("euclidean")
// Train model
val model = kmeans.fit(data)
// Get cluster centers
val centers = model.clusterCenters
println("Cluster Centers:")
centers.zipWithIndex.foreach { case (center, i) =>
println(s"Cluster $i: $center")
}
// Make predictions
val predictions = model.transform(data)
predictions.select("features", "cluster").show()
// Get training summary
val summary = model.summary
println(s"Training cost: ${summary.trainingCost}")
println(s"Number of iterations: ${summary.numIter}")
println(s"Cluster sizes: ${summary.clusterSizes.mkString(", ")}")
// Evaluate clustering
val evaluator = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("cluster")
.setMetricName("silhouette")
.setDistanceMeasure("squaredEuclidean")
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance: $silhouette")import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel}
val gmm = new GaussianMixture()
.setK(3)
.setSeed(42)
.setFeaturesCol("features")
.setPredictionCol("cluster")
.setProbabilityCol("probability")
.setMaxIter(100)
.setTol(1E-4)
val gmmModel = gmm.fit(data)
// Get mixture weights and gaussians
val weights = gmmModel.weights
val gaussians = gmmModel.gaussians
println("Mixture weights:")
weights.zipWithIndex.foreach { case (weight, i) =>
println(s"Component $i: $weight")
}
println("Gaussian parameters:")
gaussians.zipWithIndex.foreach { case (gaussian, i) =>
println(s"Component $i:")
println(s" Mean: ${gaussian.mean}")
println(s" Covariance: ${gaussian.cov}")
}
// Make predictions with probabilities
val gmmPredictions = gmmModel.transform(data)
gmmPredictions.select("features", "cluster", "probability").show(truncate = false)
// Get model summary
val gmmSummary = gmmModel.summary
println(s"Log-likelihood: ${gmmSummary.logLikelihood}")
println(s"Log-likelihood history: ${gmmSummary.logLikelihoodHistory.mkString(", ")}")import org.apache.spark.ml.clustering.BisectingKMeans
val bkm = new BisectingKMeans()
.setK(4)
.setSeed(42)
.setMaxIter(20)
.setMinDivisibleClusterSize(1.0)
.setDistanceMeasure("euclidean")
val bkmModel = bkm.fit(data)
// Get cluster centers
val bkmCenters = bkmModel.clusterCenters
println("Bisecting K-Means Cluster Centers:")
bkmCenters.zipWithIndex.foreach { case (center, i) =>
println(s"Cluster $i: $center")
}
val bkmPredictions = bkmModel.transform(data)
val bkmSummary = bkmModel.summary
println(s"Bisecting K-Means training cost: ${bkmSummary.trainingCost}")import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.{CountVectorizer, Tokenizer, StopWordsRemover}
// Prepare text data for LDA
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val stopWordsRemover = new StopWordsRemover()
.setInputCol("words")
.setOutputCol("filtered_words")
val countVectorizer = new CountVectorizer()
.setInputCol("filtered_words")
.setOutputCol("features")
.setVocabSize(10000)
.setMinDF(2)
// Process text data
val tokenized = tokenizer.transform(textData)
val filtered = stopWordsRemover.transform(tokenized)
val vectorized = countVectorizer.fit(filtered).transform(filtered)
// Train LDA model
val lda = new LDA()
.setK(10) // Number of topics
.setMaxIter(100)
.setSeed(42)
.setFeaturesCol("features")
.setOptimizer("online") // or "em"
.setLearningDecay(0.51)
.setLearningOffset(1024)
.setSubsamplingRate(0.05)
.setOptimizeDocConcentration(true)
val ldaModel = lda.fit(vectorized)
// Describe topics
val topics = ldaModel.describeTopics(maxTermsPerTopic = 10)
topics.show(truncate = false)
// Get document-topic distributions
val docTopics = ldaModel.transform(vectorized)
docTopics.select("features", "topicDistribution").show(truncate = false)
// Model evaluation
val ll = ldaModel.logLikelihood(vectorized)
val lp = ldaModel.logPerplexity(vectorized)
println(s"Log likelihood: $ll")
println(s"Log perplexity: $lp")
// Convert to local model if distributed
if (ldaModel.isDistributed) {
val localModel = ldaModel.asInstanceOf[org.apache.spark.ml.clustering.DistributedLDAModel].toLocal
// Use local model for inference on new data
}import org.apache.spark.ml.clustering.PowerIterationClustering
// Create graph data (source, destination, weight)
val edges = spark.createDataFrame(Seq(
(0L, 1L, 0.9),
(1L, 2L, 0.8),
(2L, 3L, 0.7),
(3L, 4L, 0.6),
(4L, 0L, 0.5)
)).toDF("src", "dst", "weight")
val pic = new PowerIterationClustering()
.setK(2)
.setMaxIter(20)
.setSrcCol("src")
.setDstCol("dst")
.setWeightCol("weight")
val picModel = pic.fit(edges)
val picResults = picModel.transform(edges)
picResults.select("src", "cluster").distinct().show()import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StandardScaler, PCA}
import org.apache.spark.ml.clustering.KMeans
// Create preprocessing pipeline
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithMean(true)
.setWithStd(true)
val pca = new PCA()
.setInputCol("scaledFeatures")
.setOutputCol("pcaFeatures")
.setK(5)
val kmeans = new KMeans()
.setFeaturesCol("pcaFeatures")
.setPredictionCol("cluster")
.setK(4)
.setSeed(42)
// Create and fit pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler, pca, kmeans))
val pipelineModel = pipeline.fit(data)
val clusteringResults = pipelineModel.transform(data)
clusteringResults.select("pcaFeatures", "cluster").show()
// Extract final K-means model for analysis
val finalKMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
println(s"Final cluster centers: ${finalKMeansModel.clusterCenters.mkString("\n")}")import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.ClusteringEvaluator
val kmeans = new KMeans()
.setSeed(42)
.setFeaturesCol("features")
// Build parameter grid
val paramGrid = new ParamGridBuilder()
.addGrid(kmeans.k, Array(2, 3, 4, 5, 6))
.addGrid(kmeans.maxIter, Array(50, 100, 200))
.build()
// Create evaluator
val evaluator = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("prediction")
.setMetricName("silhouette")
// Create train-validation split
val tvs = new TrainValidationSplit()
.setEstimator(kmeans)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
.setSeed(42)
// Train with validation
val tvsModel = tvs.fit(data)
// Get best model
val bestKMeansModel = tvsModel.bestModel.asInstanceOf[KMeansModel]
println(s"Best K: ${bestKMeansModel.getK}")
println(s"Best silhouette score: ${evaluator.evaluate(tvsModel.transform(data))}")