or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md

clustering.mddocs/

0

# Clustering

1

2

MLlib provides comprehensive unsupervised learning algorithms for discovering patterns and groupings in data. All clustering algorithms follow the Estimator-Transformer pattern and support the Pipeline API.

3

4

## K-Means Clustering

5

6

### Estimator

7

8

```scala { .api }

9

class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams with DefaultParamsWritable {

10

def this() = this(Identifiable.randomUID("kmeans"))

11

12

def setK(value: Int): KMeans

13

def setInitMode(value: String): KMeans

14

def setInitSteps(value: Int): KMeans

15

def setMaxIter(value: Int): KMeans

16

def setTol(value: Double): KMeans

17

def setSeed(value: Long): KMeans

18

def setDistanceMeasure(value: String): KMeans

19

20

override def fit(dataset: Dataset[_]): KMeansModel

21

override def copy(extra: ParamMap): KMeans

22

}

23

```

24

25

### Model

26

27

```scala { .api }

28

class KMeansModel private[ml] (

29

override val uid: String,

30

private[clustering] val parentModel: MLlibKMeansModel)

31

extends Model[KMeansModel] with KMeansParams with GeneralMLWritable {

32

33

def clusterCenters: Array[Vector]

34

lazy val summary: KMeansSummary

35

def hasSummary: Boolean

36

37

override def transform(dataset: Dataset[_]): DataFrame

38

39

@deprecated("This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator " +

40

"instead. You can also get the cost on the training dataset in the summary.", "2.4.0")

41

def computeCost(dataset: Dataset[_]): Double

42

43

override def copy(extra: ParamMap): KMeansModel

44

def write: MLWriter

45

}

46

47

class KMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,

48

featuresCol: String, val k: Int, val numIter: Int,

49

val trainingCost: Double) extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {

50

51

def cluster: DataFrame = predictions.select(predictionCol)

52

def clusterSizes: Array[Long]

53

}

54

```

55

56

## Bisecting K-Means

57

58

### Estimator

59

60

```scala { .api }

61

class BisectingKMeans(override val uid: String) extends Estimator[BisectingKMeansModel]

62

with BisectingKMeansParams with DefaultParamsWritable {

63

64

def this() = this(Identifiable.randomUID("bisecting_kmeans"))

65

66

def setK(value: Int): BisectingKMeans

67

def setMaxIter(value: Int): BisectingKMeans

68

def setSeed(value: Long): BisectingKMeans

69

def setMinDivisibleClusterSize(value: Double): BisectingKMeans

70

def setDistanceMeasure(value: String): BisectingKMeans

71

72

override def fit(dataset: Dataset[_]): BisectingKMeansModel

73

override def copy(extra: ParamMap): BisectingKMeans

74

}

75

```

76

77

### Model

78

79

```scala { .api }

80

class BisectingKMeansModel(override val uid: String, private val parentModel: MLlibBisectingKMeansModel)

81

extends Model[BisectingKMeansModel] with BisectingKMeansParams with MLWritable {

82

83

def clusterCenters: Array[Vector]

84

lazy val summary: BisectingKMeansSummary

85

def hasSummary: Boolean

86

87

override def transform(dataset: Dataset[_]): DataFrame

88

def computeCost(dataset: Dataset[_]): Double

89

override def copy(extra: ParamMap): BisectingKMeansModel

90

def write: MLWriter

91

}

92

93

class BisectingKMeansSummary private[clustering](predictions: DataFrame, predictionCol: String,

94

featuresCol: String, val k: Int, val numIter: Int,

95

val trainingCost: Double)

96

extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {

97

98

def cluster: DataFrame = predictions.select(predictionCol)

99

def clusterSizes: Array[Long]

100

}

101

```

102

103

## Gaussian Mixture Model

104

105

### Estimator

106

107

```scala { .api }

108

class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel]

109

with GaussianMixtureParams with DefaultParamsWritable {

110

111

def this() = this(Identifiable.randomUID("GaussianMixture"))

112

113

def setK(value: Int): GaussianMixture

114

def setMaxIter(value: Int): GaussianMixture

115

def setTol(value: Double): GaussianMixture

116

def setSeed(value: Long): GaussianMixture

117

def setAggregationDepth(value: Int): GaussianMixture

118

119

override def fit(dataset: Dataset[_]): GaussianMixtureModel

120

override def copy(extra: ParamMap): GaussianMixture

121

}

122

```

123

124

### Model

125

126

```scala { .api }

127

class GaussianMixtureModel(override val uid: String, private val parentModel: MLlibGaussianMixtureModel)

128

extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable {

129

130

def weights: Array[Double]

131

def gaussians: Array[MultivariateGaussian]

132

lazy val summary: GaussianMixtureSummary

133

def hasSummary: Boolean

134

135

override def transform(dataset: Dataset[_]): DataFrame

136

def predictProbability(features: Vector): Vector

137

def logLikelihood(dataset: Dataset[_]): Double

138

override def copy(extra: ParamMap): GaussianMixtureModel

139

def write: MLWriter

140

}

141

142

class GaussianMixtureSummary private[clustering](predictions: DataFrame, predictionCol: String,

143

featuresCol: String, val k: Int, val numIter: Int,

144

val logLikelihood: Double, val logLikelihoodHistory: Array[Double])

145

extends ClusteringSummary(predictions, predictionCol, featuresCol, numIter) {

146

147

def cluster: DataFrame = predictions.select(predictionCol)

148

def probability: DataFrame = predictions.select(probabilityCol)

149

def probabilityCol: String

150

def clusterSizes: Array[Long]

151

}

152

```

153

154

## Latent Dirichlet Allocation (LDA)

155

156

### Estimator

157

158

```scala { .api }

159

class LDA(override val uid: String) extends Estimator[LDAModel] with LDAParams with DefaultParamsWritable {

160

def this() = this(Identifiable.randomUID("lda"))

161

162

def setK(value: Int): LDA

163

def setMaxIter(value: Int): LDA

164

def setDocConcentration(value: Array[Double]): LDA

165

def setTopicConcentration(value: Double): LDA

166

def setSeed(value: Long): LDA

167

def setCheckpointInterval(value: Int): LDA

168

def setOptimizer(value: String): LDA

169

def setLearningOffset(value: Double): LDA

170

def setLearningDecay(value: Double): LDA

171

def setSubsamplingRate(value: Double): LDA

172

def setOptimizeDocConcentration(value: Boolean): LDA

173

def setKeepLastCheckpoint(value: Boolean): LDA

174

175

override def fit(dataset: Dataset[_]): LDAModel

176

override def copy(extra: ParamMap): LDA

177

}

178

```

179

180

### Models

181

182

```scala { .api }

183

sealed abstract class LDAModel private[ml] extends Model[LDAModel] with LDAParams with MLWritable {

184

def vocabSize: Int

185

def topicsMatrix: Matrix

186

def isDistributed: Boolean

187

def logLikelihood(dataset: Dataset[_]): Double

188

def logPerplexity(dataset: Dataset[_]): Double

189

def describeTopics(maxTermsPerTopic: Int = 10): DataFrame

190

def describeTopics(): DataFrame

191

override def transform(dataset: Dataset[_]): DataFrame

192

def write: MLWriter

193

}

194

195

class LocalLDAModel private[ml](override val uid: String, vocabSize: Int,

196

private val model: MLlibLocalLDAModel, sqlContext: SQLContext)

197

extends LDAModel {

198

199

override def copy(extra: ParamMap): LocalLDAModel

200

override def isDistributed: Boolean = false

201

override def topicsMatrix: Matrix

202

}

203

204

class DistributedLDAModel private[ml](override val uid: String, vocabSize: Int,

205

private val model: MLlibDistributedLDAModel, sqlContext: SQLContext,

206

private val graph: Option[Graph[LDA.TopicCounts, LDA.TokenCount]])

207

extends LDAModel {

208

209

override def copy(extra: ParamMap): DistributedLDAModel

210

override def isDistributed: Boolean = true

211

override def topicsMatrix: Matrix

212

def toLocal: LocalLDAModel

213

def trainingLogLikelihood: Double

214

def logPrior: Double

215

}

216

```

217

218

## Power Iteration Clustering

219

220

### Estimator

221

222

```scala { .api }

223

class PowerIterationClustering(override val uid: String) extends Estimator[PowerIterationClusteringModel]

224

with PowerIterationClusteringParams with DefaultParamsWritable {

225

226

def this() = this(Identifiable.randomUID("pic"))

227

228

def setK(value: Int): PowerIterationClustering

229

def setMaxIter(value: Int): PowerIterationClustering

230

def setInitMode(value: String): PowerIterationClustering

231

def setSrcCol(value: String): PowerIterationClustering

232

def setDstCol(value: String): PowerIterationClustering

233

def setWeightCol(value: String): PowerIterationClustering

234

235

override def fit(dataset: Dataset[_]): PowerIterationClusteringModel

236

override def copy(extra: ParamMap): PowerIterationClustering

237

}

238

```

239

240

### Model

241

242

```scala { .api }

243

class PowerIterationClusteringModel(override val uid: String, private val assignmentsDF: DataFrame)

244

extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable {

245

246

override def transform(dataset: Dataset[_]): DataFrame

247

override def transformSchema(schema: StructType): StructType

248

override def copy(extra: ParamMap): PowerIterationClusteringModel

249

def write: MLWriter

250

}

251

```

252

253

## Base Clustering Summary

254

255

```scala { .api }

256

abstract class ClusteringSummary(predictions: DataFrame, predictionCol: String,

257

featuresCol: String, numIter: Int) extends Serializable {

258

259

def cluster: DataFrame

260

def clusterSizes: Array[Long]

261

def silhouette: Double

262

def trainingCost: Double

263

}

264

```

265

266

## Usage Examples

267

268

### K-Means Clustering

269

270

```scala

271

import org.apache.spark.ml.clustering.{KMeans, KMeansModel}

272

import org.apache.spark.ml.feature.VectorAssembler

273

import org.apache.spark.ml.evaluation.ClusteringEvaluator

274

275

// Prepare features

276

val assembler = new VectorAssembler()

277

.setInputCols(Array("feature1", "feature2", "feature3"))

278

.setOutputCol("features")

279

280

val data = assembler.transform(rawData)

281

282

// Create K-means

283

val kmeans = new KMeans()

284

.setK(3)

285

.setSeed(42)

286

.setFeaturesCol("features")

287

.setPredictionCol("cluster")

288

.setMaxIter(100)

289

.setTol(1E-4)

290

.setDistanceMeasure("euclidean")

291

292

// Train model

293

val model = kmeans.fit(data)

294

295

// Get cluster centers

296

val centers = model.clusterCenters

297

println("Cluster Centers:")

298

centers.zipWithIndex.foreach { case (center, i) =>

299

println(s"Cluster $i: $center")

300

}

301

302

// Make predictions

303

val predictions = model.transform(data)

304

predictions.select("features", "cluster").show()

305

306

// Get training summary

307

val summary = model.summary

308

println(s"Training cost: ${summary.trainingCost}")

309

println(s"Number of iterations: ${summary.numIter}")

310

println(s"Cluster sizes: ${summary.clusterSizes.mkString(", ")}")

311

312

// Evaluate clustering

313

val evaluator = new ClusteringEvaluator()

314

.setFeaturesCol("features")

315

.setPredictionCol("cluster")

316

.setMetricName("silhouette")

317

.setDistanceMeasure("squaredEuclidean")

318

319

val silhouette = evaluator.evaluate(predictions)

320

println(s"Silhouette with squared euclidean distance: $silhouette")

321

```

322

323

### Gaussian Mixture Model

324

325

```scala

326

import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel}

327

328

val gmm = new GaussianMixture()

329

.setK(3)

330

.setSeed(42)

331

.setFeaturesCol("features")

332

.setPredictionCol("cluster")

333

.setProbabilityCol("probability")

334

.setMaxIter(100)

335

.setTol(1E-4)

336

337

val gmmModel = gmm.fit(data)

338

339

// Get mixture weights and gaussians

340

val weights = gmmModel.weights

341

val gaussians = gmmModel.gaussians

342

343

println("Mixture weights:")

344

weights.zipWithIndex.foreach { case (weight, i) =>

345

println(s"Component $i: $weight")

346

}

347

348

println("Gaussian parameters:")

349

gaussians.zipWithIndex.foreach { case (gaussian, i) =>

350

println(s"Component $i:")

351

println(s" Mean: ${gaussian.mean}")

352

println(s" Covariance: ${gaussian.cov}")

353

}

354

355

// Make predictions with probabilities

356

val gmmPredictions = gmmModel.transform(data)

357

gmmPredictions.select("features", "cluster", "probability").show(truncate = false)

358

359

// Get model summary

360

val gmmSummary = gmmModel.summary

361

println(s"Log-likelihood: ${gmmSummary.logLikelihood}")

362

println(s"Log-likelihood history: ${gmmSummary.logLikelihoodHistory.mkString(", ")}")

363

```

364

365

### Bisecting K-Means

366

367

```scala

368

import org.apache.spark.ml.clustering.BisectingKMeans

369

370

val bkm = new BisectingKMeans()

371

.setK(4)

372

.setSeed(42)

373

.setMaxIter(20)

374

.setMinDivisibleClusterSize(1.0)

375

.setDistanceMeasure("euclidean")

376

377

val bkmModel = bkm.fit(data)

378

379

// Get cluster centers

380

val bkmCenters = bkmModel.clusterCenters

381

println("Bisecting K-Means Cluster Centers:")

382

bkmCenters.zipWithIndex.foreach { case (center, i) =>

383

println(s"Cluster $i: $center")

384

}

385

386

val bkmPredictions = bkmModel.transform(data)

387

val bkmSummary = bkmModel.summary

388

println(s"Bisecting K-Means training cost: ${bkmSummary.trainingCost}")

389

```

390

391

### Latent Dirichlet Allocation (Topic Modeling)

392

393

```scala

394

import org.apache.spark.ml.clustering.LDA

395

import org.apache.spark.ml.feature.{CountVectorizer, Tokenizer, StopWordsRemover}

396

397

// Prepare text data for LDA

398

val tokenizer = new Tokenizer()

399

.setInputCol("text")

400

.setOutputCol("words")

401

402

val stopWordsRemover = new StopWordsRemover()

403

.setInputCol("words")

404

.setOutputCol("filtered_words")

405

406

val countVectorizer = new CountVectorizer()

407

.setInputCol("filtered_words")

408

.setOutputCol("features")

409

.setVocabSize(10000)

410

.setMinDF(2)

411

412

// Process text data

413

val tokenized = tokenizer.transform(textData)

414

val filtered = stopWordsRemover.transform(tokenized)

415

val vectorized = countVectorizer.fit(filtered).transform(filtered)

416

417

// Train LDA model

418

val lda = new LDA()

419

.setK(10) // Number of topics

420

.setMaxIter(100)

421

.setSeed(42)

422

.setFeaturesCol("features")

423

.setOptimizer("online") // or "em"

424

.setLearningDecay(0.51)

425

.setLearningOffset(1024)

426

.setSubsamplingRate(0.05)

427

.setOptimizeDocConcentration(true)

428

429

val ldaModel = lda.fit(vectorized)

430

431

// Describe topics

432

val topics = ldaModel.describeTopics(maxTermsPerTopic = 10)

433

topics.show(truncate = false)

434

435

// Get document-topic distributions

436

val docTopics = ldaModel.transform(vectorized)

437

docTopics.select("features", "topicDistribution").show(truncate = false)

438

439

// Model evaluation

440

val ll = ldaModel.logLikelihood(vectorized)

441

val lp = ldaModel.logPerplexity(vectorized)

442

println(s"Log likelihood: $ll")

443

println(s"Log perplexity: $lp")

444

445

// Convert to local model if distributed

446

if (ldaModel.isDistributed) {

447

val localModel = ldaModel.asInstanceOf[org.apache.spark.ml.clustering.DistributedLDAModel].toLocal

448

// Use local model for inference on new data

449

}

450

```

451

452

### Power Iteration Clustering

453

454

```scala

455

import org.apache.spark.ml.clustering.PowerIterationClustering

456

457

// Create graph data (source, destination, weight)

458

val edges = spark.createDataFrame(Seq(

459

(0L, 1L, 0.9),

460

(1L, 2L, 0.8),

461

(2L, 3L, 0.7),

462

(3L, 4L, 0.6),

463

(4L, 0L, 0.5)

464

)).toDF("src", "dst", "weight")

465

466

val pic = new PowerIterationClustering()

467

.setK(2)

468

.setMaxIter(20)

469

.setSrcCol("src")

470

.setDstCol("dst")

471

.setWeightCol("weight")

472

473

val picModel = pic.fit(edges)

474

val picResults = picModel.transform(edges)

475

picResults.select("src", "cluster").distinct().show()

476

```

477

478

### Clustering Pipeline with Feature Processing

479

480

```scala

481

import org.apache.spark.ml.Pipeline

482

import org.apache.spark.ml.feature.{StandardScaler, PCA}

483

import org.apache.spark.ml.clustering.KMeans

484

485

// Create preprocessing pipeline

486

val scaler = new StandardScaler()

487

.setInputCol("features")

488

.setOutputCol("scaledFeatures")

489

.setWithMean(true)

490

.setWithStd(true)

491

492

val pca = new PCA()

493

.setInputCol("scaledFeatures")

494

.setOutputCol("pcaFeatures")

495

.setK(5)

496

497

val kmeans = new KMeans()

498

.setFeaturesCol("pcaFeatures")

499

.setPredictionCol("cluster")

500

.setK(4)

501

.setSeed(42)

502

503

// Create and fit pipeline

504

val pipeline = new Pipeline()

505

.setStages(Array(assembler, scaler, pca, kmeans))

506

507

val pipelineModel = pipeline.fit(data)

508

val clusteringResults = pipelineModel.transform(data)

509

510

clusteringResults.select("pcaFeatures", "cluster").show()

511

512

// Extract final K-means model for analysis

513

val finalKMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]

514

println(s"Final cluster centers: ${finalKMeansModel.clusterCenters.mkString("\n")}")

515

```

516

517

### Hyperparameter Tuning for Clustering

518

519

```scala

520

import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

521

import org.apache.spark.ml.evaluation.ClusteringEvaluator

522

523

val kmeans = new KMeans()

524

.setSeed(42)

525

.setFeaturesCol("features")

526

527

// Build parameter grid

528

val paramGrid = new ParamGridBuilder()

529

.addGrid(kmeans.k, Array(2, 3, 4, 5, 6))

530

.addGrid(kmeans.maxIter, Array(50, 100, 200))

531

.build()

532

533

// Create evaluator

534

val evaluator = new ClusteringEvaluator()

535

.setFeaturesCol("features")

536

.setPredictionCol("prediction")

537

.setMetricName("silhouette")

538

539

// Create train-validation split

540

val tvs = new TrainValidationSplit()

541

.setEstimator(kmeans)

542

.setEvaluator(evaluator)

543

.setEstimatorParamMaps(paramGrid)

544

.setTrainRatio(0.8)

545

.setSeed(42)

546

547

// Train with validation

548

val tvsModel = tvs.fit(data)

549

550

// Get best model

551

val bestKMeansModel = tvsModel.bestModel.asInstanceOf[KMeansModel]

552

println(s"Best K: ${bestKMeansModel.getK}")

553

println(s"Best silhouette score: ${evaluator.evaluate(tvsModel.transform(data))}")

554

```