or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md

rdd-api.mddocs/

0

# RDD-based API (Legacy)

1

2

The MLlib RDD-based API (`org.apache.spark.mllib`) is the original machine learning library built on Spark RDDs. While this API is in maintenance mode and the DataFrame-based API is recommended for new development, it still provides valuable functionality and is widely used in existing systems.

3

4

## Core Data Types

5

6

### LabeledPoint

7

8

```scala { .api }

9

case class LabeledPoint(label: Double, features: org.apache.spark.mllib.linalg.Vector) {

10

override def toString: String

11

}

12

13

object LabeledPoint {

14

def parse(s: String): LabeledPoint

15

}

16

```

17

18

### Rating

19

20

```scala { .api }

21

case class Rating(user: Int, product: Int, rating: Double) {

22

override def toString: String

23

}

24

```

25

26

## Linear Algebra (mllib.linalg)

27

28

### Vector

29

30

```scala { .api }

31

trait Vector extends Serializable {

32

def size: Int

33

def toArray: Array[Double]

34

def apply(i: Int): Double

35

def copy: Vector

36

def foreachActive(f: (Int, Double) => Unit): Unit

37

def numActives: Int

38

def numNonzeros: Int

39

def toDense: DenseVector

40

def toSparse: SparseVector

41

def compressed: Vector

42

}

43

44

class DenseVector(val values: Array[Double]) extends Vector

45

class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector

46

47

object Vectors {

48

def dense(values: Array[Double]): DenseVector

49

def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector

50

def norm(vector: Vector, p: Double): Double

51

def sqdist(v1: Vector, v2: Vector): Double

52

}

53

```

54

55

### Matrix

56

57

```scala { .api }

58

trait Matrix extends Serializable {

59

def numRows: Int

60

def numCols: Int

61

def toArray: Array[Double]

62

def apply(i: Int, j: Int): Double

63

def transpose: Matrix

64

def multiply(y: DenseMatrix): DenseMatrix

65

def multiply(y: DenseVector): DenseVector

66

}

67

68

class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix

69

class SparseMatrix(override val numRows: Int, override val numCols: Int,

70

val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double]) extends Matrix

71

72

object Matrices {

73

def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix

74

def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],

75

rowIndices: Array[Int], values: Array[Double]): SparseMatrix

76

}

77

```

78

79

## Classification

80

81

### Logistic Regression

82

83

```scala { .api }

84

class LogisticRegressionModel(override val weights: Vector, override val intercept: Double)

85

extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

86

87

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double

88

override def toString: String

89

}

90

91

object LogisticRegressionWithLBFGS {

92

def train(input: RDD[LabeledPoint]): LogisticRegressionModel

93

def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel

94

def train(input: RDD[LabeledPoint], numIterations: Int, numCorrections: Int,

95

convergenceTol: Double, regParam: Double): LogisticRegressionModel

96

}

97

98

object LogisticRegressionWithSGD {

99

def train(input: RDD[LabeledPoint]): LogisticRegressionModel

100

def train(input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel

101

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel

102

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

103

miniBatchFraction: Double): LogisticRegressionModel

104

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

105

miniBatchFraction: Double, initialWeights: Vector): LogisticRegressionModel

106

}

107

```

108

109

### Support Vector Machines

110

111

```scala { .api }

112

class SVMModel(override val weights: Vector, override val intercept: Double)

113

extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

114

115

def setThreshold(threshold: Double): SVMModel

116

def clearThreshold(): SVMModel

117

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double

118

}

119

120

object SVMWithSGD {

121

def train(input: RDD[LabeledPoint]): SVMModel

122

def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel

123

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): SVMModel

124

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

125

regParam: Double): SVMModel

126

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

127

regParam: Double, miniBatchFraction: Double): SVMModel

128

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

129

regParam: Double, miniBatchFraction: Double, initialWeights: Vector): SVMModel

130

}

131

```

132

133

### Naive Bayes

134

135

```scala { .api }

136

class NaiveBayesModel(val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]],

137

val modelType: String) extends ClassificationModel with Serializable {

138

139

def predict(testData: RDD[Vector]): RDD[Double]

140

def predict(testData: Vector): Double

141

}

142

143

object NaiveBayes {

144

def train(input: RDD[LabeledPoint]): NaiveBayesModel

145

def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel

146

def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel

147

}

148

```

149

150

## Regression

151

152

### Linear Regression

153

154

```scala { .api }

155

class LinearRegressionModel(override val weights: Vector, override val intercept: Double)

156

extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

157

158

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double

159

}

160

161

object LinearRegressionWithSGD {

162

def train(input: RDD[LabeledPoint]): LinearRegressionModel

163

def train(input: RDD[LabeledPoint], numIterations: Int): LinearRegressionModel

164

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LinearRegressionModel

165

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

166

miniBatchFraction: Double): LinearRegressionModel

167

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

168

miniBatchFraction: Double, initialWeights: Vector): LinearRegressionModel

169

}

170

```

171

172

### Ridge Regression

173

174

```scala { .api }

175

class RidgeRegressionModel(override val weights: Vector, override val intercept: Double)

176

extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

177

178

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double

179

}

180

181

object RidgeRegressionWithSGD {

182

def train(input: RDD[LabeledPoint]): RidgeRegressionModel

183

def train(input: RDD[LabeledPoint], numIterations: Int): RidgeRegressionModel

184

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): RidgeRegressionModel

185

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

186

regParam: Double): RidgeRegressionModel

187

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

188

regParam: Double, miniBatchFraction: Double): RidgeRegressionModel

189

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

190

regParam: Double, miniBatchFraction: Double, initialWeights: Vector): RidgeRegressionModel

191

}

192

```

193

194

### Lasso Regression

195

196

```scala { .api }

197

class LassoModel(override val weights: Vector, override val intercept: Double)

198

extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

199

200

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double

201

}

202

203

object LassoWithSGD {

204

def train(input: RDD[LabeledPoint]): LassoModel

205

def train(input: RDD[LabeledPoint], numIterations: Int): LassoModel

206

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LassoModel

207

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

208

regParam: Double): LassoModel

209

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

210

regParam: Double, miniBatchFraction: Double): LassoModel

211

def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double,

212

regParam: Double, miniBatchFraction: Double, initialWeights: Vector): LassoModel

213

}

214

```

215

216

## Clustering

217

218

### K-Means

219

220

```scala { .api }

221

class KMeansModel(val clusterCenters: Array[Vector]) extends Saveable with Serializable {

222

def predict(point: Vector): Int

223

def predict(points: RDD[Vector]): RDD[Int]

224

def computeCost(data: RDD[Vector]): Double

225

def k: Int

226

}

227

228

object KMeans {

229

def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel

230

def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel

231

def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,

232

initializationMode: String): KMeansModel

233

def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int,

234

initializationMode: String, seed: Long): KMeansModel

235

}

236

```

237

238

### Gaussian Mixture Model

239

240

```scala { .api }

241

class GaussianMixtureModel(val weights: Array[Double], val gaussians: Array[MultivariateGaussian])

242

extends Serializable {

243

244

def predict(point: Vector): Int

245

def predict(points: RDD[Vector]): RDD[Int]

246

def predictSoft(point: Vector): Array[Double]

247

def predictSoft(points: RDD[Vector]): RDD[Array[Double]]

248

def k: Int

249

}

250

251

class GaussianMixture extends Serializable {

252

def setK(k: Int): GaussianMixture

253

def setMaxIterations(maxIterations: Int): GaussianMixture

254

def setConvergenceTol(convergenceTol: Double): GaussianMixture

255

def setSeed(seed: Long): GaussianMixture

256

def run(data: RDD[Vector]): GaussianMixtureModel

257

}

258

```

259

260

### Power Iteration Clustering

261

262

```scala { .api }

263

class PowerIterationClusteringModel(val k: Int, val assignments: RDD[(Long, Int)]) extends Serializable {

264

def predict(point: Vector): Int

265

}

266

267

class PowerIterationClustering(private var k: Int, private var maxIterations: Int) extends Serializable {

268

def setK(k: Int): PowerIterationClustering

269

def setMaxIterations(maxIterations: Int): PowerIterationClustering

270

def setInitializationMode(initializationMode: String): PowerIterationClustering

271

def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel

272

}

273

```

274

275

## Tree-Based Methods

276

277

### Decision Trees

278

279

```scala { .api }

280

class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable {

281

def predict(features: Vector): Double

282

def predict(features: RDD[Vector]): RDD[Double]

283

def depth: Int

284

def numNodes: Int

285

}

286

287

object DecisionTree {

288

def train(input: RDD[LabeledPoint], strategy: Strategy): DecisionTreeModel

289

def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,

290

maxDepth: Int): DecisionTreeModel

291

def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,

292

maxDepth: Int, numClasses: Int): DecisionTreeModel

293

def train(input: RDD[LabeledPoint], algo: Algo, impurity: Impurity,

294

maxDepth: Int, numClasses: Int, maxBins: Int,

295

quantileCalculationStrategy: QuantileStrategy,

296

categoricalFeaturesInfo: Map[Int, Int]): DecisionTreeModel

297

}

298

```

299

300

### Random Forest

301

302

```scala { .api }

303

class RandomForestModel(val algo: Algo, val trees: Array[DecisionTreeModel]) extends Serializable {

304

def predict(features: Vector): Double

305

def predict(features: RDD[Vector]): RDD[Double]

306

def totalNumNodes: Int

307

}

308

309

object RandomForest {

310

def trainClassifier(input: RDD[LabeledPoint], strategy: Strategy,

311

numTrees: Int, featureSubsetStrategy: String,

312

seed: Int): RandomForestModel

313

def trainRegressor(input: RDD[LabeledPoint], strategy: Strategy,

314

numTrees: Int, featureSubsetStrategy: String,

315

seed: Int): RandomForestModel

316

}

317

```

318

319

### Gradient Boosted Trees

320

321

```scala { .api }

322

class GradientBoostedTreesModel(val algo: Algo, val trees: Array[DecisionTreeModel],

323

val treeWeights: Array[Double]) extends Serializable {

324

def predict(features: Vector): Double

325

def predict(features: RDD[Vector]): RDD[Double]

326

def totalNumNodes: Int

327

}

328

329

object GradientBoostedTrees {

330

def train(input: RDD[LabeledPoint], boostingStrategy: BoostingStrategy): GradientBoostedTreesModel

331

}

332

```

333

334

## Recommendation

335

336

### Collaborative Filtering

337

338

```scala { .api }

339

class MatrixFactorizationModel(val rank: Int, val userFeatures: RDD[(Int, Array[Double])],

340

val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {

341

342

def predict(user: Int, product: Int): Double

343

def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]

344

def recommendProducts(user: Int, num: Int): Array[Rating]

345

def recommendUsers(product: Int, num: Int): Array[Rating]

346

}

347

348

object ALS {

349

def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel

350

def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel

351

def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel

352

def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,

353

alpha: Double): MatrixFactorizationModel

354

}

355

```

356

357

## Feature Extraction and Selection

358

359

### Text Feature Extraction

360

361

```scala { .api }

362

class HashingTF(val numFeatures: Int) extends Serializable {

363

def this() = this(1048576)

364

def transform(document: Iterable[String]): Vector

365

def transform(document: RDD[Iterable[String]]): RDD[Vector]

366

def indexOf(term: Any): Int

367

}

368

369

class IDFModel(val idf: Vector) extends Serializable {

370

def transform(dataset: RDD[Vector]): RDD[Vector]

371

def transform(dataset: Vector): Vector

372

}

373

374

class IDF(val minDocFreq: Int) extends Serializable {

375

def this() = this(0)

376

def fit(dataset: RDD[Vector]): IDFModel

377

}

378

379

class Word2VecModel(private val wordVectors: Map[String, Array[Float]]) extends Serializable {

380

def transform(word: String): Vector

381

def findSynonyms(word: String, num: Int): Array[(String, Double)]

382

def findSynonyms(vector: Vector, num: Int): Array[(String, Double)]

383

def getVectors: Map[String, Array[Float]]

384

}

385

386

class Word2Vec extends Serializable {

387

def setVectorSize(vectorSize: Int): Word2Vec

388

def setLearningRate(learningRate: Double): Word2Vec

389

def setNumPartitions(numPartitions: Int): Word2Vec

390

def setNumIterations(numIterations: Int): Word2Vec

391

def setSeed(seed: Long): Word2Vec

392

def setMinCount(minCount: Int): Word2Vec

393

def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel

394

}

395

```

396

397

### Numerical Feature Processing

398

399

```scala { .api }

400

class StandardScalerModel(val std: Vector, val mean: Vector) extends Serializable {

401

def this(std: Vector) = this(std, null)

402

def transform(vector: Vector): Vector

403

def transform(vectors: RDD[Vector]): RDD[Vector]

404

}

405

406

class StandardScaler(withMean: Boolean, withStd: Boolean) extends Serializable {

407

def this() = this(false, true)

408

def fit(data: RDD[Vector]): StandardScalerModel

409

}

410

411

class Normalizer(val p: Double) extends Serializable {

412

def this() = this(2.0)

413

def transform(vector: Vector): Vector

414

def transform(data: RDD[Vector]): RDD[Vector]

415

}

416

```

417

418

## Statistics

419

420

### Summary Statistics

421

422

```scala { .api }

423

trait MultivariateStatisticalSummary {

424

def mean: Vector

425

def variance: Vector

426

def count: Long

427

def numNonzeros: Vector

428

def max: Vector

429

def min: Vector

430

def normL1: Vector

431

def normL2: Vector

432

}

433

434

class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {

435

def add(sample: Vector): MultivariateOnlineSummarizer

436

def merge(other: MultivariateOnlineSummarizer): MultivariateOnlineSummarizer

437

}

438

439

object Statistics {

440

def colStats(X: RDD[Vector]): MultivariateStatisticalSummary

441

def corr(x: RDD[Double], y: RDD[Double]): Double

442

def corr(x: RDD[Double], y: RDD[Double], method: String): Double

443

def corr(X: RDD[Vector]): Matrix

444

def corr(X: RDD[Vector], method: String): Matrix

445

def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult

446

def chiSqTest(observed: Matrix): ChiSqTestResult

447

def kolmogorovSmirnovTest(sampleX: RDD[Double], sampleY: RDD[Double]): KolmogorovSmirnovTestResult

448

def kolmogorovSmirnovTest(sample: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult

449

}

450

```

451

452

## Usage Examples

453

454

### Basic RDD-based Classification

455

456

```scala

457

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, SVMWithSGD}

458

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

459

import org.apache.spark.mllib.regression.LabeledPoint

460

import org.apache.spark.mllib.linalg.Vectors

461

import org.apache.spark.mllib.util.MLUtils

462

463

// Load and parse data

464

val data = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")

465

466

// Split data into training and test sets

467

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

468

val training = splits(0).cache()

469

val test = splits(1)

470

471

// Train logistic regression model

472

val lrModel = LogisticRegressionWithLBFGS.train(training)

473

474

// Train SVM model

475

val svmModel = SVMWithSGD.train(training, numIterations = 100)

476

477

// Make predictions

478

val lrPredictionAndLabels = test.map { point =>

479

val prediction = lrModel.predict(point.features)

480

(prediction, point.label)

481

}

482

483

val svmPredictionAndLabels = test.map { point =>

484

val prediction = svmModel.predict(point.features)

485

(prediction, point.label)

486

}

487

488

// Evaluate models

489

val lrMetrics = new BinaryClassificationMetrics(lrPredictionAndLabels)

490

val svmMetrics = new BinaryClassificationMetrics(svmPredictionAndLabels)

491

492

println(s"Logistic Regression AUC: ${lrMetrics.areaUnderROC()}")

493

println(s"SVM AUC: ${svmMetrics.areaUnderROC()}")

494

```

495

496

### RDD-based Clustering

497

498

```scala

499

import org.apache.spark.mllib.clustering.{KMeans, GaussianMixture}

500

import org.apache.spark.mllib.linalg.Vectors

501

502

// Create sample data

503

val data = sc.parallelize(Array(

504

Vectors.dense(0.0, 0.0),

505

Vectors.dense(1.0, 1.0),

506

Vectors.dense(9.0, 8.0),

507

Vectors.dense(8.0, 9.0)

508

))

509

510

// K-means clustering

511

val numClusters = 2

512

val numIterations = 20

513

val kMeansModel = KMeans.train(data, numClusters, numIterations)

514

515

println("K-means cluster centers:")

516

kMeansModel.clusterCenters.foreach(println)

517

518

// Predict clusters

519

val predictions = kMeansModel.predict(data)

520

data.zip(predictions).collect().foreach { case (point, cluster) =>

521

println(s"Point $point belongs to cluster $cluster")

522

}

523

524

// Gaussian Mixture Model

525

val gmm = new GaussianMixture()

526

.setK(2)

527

.setMaxIterations(20)

528

529

val gmmModel = gmm.run(data)

530

531

println("GMM weights:")

532

gmmModel.weights.foreach(println)

533

534

println("GMM gaussians:")

535

gmmModel.gaussians.foreach(println)

536

537

// Soft predictions (probabilities)

538

val softPredictions = gmmModel.predictSoft(data)

539

softPredictions.collect().foreach(println)

540

```

541

542

### Matrix Factorization for Recommendations

543

544

```scala

545

import org.apache.spark.mllib.recommendation.{ALS, Rating}

546

547

// Sample ratings data

548

val ratings = sc.parallelize(Array(

549

Rating(1, 1, 3.0),

550

Rating(1, 2, 4.0),

551

Rating(1, 3, 2.0),

552

Rating(2, 1, 4.0),

553

Rating(2, 2, 2.0),

554

Rating(2, 3, 5.0),

555

Rating(3, 1, 2.0),

556

Rating(3, 2, 3.0),

557

Rating(3, 3, 4.0)

558

))

559

560

// Train collaborative filtering model

561

val rank = 10

562

val numIterations = 10

563

val lambda = 0.01

564

565

val alsModel = ALS.train(ratings, rank, numIterations, lambda)

566

567

// Make predictions

568

val usersProducts = ratings.map { case Rating(user, product, rate) =>

569

(user, product)

570

}

571

572

val predictions = alsModel.predict(usersProducts).map { case Rating(user, product, rate) =>

573

((user, product), rate)

574

}

575

576

val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>

577

((user, product), rate)

578

}.join(predictions)

579

580

// Calculate RMSE

581

val mse = ratesAndPreds.map { case ((user, product), (r1, r2)) =>

582

val err = (r1 - r2)

583

err * err

584

}.mean()

585

586

val rmse = math.sqrt(mse)

587

println(s"Mean Squared Error = $mse")

588

println(s"Root Mean Squared Error = $rmse")

589

590

// Recommend products for users

591

val userProducts = alsModel.recommendProducts(1, 3)

592

println(s"Recommendations for user 1:")

593

userProducts.foreach(println)

594

```

595

596

### Feature Extraction and Text Processing

597

598

```scala

599

import org.apache.spark.mllib.feature.{HashingTF, IDF, Word2Vec}

600

import org.apache.spark.mllib.linalg.Vector

601

602

// Sample documents

603

val documents: RDD[Seq[String]] = sc.parallelize(Seq(

604

"spark is great".split(" ").toSeq,

605

"machine learning with spark".split(" ").toSeq,

606

"apache spark mllib".split(" ").toSeq

607

))

608

609

// TF-IDF feature extraction

610

val hashingTF = new HashingTF(1000)

611

val tf: RDD[Vector] = hashingTF.transform(documents)

612

613

tf.cache()

614

val idf = new IDF().fit(tf)

615

val tfidf: RDD[Vector] = idf.transform(tf)

616

617

println("TF-IDF vectors:")

618

tfidf.collect().foreach(println)

619

620

// Word2Vec embeddings

621

val word2vec = new Word2Vec()

622

.setVectorSize(100)

623

.setMinCount(1)

624

625

val word2vecModel = word2vec.fit(documents)

626

627

// Find synonyms

628

val synonyms = word2vecModel.findSynonyms("spark", 5)

629

synonyms.foreach { case (word, similarity) =>

630

println(s"$word: $similarity")

631

}

632

633

// Transform words to vectors

634

val sparkVector = word2vecModel.transform("spark")

635

println(s"Vector for 'spark': $sparkVector")

636

```

637

638

### Statistical Analysis

639

640

```scala

641

import org.apache.spark.mllib.stat.Statistics

642

import org.apache.spark.mllib.linalg.Vectors

643

644

// Sample data

645

val observations = sc.parallelize(Seq(

646

Vectors.dense(1.0, 10.0, 100.0),

647

Vectors.dense(2.0, 20.0, 200.0),

648

Vectors.dense(3.0, 30.0, 300.0)

649

))

650

651

// Summary statistics

652

val summary = Statistics.colStats(observations)

653

println(s"Mean: ${summary.mean}")

654

println(s"Variance: ${summary.variance}")

655

println(s"Min: ${summary.min}")

656

println(s"Max: ${summary.max}")

657

println(s"Count: ${summary.count}")

658

659

// Correlation matrix

660

val correlMatrix = Statistics.corr(observations, "pearson")

661

println(s"Correlation matrix:\n$correlMatrix")

662

663

// Correlation between two RDDs

664

val x: RDD[Double] = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0))

665

val y: RDD[Double] = sc.parallelize(Array(2.0, 4.0, 6.0, 8.0))

666

667

val correlation = Statistics.corr(x, y, "pearson")

668

println(s"Correlation between x and y: $correlation")

669

670

// Chi-squared test

671

val observed = Vectors.dense(1.0, 2.0, 3.0)

672

val expected = Vectors.dense(1.5, 1.5, 2.0)

673

674

val chiSqTestResult = Statistics.chiSqTest(observed, expected)

675

println(s"Chi-squared test result: $chiSqTestResult")

676

```

677

678

### Advanced RDD Operations for ML

679

680

```scala

681

import org.apache.spark.mllib.regression.LabeledPoint

682

import org.apache.spark.mllib.feature.StandardScaler

683

684

// Create labeled points from raw data

685

def createLabeledPoints(rawData: RDD[String]): RDD[LabeledPoint] = {

686

rawData.map { line =>

687

val parts = line.split(',')

688

val label = parts.last.toDouble

689

val features = Vectors.dense(parts.init.map(_.toDouble))

690

LabeledPoint(label, features)

691

}

692

}

693

694

// Feature scaling

695

def scaleFeatures(data: RDD[LabeledPoint]): (RDD[LabeledPoint], StandardScalerModel) = {

696

val features = data.map(_.features)

697

698

val scaler = new StandardScaler(withMean = true, withStd = true)

699

val scalerModel = scaler.fit(features)

700

701

val scaledData = data.map { point =>

702

LabeledPoint(point.label, scalerModel.transform(point.features))

703

}

704

705

(scaledData, scalerModel)

706

}

707

708

// Cross-validation for RDD-based models

709

def crossValidate[M](data: RDD[LabeledPoint],

710

trainFunc: RDD[LabeledPoint] => M,

711

predictFunc: (M, Vector) => Double,

712

k: Int = 5): Array[Double] = {

713

714

val folds = data.randomSplit(Array.fill(k)(1.0 / k), seed = 42)

715

716

(0 until k).map { i =>

717

val validation = folds(i)

718

val training = sc.union(folds.zipWithIndex.filter(_._2 != i).map(_._1))

719

720

val model = trainFunc(training)

721

722

val predictions = validation.map { point =>

723

val prediction = predictFunc(model, point.features)

724

(prediction, point.label)

725

}

726

727

// Calculate accuracy for classification or RMSE for regression

728

val accuracy = predictions.map { case (pred, label) =>

729

if (math.abs(pred - label) < 0.5) 1.0 else 0.0

730

}.mean()

731

732

accuracy

733

}.toArray

734

}

735

736

// Usage example

737

val rawData = sc.textFile("path/to/data.csv")

738

val labeledData = createLabeledPoints(rawData)

739

val (scaledData, scalerModel) = scaleFeatures(labeledData)

740

741

// Cross-validate a logistic regression model

742

val cvScores = crossValidate(

743

scaledData,

744

LogisticRegressionWithLBFGS.train,

745

(model: LogisticRegressionModel, features: Vector) => model.predict(features)

746

)

747

748

println(s"Cross-validation scores: ${cvScores.mkString(", ")}")

749

println(s"Average accuracy: ${cvScores.sum / cvScores.length}")

750

```

751

752

### Migration from RDD to DataFrame API

753

754

```scala

755

import org.apache.spark.ml.{Pipeline, classification => newML}

756

import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler => NewStandardScaler}

757

import org.apache.spark.sql.DataFrame

758

759

// Convert RDD[LabeledPoint] to DataFrame

760

def rddToDataFrame(data: RDD[LabeledPoint]): DataFrame = {

761

import spark.implicits._

762

763

data.map { point =>

764

(point.label, point.features)

765

}.toDF("label", "features")

766

}

767

768

// Convert mllib.linalg.Vector to ml.linalg.Vector

769

def convertVector(oldVector: org.apache.spark.mllib.linalg.Vector): org.apache.spark.ml.linalg.Vector = {

770

org.apache.spark.ml.linalg.Vectors.fromML(oldVector)

771

}

772

773

// Migration example

774

val rddData: RDD[LabeledPoint] = // ... your RDD data

775

val dataFrame = rddToDataFrame(rddData)

776

777

// Use new DataFrame-based API

778

val assembler = new VectorAssembler()

779

.setInputCols(Array("feature1", "feature2", "feature3"))

780

.setOutputCol("features")

781

782

val scaler = new NewStandardScaler()

783

.setInputCol("features")

784

.setOutputCol("scaledFeatures")

785

786

val lr = new newML.LogisticRegression()

787

.setFeaturesCol("scaledFeatures")

788

.setLabelCol("label")

789

790

val pipeline = new Pipeline()

791

.setStages(Array(assembler, scaler, lr))

792

793

val model = pipeline.fit(dataFrame)

794

val predictions = model.transform(dataFrame)

795

796

predictions.select("label", "prediction", "probability").show()

797

```