or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md

core-framework.mddocs/

0

# Core Framework

1

2

The MLlib core framework provides the foundational abstractions for building machine learning pipelines using the DataFrame-based API. It follows the Estimator-Transformer pattern with type-safe parameter management.

3

4

## Pipeline Architecture

5

6

### Base Classes

7

8

```scala { .api }

9

abstract class PipelineStage extends Params with Logging {

10

val uid: String

11

def copy(extra: ParamMap): PipelineStage

12

def transformSchema(schema: StructType): StructType

13

}

14

15

abstract class Estimator[M <: Model[M]] extends PipelineStage {

16

def fit(dataset: Dataset[_]): M

17

def fit(dataset: Dataset[_], paramMap: ParamMap): M

18

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[M]

19

}

20

21

abstract class Transformer extends PipelineStage {

22

def transform(dataset: Dataset[_]): DataFrame

23

}

24

25

abstract class Model[M <: Model[M]] extends Transformer {

26

val parent: Estimator[M]

27

def copy(extra: ParamMap): M

28

}

29

```

30

31

### Pipeline Components

32

33

```scala { .api }

34

class Pipeline(val uid: String) extends Estimator[PipelineModel] with MLWritable {

35

def this() = this(Identifiable.randomUID("pipeline"))

36

37

final val stages: Param[Array[PipelineStage]]

38

39

def setStages(value: Array[PipelineStage]): Pipeline

40

def getStages: Array[PipelineStage]

41

def fit(dataset: Dataset[_]): PipelineModel

42

def copy(extra: ParamMap): Pipeline

43

def transformSchema(schema: StructType): StructType

44

def write: MLWriter

45

}

46

47

class PipelineModel(override val uid: String, val stages: Array[Transformer])

48

extends Model[PipelineModel] with MLWritable with Logging {

49

50

override def transform(dataset: Dataset[_]): DataFrame

51

override def transformSchema(schema: StructType): StructType

52

override def copy(extra: ParamMap): PipelineModel

53

def write: MLWriter

54

}

55

```

56

57

## Parameter System

58

59

### Parameter Definition

60

61

```scala { .api }

62

trait Param[T] extends Serializable {

63

def name: String

64

def doc: String

65

def parent: String

66

def defaultValue: Option[T]

67

def isValid(value: T): Boolean

68

def encode(value: T): String

69

def decode(encoded: String): T

70

}

71

72

class IntParam(parent: Params, name: String, doc: String, isValid: Int => Boolean)

73

extends Param[Int](parent, name, doc)

74

75

class DoubleParam(parent: Params, name: String, doc: String, isValid: Double => Boolean)

76

extends Param[Double](parent, name, doc)

77

78

class BooleanParam(parent: Params, name: String, doc: String)

79

extends Param[Boolean](parent, name, doc)

80

81

class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array[String] => Boolean)

82

extends Param[Array[String]](parent, name, doc)

83

```

84

85

### Parameter Management

86

87

```scala { .api }

88

trait Params extends Identifiable with Serializable {

89

def copy(extra: ParamMap): Params

90

91

final def set(param: Param[_], value: Any): Params.this.type

92

final def set[T](param: Param[T], value: T): Params.this.type

93

final def set(paramPair: ParamPair[_]): Params.this.type

94

final def setDefault(paramPairs: ParamPair[_]*): Params.this.type

95

final def setDefault[T](param: Param[T], value: T): Params.this.type

96

97

final def get[T](param: Param[T]): Option[T]

98

final def getOrDefault[T](param: Param[T]): T

99

final def $(param: Param[_]): Any

100

final def isSet(param: Param[_]): Boolean

101

final def isDefined(param: Param[_]): Boolean

102

final def hasDefault[T](param: Param[T]): Boolean

103

final def getDefault[T](param: Param[T]): Option[T]

104

105

def params: Array[Param[_]]

106

def explainParam(param: Param[_]): String

107

def explainParams(): String

108

final def extractParamMap(): ParamMap

109

final def extractParamMap(extra: ParamMap): ParamMap

110

}

111

112

class ParamMap extends Serializable {

113

def put[T](param: Param[T], value: T): ParamMap

114

def put(paramPair: ParamPair[_]): ParamMap

115

def put(paramPairs: ParamPair[_]*): ParamMap

116

def get[T](param: Param[T]): Option[T]

117

def apply[T](param: Param[T]): T

118

def contains(param: Param[_]): Boolean

119

def remove[T](param: Param[T]): ParamMap

120

def filter(f: ParamPair[_] => Boolean): ParamMap

121

def copy: ParamMap

122

def toSeq: Seq[ParamPair[_]]

123

def size: Int

124

}

125

126

case class ParamPair[T](param: Param[T], value: T)

127

```

128

129

## Shared Parameters

130

131

### Common Parameter Traits

132

133

```scala { .api }

134

trait HasFeaturesCol extends Params {

135

final val featuresCol: Param[String]

136

final def getFeaturesCol: String

137

def setFeaturesCol(value: String): this.type

138

}

139

140

trait HasLabelCol extends Params {

141

final val labelCol: Param[String]

142

final def getLabelCol: String

143

def setLabelCol(value: String): this.type

144

}

145

146

trait HasPredictionCol extends Params {

147

final val predictionCol: Param[String]

148

final def getPredictionCol: String

149

def setPredictionCol(value: String): this.type

150

}

151

152

trait HasRawPredictionCol extends Params {

153

final val rawPredictionCol: Param[String]

154

final def getRawPredictionCol: String

155

def setRawPredictionCol(value: String): this.type

156

}

157

158

trait HasProbabilityCol extends Params {

159

final val probabilityCol: Param[String]

160

final def getProbabilityCol: String

161

def setProbabilityCol(value: String): this.type

162

}

163

164

trait HasWeightCol extends Params {

165

final val weightCol: Param[String]

166

final def getWeightCol: String

167

def setWeightCol(value: String): this.type

168

}

169

```

170

171

### Algorithm Parameters

172

173

```scala { .api }

174

trait HasRegParam extends Params {

175

final val regParam: DoubleParam

176

final def getRegParam: Double

177

def setRegParam(value: Double): this.type

178

}

179

180

trait HasMaxIter extends Params {

181

final val maxIter: IntParam

182

final def getMaxIter: Int

183

def setMaxIter(value: Int): this.type

184

}

185

186

trait HasTol extends Params {

187

final val tol: DoubleParam

188

final def getTol: Double

189

def setTol(value: Double): this.type

190

}

191

192

trait HasStepSize extends Params {

193

final val stepSize: DoubleParam

194

final def getStepSize: Double

195

def setStepSize(value: Double): this.type

196

}

197

198

trait HasSeed extends Params {

199

final val seed: LongParam

200

final def getSeed: Long

201

def setSeed(value: Long): this.type

202

}

203

204

trait HasElasticNetParam extends Params {

205

final val elasticNetParam: DoubleParam

206

final def getElasticNetParam: Double

207

def setElasticNetParam(value: Double): this.type

208

}

209

210

trait HasFitIntercept extends Params {

211

final val fitIntercept: BooleanParam

212

final def getFitIntercept: Boolean

213

def setFitIntercept(value: Boolean): this.type

214

}

215

216

trait HasStandardization extends Params {

217

final val standardization: BooleanParam

218

final def getStandardization: Boolean

219

def setStandardization(value: Boolean): this.type

220

}

221

222

trait HasThreshold extends Params {

223

final val threshold: DoubleParam

224

final def getThreshold: Double

225

def setThreshold(value: Double): this.type

226

}

227

228

trait HasThresholds extends Params {

229

final val thresholds: DoubleArrayParam

230

final def getThresholds: Array[Double]

231

def setThresholds(value: Array[Double]): this.type

232

}

233

```

234

235

## Supervised Learning Base Classes

236

237

### Predictor Framework

238

239

```scala { .api }

240

abstract class Predictor[FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], M <: PredictionModel[FeaturesType, M]]

241

extends Estimator[M] with PredictorParams {

242

243

def train(dataset: Dataset[_]): M

244

override def fit(dataset: Dataset[_]): M

245

def copy(extra: ParamMap): Learner

246

override def transformSchema(schema: StructType): StructType

247

}

248

249

abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]]

250

extends Model[M] with PredictorParams {

251

252

val numFeatures: Int

253

val parent: Estimator[M]

254

255

def predict(features: FeaturesType): Double

256

def predictRaw(features: FeaturesType): Vector

257

258

override def transform(dataset: Dataset[_]): DataFrame

259

override def transformSchema(schema: StructType): StructType

260

def copy(extra: ParamMap): M

261

}

262

```

263

264

### Classification Framework

265

266

```scala { .api }

267

abstract class Classifier[FeaturesType, E <: Classifier[FeaturesType, E, M], M <: ClassificationModel[FeaturesType, M]]

268

extends Predictor[FeaturesType, E, M] with HasRawPredictionCol with HasProbabilityCol {

269

270

def setRawPredictionCol(value: String): E

271

def setProbabilityCol(value: String): E

272

}

273

274

abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]]

275

extends PredictionModel[FeaturesType, M] with HasRawPredictionCol with HasProbabilityCol {

276

277

val numClasses: Int

278

279

override def predictRaw(features: FeaturesType): Vector

280

def raw2probabilityInPlace(rawPrediction: Vector): Vector

281

def probability2predictionInPlace(probability: Vector): Vector

282

283

def setRawPredictionCol(value: String): M

284

def setProbabilityCol(value: String): M

285

}

286

287

abstract class ProbabilisticClassifier[FeaturesType, E <: ProbabilisticClassifier[FeaturesType, E, M], M <: ProbabilisticClassificationModel[FeaturesType, M]]

288

extends Classifier[FeaturesType, E, M] {

289

}

290

291

abstract class ProbabilisticClassificationModel[FeaturesType, M <: ProbabilisticClassificationModel[FeaturesType, M]]

292

extends ClassificationModel[FeaturesType, M] {

293

294

def predictProbability(features: FeaturesType): Vector

295

}

296

```

297

298

### Regression Framework

299

300

```scala { .api }

301

abstract class Regressor[FeaturesType, E <: Regressor[FeaturesType, E, M], M <: RegressionModel[FeaturesType, M]]

302

extends Predictor[FeaturesType, E, M] {

303

}

304

305

abstract class RegressionModel[FeaturesType, M <: RegressionModel[FeaturesType, M]]

306

extends PredictionModel[FeaturesType, M] {

307

}

308

```

309

310

## Persistence Framework

311

312

### MLWritable and MLReadable

313

314

```scala { .api }

315

trait MLWritable {

316

def write: MLWriter

317

def save(path: String): Unit

318

}

319

320

trait MLReadable[T] {

321

def read: MLReader[T]

322

def load(path: String): T

323

}

324

325

abstract class MLWriter {

326

def save(path: String): Unit

327

def overwrite(): MLWriter

328

def option(key: String, value: String): MLWriter

329

def session(sparkSession: SparkSession): MLWriter

330

}

331

332

abstract class MLReader[T] {

333

def load(path: String): T

334

def option(key: String, value: String): MLReader[T]

335

def session(sparkSession: SparkSession): MLReader[T]

336

}

337

```

338

339

### Default Persistence

340

341

```scala { .api }

342

trait DefaultParamsWritable extends MLWritable {

343

def write: MLWriter = new DefaultParamsWriter(this)

344

}

345

346

trait DefaultParamsReadable[T] extends MLReadable[T] {

347

def read: MLReader[T] = new DefaultParamsReader[T]

348

}

349

350

class DefaultParamsWriter(instance: Params) extends MLWriter {

351

override def save(path: String): Unit

352

}

353

354

class DefaultParamsReader[T] extends MLReader[T] {

355

override def load(path: String): T

356

}

357

```

358

359

## Identifiable Trait

360

361

```scala { .api }

362

trait Identifiable {

363

val uid: String

364

}

365

366

object Identifiable {

367

def randomUID(prefix: String): String

368

}

369

```

370

371

## Usage Examples

372

373

### Creating a Simple Pipeline

374

375

```scala

376

import org.apache.spark.ml.Pipeline

377

import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

378

import org.apache.spark.ml.classification.LogisticRegression

379

380

// Create pipeline stages

381

val assembler = new VectorAssembler()

382

.setInputCols(Array("feature1", "feature2", "feature3"))

383

.setOutputCol("features")

384

385

val scaler = new StandardScaler()

386

.setInputCol("features")

387

.setOutputCol("scaledFeatures")

388

.setWithMean(false)

389

.setWithStd(true)

390

391

val lr = new LogisticRegression()

392

.setFeaturesCol("scaledFeatures")

393

.setLabelCol("label")

394

.setPredictionCol("prediction")

395

396

// Create and fit pipeline

397

val pipeline = new Pipeline().setStages(Array(assembler, scaler, lr))

398

val model = pipeline.fit(trainingData)

399

400

// Make predictions

401

val predictions = model.transform(testData)

402

```

403

404

### Parameter Management

405

406

```scala

407

import org.apache.spark.ml.param.ParamMap

408

import org.apache.spark.ml.classification.LogisticRegression

409

410

val lr = new LogisticRegression()

411

412

// Set parameters individually

413

lr.setMaxIter(100)

414

.setRegParam(0.01)

415

.setElasticNetParam(0.5)

416

417

// Use ParamMap for batch parameter setting

418

val paramMap = ParamMap(

419

lr.maxIter -> 50,

420

lr.regParam -> 0.1,

421

lr.elasticNetParam -> 0.0

422

)

423

424

val lr2 = lr.copy(paramMap)

425

426

// Extract current parameters

427

val currentParams = lr.extractParamMap()

428

println(lr.explainParams())

429

```

430

431

### Custom Parameter Validation

432

433

```scala

434

import org.apache.spark.ml.param.{Param, Params, DoubleParam}

435

import org.apache.spark.ml.util.Identifiable

436

437

class MyEstimator(override val uid: String) extends Estimator[MyModel] with Params {

438

def this() = this(Identifiable.randomUID("myEstimator"))

439

440

// Custom parameter with validation

441

final val customParam: DoubleParam = new DoubleParam(this, "customParam",

442

"custom parameter (must be positive)", (x: Double) => x > 0.0)

443

444

def setCustomParam(value: Double): this.type = set(customParam, value)

445

def getCustomParam: Double = $(customParam)

446

447

setDefault(customParam -> 1.0)

448

449

override def fit(dataset: Dataset[_]): MyModel = {

450

// Training logic using getCustomParam

451

new MyModel(uid)

452

}

453

454

override def copy(extra: ParamMap): MyEstimator = defaultCopy(extra)

455

override def transformSchema(schema: StructType): StructType = schema

456

def params: Array[Param[_]] = Array(customParam)

457

}

458

```