0
# Pipeline Components
1
2
Core abstractions and utilities for building composable machine learning workflows with automated parameter management, model persistence, and metadata handling.
3
4
## Capabilities
5
6
### Core Pipeline Abstractions
7
8
Fundamental building blocks for creating machine learning pipelines with type-safe composition and parameter management.
9
10
```scala { .api }
11
/**
12
* Base pipeline stage that can be part of an ML pipeline
13
*/
14
abstract class PipelineStage extends Params with Logging {
15
def uid: String
16
def transformSchema(schema: StructType): StructType
17
def copy(extra: ParamMap): PipelineStage
18
}
19
20
/**
21
* Abstract estimator that fits models to data
22
*/
23
abstract class Estimator[M <: Model[M]] extends PipelineStage {
24
def fit(dataset: Dataset[_]): M
25
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]
26
def fit(dataset: Dataset[_], paramMap: ParamMap): M
27
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M
28
}
29
30
/**
31
* Abstract transformer that transforms datasets
32
*/
33
abstract class Transformer extends PipelineStage {
34
def transform(dataset: Dataset[_]): DataFrame
35
def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame
36
def transform(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame
37
}
38
39
/**
40
* Abstract model that extends transformer (fitted estimators)
41
*/
42
abstract class Model[M <: Model[M]] extends Transformer {
43
def parent: Estimator[M]
44
def hasParent: Boolean
45
def setParent(parent: Estimator[M]): M
46
}
47
```
48
49
### Pipeline Construction
50
51
Composable pipeline classes for chaining multiple ML stages together.
52
53
```scala { .api }
54
/**
55
* ML Pipeline for chaining multiple pipeline stages
56
*/
57
class Pipeline extends Estimator[PipelineModel] {
58
def setStages(value: Array[_ <: PipelineStage]): this.type
59
def getStages: Array[PipelineStage]
60
}
61
62
class PipelineModel extends Model[PipelineModel] {
63
def stages: Array[Transformer]
64
override def transform(dataset: Dataset[_]): DataFrame
65
override def transformSchema(schema: StructType): StructType
66
}
67
```
68
69
**Usage Example:**
70
71
```scala
72
import org.apache.spark.ml.{Pipeline, PipelineModel}
73
import org.apache.spark.ml.classification.LogisticRegression
74
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
75
76
// Configure ML algorithms
77
val tokenizer = new Tokenizer()
78
.setInputCol("text")
79
.setOutputCol("words")
80
81
val hashingTF = new HashingTF()
82
.setNumFeatures(1000)
83
.setInputCol(tokenizer.getOutputCol)
84
.setOutputCol("features")
85
86
val lr = new LogisticRegression()
87
.setMaxIter(10)
88
.setRegParam(0.001)
89
90
// Create pipeline
91
val pipeline = new Pipeline()
92
.setStages(Array(tokenizer, hashingTF, lr))
93
94
// Fit the pipeline to training documents
95
val model = pipeline.fit(training)
96
97
// Make predictions on test documents
98
val predictions = model.transform(test)
99
```
100
101
### Parameter System
102
103
Comprehensive parameter management system with type safety and automatic validation.
104
105
```scala { .api }
106
/**
107
* Base trait for components with parameters
108
*/
109
trait Params extends Identifiable with Serializable {
110
def params: Array[Param[_]]
111
def explainParam(param: Param[_]): String
112
def explainParams(): String
113
def isSet(param: Param[_]): Boolean
114
def isDefined(param: Param[_]): Boolean
115
def hasDefault(param: Param[_]): Boolean
116
def getDefault[T](param: Param[T]): Option[T]
117
def extractParamMap(): ParamMap
118
def extractParamMap(extra: ParamMap): ParamMap
119
def copy(extra: ParamMap): Params
120
def set[T](param: Param[T], value: T): this.type
121
def clear(param: Param[_]): this.type
122
def get[T](param: Param[T]): Option[T]
123
def getOrDefault[T](param: Param[T]): T
124
def $[T](param: Param[T]): T
125
def setDefault[T](param: Param[T], value: T): this.type
126
def setDefault(paramPairs: ParamPair[_]*): this.type
127
def hasParam(paramName: String): Boolean
128
def getParam(paramName: String): Param[Any]
129
}
130
131
/**
132
* Typed parameter definition
133
*/
134
class Param[T] extends Serializable {
135
def name: String
136
def doc: String
137
def parent: String
138
def toString: String
139
def jsonEncode(value: T): String
140
def jsonDecode(json: String): T
141
def w(value: T): ParamPair[T]
142
def ->(value: T): ParamPair[T]
143
}
144
145
/**
146
* Parameter-value pairs
147
*/
148
case class ParamPair[T](param: Param[T], value: T)
149
150
/**
151
* Set of parameter values
152
*/
153
class ParamMap extends Serializable {
154
def put[T](param: Param[T], value: T): this.type
155
def put(paramPairs: ParamPair[_]*): this.type
156
def get[T](param: Param[T]): Option[T]
157
def apply[T](param: Param[T]): T
158
def contains(param: Param[_]): Boolean
159
def remove[T](param: Param[T]): this.type
160
def filter(f: ParamPair[_] => Boolean): ParamMap
161
def copy: ParamMap
162
def ++(other: ParamMap): ParamMap
163
def toSeq: Seq[ParamPair[_]]
164
def size: Int
165
}
166
```
167
168
### Model Persistence
169
170
MLlib's model saving and loading framework for production deployment.
171
172
```scala { .api }
173
/**
174
* Trait for ML components that can be written to storage
175
*/
176
trait MLWritable {
177
def write: MLWriter
178
def save(path: String): Unit
179
}
180
181
/**
182
* Abstract writer for ML components
183
*/
184
abstract class MLWriter extends Logging {
185
def context: SparkContext
186
def option(key: String, value: String): this.type
187
def options(options: Map[String, String]): this.type
188
def overwrite(): this.type
189
def save(path: String): Unit
190
protected def saveImpl(path: String): Unit
191
}
192
193
/**
194
* Trait for ML components that can be read from storage
195
*/
196
trait MLReadable[T] {
197
def read: MLReader[T]
198
def load(path: String): T
199
}
200
201
/**
202
* Abstract reader for ML components
203
*/
204
abstract class MLReader[T] extends Logging {
205
def context: SparkContext
206
def option(key: String, value: String): this.type
207
def options(options: Map[String, String]): this.type
208
def load(path: String): T
209
}
210
211
/**
212
* Default implementations for parameter persistence
213
*/
214
trait DefaultParamsWritable extends MLWritable {
215
self: Params =>
216
override def write: MLWriter = new DefaultParamsWriter(this)
217
}
218
219
trait DefaultParamsReadable[T] extends MLReadable[T] {
220
override def read: MLReader[T] = new DefaultParamsReader[T]
221
override def load(path: String): T = super.load(path)
222
}
223
```
224
225
**Usage Example:**
226
227
```scala
228
// Save a model
229
val model = pipeline.fit(trainingData)
230
model.write.overwrite().save("/path/to/model")
231
232
// Load a model
233
val loadedModel = PipelineModel.load("/path/to/model")
234
val predictions = loadedModel.transform(testData)
235
```
236
237
### Component Identification
238
239
Unique identification system for ML pipeline components.
240
241
```scala { .api }
242
/**
243
* Trait for objects with unique identifiers
244
*/
245
trait Identifiable {
246
val uid: String
247
def toString: String
248
}
249
250
/**
251
* Utilities for generating unique identifiers
252
*/
253
object Identifiable {
254
def randomUID(prefix: String): String
255
}
256
```
257
258
### Schema Utilities
259
260
Tools for working with DataFrame schemas in ML pipelines.
261
262
```scala { .api }
263
/**
264
* Schema validation and manipulation utilities
265
*/
266
object SchemaUtils {
267
def checkColumnType(
268
schema: StructType,
269
colName: String,
270
dataType: DataType,
271
msg: String = ""
272
): Unit
273
274
def checkColumnTypes(
275
schema: StructType,
276
colName: String,
277
dataTypes: Seq[DataType],
278
msg: String = ""
279
): Unit
280
281
def checkNumericType(
282
schema: StructType,
283
colName: String,
284
msg: String = ""
285
): Unit
286
287
def appendColumn(
288
schema: StructType,
289
colName: String,
290
dataType: DataType,
291
nullable: Boolean = false
292
): StructType
293
294
def appendColumns(
295
schema: StructType,
296
cols: Seq[StructField]
297
): StructType
298
}
299
```
300
301
### Metadata Utilities
302
303
Tools for handling DataFrame column metadata in ML contexts.
304
305
```scala { .api }
306
/**
307
* Metadata manipulation utilities for ML
308
*/
309
object MetadataUtils {
310
/**
311
* Gets the number of features from vector column metadata
312
*/
313
def getNumFeatures(dataset: Dataset[_], vectorCol: String): Int
314
315
/**
316
* Gets feature names from vector column metadata
317
*/
318
def getFeatureNames(dataset: Dataset[_], vectorCol: String): Option[Array[String]]
319
320
/**
321
* Gets categorical features metadata
322
*/
323
def getCategoricalFeatures(dataset: Dataset[_], featuresCol: String): Set[Int]
324
325
/**
326
* Creates metadata for vector columns
327
*/
328
def createVectorMetadata(
329
numFeatures: Int,
330
featureNames: Option[Array[String]] = None,
331
categoricalFeatures: Set[Int] = Set.empty
332
): Metadata
333
}
334
```
335
336
### Advanced Pipeline Features
337
338
Extended pipeline capabilities for complex ML workflows.
339
340
```scala { .api }
341
/**
342
* Pipeline utilities for advanced use cases
343
*/
344
object PipelineUtils {
345
/**
346
* Creates a pipeline from a sequence of stages
347
*/
348
def createPipeline(stages: PipelineStage*): Pipeline
349
350
/**
351
* Validates pipeline stage compatibility
352
*/
353
def validatePipeline(stages: Array[PipelineStage], inputSchema: StructType): Unit
354
355
/**
356
* Extracts the final estimator from a pipeline
357
*/
358
def getFinalEstimator(pipeline: Pipeline): Option[Estimator[_]]
359
360
/**
361
* Gets all transformers before the final estimator
362
*/
363
def getFeatureTransformers(pipeline: Pipeline): Array[Transformer]
364
}
365
366
/**
367
* Pipeline model utilities
368
*/
369
object PipelineModelUtils {
370
/**
371
* Extracts the final model from a fitted pipeline
372
*/
373
def getFinalModel(pipelineModel: PipelineModel): Option[Model[_]]
374
375
/**
376
* Gets all transformer stages except the final model
377
*/
378
def getFeatureTransformers(pipelineModel: PipelineModel): Array[Transformer]
379
380
/**
381
* Creates a new pipeline model with replaced stages
382
*/
383
def replaceStages(
384
pipelineModel: PipelineModel,
385
newStages: Array[Transformer]
386
): PipelineModel
387
}
388
```
389
390
### Cross-Validation Pipeline Integration
391
392
Integration between pipelines and cross-validation for comprehensive model selection.
393
394
```scala { .api }
395
/**
396
* Pipeline-aware cross-validation
397
*/
398
class PipelineCrossValidator extends CrossValidator {
399
/**
400
* Gets the best pipeline from cross-validation results
401
*/
402
def getBestPipeline: Pipeline
403
404
/**
405
* Gets feature importance from the best model if available
406
*/
407
def getFeatureImportances: Option[Vector]
408
409
/**
410
* Extracts the best hyperparameters for each pipeline stage
411
*/
412
def getBestParamsByStage: Map[String, ParamMap]
413
}
414
```
415
416
## Types
417
418
```scala { .api }
419
// Core pipeline imports
420
import org.apache.spark.ml._
421
import org.apache.spark.ml.param._
422
import org.apache.spark.ml.util._
423
import org.apache.spark.sql.{DataFrame, Dataset}
424
import org.apache.spark.sql.types.{StructType, StructField, DataType, Metadata}
425
426
// Pipeline stage types
427
import org.apache.spark.ml.{
428
Estimator,
429
Transformer,
430
Model,
431
Pipeline,
432
PipelineModel,
433
PipelineStage
434
}
435
436
// Parameter system types
437
import org.apache.spark.ml.param.{
438
Param,
439
ParamMap,
440
ParamPair,
441
Params
442
}
443
444
// Persistence types
445
import org.apache.spark.ml.util.{
446
MLWritable,
447
MLWriter,
448
MLReadable,
449
MLReader,
450
DefaultParamsWritable,
451
DefaultParamsReadable
452
}
453
454
// Utility types
455
import org.apache.spark.ml.util.{
456
Identifiable,
457
SchemaUtils,
458
MetadataUtils
459
}
460
```