or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdpipeline.mdrecommendation.mdregression.mdstatistics.md

pipeline.mddocs/

0

# Pipeline and Hyperparameter Tuning

1

2

Framework for building complex ML workflows and automated hyperparameter optimization. MLlib provides pipeline abstractions for chaining transformations and algorithms, along with tools for model selection and hyperparameter tuning.

3

4

## Capabilities

5

6

### Pipeline Framework

7

8

```scala { .api }

9

/**

10

* Pipeline - chains multiple pipeline stages into a single workflow

11

* Enables complex ML workflows by combining transformers and estimators

12

*/

13

class Pipeline extends Estimator[PipelineModel] {

14

def setStages(value: Array[PipelineStage]): this.type

15

def getStages: Array[PipelineStage]

16

}

17

18

/**

19

* PipelineModel - a fitted pipeline containing trained stages

20

* Applies transformations and models in sequence

21

*/

22

class PipelineModel extends Model[PipelineModel] {

23

def stages: Array[Transformer]

24

def numStages: Int

25

}

26

27

/**

28

* PipelineStage - base class for pipeline components

29

* All transformers and estimators inherit from this class

30

*/

31

abstract class PipelineStage extends Params with Logging {

32

def transformSchema(schema: StructType): StructType

33

def copy(extra: ParamMap): PipelineStage

34

}

35

```

36

37

### Cross-Validation

38

39

```scala { .api }

40

/**

41

* CrossValidator - K-fold cross-validation for model selection

42

* Performs cross-validation to select best hyperparameters

43

*/

44

class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorParams with MLWritable {

45

def setEstimator(value: Estimator[_]): this.type

46

def setEstimatorParamMaps(value: Array[ParamMap]): this.type

47

def setEvaluator(value: Evaluator): this.type

48

def setNumFolds(value: Int): this.type

49

def setSeed(value: Long): this.type

50

def setParallelism(value: Int): this.type

51

def setCollectSubModels(value: Boolean): this.type

52

def setFoldCol(value: String): this.type

53

}

54

55

/**

56

* CrossValidatorModel - fitted cross-validation model with best parameters

57

* Contains the best model selected through cross-validation

58

*/

59

class CrossValidatorModel extends Model[CrossValidatorModel] with CrossValidatorParams with MLWritable {

60

def bestModel: Model[_]

61

def avgMetrics: Array[Double]

62

def stdMetrics: Array[Double]

63

def subModels: Array[Array[Model[_]]]

64

def numFolds: Int

65

}

66

```

67

68

### Train-Validation Split

69

70

```scala { .api }

71

/**

72

* TrainValidationSplit - simple train/validation split for model selection

73

* Splits data into training and validation sets for hyperparameter tuning

74

*/

75

class TrainValidationSplit extends Estimator[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {

76

def setEstimator(value: Estimator[_]): this.type

77

def setEstimatorParamMaps(value: Array[ParamMap]): this.type

78

def setEvaluator(value: Evaluator): this.type

79

def setTrainRatio(value: Double): this.type

80

def setSeed(value: Long): this.type

81

def setParallelism(value: Int): this.type

82

def setCollectSubModels(value: Boolean): this.type

83

}

84

85

/**

86

* TrainValidationSplitModel - fitted train-validation model with best parameters

87

* Contains the best model selected through train-validation split

88

*/

89

class TrainValidationSplitModel extends Model[TrainValidationSplitModel] with TrainValidationSplitParams with MLWritable {

90

def bestModel: Model[_]

91

def validationMetrics: Array[Double]

92

def subModels: Array[Model[_]]

93

def trainRatio: Double

94

}

95

```

96

97

### Parameter Grid Builder

98

99

```scala { .api }

100

/**

101

* ParamGridBuilder - constructs parameter grids for hyperparameter tuning

102

* Builds Cartesian product of parameter values for grid search

103

*/

104

class ParamGridBuilder {

105

def addGrid[T](param: Param[T], values: Array[T]): this.type

106

def addGrid[T](param: Param[T], values: java.util.List[T]): this.type

107

def baseOn(paramMap: ParamMap): this.type

108

def baseOn(paramPairs: ParamPair[_]*): this.type

109

def build(): Array[ParamMap]

110

}

111

```

112

113

### Parameter System

114

115

```scala { .api }

116

/**

117

* ParamMap - map of parameters to their values

118

* Stores parameter-value pairs for ML components

119

*/

120

class ParamMap extends Serializable {

121

def put[T](param: Param[T], value: T): this.type

122

def put(paramPairs: ParamPair[_]*): this.type

123

def get[T](param: Param[T]): Option[T]

124

def apply[T](param: Param[T]): T

125

def contains(param: Param[_]): Boolean

126

def remove[T](param: Param[T]): this.type

127

def copy: ParamMap

128

def toSeq: Seq[ParamPair[_]]

129

def size: Int

130

}

131

132

/**

133

* Param - parameter definition with type information

134

* Defines a parameter for ML components with validation

135

*/

136

class Param[T] extends Serializable {

137

def parent: String

138

def name: String

139

def doc: String

140

def isValid(value: T): Boolean

141

def toString: String

142

}

143

144

/**

145

* ParamPair - parameter-value pair

146

* Associates a parameter with its value

147

*/

148

case class ParamPair[T](param: Param[T], value: T)

149

```

150

151

## Usage Examples

152

153

### Basic Pipeline

154

155

```scala

156

import org.apache.spark.ml.{Pipeline, PipelineModel}

157

import org.apache.spark.ml.classification.LogisticRegression

158

import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

159

160

// Configure ML pipeline stages

161

val tokenizer = new Tokenizer()

162

.setInputCol("text")

163

.setOutputCol("words")

164

165

val hashingTF = new HashingTF()

166

.setNumFeatures(1000)

167

.setInputCol(tokenizer.getOutputCol)

168

.setOutputCol("features")

169

170

val lr = new LogisticRegression()

171

.setMaxIter(10)

172

.setRegParam(0.001)

173

174

// Create pipeline

175

val pipeline = new Pipeline()

176

.setStages(Array(tokenizer, hashingTF, lr))

177

178

// Fit pipeline

179

val model = pipeline.fit(training)

180

181

// Make predictions

182

val predictions = model.transform(test)

183

```

184

185

### Cross-Validation with Parameter Grid

186

187

```scala

188

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

189

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

190

191

// Create parameter grid

192

val paramGrid = new ParamGridBuilder()

193

.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))

194

.addGrid(lr.regParam, Array(0.1, 0.01))

195

.addGrid(lr.maxIter, Array(10, 20))

196

.build()

197

198

// Create evaluator

199

val evaluator = new BinaryClassificationEvaluator()

200

.setLabelCol("label")

201

.setRawPredictionCol("rawPrediction")

202

.setMetricName("areaUnderROC")

203

204

// Create cross-validator

205

val cv = new CrossValidator()

206

.setEstimator(pipeline)

207

.setEvaluator(evaluator)

208

.setEstimatorParamMaps(paramGrid)

209

.setNumFolds(3)

210

.setParallelism(2)

211

212

// Run cross-validation

213

val cvModel = cv.fit(training)

214

215

// Get best model

216

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]

217

218

// Make predictions with best model

219

val predictions = bestModel.transform(test)

220

221

// Access cross-validation metrics

222

println(s"Average metrics: ${cvModel.avgMetrics.mkString(", ")}")

223

println(s"Best metric: ${cvModel.avgMetrics.max}")

224

```

225

226

### Train-Validation Split

227

228

```scala

229

import org.apache.spark.ml.tuning.TrainValidationSplit

230

231

val trainValidationSplit = new TrainValidationSplit()

232

.setEstimator(pipeline)

233

.setEvaluator(evaluator)

234

.setEstimatorParamMaps(paramGrid)

235

.setTrainRatio(0.8)

236

.setParallelism(2)

237

238

val model = trainValidationSplit.fit(training)

239

240

println(s"Validation metrics: ${model.validationMetrics.mkString(", ")}")

241

println(s"Best validation metric: ${model.validationMetrics.max}")

242

```

243

244

### Complex Pipeline with Feature Engineering

245

246

```scala

247

import org.apache.spark.ml.feature._

248

249

// Text processing stages

250

val tokenizer = new Tokenizer()

251

.setInputCol("text")

252

.setOutputCol("words")

253

254

val stopWordsRemover = new StopWordsRemover()

255

.setInputCol("words")

256

.setOutputCol("filtered")

257

258

val word2Vec = new Word2Vec()

259

.setInputCol("filtered")

260

.setOutputCol("textFeatures")

261

.setVectorSize(100)

262

.setMinCount(0)

263

264

// Categorical feature processing

265

val stringIndexer = new StringIndexer()

266

.setInputCol("category")

267

.setOutputCol("categoryIndex")

268

269

val oneHotEncoder = new OneHotEncoder()

270

.setInputCols(Array("categoryIndex"))

271

.setOutputCols(Array("categoryVec"))

272

273

// Numerical feature processing

274

val scaler = new StandardScaler()

275

.setInputCol("numericFeatures")

276

.setOutputCol("scaledNumeric")

277

.setWithMean(true)

278

.setWithStd(true)

279

280

// Feature assembly

281

val assembler = new VectorAssembler()

282

.setInputCols(Array("textFeatures", "categoryVec", "scaledNumeric"))

283

.setOutputCol("features")

284

285

// Classifier

286

val rf = new RandomForestClassifier()

287

.setLabelCol("label")

288

.setFeaturesCol("features")

289

290

// Complete pipeline

291

val complexPipeline = new Pipeline()

292

.setStages(Array(

293

tokenizer, stopWordsRemover, word2Vec,

294

stringIndexer, oneHotEncoder,

295

scaler, assembler, rf

296

))

297

298

// Parameter grid for complex pipeline

299

val complexParamGrid = new ParamGridBuilder()

300

.addGrid(word2Vec.vectorSize, Array(50, 100, 200))

301

.addGrid(rf.numTrees, Array(10, 20, 30))

302

.addGrid(rf.maxDepth, Array(5, 10))

303

.build()

304

305

val complexCV = new CrossValidator()

306

.setEstimator(complexPipeline)

307

.setEvaluator(evaluator)

308

.setEstimatorParamMaps(complexParamGrid)

309

.setNumFolds(3)

310

311

val complexModel = complexCV.fit(training)

312

```

313

314

### Model Persistence

315

316

```scala

317

// Save pipeline model

318

model.write.overwrite().save("path/to/model")

319

320

// Load pipeline model

321

val loadedModel = PipelineModel.load("path/to/model")

322

323

// Save cross-validation model

324

cvModel.write.overwrite().save("path/to/cv-model")

325

326

// Load cross-validation model

327

val loadedCVModel = CrossValidatorModel.load("path/to/cv-model")

328

```

329

330

### Accessing Pipeline Stages

331

332

```scala

333

// Get stages from fitted pipeline

334

val pipelineModel = model.asInstanceOf[PipelineModel]

335

val stages = pipelineModel.stages

336

337

// Access specific stage (e.g., the trained classifier)

338

val trainedClassifier = stages.last.asInstanceOf[LogisticRegressionModel]

339

println(s"Coefficients: ${trainedClassifier.coefficients}")

340

341

// Access feature transformer

342

val hashingTFModel = stages(1).asInstanceOf[HashingTF]

343

println(s"Number of features: ${hashingTFModel.getNumFeatures}")

344

```

345

346

### Custom Parameter Validation

347

348

```scala

349

import org.apache.spark.ml.param._

350

351

// Custom parameter with validation

352

class CustomEstimator extends Estimator[CustomModel] {

353

final val customParam: DoubleParam = new DoubleParam(this, "customParam",

354

"custom parameter", (value: Double) => value > 0.0)

355

356

def setCustomParam(value: Double): this.type = set(customParam, value)

357

def getCustomParam: Double = $(customParam)

358

359

// Set default value

360

setDefault(customParam -> 1.0)

361

}

362

```

363

364

## Best Practices

365

366

### Pipeline Design

367

1. **Modular stages**: Break complex transformations into separate pipeline stages

368

2. **Parameter organization**: Use consistent parameter naming across stages

369

3. **Schema validation**: Ensure schema compatibility between stages

370

4. **Error handling**: Add validation for input data formats

371

372

### Hyperparameter Tuning

373

1. **Parameter ranges**: Use logarithmic ranges for regularization parameters

374

2. **Cross-validation folds**: Use 5-10 folds for reliable estimates

375

3. **Parallelism**: Set parallelism based on available cores

376

4. **Evaluation metrics**: Choose metrics appropriate for the problem type

377

378

### Performance Optimization

379

1. **Caching**: Cache intermediate datasets when reused

380

2. **Checkpointing**: Use checkpointing for long pipelines

381

3. **Resource allocation**: Tune Spark configuration for ML workloads

382

4. **Feature selection**: Remove irrelevant features to improve performance