0
# Pipeline Framework
1
2
Apache Flink ML provides a pipeline framework that enables building complex machine learning workflows by chaining transformers and predictors. The framework follows a similar design to scikit-learn with Estimator, Predictor, and Transformer abstractions.
3
4
## Core Pipeline Components
5
6
### Estimator
7
8
Base trait for components that can be fitted to data to learn parameters.
9
10
```scala { .api }
11
trait Estimator[Self] extends WithParameters {
12
def fit[Training](training: DataSet[Training])(implicit fitOperation: FitOperation[Self, Training]): Self
13
}
14
```
15
16
All machine learning algorithms in Flink ML extend `Estimator` to provide the `fit` method for training.
17
18
### Predictor
19
20
Extends `Estimator` to add prediction capabilities for supervised learning algorithms.
21
22
```scala { .api }
23
trait Predictor[Self] extends Estimator[Self] {
24
def predict[Testing](
25
testing: DataSet[Testing]
26
)(implicit predictOperation: PredictDataSetOperation[Self, Testing, Prediction]): DataSet[Prediction]
27
28
def evaluate[Testing, Prediction](
29
testing: DataSet[Testing]
30
)(implicit evaluateOperation: EvaluateDataSetOperation[Self, Testing, Prediction]): DataSet[Prediction]
31
}
32
```
33
34
**Usage Example:**
35
36
```scala
37
import org.apache.flink.ml.classification.SVM
38
39
val svm: SVM = SVM()
40
.setIterations(100)
41
.setRegularization(0.01)
42
43
// Fit the predictor
44
val trainedModel = svm.fit(trainingData)
45
46
// Make predictions
47
val predictions = trainedModel.predict(testData)
48
49
// Evaluate model performance
50
val evaluationResults = trainedModel.evaluate(testData)
51
```
52
53
### Transformer
54
55
Base trait for unsupervised learning components that transform data without learning from labels.
56
57
```scala { .api }
58
trait Transformer[Self] extends WithParameters {
59
def transform[Input](
60
input: DataSet[Input]
61
)(implicit transformOperation: TransformDataSetOperation[Self, Input, Output]): DataSet[Output]
62
63
def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
64
def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
65
}
66
```
67
68
**Usage Example:**
69
70
```scala
71
import org.apache.flink.ml.preprocessing.StandardScaler
72
73
val scaler: StandardScaler = StandardScaler()
74
.setMean(true)
75
.setStd(true)
76
77
// Fit the transformer (learn mean and std)
78
val fittedScaler = scaler.fit(trainingData)
79
80
// Transform data
81
val scaledData = fittedScaler.transform(trainingData)
82
```
83
84
## Chained Components
85
86
### Chained Transformer
87
88
Combines multiple transformers into a single pipeline component.
89
90
```scala { .api }
91
case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](
92
left: L,
93
right: R
94
) extends Transformer[ChainedTransformer[L, R]] {
95
def transform[Input](input: DataSet[Input]): DataSet[Output]
96
}
97
```
98
99
### Chained Predictor
100
101
Combines transformers with a final predictor for end-to-end ML pipelines.
102
103
```scala { .api }
104
case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](
105
transformer: T,
106
predictor: P
107
) extends Predictor[ChainedPredictor[T, P]] {
108
def fit[Training](training: DataSet[Training]): ChainedPredictor[T, P]
109
def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
110
}
111
```
112
113
## Building Pipelines
114
115
### Simple Transformer Chain
116
117
```scala
118
import org.apache.flink.ml.preprocessing.{StandardScaler, MinMaxScaler, PolynomialFeatures}
119
120
val scaler1 = MinMaxScaler().setMin(0.0).setMax(1.0)
121
val polyFeatures = PolynomialFeatures().setDegree(2)
122
val scaler2 = StandardScaler()
123
124
// Chain transformers
125
val preprocessingPipeline = scaler1
126
.chainTransformer(polyFeatures)
127
.chainTransformer(scaler2)
128
129
// Fit and transform
130
val fittedPipeline = preprocessingPipeline.fit(trainingData)
131
val transformedData = fittedPipeline.transform(trainingData)
132
```
133
134
### Complete ML Pipeline
135
136
```scala
137
import org.apache.flink.ml.preprocessing.StandardScaler
138
import org.apache.flink.ml.classification.SVM
139
140
val scaler = StandardScaler()
141
val svm = SVM().setIterations(100).setRegularization(0.01)
142
143
// Create end-to-end pipeline
144
val mlPipeline = scaler.chainPredictor(svm)
145
146
// Fit entire pipeline
147
val trainedPipeline = mlPipeline.fit(trainingData)
148
149
// Make predictions (automatically applies scaling then SVM)
150
val predictions = trainedPipeline.predict(testData)
151
```
152
153
### Complex Multi-Stage Pipeline
154
155
```scala
156
import org.apache.flink.ml.preprocessing.{StandardScaler, PolynomialFeatures}
157
import org.apache.flink.ml.regression.MultipleLinearRegression
158
159
// Multi-stage preprocessing
160
val minMaxScaler = MinMaxScaler().setMin(-1.0).setMax(1.0)
161
val polyFeatures = PolynomialFeatures().setDegree(3)
162
val standardScaler = StandardScaler()
163
164
// Regression model
165
val regression = MultipleLinearRegression()
166
.setIterations(200)
167
.setStepsize(0.01)
168
169
// Build complex pipeline
170
val complexPipeline = minMaxScaler
171
.chainTransformer(polyFeatures)
172
.chainTransformer(standardScaler)
173
.chainPredictor(regression)
174
175
// Train pipeline
176
val trainedComplexPipeline = complexPipeline.fit(trainingData)
177
178
// Use pipeline
179
val predictions = trainedComplexPipeline.predict(testData)
180
```
181
182
## Type Class Operations
183
184
The pipeline framework uses type classes to provide flexible operations for different data types.
185
186
### Fit Operation
187
188
```scala { .api }
189
trait FitOperation[Self, Training] {
190
def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Self
191
}
192
```
193
194
### Transform Operations
195
196
```scala { .api }
197
trait TransformOperation[Instance, Model, Input, Output] {
198
def transform(
199
instance: Instance,
200
model: Model,
201
input: Input
202
): Output
203
}
204
205
trait TransformDataSetOperation[Instance, Input, Output] {
206
def transformDataSet(
207
instance: Instance,
208
transformParameters: ParameterMap,
209
input: DataSet[Input]
210
): DataSet[Output]
211
}
212
```
213
214
### Predict Operations
215
216
```scala { .api }
217
trait PredictOperation[Instance, Model, Testing, Prediction] {
218
def predict(instance: Instance, model: Model, testing: Testing): Prediction
219
}
220
221
trait PredictDataSetOperation[Instance, Testing, Prediction] {
222
def predictDataSet(
223
instance: Instance,
224
predictParameters: ParameterMap,
225
testing: DataSet[Testing]
226
): DataSet[Prediction]
227
}
228
```
229
230
### Evaluate Operation
231
232
```scala { .api }
233
trait EvaluateDataSetOperation[Instance, Testing, Prediction] {
234
def evaluateDataSet(
235
instance: Instance,
236
evaluateParameters: ParameterMap,
237
testing: DataSet[Testing]
238
): DataSet[Prediction]
239
}
240
```
241
242
## Parameter Management in Pipelines
243
244
Pipelines preserve and merge parameters from all components.
245
246
```scala
247
// Configure individual components
248
val scaler = StandardScaler()
249
.setMean(true)
250
.setStd(false)
251
252
val svm = SVM()
253
.setIterations(50)
254
.setRegularization(0.1)
255
256
// Create pipeline - parameters are preserved
257
val pipeline = scaler.chainPredictor(svm)
258
259
// Access combined parameters
260
val allParameters = pipeline.parameters
261
262
// You can still modify parameters of the chained pipeline
263
val modifiedPipeline = pipeline.set(SVM.Iterations, 100)
264
```
265
266
## Custom Pipeline Components
267
268
You can create custom transformers and predictors by implementing the respective traits.
269
270
### Custom Transformer Example
271
272
```scala
273
import org.apache.flink.ml.pipeline.Transformer
274
import org.apache.flink.ml.common.WithParameters
275
276
class LogTransformer extends Transformer[LogTransformer] with WithParameters {
277
def transform[Input](input: DataSet[Input]): DataSet[Output] = {
278
// Implementation depends on implicit TransformDataSetOperation
279
transformDataSet(this, parameters, input)
280
}
281
}
282
283
object LogTransformer {
284
def apply(): LogTransformer = new LogTransformer()
285
286
// Define implicit operations
287
implicit val logTransformVectors = new TransformDataSetOperation[LogTransformer, Vector, Vector] {
288
def transformDataSet(
289
instance: LogTransformer,
290
transformParameters: ParameterMap,
291
input: DataSet[Vector]
292
): DataSet[Vector] = {
293
input.map(vector => {
294
val logData = vector.toArray.map(x => if (x > 0) math.log(x) else 0.0)
295
DenseVector(logData)
296
})
297
}
298
}
299
}
300
301
// Usage
302
val logTransform = LogTransformer()
303
val scaler = StandardScaler()
304
val pipeline = logTransform.chainTransformer(scaler)
305
```
306
307
## Pipeline Persistence
308
309
While the core framework doesn't provide built-in persistence, you can save pipeline parameters and recreate pipelines:
310
311
```scala
312
// Save pipeline configuration
313
val scaler = StandardScaler().setMean(true).setStd(true)
314
val svm = SVM().setIterations(100).setRegularization(0.01)
315
val pipeline = scaler.chainPredictor(svm)
316
317
// Extract parameters for serialization
318
val scalerParams = scaler.parameters
319
val svmParams = svm.parameters
320
321
// Recreate pipeline later
322
val recreatedScaler = StandardScaler().setParameters(scalerParams)
323
val recreatedSVM = SVM().setParameters(svmParams)
324
val recreatedPipeline = recreatedScaler.chainPredictor(recreatedSVM)
325
```
326
327
## Error Handling in Pipelines
328
329
Pipeline components should handle errors gracefully:
330
331
```scala
332
try {
333
val trainedPipeline = pipeline.fit(trainingData)
334
val predictions = trainedPipeline.predict(testData)
335
} catch {
336
case e: IllegalArgumentException =>
337
println(s"Invalid parameters: ${e.getMessage}")
338
case e: RuntimeException =>
339
println(s"Runtime error in pipeline: ${e.getMessage}")
340
}
341
```
342
343
## Best Practices
344
345
1. **Chain transformers before predictors**: Always apply data transformations before supervised learning
346
2. **Fit on training data only**: Only use training data to fit transformers and predictors
347
3. **Preserve test data integrity**: Apply the same fitted transformations to test data
348
4. **Parameter validation**: Validate parameters before fitting pipelines
349
5. **Memory management**: Consider data persistence for large datasets in complex pipelines
350
351
```scala
352
// Good practice example
353
val pipeline = StandardScaler()
354
.chainTransformer(PolynomialFeatures().setDegree(2))
355
.chainPredictor(SVM().setIterations(100))
356
357
// Fit on training data only
358
val trainedPipeline = pipeline.fit(trainingData)
359
360
// Apply to test data
361
val predictions = trainedPipeline.predict(testData)
362
```