0
# Core Framework
1
2
The MLlib core framework provides the foundational abstractions for building machine learning pipelines using the DataFrame-based API. It follows the Estimator-Transformer pattern with type-safe parameter management.
3
4
## Pipeline Architecture
5
6
### Base Classes
7
8
```scala { .api }
9
abstract class PipelineStage extends Params with Logging {
10
val uid: String
11
def copy(extra: ParamMap): PipelineStage
12
def transformSchema(schema: StructType): StructType
13
}
14
15
abstract class Estimator[M <: Model[M]] extends PipelineStage {
16
def fit(dataset: Dataset[_]): M
17
def fit(dataset: Dataset[_], paramMap: ParamMap): M
18
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[M]
19
}
20
21
abstract class Transformer extends PipelineStage {
22
def transform(dataset: Dataset[_]): DataFrame
23
}
24
25
abstract class Model[M <: Model[M]] extends Transformer {
26
val parent: Estimator[M]
27
def copy(extra: ParamMap): M
28
}
29
```
30
31
### Pipeline Components
32
33
```scala { .api }
34
class Pipeline(val uid: String) extends Estimator[PipelineModel] with MLWritable {
35
def this() = this(Identifiable.randomUID("pipeline"))
36
37
final val stages: Param[Array[PipelineStage]]
38
39
def setStages(value: Array[PipelineStage]): Pipeline
40
def getStages: Array[PipelineStage]
41
def fit(dataset: Dataset[_]): PipelineModel
42
def copy(extra: ParamMap): Pipeline
43
def transformSchema(schema: StructType): StructType
44
def write: MLWriter
45
}
46
47
class PipelineModel(override val uid: String, val stages: Array[Transformer])
48
extends Model[PipelineModel] with MLWritable with Logging {
49
50
override def transform(dataset: Dataset[_]): DataFrame
51
override def transformSchema(schema: StructType): StructType
52
override def copy(extra: ParamMap): PipelineModel
53
def write: MLWriter
54
}
55
```
56
57
## Parameter System
58
59
### Parameter Definition
60
61
```scala { .api }
62
trait Param[T] extends Serializable {
63
def name: String
64
def doc: String
65
def parent: String
66
def defaultValue: Option[T]
67
def isValid(value: T): Boolean
68
def encode(value: T): String
69
def decode(encoded: String): T
70
}
71
72
class IntParam(parent: Params, name: String, doc: String, isValid: Int => Boolean)
73
extends Param[Int](parent, name, doc)
74
75
class DoubleParam(parent: Params, name: String, doc: String, isValid: Double => Boolean)
76
extends Param[Double](parent, name, doc)
77
78
class BooleanParam(parent: Params, name: String, doc: String)
79
extends Param[Boolean](parent, name, doc)
80
81
class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array[String] => Boolean)
82
extends Param[Array[String]](parent, name, doc)
83
```
84
85
### Parameter Management
86
87
```scala { .api }
88
trait Params extends Identifiable with Serializable {
89
def copy(extra: ParamMap): Params
90
91
final def set(param: Param[_], value: Any): Params.this.type
92
final def set[T](param: Param[T], value: T): Params.this.type
93
final def set(paramPair: ParamPair[_]): Params.this.type
94
final def setDefault(paramPairs: ParamPair[_]*): Params.this.type
95
final def setDefault[T](param: Param[T], value: T): Params.this.type
96
97
final def get[T](param: Param[T]): Option[T]
98
final def getOrDefault[T](param: Param[T]): T
99
final def $(param: Param[_]): Any
100
final def isSet(param: Param[_]): Boolean
101
final def isDefined(param: Param[_]): Boolean
102
final def hasDefault[T](param: Param[T]): Boolean
103
final def getDefault[T](param: Param[T]): Option[T]
104
105
def params: Array[Param[_]]
106
def explainParam(param: Param[_]): String
107
def explainParams(): String
108
final def extractParamMap(): ParamMap
109
final def extractParamMap(extra: ParamMap): ParamMap
110
}
111
112
class ParamMap extends Serializable {
113
def put[T](param: Param[T], value: T): ParamMap
114
def put(paramPair: ParamPair[_]): ParamMap
115
def put(paramPairs: ParamPair[_]*): ParamMap
116
def get[T](param: Param[T]): Option[T]
117
def apply[T](param: Param[T]): T
118
def contains(param: Param[_]): Boolean
119
def remove[T](param: Param[T]): ParamMap
120
def filter(f: ParamPair[_] => Boolean): ParamMap
121
def copy: ParamMap
122
def toSeq: Seq[ParamPair[_]]
123
def size: Int
124
}
125
126
case class ParamPair[T](param: Param[T], value: T)
127
```
128
129
## Shared Parameters
130
131
### Common Parameter Traits
132
133
```scala { .api }
134
trait HasFeaturesCol extends Params {
135
final val featuresCol: Param[String]
136
final def getFeaturesCol: String
137
def setFeaturesCol(value: String): this.type
138
}
139
140
trait HasLabelCol extends Params {
141
final val labelCol: Param[String]
142
final def getLabelCol: String
143
def setLabelCol(value: String): this.type
144
}
145
146
trait HasPredictionCol extends Params {
147
final val predictionCol: Param[String]
148
final def getPredictionCol: String
149
def setPredictionCol(value: String): this.type
150
}
151
152
trait HasRawPredictionCol extends Params {
153
final val rawPredictionCol: Param[String]
154
final def getRawPredictionCol: String
155
def setRawPredictionCol(value: String): this.type
156
}
157
158
trait HasProbabilityCol extends Params {
159
final val probabilityCol: Param[String]
160
final def getProbabilityCol: String
161
def setProbabilityCol(value: String): this.type
162
}
163
164
trait HasWeightCol extends Params {
165
final val weightCol: Param[String]
166
final def getWeightCol: String
167
def setWeightCol(value: String): this.type
168
}
169
```
170
171
### Algorithm Parameters
172
173
```scala { .api }
174
trait HasRegParam extends Params {
175
final val regParam: DoubleParam
176
final def getRegParam: Double
177
def setRegParam(value: Double): this.type
178
}
179
180
trait HasMaxIter extends Params {
181
final val maxIter: IntParam
182
final def getMaxIter: Int
183
def setMaxIter(value: Int): this.type
184
}
185
186
trait HasTol extends Params {
187
final val tol: DoubleParam
188
final def getTol: Double
189
def setTol(value: Double): this.type
190
}
191
192
trait HasStepSize extends Params {
193
final val stepSize: DoubleParam
194
final def getStepSize: Double
195
def setStepSize(value: Double): this.type
196
}
197
198
trait HasSeed extends Params {
199
final val seed: LongParam
200
final def getSeed: Long
201
def setSeed(value: Long): this.type
202
}
203
204
trait HasElasticNetParam extends Params {
205
final val elasticNetParam: DoubleParam
206
final def getElasticNetParam: Double
207
def setElasticNetParam(value: Double): this.type
208
}
209
210
trait HasFitIntercept extends Params {
211
final val fitIntercept: BooleanParam
212
final def getFitIntercept: Boolean
213
def setFitIntercept(value: Boolean): this.type
214
}
215
216
trait HasStandardization extends Params {
217
final val standardization: BooleanParam
218
final def getStandardization: Boolean
219
def setStandardization(value: Boolean): this.type
220
}
221
222
trait HasThreshold extends Params {
223
final val threshold: DoubleParam
224
final def getThreshold: Double
225
def setThreshold(value: Double): this.type
226
}
227
228
trait HasThresholds extends Params {
229
final val thresholds: DoubleArrayParam
230
final def getThresholds: Array[Double]
231
def setThresholds(value: Array[Double]): this.type
232
}
233
```
234
235
## Supervised Learning Base Classes
236
237
### Predictor Framework
238
239
```scala { .api }
240
abstract class Predictor[FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], M <: PredictionModel[FeaturesType, M]]
241
extends Estimator[M] with PredictorParams {
242
243
def train(dataset: Dataset[_]): M
244
override def fit(dataset: Dataset[_]): M
245
def copy(extra: ParamMap): Learner
246
override def transformSchema(schema: StructType): StructType
247
}
248
249
abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]]
250
extends Model[M] with PredictorParams {
251
252
val numFeatures: Int
253
val parent: Estimator[M]
254
255
def predict(features: FeaturesType): Double
256
def predictRaw(features: FeaturesType): Vector
257
258
override def transform(dataset: Dataset[_]): DataFrame
259
override def transformSchema(schema: StructType): StructType
260
def copy(extra: ParamMap): M
261
}
262
```
263
264
### Classification Framework
265
266
```scala { .api }
267
abstract class Classifier[FeaturesType, E <: Classifier[FeaturesType, E, M], M <: ClassificationModel[FeaturesType, M]]
268
extends Predictor[FeaturesType, E, M] with HasRawPredictionCol with HasProbabilityCol {
269
270
def setRawPredictionCol(value: String): E
271
def setProbabilityCol(value: String): E
272
}
273
274
abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]]
275
extends PredictionModel[FeaturesType, M] with HasRawPredictionCol with HasProbabilityCol {
276
277
val numClasses: Int
278
279
override def predictRaw(features: FeaturesType): Vector
280
def raw2probabilityInPlace(rawPrediction: Vector): Vector
281
def probability2predictionInPlace(probability: Vector): Vector
282
283
def setRawPredictionCol(value: String): M
284
def setProbabilityCol(value: String): M
285
}
286
287
abstract class ProbabilisticClassifier[FeaturesType, E <: ProbabilisticClassifier[FeaturesType, E, M], M <: ProbabilisticClassificationModel[FeaturesType, M]]
288
extends Classifier[FeaturesType, E, M] {
289
}
290
291
abstract class ProbabilisticClassificationModel[FeaturesType, M <: ProbabilisticClassificationModel[FeaturesType, M]]
292
extends ClassificationModel[FeaturesType, M] {
293
294
def predictProbability(features: FeaturesType): Vector
295
}
296
```
297
298
### Regression Framework
299
300
```scala { .api }
301
abstract class Regressor[FeaturesType, E <: Regressor[FeaturesType, E, M], M <: RegressionModel[FeaturesType, M]]
302
extends Predictor[FeaturesType, E, M] {
303
}
304
305
abstract class RegressionModel[FeaturesType, M <: RegressionModel[FeaturesType, M]]
306
extends PredictionModel[FeaturesType, M] {
307
}
308
```
309
310
## Persistence Framework
311
312
### MLWritable and MLReadable
313
314
```scala { .api }
315
trait MLWritable {
316
def write: MLWriter
317
def save(path: String): Unit
318
}
319
320
trait MLReadable[T] {
321
def read: MLReader[T]
322
def load(path: String): T
323
}
324
325
abstract class MLWriter {
326
def save(path: String): Unit
327
def overwrite(): MLWriter
328
def option(key: String, value: String): MLWriter
329
def session(sparkSession: SparkSession): MLWriter
330
}
331
332
abstract class MLReader[T] {
333
def load(path: String): T
334
def option(key: String, value: String): MLReader[T]
335
def session(sparkSession: SparkSession): MLReader[T]
336
}
337
```
338
339
### Default Persistence
340
341
```scala { .api }
342
trait DefaultParamsWritable extends MLWritable {
343
def write: MLWriter = new DefaultParamsWriter(this)
344
}
345
346
trait DefaultParamsReadable[T] extends MLReadable[T] {
347
def read: MLReader[T] = new DefaultParamsReader[T]
348
}
349
350
class DefaultParamsWriter(instance: Params) extends MLWriter {
351
override def save(path: String): Unit
352
}
353
354
class DefaultParamsReader[T] extends MLReader[T] {
355
override def load(path: String): T
356
}
357
```
358
359
## Identifiable Trait
360
361
```scala { .api }
362
trait Identifiable {
363
val uid: String
364
}
365
366
object Identifiable {
367
def randomUID(prefix: String): String
368
}
369
```
370
371
## Usage Examples
372
373
### Creating a Simple Pipeline
374
375
```scala
376
import org.apache.spark.ml.Pipeline
377
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
378
import org.apache.spark.ml.classification.LogisticRegression
379
380
// Create pipeline stages
381
val assembler = new VectorAssembler()
382
.setInputCols(Array("feature1", "feature2", "feature3"))
383
.setOutputCol("features")
384
385
val scaler = new StandardScaler()
386
.setInputCol("features")
387
.setOutputCol("scaledFeatures")
388
.setWithMean(false)
389
.setWithStd(true)
390
391
val lr = new LogisticRegression()
392
.setFeaturesCol("scaledFeatures")
393
.setLabelCol("label")
394
.setPredictionCol("prediction")
395
396
// Create and fit pipeline
397
val pipeline = new Pipeline().setStages(Array(assembler, scaler, lr))
398
val model = pipeline.fit(trainingData)
399
400
// Make predictions
401
val predictions = model.transform(testData)
402
```
403
404
### Parameter Management
405
406
```scala
407
import org.apache.spark.ml.param.ParamMap
408
import org.apache.spark.ml.classification.LogisticRegression
409
410
val lr = new LogisticRegression()
411
412
// Set parameters individually
413
lr.setMaxIter(100)
414
.setRegParam(0.01)
415
.setElasticNetParam(0.5)
416
417
// Use ParamMap for batch parameter setting
418
val paramMap = ParamMap(
419
lr.maxIter -> 50,
420
lr.regParam -> 0.1,
421
lr.elasticNetParam -> 0.0
422
)
423
424
val lr2 = lr.copy(paramMap)
425
426
// Extract current parameters
427
val currentParams = lr.extractParamMap()
428
println(lr.explainParams())
429
```
430
431
### Custom Parameter Validation
432
433
```scala
434
import org.apache.spark.ml.param.{Param, Params, DoubleParam}
435
import org.apache.spark.ml.util.Identifiable
436
437
class MyEstimator(override val uid: String) extends Estimator[MyModel] with Params {
438
def this() = this(Identifiable.randomUID("myEstimator"))
439
440
// Custom parameter with validation
441
final val customParam: DoubleParam = new DoubleParam(this, "customParam",
442
"custom parameter (must be positive)", (x: Double) => x > 0.0)
443
444
def setCustomParam(value: Double): this.type = set(customParam, value)
445
def getCustomParam: Double = $(customParam)
446
447
setDefault(customParam -> 1.0)
448
449
override def fit(dataset: Dataset[_]): MyModel = {
450
// Training logic using getCustomParam
451
new MyModel(uid)
452
}
453
454
override def copy(extra: ParamMap): MyEstimator = defaultCopy(extra)
455
override def transformSchema(schema: StructType): StructType = schema
456
def params: Array[Param[_]] = Array(customParam)
457
}
458
```