0
# Pipeline and Hyperparameter Tuning
1
2
Framework for building complex ML workflows and automated hyperparameter optimization. MLlib provides pipeline abstractions for chaining transformations and algorithms, along with tools for model selection and hyperparameter tuning.
3
4
## Capabilities
5
6
### Pipeline Framework
7
8
```scala { .api }
9
/**
10
* Pipeline - chains multiple pipeline stages into a single workflow
11
* Enables complex ML workflows by combining transformers and estimators
12
*/
13
class Pipeline extends Estimator[PipelineModel] {
14
def setStages(value: Array[PipelineStage]): this.type
15
def getStages: Array[PipelineStage]
16
}
17
18
/**
19
* PipelineModel - a fitted pipeline containing trained stages
20
* Applies transformations and models in sequence
21
*/
22
class PipelineModel extends Model[PipelineModel] {
23
def stages: Array[Transformer]
24
def numStages: Int
25
}
26
27
/**
28
* PipelineStage - base class for pipeline components
29
* All transformers and estimators inherit from this class
30
*/
31
abstract class PipelineStage extends Params with Logging {
32
def transformSchema(schema: StructType): StructType
33
def copy(extra: ParamMap): PipelineStage
34
}
35
```
36
37
### Cross-Validation
38
39
```scala { .api }
40
/**
41
* CrossValidator - K-fold cross-validation for model selection
42
* Performs cross-validation to select best hyperparameters
43
*/
44
class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorParams with MLWritable {
45
def setEstimator(value: Estimator[_]): this.type
46
def setEstimatorParamMaps(value: Array[ParamMap]): this.type
47
def setEvaluator(value: Evaluator): this.type
48
def setNumFolds(value: Int): this.type
49
def setSeed(value: Long): this.type
50
def setParallelism(value: Int): this.type
51
def setCollectSubModels(value: Boolean): this.type
52
def setFoldCol(value: String): this.type
53
}
54
55
/**
56
* CrossValidatorModel - fitted cross-validation model with best parameters
57
* Contains the best model selected through cross-validation
58
*/
59
class CrossValidatorModel extends Model[CrossValidatorModel] with CrossValidatorParams with MLWritable {
60
def bestModel: Model[_]
61
def avgMetrics: Array[Double]
62
def stdMetrics: Array[Double]
63
def subModels: Array[Array[Model[_]]]
64
def numFolds: Int
65
}
66
```
67
68
### Train-Validation Split
69
70
```scala { .api }
71
/**
72
* TrainValidationSplit - simple train/validation split for model selection
73
* Splits data into training and validation sets for hyperparameter tuning
74
*/
75
class TrainValidationSplit extends Estimator[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {
76
def setEstimator(value: Estimator[_]): this.type
77
def setEstimatorParamMaps(value: Array[ParamMap]): this.type
78
def setEvaluator(value: Evaluator): this.type
79
def setTrainRatio(value: Double): this.type
80
def setSeed(value: Long): this.type
81
def setParallelism(value: Int): this.type
82
def setCollectSubModels(value: Boolean): this.type
83
}
84
85
/**
86
* TrainValidationSplitModel - fitted train-validation model with best parameters
87
* Contains the best model selected through train-validation split
88
*/
89
class TrainValidationSplitModel extends Model[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {
90
def bestModel: Model[_]
91
def validationMetrics: Array[Double]
92
def subModels: Array[Model[_]]
93
def trainRatio: Double
94
}
95
```
96
97
### Parameter Grid Builder
98
99
```scala { .api }
100
/**
101
* ParamGridBuilder - constructs parameter grids for hyperparameter tuning
102
* Builds Cartesian product of parameter values for grid search
103
*/
104
class ParamGridBuilder {
105
def addGrid[T](param: Param[T], values: Array[T]): this.type
106
def addGrid[T](param: Param[T], values: java.util.List[T]): this.type
107
def baseOn(paramMap: ParamMap): this.type
108
def baseOn(paramPairs: ParamPair[_]*): this.type
109
def build(): Array[ParamMap]
110
}
111
```
112
113
### Parameter System
114
115
```scala { .api }
116
/**
117
* ParamMap - map of parameters to their values
118
* Stores parameter-value pairs for ML components
119
*/
120
class ParamMap extends Serializable {
121
def put[T](param: Param[T], value: T): this.type
122
def put(paramPairs: ParamPair[_]*): this.type
123
def get[T](param: Param[T]): Option[T]
124
def apply[T](param: Param[T]): T
125
def contains(param: Param[_]): Boolean
126
def remove[T](param: Param[T]): this.type
127
def copy: ParamMap
128
def toSeq: Seq[ParamPair[_]]
129
def size: Int
130
}
131
132
/**
133
* Param - parameter definition with type information
134
* Defines a parameter for ML components with validation
135
*/
136
class Param[T] extends Serializable {
137
def parent: String
138
def name: String
139
def doc: String
140
def isValid(value: T): Boolean
141
def toString: String
142
}
143
144
/**
145
* ParamPair - parameter-value pair
146
* Associates a parameter with its value
147
*/
148
case class ParamPair[T](param: Param[T], value: T)
149
```
150
151
## Usage Examples
152
153
### Basic Pipeline
154
155
```scala
156
import org.apache.spark.ml.{Pipeline, PipelineModel}
157
import org.apache.spark.ml.classification.LogisticRegression
158
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
159
160
// Configure ML pipeline stages
161
val tokenizer = new Tokenizer()
162
.setInputCol("text")
163
.setOutputCol("words")
164
165
val hashingTF = new HashingTF()
166
.setNumFeatures(1000)
167
.setInputCol(tokenizer.getOutputCol)
168
.setOutputCol("features")
169
170
val lr = new LogisticRegression()
171
.setMaxIter(10)
172
.setRegParam(0.001)
173
174
// Create pipeline
175
val pipeline = new Pipeline()
176
.setStages(Array(tokenizer, hashingTF, lr))
177
178
// Fit pipeline
179
val model = pipeline.fit(training)
180
181
// Make predictions
182
val predictions = model.transform(test)
183
```
184
185
### Cross-Validation with Parameter Grid
186
187
```scala
188
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
189
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
190
191
// Create parameter grid
192
val paramGrid = new ParamGridBuilder()
193
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
194
.addGrid(lr.regParam, Array(0.1, 0.01))
195
.addGrid(lr.maxIter, Array(10, 20))
196
.build()
197
198
// Create evaluator
199
val evaluator = new BinaryClassificationEvaluator()
200
.setLabelCol("label")
201
.setRawPredictionCol("rawPrediction")
202
.setMetricName("areaUnderROC")
203
204
// Create cross-validator
205
val cv = new CrossValidator()
206
.setEstimator(pipeline)
207
.setEvaluator(evaluator)
208
.setEstimatorParamMaps(paramGrid)
209
.setNumFolds(3)
210
.setParallelism(2)
211
212
// Run cross-validation
213
val cvModel = cv.fit(training)
214
215
// Get best model
216
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
217
218
// Make predictions with best model
219
val predictions = bestModel.transform(test)
220
221
// Access cross-validation metrics
222
println(s"Average metrics: ${cvModel.avgMetrics.mkString(", ")}")
223
println(s"Best metric: ${cvModel.avgMetrics.max}")
224
```
225
226
### Train-Validation Split
227
228
```scala
229
import org.apache.spark.ml.tuning.TrainValidationSplit
230
231
val trainValidationSplit = new TrainValidationSplit()
232
.setEstimator(pipeline)
233
.setEvaluator(evaluator)
234
.setEstimatorParamMaps(paramGrid)
235
.setTrainRatio(0.8)
236
.setParallelism(2)
237
238
val model = trainValidationSplit.fit(training)
239
240
println(s"Validation metrics: ${model.validationMetrics.mkString(", ")}")
241
println(s"Best validation metric: ${model.validationMetrics.max}")
242
```
243
244
### Complex Pipeline with Feature Engineering
245
246
```scala
247
import org.apache.spark.ml.feature._
248
249
// Text processing stages
250
val tokenizer = new Tokenizer()
251
.setInputCol("text")
252
.setOutputCol("words")
253
254
val stopWordsRemover = new StopWordsRemover()
255
.setInputCol("words")
256
.setOutputCol("filtered")
257
258
val word2Vec = new Word2Vec()
259
.setInputCol("filtered")
260
.setOutputCol("textFeatures")
261
.setVectorSize(100)
262
.setMinCount(0)
263
264
// Categorical feature processing
265
val stringIndexer = new StringIndexer()
266
.setInputCol("category")
267
.setOutputCol("categoryIndex")
268
269
val oneHotEncoder = new OneHotEncoder()
270
.setInputCols(Array("categoryIndex"))
271
.setOutputCols(Array("categoryVec"))
272
273
// Numerical feature processing
274
val scaler = new StandardScaler()
275
.setInputCol("numericFeatures")
276
.setOutputCol("scaledNumeric")
277
.setWithMean(true)
278
.setWithStd(true)
279
280
// Feature assembly
281
val assembler = new VectorAssembler()
282
.setInputCols(Array("textFeatures", "categoryVec", "scaledNumeric"))
283
.setOutputCol("features")
284
285
// Classifier
286
val rf = new RandomForestClassifier()
287
.setLabelCol("label")
288
.setFeaturesCol("features")
289
290
// Complete pipeline
291
val complexPipeline = new Pipeline()
292
.setStages(Array(
293
tokenizer, stopWordsRemover, word2Vec,
294
stringIndexer, oneHotEncoder,
295
scaler, assembler, rf
296
))
297
298
// Parameter grid for complex pipeline
299
val complexParamGrid = new ParamGridBuilder()
300
.addGrid(word2Vec.vectorSize, Array(50, 100, 200))
301
.addGrid(rf.numTrees, Array(10, 20, 30))
302
.addGrid(rf.maxDepth, Array(5, 10))
303
.build()
304
305
val complexCV = new CrossValidator()
306
.setEstimator(complexPipeline)
307
.setEvaluator(evaluator)
308
.setEstimatorParamMaps(complexParamGrid)
309
.setNumFolds(3)
310
311
val complexModel = complexCV.fit(training)
312
```
313
314
### Model Persistence
315
316
```scala
317
// Save pipeline model
318
model.write.overwrite().save("path/to/model")
319
320
// Load pipeline model
321
val loadedModel = PipelineModel.load("path/to/model")
322
323
// Save cross-validation model
324
cvModel.write.overwrite().save("path/to/cv-model")
325
326
// Load cross-validation model
327
val loadedCVModel = CrossValidatorModel.load("path/to/cv-model")
328
```
329
330
### Accessing Pipeline Stages
331
332
```scala
333
// Get stages from fitted pipeline
334
val pipelineModel = model.asInstanceOf[PipelineModel]
335
val stages = pipelineModel.stages
336
337
// Access specific stage (e.g., the trained classifier)
338
val trainedClassifier = stages.last.asInstanceOf[LogisticRegressionModel]
339
println(s"Coefficients: ${trainedClassifier.coefficients}")
340
341
// Access feature transformer
342
val hashingTFModel = stages(1).asInstanceOf[HashingTF]
343
println(s"Number of features: ${hashingTFModel.getNumFeatures}")
344
```
345
346
### Custom Parameter Validation
347
348
```scala
349
import org.apache.spark.ml.param._
350
351
// Custom parameter with validation
352
class CustomEstimator extends Estimator[CustomModel] {
353
final val customParam: DoubleParam = new DoubleParam(this, "customParam",
354
"custom parameter", (value: Double) => value > 0.0)
355
356
def setCustomParam(value: Double): this.type = set(customParam, value)
357
def getCustomParam: Double = $(customParam)
358
359
// Set default value
360
setDefault(customParam -> 1.0)
361
}
362
```
363
364
## Best Practices
365
366
### Pipeline Design
367
1. **Modular stages**: Break complex transformations into separate pipeline stages
368
2. **Parameter organization**: Use consistent parameter naming across stages
369
3. **Schema validation**: Ensure schema compatibility between stages
370
4. **Error handling**: Add validation for input data formats
371
372
### Hyperparameter Tuning
373
1. **Parameter ranges**: Use logarithmic ranges for regularization parameters
374
2. **Cross-validation folds**: Use 5-10 folds for reliable estimates
375
3. **Parallelism**: Set parallelism based on available cores
376
4. **Evaluation metrics**: Choose metrics appropriate for the problem type
377
378
### Performance Optimization
379
1. **Caching**: Cache intermediate datasets when reused
380
2. **Checkpointing**: Use checkpointing for long pipelines
381
3. **Resource allocation**: Tune Spark configuration for ML workloads
382
4. **Feature selection**: Remove irrelevant features to improve performance