0
# Clustering
1
2
MLlib provides comprehensive unsupervised learning algorithms for discovering patterns and groupings in data. All clustering algorithms follow the Estimator-Transformer pattern and support the Pipeline API.
3
4
## K-Means Clustering
5
6
### Estimator
7
8
```scala { .api }
9
class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams with DefaultParamsWritable {
10
def this() = this(Identifiable.randomUID("kmeans"))
11
12
def setK(value: Int): KMeans
13
def setInitMode(value: String): KMeans
14
def setInitSteps(value: Int): KMeans
15
def setMaxIter(value: Int): KMeans
16
def setTol(value: Double): KMeans
17
def setSeed(value: Long): KMeans
18
def setDistanceMeasure(value: String): KMeans
19
20
override def fit(dataset: Dataset[_]): KMeansModel
21
override def copy(extra: ParamMap): KMeans
22
}
23
```
24
25
### Model
26
27
```scala { .api }
28
class KMeansModel private[ml] (
29
override val uid: String,
30
private[clustering] val parentModel: MLlibKMeansModel)
31
extends Model[KMeansModel] with KMeansParams with GeneralMLWritable {
32
33
def clusterCenters: Array[Vector]
34
lazy val summary: KMeansSummary
35
def hasSummary: Boolean
36
37
override def transform(dataset: Dataset[_]): DataFrame
38
39
@deprecated("This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator " +
40
"instead. You can also get the cost on the training dataset in the summary.", "2.4.0")
41
def computeCost(dataset: Dataset[_]): Double
42
43
override def copy(extra: ParamMap): KMeansModel
44
def write: MLWriter
45
}
46
47
class KMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,
48
featuresCol: String, val k: Int, val numIter: Int,
49
val trainingCost: Double) extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
50
51
def cluster: DataFrame = predictions.select(predictionCol)
52
def clusterSizes: Array[Long]
53
}
54
```
55
56
## Bisecting K-Means
57
58
### Estimator
59
60
```scala { .api }
61
class BisectingKMeans(override val uid: String) extends Estimator[BisectingKMeansModel]
62
with BisectingKMeansParams with DefaultParamsWritable {
63
64
def this() = this(Identifiable.randomUID("bisecting_kmeans"))
65
66
def setK(value: Int): BisectingKMeans
67
def setMaxIter(value: Int): BisectingKMeans
68
def setSeed(value: Long): BisectingKMeans
69
def setMinDivisibleClusterSize(value: Double): BisectingKMeans
70
def setDistanceMeasure(value: String): BisectingKMeans
71
72
override def fit(dataset: Dataset[_]): BisectingKMeansModel
73
override def copy(extra: ParamMap): BisectingKMeans
74
}
75
```
76
77
### Model
78
79
```scala { .api }
80
class BisectingKMeansModel(override val uid: String, private val parentModel: MLlibBisectingKMeansModel)
81
extends Model[BisectingKMeansModel] with BisectingKMeansParams with MLWritable {
82
83
def clusterCenters: Array[Vector]
84
lazy val summary: BisectingKMeansSummary
85
def hasSummary: Boolean
86
87
override def transform(dataset: Dataset[_]): DataFrame
88
def computeCost(dataset: Dataset[_]): Double
89
override def copy(extra: ParamMap): BisectingKMeansModel
90
def write: MLWriter
91
}
92
93
class BisectingKMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,
94
featuresCol: String, val k: Int, val numIter: Int,
95
val trainingCost: Double)
96
extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
97
98
def cluster: DataFrame = predictions.select(predictionCol)
99
def clusterSizes: Array[Long]
100
}
101
```
102
103
## Gaussian Mixture Model
104
105
### Estimator
106
107
```scala { .api }
108
class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel]
109
with GaussianMixtureParams with DefaultParamsWritable {
110
111
def this() = this(Identifiable.randomUID("GaussianMixture"))
112
113
def setK(value: Int): GaussianMixture
114
def setMaxIter(value: Int): GaussianMixture
115
def setTol(value: Double): GaussianMixture
116
def setSeed(value: Long): GaussianMixture
117
def setAggregationDepth(value: Int): GaussianMixture
118
119
override def fit(dataset: Dataset[_]): GaussianMixtureModel
120
override def copy(extra: ParamMap): GaussianMixture
121
}
122
```
123
124
### Model
125
126
```scala { .api }
127
class GaussianMixtureModel(override val uid: String, private val parentModel: MLlibGaussianMixtureModel)
128
extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable {
129
130
def weights: Array[Double]
131
def gaussians: Array[MultivariateGaussian]
132
lazy val summary: GaussianMixtureSummary
133
def hasSummary: Boolean
134
135
override def transform(dataset: Dataset[_]): DataFrame
136
def predictProbability(features: Vector): Vector
137
def logLikelihood(dataset: Dataset[_]): Double
138
override def copy(extra: ParamMap): GaussianMixtureModel
139
def write: MLWriter
140
}
141
142
class GaussianMixtureSummary private[clustering](predictions: DataFrame, predictionCol: String,
143
featuresCol: String, val k: Int, val numIter: Int,
144
val logLikelihood: Double, val logLikelihoodHistory: Array[Double])
145
extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {
146
147
def cluster: DataFrame = predictions.select(predictionCol)
148
def probability: DataFrame = predictions.select(probabilityCol)
149
def probabilityCol: String
150
def clusterSizes: Array[Long]
151
}
152
```
153
154
## Latent Dirichlet Allocation (LDA)
155
156
### Estimator
157
158
```scala { .api }
159
class LDA(override val uid: String) extends Estimator[LDAModel] with LDAParams with DefaultParamsWritable {
160
def this() = this(Identifiable.randomUID("lda"))
161
162
def setK(value: Int): LDA
163
def setMaxIter(value: Int): LDA
164
def setDocConcentration(value: Array[Double]): LDA
165
def setTopicConcentration(value: Double): LDA
166
def setSeed(value: Long): LDA
167
def setCheckpointInterval(value: Int): LDA
168
def setOptimizer(value: String): LDA
169
def setLearningOffset(value: Double): LDA
170
def setLearningDecay(value: Double): LDA
171
def setSubsamplingRate(value: Double): LDA
172
def setOptimizeDocConcentration(value: Boolean): LDA
173
def setKeepLastCheckpoint(value: Boolean): LDA
174
175
override def fit(dataset: Dataset[_]): LDAModel
176
override def copy(extra: ParamMap): LDA
177
}
178
```
179
180
### Models
181
182
```scala { .api }
183
sealed abstract class LDAModel private[ml] extends Model[LDAModel] with LDAParams with MLWritable {
184
def vocabSize: Int
185
def topicsMatrix: Matrix
186
def isDistributed: Boolean
187
def logLikelihood(dataset: Dataset[_]): Double
188
def logPerplexity(dataset: Dataset[_]): Double
189
def describeTopics(maxTermsPerTopic: Int = 10): DataFrame
190
def describeTopics(): DataFrame
191
override def transform(dataset: Dataset[_]): DataFrame
192
def write: MLWriter
193
}
194
195
class LocalLDAModel private[ml](override val uid: String, vocabSize: Int,
196
private val model: MLlibLocalLDAModel, sqlContext: SQLContext)
197
extends LDAModel {
198
199
override def copy(extra: ParamMap): LocalLDAModel
200
override def isDistributed: Boolean = false
201
override def topicsMatrix: Matrix
202
}
203
204
class DistributedLDAModel private[ml](override val uid: String, vocabSize: Int,
205
private val model: MLlibDistributedLDAModel, sqlContext: SQLContext,
206
private val graph: Option[Graph[LDA.TopicCounts, LDA.TokenCount]])
207
extends LDAModel {
208
209
override def copy(extra: ParamMap): DistributedLDAModel
210
override def isDistributed: Boolean = true
211
override def topicsMatrix: Matrix
212
def toLocal: LocalLDAModel
213
def trainingLogLikelihood: Double
214
def logPrior: Double
215
}
216
```
217
218
## Power Iteration Clustering
219
220
### Estimator
221
222
```scala { .api }
223
class PowerIterationClustering(override val uid: String) extends Estimator[PowerIterationClusteringModel]
224
with PowerIterationClusteringParams with DefaultParamsWritable {
225
226
def this() = this(Identifiable.randomUID("pic"))
227
228
def setK(value: Int): PowerIterationClustering
229
def setMaxIter(value: Int): PowerIterationClustering
230
def setInitMode(value: String): PowerIterationClustering
231
def setSrcCol(value: String): PowerIterationClustering
232
def setDstCol(value: String): PowerIterationClustering
233
def setWeightCol(value: String): PowerIterationClustering
234
235
override def fit(dataset: Dataset[_]): PowerIterationClusteringModel
236
override def copy(extra: ParamMap): PowerIterationClustering
237
}
238
```
239
240
### Model
241
242
```scala { .api }
243
class PowerIterationClusteringModel(override val uid: String, private val assignmentsDF: DataFrame)
244
extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable {
245
246
override def transform(dataset: Dataset[_]): DataFrame
247
override def transformSchema(schema: StructType): StructType
248
override def copy(extra: ParamMap): PowerIterationClusteringModel
249
def write: MLWriter
250
}
251
```
252
253
## Base Clustering Summary
254
255
```scala { .api }
256
abstract class ClusteringSummary(predictions: DataFrame, predictionCol: String,
257
featuresCol: String, numIter: Int) extends Serializable {
258
259
def cluster: DataFrame
260
def clusterSizes: Array[Long]
261
def silhouette: Double
262
def trainingCost: Double
263
}
264
```
265
266
## Usage Examples
267
268
### K-Means Clustering
269
270
```scala
271
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
272
import org.apache.spark.ml.feature.VectorAssembler
273
import org.apache.spark.ml.evaluation.ClusteringEvaluator
274
275
// Prepare features
276
val assembler = new VectorAssembler()
277
.setInputCols(Array("feature1", "feature2", "feature3"))
278
.setOutputCol("features")
279
280
val data = assembler.transform(rawData)
281
282
// Create K-means
283
val kmeans = new KMeans()
284
.setK(3)
285
.setSeed(42)
286
.setFeaturesCol("features")
287
.setPredictionCol("cluster")
288
.setMaxIter(100)
289
.setTol(1E-4)
290
.setDistanceMeasure("euclidean")
291
292
// Train model
293
val model = kmeans.fit(data)
294
295
// Get cluster centers
296
val centers = model.clusterCenters
297
println("Cluster Centers:")
298
centers.zipWithIndex.foreach { case (center, i) =>
299
println(s"Cluster $i: $center")
300
}
301
302
// Make predictions
303
val predictions = model.transform(data)
304
predictions.select("features", "cluster").show()
305
306
// Get training summary
307
val summary = model.summary
308
println(s"Training cost: ${summary.trainingCost}")
309
println(s"Number of iterations: ${summary.numIter}")
310
println(s"Cluster sizes: ${summary.clusterSizes.mkString(", ")}")
311
312
// Evaluate clustering
313
val evaluator = new ClusteringEvaluator()
314
.setFeaturesCol("features")
315
.setPredictionCol("cluster")
316
.setMetricName("silhouette")
317
.setDistanceMeasure("squaredEuclidean")
318
319
val silhouette = evaluator.evaluate(predictions)
320
println(s"Silhouette with squared euclidean distance: $silhouette")
321
```
322
323
### Gaussian Mixture Model
324
325
```scala
326
import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel}
327
328
val gmm = new GaussianMixture()
329
.setK(3)
330
.setSeed(42)
331
.setFeaturesCol("features")
332
.setPredictionCol("cluster")
333
.setProbabilityCol("probability")
334
.setMaxIter(100)
335
.setTol(1E-4)
336
337
val gmmModel = gmm.fit(data)
338
339
// Get mixture weights and gaussians
340
val weights = gmmModel.weights
341
val gaussians = gmmModel.gaussians
342
343
println("Mixture weights:")
344
weights.zipWithIndex.foreach { case (weight, i) =>
345
println(s"Component $i: $weight")
346
}
347
348
println("Gaussian parameters:")
349
gaussians.zipWithIndex.foreach { case (gaussian, i) =>
350
println(s"Component $i:")
351
println(s" Mean: ${gaussian.mean}")
352
println(s" Covariance: ${gaussian.cov}")
353
}
354
355
// Make predictions with probabilities
356
val gmmPredictions = gmmModel.transform(data)
357
gmmPredictions.select("features", "cluster", "probability").show(truncate = false)
358
359
// Get model summary
360
val gmmSummary = gmmModel.summary
361
println(s"Log-likelihood: ${gmmSummary.logLikelihood}")
362
println(s"Log-likelihood history: ${gmmSummary.logLikelihoodHistory.mkString(", ")}")
363
```
364
365
### Bisecting K-Means
366
367
```scala
368
import org.apache.spark.ml.clustering.BisectingKMeans
369
370
val bkm = new BisectingKMeans()
371
.setK(4)
372
.setSeed(42)
373
.setMaxIter(20)
374
.setMinDivisibleClusterSize(1.0)
375
.setDistanceMeasure("euclidean")
376
377
val bkmModel = bkm.fit(data)
378
379
// Get cluster centers
380
val bkmCenters = bkmModel.clusterCenters
381
println("Bisecting K-Means Cluster Centers:")
382
bkmCenters.zipWithIndex.foreach { case (center, i) =>
383
println(s"Cluster $i: $center")
384
}
385
386
val bkmPredictions = bkmModel.transform(data)
387
val bkmSummary = bkmModel.summary
388
println(s"Bisecting K-Means training cost: ${bkmSummary.trainingCost}")
389
```
390
391
### Latent Dirichlet Allocation (Topic Modeling)
392
393
```scala
394
import org.apache.spark.ml.clustering.LDA
395
import org.apache.spark.ml.feature.{CountVectorizer, Tokenizer, StopWordsRemover}
396
397
// Prepare text data for LDA
398
val tokenizer = new Tokenizer()
399
.setInputCol("text")
400
.setOutputCol("words")
401
402
val stopWordsRemover = new StopWordsRemover()
403
.setInputCol("words")
404
.setOutputCol("filtered_words")
405
406
val countVectorizer = new CountVectorizer()
407
.setInputCol("filtered_words")
408
.setOutputCol("features")
409
.setVocabSize(10000)
410
.setMinDF(2)
411
412
// Process text data
413
val tokenized = tokenizer.transform(textData)
414
val filtered = stopWordsRemover.transform(tokenized)
415
val vectorized = countVectorizer.fit(filtered).transform(filtered)
416
417
// Train LDA model
418
val lda = new LDA()
419
.setK(10) // Number of topics
420
.setMaxIter(100)
421
.setSeed(42)
422
.setFeaturesCol("features")
423
.setOptimizer("online") // or "em"
424
.setLearningDecay(0.51)
425
.setLearningOffset(1024)
426
.setSubsamplingRate(0.05)
427
.setOptimizeDocConcentration(true)
428
429
val ldaModel = lda.fit(vectorized)
430
431
// Describe topics
432
val topics = ldaModel.describeTopics(maxTermsPerTopic = 10)
433
topics.show(truncate = false)
434
435
// Get document-topic distributions
436
val docTopics = ldaModel.transform(vectorized)
437
docTopics.select("features", "topicDistribution").show(truncate = false)
438
439
// Model evaluation
440
val ll = ldaModel.logLikelihood(vectorized)
441
val lp = ldaModel.logPerplexity(vectorized)
442
println(s"Log likelihood: $ll")
443
println(s"Log perplexity: $lp")
444
445
// Convert to local model if distributed
446
if (ldaModel.isDistributed) {
447
val localModel = ldaModel.asInstanceOf[org.apache.spark.ml.clustering.DistributedLDAModel].toLocal
448
// Use local model for inference on new data
449
}
450
```
451
452
### Power Iteration Clustering
453
454
```scala
455
import org.apache.spark.ml.clustering.PowerIterationClustering
456
457
// Create graph data (source, destination, weight)
458
val edges = spark.createDataFrame(Seq(
459
(0L, 1L, 0.9),
460
(1L, 2L, 0.8),
461
(2L, 3L, 0.7),
462
(3L, 4L, 0.6),
463
(4L, 0L, 0.5)
464
)).toDF("src", "dst", "weight")
465
466
val pic = new PowerIterationClustering()
467
.setK(2)
468
.setMaxIter(20)
469
.setSrcCol("src")
470
.setDstCol("dst")
471
.setWeightCol("weight")
472
473
val picModel = pic.fit(edges)
474
val picResults = picModel.transform(edges)
475
picResults.select("src", "cluster").distinct().show()
476
```
477
478
### Clustering Pipeline with Feature Processing
479
480
```scala
481
import org.apache.spark.ml.Pipeline
482
import org.apache.spark.ml.feature.{StandardScaler, PCA}
483
import org.apache.spark.ml.clustering.KMeans
484
485
// Create preprocessing pipeline
486
val scaler = new StandardScaler()
487
.setInputCol("features")
488
.setOutputCol("scaledFeatures")
489
.setWithMean(true)
490
.setWithStd(true)
491
492
val pca = new PCA()
493
.setInputCol("scaledFeatures")
494
.setOutputCol("pcaFeatures")
495
.setK(5)
496
497
val kmeans = new KMeans()
498
.setFeaturesCol("pcaFeatures")
499
.setPredictionCol("cluster")
500
.setK(4)
501
.setSeed(42)
502
503
// Create and fit pipeline
504
val pipeline = new Pipeline()
505
.setStages(Array(assembler, scaler, pca, kmeans))
506
507
val pipelineModel = pipeline.fit(data)
508
val clusteringResults = pipelineModel.transform(data)
509
510
clusteringResults.select("pcaFeatures", "cluster").show()
511
512
// Extract final K-means model for analysis
513
val finalKMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
514
println(s"Final cluster centers: ${finalKMeansModel.clusterCenters.mkString("\n")}")
515
```
516
517
### Hyperparameter Tuning for Clustering
518
519
```scala
520
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
521
import org.apache.spark.ml.evaluation.ClusteringEvaluator
522
523
val kmeans = new KMeans()
524
.setSeed(42)
525
.setFeaturesCol("features")
526
527
// Build parameter grid
528
val paramGrid = new ParamGridBuilder()
529
.addGrid(kmeans.k, Array(2, 3, 4, 5, 6))
530
.addGrid(kmeans.maxIter, Array(50, 100, 200))
531
.build()
532
533
// Create evaluator
534
val evaluator = new ClusteringEvaluator()
535
.setFeaturesCol("features")
536
.setPredictionCol("prediction")
537
.setMetricName("silhouette")
538
539
// Create train-validation split
540
val tvs = new TrainValidationSplit()
541
.setEstimator(kmeans)
542
.setEvaluator(evaluator)
543
.setEstimatorParamMaps(paramGrid)
544
.setTrainRatio(0.8)
545
.setSeed(42)
546
547
// Train with validation
548
val tvsModel = tvs.fit(data)
549
550
// Get best model
551
val bestKMeansModel = tvsModel.bestModel.asInstanceOf[KMeansModel]
552
println(s"Best K: ${bestKMeansModel.getK}")
553
println(s"Best silhouette score: ${evaluator.evaluate(tvsModel.transform(data))}")
554
```