or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-launcher.mdcore-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

machine-learning.mddocs/

0

# Machine Learning

1

2

Spark MLlib provides comprehensive machine learning capabilities including classification, regression, clustering, collaborative filtering, and feature engineering through a unified API based on DataFrames.

3

4

## ML Pipeline API

5

6

The ML Pipeline API provides a uniform set of high-level APIs built on top of DataFrames.

7

8

### Pipeline and PipelineModel

9

10

```scala { .api }

11

class Pipeline extends Estimator[PipelineModel] {

12

def setStages(value: Array[PipelineStage]): Pipeline

13

def getStages: Array[PipelineStage]

14

def fit(dataset: Dataset[_]): PipelineModel

15

def transformSchema(schema: StructType): StructType

16

def copy(extra: ParamMap): Pipeline

17

def write: MLWriter

18

}

19

20

class PipelineModel extends Model[PipelineModel] {

21

def stages: Array[Transformer]

22

def transform(dataset: Dataset[_]): DataFrame

23

def transformSchema(schema: StructType): StructType

24

def copy(extra: ParamMap): PipelineModel

25

def write: MLWriter

26

}

27

```

28

29

### Base Classes

30

31

```scala { .api }

32

abstract class Estimator[M <: Model[M]] extends PipelineStage {

33

def fit(dataset: Dataset[_]): M

34

def transformSchema(schema: StructType): StructType

35

def copy(extra: ParamMap): Estimator[M]

36

}

37

38

abstract class Transformer extends PipelineStage {

39

def transform(dataset: Dataset[_]): DataFrame

40

def transformSchema(schema: StructType): StructType

41

def copy(extra: ParamMap): Transformer

42

}

43

44

abstract class Model[M <: Model[M]] extends Transformer {

45

def copy(extra: ParamMap): M

46

}

47

48

trait PipelineStage extends Params {

49

def transformSchema(schema: StructType): StructType

50

def copy(extra: ParamMap): PipelineStage

51

}

52

```

53

54

### Usage Examples

55

56

```scala

57

import org.apache.spark.ml.{Pipeline, PipelineModel}

58

import org.apache.spark.ml.classification.LogisticRegression

59

import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

60

61

// Create pipeline stages

62

val tokenizer = new Tokenizer()

63

.setInputCol("text")

64

.setOutputCol("words")

65

66

val hashingTF = new HashingTF()

67

.setNumFeatures(1000)

68

.setInputCol(tokenizer.getOutputCol)

69

.setOutputCol("features")

70

71

val lr = new LogisticRegression()

72

.setMaxIter(10)

73

.setRegParam(0.001)

74

75

// Create and fit pipeline

76

val pipeline = new Pipeline()

77

.setStages(Array(tokenizer, hashingTF, lr))

78

79

val model = pipeline.fit(trainingData)

80

81

// Transform new data

82

val predictions = model.transform(testData)

83

```

84

85

## Classification

86

87

### Logistic Regression

88

89

```scala { .api }

90

class LogisticRegression extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {

91

def setMaxIter(value: Int): this.type

92

def setRegParam(value: Double): this.type

93

def setElasticNetParam(value: Double): this.type

94

def setTol(value: Double): this.type

95

def setFitIntercept(value: Boolean): this.type

96

def setThreshold(value: Double): this.type

97

def setThresholds(value: Array[Double]): this.type

98

def setStandardization(value: Boolean): this.type

99

def setWeightCol(value: String): this.type

100

def setAggregationDepth(value: Int): this.type

101

def setFamily(value: String): this.type

102

def setLowerBoundsOnCoefficients(value: Matrix): this.type

103

def setUpperBoundsOnCoefficients(value: Matrix): this.type

104

def setLowerBoundsOnIntercepts(value: Vector): this.type

105

def setUpperBoundsOnIntercepts(value: Vector): this.type

106

}

107

108

class LogisticRegressionModel extends ClassificationModel[Vector, LogisticRegressionModel] {

109

def coefficients: Vector

110

def intercept: Double

111

def coefficientMatrix: Matrix

112

def interceptVector: Vector

113

def numClasses: Int

114

def numFeatures: Int

115

def summary: LogisticRegressionTrainingSummary

116

def hasSummary: Boolean

117

def evaluate(dataset: Dataset[_]): LogisticRegressionSummary

118

}

119

```

120

121

### Decision Tree Classifier

122

123

```scala { .api }

124

class DecisionTreeClassifier extends Classifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] {

125

def setMaxDepth(value: Int): this.type

126

def setMaxBins(value: Int): this.type

127

def setMinInstancesPerNode(value: Int): this.type

128

def setMinInfoGain(value: Double): this.type

129

def setMaxMemoryInMB(value: Int): this.type

130

def setCacheNodeIds(value: Boolean): this.type

131

def setCheckpointInterval(value: Int): this.type

132

def setImpurity(value: String): this.type

133

def setSeed(value: Long): this.type

134

def setWeightCol(value: String): this.type

135

}

136

137

class DecisionTreeClassificationModel extends ClassificationModel[Vector, DecisionTreeClassificationModel]

138

with DecisionTreeModel with DecisionTreeClassifierParams {

139

def rootNode: Node

140

def numNodes: Int

141

def depth: Int

142

def featureImportances: Vector

143

def toDebugString: String

144

}

145

```

146

147

### Random Forest Classifier

148

149

```scala { .api }

150

class RandomForestClassifier extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {

151

def setNumTrees(value: Int): this.type

152

def setMaxDepth(value: Int): this.type

153

def setMaxBins(value: Int): this.type

154

def setMinInstancesPerNode(value: Int): this.type

155

def setMinInfoGain(value: Double): this.type

156

def setSubsamplingRate(value: Double): this.type

157

def setFeatureSubsetStrategy(value: String): this.type

158

def setSeed(value: Long): this.type

159

def setImpurity(value: String): this.type

160

}

161

162

class RandomForestClassificationModel extends ClassificationModel[Vector, RandomForestClassificationModel] {

163

def trees: Array[DecisionTreeClassificationModel]

164

def treeWeights: Array[Double]

165

def numFeatures: Int

166

def numClasses: Int

167

def totalNumNodes: Int

168

def featureImportances: Vector

169

def toDebugString: String

170

}

171

```

172

173

### Usage Examples

174

175

```scala

176

import org.apache.spark.ml.classification.{LogisticRegression, DecisionTreeClassifier, RandomForestClassifier}

177

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

178

179

// Logistic Regression

180

val lr = new LogisticRegression()

181

.setMaxIter(20)

182

.setRegParam(0.3)

183

.setElasticNetParam(0.8)

184

185

val lrModel = lr.fit(trainingData)

186

val lrPredictions = lrModel.transform(testData)

187

188

// Decision Tree

189

val dt = new DecisionTreeClassifier()

190

.setLabelCol("indexedLabel")

191

.setFeaturesCol("indexedFeatures")

192

.setMaxDepth(5)

193

.setMaxBins(32)

194

195

val dtModel = dt.fit(trainingData)

196

197

// Random Forest

198

val rf = new RandomForestClassifier()

199

.setLabelCol("indexedLabel")

200

.setFeaturesCol("indexedFeatures")

201

.setNumTrees(20)

202

.setMaxDepth(5)

203

204

val rfModel = rf.fit(trainingData)

205

206

// Evaluation

207

val evaluator = new BinaryClassificationEvaluator()

208

.setLabelCol("indexedLabel")

209

.setRawPredictionCol("rawPrediction")

210

.setMetricName("areaUnderROC")

211

212

val accuracy = evaluator.evaluate(lrPredictions)

213

```

214

215

## Regression

216

217

### Linear Regression

218

219

```scala { .api }

220

class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] {

221

def setMaxIter(value: Int): this.type

222

def setRegParam(value: Double): this.type

223

def setElasticNetParam(value: Double): this.type

224

def setTol(value: Double): this.type

225

def setFitIntercept(value: Boolean): this.type

226

def setStandardization(value: Boolean): this.type

227

def setSolver(value: String): this.type

228

def setWeightCol(value: String): this.type

229

def setAggregationDepth(value: Int): this.type

230

def setLoss(value: String): this.type

231

def setEpsilon(value: Double): this.type

232

}

233

234

class LinearRegressionModel extends RegressionModel[Vector, LinearRegressionModel] {

235

def coefficients: Vector

236

def intercept: Double

237

def numFeatures: Int

238

def summary: LinearRegressionTrainingSummary

239

def hasSummary: Boolean

240

def evaluate(dataset: Dataset[_]): LinearRegressionSummary

241

}

242

```

243

244

### Decision Tree Regressor

245

246

```scala { .api }

247

class DecisionTreeRegressor extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] {

248

def setMaxDepth(value: Int): this.type

249

def setMaxBins(value: Int): this.type

250

def setMinInstancesPerNode(value: Int): this.type

251

def setMinInfoGain(value: Double): this.type

252

def setImpurity(value: String): this.type

253

def setSeed(value: Long): this.type

254

def setVarianceCol(value: String): this.type

255

}

256

257

class DecisionTreeRegressionModel extends RegressionModel[Vector, DecisionTreeRegressionModel]

258

with DecisionTreeModel {

259

def rootNode: Node

260

def numNodes: Int

261

def depth: Int

262

def featureImportances: Vector

263

def toDebugString: String

264

}

265

```

266

267

## Clustering

268

269

### K-Means

270

271

```scala { .api }

272

class KMeans extends Estimator[KMeansModel] with KMeansParams {

273

def setK(value: Int): this.type

274

def setInitMode(value: String): this.type

275

def setInitSteps(value: Int): this.type

276

def setMaxIter(value: Int): this.type

277

def setTol(value: Double): this.type

278

def setSeed(value: Long): this.type

279

def setDistanceMeasure(value: String): this.type

280

def setWeightCol(value: String): this.type

281

}

282

283

class KMeansModel extends Model[KMeansModel] with KMeansParams {

284

def clusterCenters: Array[Vector]

285

def k: Int

286

def computeCost(dataset: Dataset[_]): Double

287

def summary: KMeansSummary

288

def hasSummary: Boolean

289

}

290

```

291

292

### Gaussian Mixture Model

293

294

```scala { .api }

295

class GaussianMixture extends Estimator[GaussianMixtureModel] {

296

def setK(value: Int): this.type

297

def setMaxIter(value: Int): this.type

298

def setTol(value: Double): this.type

299

def setSeed(value: Long): this.type

300

def setAggregationDepth(value: Int): this.type

301

def setWeightCol(value: String): this.type

302

}

303

304

class GaussianMixtureModel extends Model[GaussianMixtureModel] {

305

def weights: Array[Double]

306

def gaussians: Array[MultivariateGaussian]

307

def k: Int

308

def summary: GaussianMixtureSummary

309

def hasSummary: Boolean

310

}

311

```

312

313

### Usage Examples

314

315

```scala

316

import org.apache.spark.ml.clustering.{KMeans, GaussianMixture}

317

import org.apache.spark.ml.evaluation.ClusteringEvaluator

318

319

// K-means clustering

320

val kmeans = new KMeans()

321

.setK(3)

322

.setMaxIter(20)

323

.setSeed(1L)

324

325

val kmeansModel = kmeans.fit(dataset)

326

val kmeansPredictions = kmeansModel.transform(dataset)

327

328

// Gaussian Mixture Model

329

val gmm = new GaussianMixture()

330

.setK(3)

331

.setMaxIter(100)

332

.setSeed(538009335L)

333

334

val gmmModel = gmm.fit(dataset)

335

val gmmPredictions = gmmModel.transform(dataset)

336

337

// Evaluation

338

val evaluator = new ClusteringEvaluator()

339

val silhouette = evaluator.evaluate(kmeansPredictions)

340

```

341

342

## Feature Engineering

343

344

### Vector Assembler

345

346

```scala { .api }

347

class VectorAssembler extends Transformer with VectorAssemblerParams {

348

def setInputCols(value: Array[String]): this.type

349

def setOutputCol(value: String): this.type

350

def setHandleInvalid(value: String): this.type

351

def transform(dataset: Dataset[_]): DataFrame

352

}

353

```

354

355

### String Indexer

356

357

```scala { .api }

358

class StringIndexer extends Estimator[StringIndexerModel] {

359

def setInputCol(value: String): this.type

360

def setOutputCol(value: String): this.type

361

def setHandleInvalid(value: String): this.type

362

def setStringOrderType(value: String): this.type

363

}

364

365

class StringIndexerModel extends Model[StringIndexerModel] {

366

def labels: Array[String]

367

def labelsArray: Array[Array[String]]

368

def transform(dataset: Dataset[_]): DataFrame

369

}

370

```

371

372

### One-Hot Encoder

373

374

```scala { .api }

375

class OneHotEncoder extends Transformer with OneHotEncoderParams {

376

def setInputCols(value: Array[String]): this.type

377

def setOutputCols(value: Array[String]): this.type

378

def setDropLast(value: Boolean): this.type

379

def setHandleInvalid(value: String): this.type

380

def transform(dataset: Dataset[_]): DataFrame

381

}

382

```

383

384

### Standard Scaler

385

386

```scala { .api }

387

class StandardScaler extends Estimator[StandardScalerModel] {

388

def setInputCol(value: String): this.type

389

def setOutputCol(value: String): this.type

390

def setWithMean(value: Boolean): this.type

391

def setWithStd(value: Boolean): this.type

392

}

393

394

class StandardScalerModel extends Model[StandardScalerModel] {

395

def mean: Vector

396

def std: Vector

397

def transform(dataset: Dataset[_]): DataFrame

398

}

399

```

400

401

### Usage Examples

402

403

```scala

404

import org.apache.spark.ml.feature._

405

406

// Vector Assembler

407

val assembler = new VectorAssembler()

408

.setInputCols(Array("feature1", "feature2", "feature3"))

409

.setOutputCol("features")

410

411

val featuresDF = assembler.transform(df)

412

413

// String Indexer

414

val indexer = new StringIndexer()

415

.setInputCol("category")

416

.setOutputCol("categoryIndex")

417

418

val indexerModel = indexer.fit(df)

419

val indexedDF = indexerModel.transform(df)

420

421

// One-Hot Encoder

422

val encoder = new OneHotEncoder()

423

.setInputCols(Array("categoryIndex"))

424

.setOutputCols(Array("categoryVec"))

425

426

val encodedDF = encoder.transform(indexedDF)

427

428

// Standard Scaler

429

val scaler = new StandardScaler()

430

.setInputCol("features")

431

.setOutputCol("scaledFeatures")

432

.setWithStd(true)

433

.setWithMean(false)

434

435

val scalerModel = scaler.fit(featuresDF)

436

val scaledDF = scalerModel.transform(featuresDF)

437

```

438

439

## Model Selection and Evaluation

440

441

### Cross Validator

442

443

```scala { .api }

444

class CrossValidator extends Estimator[CrossValidatorModel] {

445

def setEstimator(value: Estimator[_]): this.type

446

def setEstimatorParamMaps(value: Array[ParamMap]): this.type

447

def setEvaluator(value: Evaluator): this.type

448

def setNumFolds(value: Int): this.type

449

def setSeed(value: Long): this.type

450

def setParallelism(value: Int): this.type

451

def setCollectSubModels(value: Boolean): this.type

452

def setFoldCol(value: String): this.type

453

}

454

455

class CrossValidatorModel extends Model[CrossValidatorModel] {

456

def bestModel: Model[_]

457

def avgMetrics: Array[Double]

458

def stdMetrics: Array[Double]

459

def subModels: Array[Array[Model[_]]]

460

def getEstimatorParamMaps: Array[ParamMap]

461

def getEvaluator: Evaluator

462

}

463

```

464

465

### Parameter Grid Builder

466

467

```scala { .api }

468

class ParamGridBuilder {

469

def addGrid[T](param: Param[T], values: Array[T]): this.type

470

def addGrid[T](param: Param[T], values: java.util.List[T]): this.type

471

def build(): Array[ParamMap]

472

}

473

```

474

475

### Evaluators

476

477

```scala { .api }

478

class BinaryClassificationEvaluator extends Evaluator {

479

def setRawPredictionCol(value: String): this.type

480

def setLabelCol(value: String): this.type

481

def setMetricName(value: String): this.type

482

def setWeightCol(value: String): this.type

483

def evaluate(dataset: Dataset[_]): Double

484

def isLargerBetter: Boolean

485

}

486

487

class MulticlassClassificationEvaluator extends Evaluator {

488

def setPredictionCol(value: String): this.type

489

def setLabelCol(value: String): this.type

490

def setMetricName(value: String): this.type

491

def setWeightCol(value: String): this.type

492

def setBeta(value: Double): this.type

493

def setEps(value: Double): this.type

494

def evaluate(dataset: Dataset[_]): Double

495

}

496

497

class RegressionEvaluator extends Evaluator {

498

def setPredictionCol(value: String): this.type

499

def setLabelCol(value: String): this.type

500

def setMetricName(value: String): this.type

501

def setWeightCol(value: String): this.type

502

def evaluate(dataset: Dataset[_]): Double

503

}

504

```

505

506

### Usage Examples

507

508

```scala

509

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

510

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

511

512

// Parameter grid

513

val paramGrid = new ParamGridBuilder()

514

.addGrid(lr.regParam, Array(0.1, 0.01))

515

.addGrid(lr.fitIntercept, Array(false, true))

516

.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))

517

.build()

518

519

// Cross validator

520

val cv = new CrossValidator()

521

.setEstimator(lr)

522

.setEvaluator(new BinaryClassificationEvaluator)

523

.setEstimatorParamMaps(paramGrid)

524

.setNumFolds(3)

525

.setParallelism(2)

526

527

val cvModel = cv.fit(trainingData)

528

val bestModel = cvModel.bestModel

529

530

// Predictions and evaluation

531

val predictions = cvModel.transform(testData)

532

val evaluator = new BinaryClassificationEvaluator()

533

val auc = evaluator.evaluate(predictions)

534

```

535

536

## Collaborative Filtering

537

538

### Alternating Least Squares (ALS)

539

540

```scala { .api }

541

class ALS extends Estimator[ALSModel] {

542

def setRank(value: Int): this.type

543

def setMaxIter(value: Int): this.type

544

def setRegParam(value: Double): this.type

545

def setImplicitPrefs(value: Boolean): this.type

546

def setAlpha(value: Double): this.type

547

def setUserCol(value: String): this.type

548

def setItemCol(value: String): this.type

549

def setRatingCol(value: String): this.type

550

def setPredictionCol(value: String): this.type

551

def setNonnegative(value: Boolean): this.type

552

def setNumUserBlocks(value: Int): this.type

553

def setNumItemBlocks(value: Int): this.type

554

def setSeed(value: Long): this.type

555

def setCheckpointInterval(value: Int): this.type

556

def setStorageLevel(value: StorageLevel): this.type

557

def setIntermediateStorageLevel(value: StorageLevel): this.type

558

def setColdStartStrategy(value: String): this.type

559

def setFinalStorageLevel(value: StorageLevel): this.type

560

def setBlockSize(value: Int): this.type

561

}

562

563

class ALSModel extends Model[ALSModel] {

564

def rank: Int

565

def userFactors: DataFrame

566

def itemFactors: DataFrame

567

def recommendForAllUsers(numItems: Int): DataFrame

568

def recommendForAllItems(numUsers: Int): DataFrame

569

def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame

570

def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame

571

def transform(dataset: Dataset[_]): DataFrame

572

}

573

```

574

575

### Usage Examples

576

577

```scala

578

import org.apache.spark.ml.recommendation.ALS

579

580

// ALS model

581

val als = new ALS()

582

.setMaxIter(5)

583

.setRegParam(0.01)

584

.setUserCol("userId")

585

.setItemCol("movieId")

586

.setRatingCol("rating")

587

.setColdStartStrategy("drop")

588

589

val model = als.fit(ratings)

590

591

// Generate recommendations

592

val userRecs = model.recommendForAllUsers(10)

593

val movieRecs = model.recommendForAllItems(10)

594

595

// Make predictions

596

val predictions = model.transform(testData)

597

```