0
# Data Preprocessing
1
2
Apache Flink ML provides data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms. All preprocessing components follow the Transformer pattern and can be chained together in pipelines.
3
4
## Feature Scaling
5
6
### Standard Scaler
7
8
Standardizes features by removing the mean and scaling to unit variance (z-score normalization).
9
10
```scala { .api }
11
class StandardScaler extends Transformer[StandardScaler] with WithParameters {
12
def setMean(mean: Boolean): StandardScaler
13
def setStd(std: Boolean): StandardScaler
14
}
15
16
object StandardScaler {
17
def apply(): StandardScaler
18
19
// Parameters
20
case object Mean extends Parameter[Boolean] {
21
val defaultValue = Some(true)
22
}
23
24
case object Std extends Parameter[Boolean] {
25
val defaultValue = Some(true)
26
}
27
}
28
```
29
30
**Usage Example:**
31
32
```scala
33
import org.apache.flink.ml.preprocessing.StandardScaler
34
import org.apache.flink.ml.common.LabeledVector
35
import org.apache.flink.ml.math.DenseVector
36
37
val data: DataSet[Vector] = env.fromCollection(Seq(
38
DenseVector(1.0, 2.0, 3.0),
39
DenseVector(4.0, 5.0, 6.0),
40
DenseVector(7.0, 8.0, 9.0)
41
))
42
43
// Configure standard scaler
44
val scaler = StandardScaler()
45
.setMean(true) // Center data (subtract mean)
46
.setStd(true) // Scale to unit variance
47
48
// Fit scaler to data
49
val fittedScaler = scaler.fit(data)
50
51
// Transform data
52
val scaledData = fittedScaler.transform(data)
53
54
// Works with LabeledVector too
55
val labeledData: DataSet[LabeledVector] = env.fromCollection(Seq(
56
LabeledVector(1.0, DenseVector(1.0, 2.0, 3.0)),
57
LabeledVector(0.0, DenseVector(4.0, 5.0, 6.0))
58
))
59
60
val scaledLabeledData = fittedScaler.transform(labeledData)
61
```
62
63
### Min-Max Scaler
64
65
Scales features to a specified range by linear transformation.
66
67
```scala { .api }
68
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters {
69
def setMin(min: Double): MinMaxScaler
70
def setMax(max: Double): MinMaxScaler
71
}
72
73
object MinMaxScaler {
74
def apply(): MinMaxScaler
75
76
// Parameters
77
case object Min extends Parameter[Double] {
78
val defaultValue = Some(0.0)
79
}
80
81
case object Max extends Parameter[Double] {
82
val defaultValue = Some(1.0)
83
}
84
}
85
```
86
87
**Usage Example:**
88
89
```scala
90
import org.apache.flink.ml.preprocessing.MinMaxScaler
91
92
val data: DataSet[Vector] = env.fromCollection(Seq(
93
DenseVector(1.0, 2.0, 3.0),
94
DenseVector(4.0, 5.0, 6.0),
95
DenseVector(7.0, 8.0, 9.0)
96
))
97
98
// Configure min-max scaler
99
val scaler = MinMaxScaler()
100
.setMin(-1.0) // Minimum value in output range
101
.setMax(1.0) // Maximum value in output range
102
103
// Fit and transform
104
val fittedScaler = scaler.fit(data)
105
val scaledData = fittedScaler.transform(data)
106
107
// Chain with other transformers
108
val standardScaler = StandardScaler()
109
val pipeline = scaler.chainTransformer(standardScaler)
110
```
111
112
## Feature Engineering
113
114
### Polynomial Features
115
116
Generates polynomial and interaction features from the input features.
117
118
```scala { .api }
119
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters {
120
def setDegree(degree: Int): PolynomialFeatures
121
}
122
123
object PolynomialFeatures {
124
def apply(): PolynomialFeatures
125
126
// Parameters
127
case object Degree extends Parameter[Int] {
128
val defaultValue = Some(2)
129
}
130
}
131
```
132
133
**Usage Example:**
134
135
```scala
136
import org.apache.flink.ml.preprocessing.PolynomialFeatures
137
138
val data: DataSet[Vector] = env.fromCollection(Seq(
139
DenseVector(2.0, 3.0), // Original features: [x1, x2]
140
DenseVector(4.0, 5.0),
141
DenseVector(6.0, 7.0)
142
))
143
144
// Configure polynomial features
145
val polyFeatures = PolynomialFeatures()
146
.setDegree(2) // Degree 2: [1, x1, x2, x1^2, x1*x2, x2^2]
147
148
// Fit and transform - creates polynomial combinations
149
val fittedPoly = polyFeatures.fit(data)
150
val expandedData = fittedPoly.transform(data)
151
152
// Input [2.0, 3.0] becomes [1.0, 2.0, 3.0, 4.0, 6.0, 9.0]
153
// Features: [bias, x1, x2, x1^2, x1*x2, x2^2]
154
155
// Works with LabeledVector
156
val labeledData: DataSet[LabeledVector] = env.fromCollection(Seq(
157
LabeledVector(1.0, DenseVector(2.0, 3.0)),
158
LabeledVector(0.0, DenseVector(4.0, 5.0))
159
))
160
161
val expandedLabeledData = fittedPoly.transform(labeledData)
162
```
163
164
## Data Splitting
165
166
### Splitter
167
168
Utility for splitting datasets into training and testing sets.
169
170
```scala { .api }
171
class Splitter extends Transformer[Splitter] with WithParameters {
172
// Implementation details depend on the specific splitting strategy
173
}
174
175
object Splitter {
176
def apply(): Splitter
177
}
178
```
179
180
**Usage Example:**
181
182
```scala
183
import org.apache.flink.ml.preprocessing.Splitter
184
185
val data: DataSet[LabeledVector] = //... your dataset
186
187
val splitter = Splitter()
188
// Configure splitting parameters as needed
189
190
val splitData = splitter.transform(data)
191
```
192
193
## Transformer Chaining
194
195
All preprocessing transformers can be chained together to create complex preprocessing pipelines.
196
197
```scala { .api }
198
trait Transformer[Self] extends WithParameters {
199
def transform[Input](input: DataSet[Input]): DataSet[Output]
200
def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
201
def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
202
}
203
204
case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](
205
left: L,
206
right: R
207
) extends Transformer[ChainedTransformer[L, R]]
208
209
case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](
210
transformer: T,
211
predictor: P
212
) extends Predictor[ChainedPredictor[T, P]]
213
```
214
215
**Usage Example:**
216
217
```scala
218
import org.apache.flink.ml.preprocessing.{StandardScaler, MinMaxScaler, PolynomialFeatures}
219
import org.apache.flink.ml.classification.SVM
220
221
val data: DataSet[LabeledVector] = //... your training data
222
223
// Create preprocessing pipeline
224
val minMaxScaler = MinMaxScaler().setMin(0.0).setMax(1.0)
225
val polyFeatures = PolynomialFeatures().setDegree(2)
226
val standardScaler = StandardScaler()
227
228
// Chain transformers
229
val preprocessingPipeline = minMaxScaler
230
.chainTransformer(polyFeatures)
231
.chainTransformer(standardScaler)
232
233
// Chain with predictor
234
val svm = SVM().setIterations(100)
235
val completePipeline = preprocessingPipeline.chainPredictor(svm)
236
237
// Fit the entire pipeline
238
val trainedPipeline = completePipeline.fit(data)
239
240
// Make predictions (automatically applies all preprocessing steps)
241
val testData: DataSet[Vector] = //... test vectors
242
val predictions = trainedPipeline.predict(testData)
243
```
244
245
## Working with Different Data Types
246
247
The preprocessing transformers support multiple input data types through implicit type class operations.
248
249
### Supported Input Types
250
251
- `Vector`: Raw feature vectors
252
- `LabeledVector`: Feature vectors with labels for supervised learning
253
- `(LabeledVector, Double)`: Tuples for specialized operations
254
255
**Usage Examples:**
256
257
```scala
258
val scaler = StandardScaler()
259
260
// Fit on vectors
261
val vectorData: DataSet[Vector] = //...
262
val fittedScaler1 = scaler.fit(vectorData)
263
264
// Fit on labeled vectors
265
val labeledData: DataSet[LabeledVector] = //...
266
val fittedScaler2 = scaler.fit(labeledData)
267
268
// Transform different types
269
val scaledVectors = fittedScaler1.transform(vectorData)
270
val scaledLabeled = fittedScaler2.transform(labeledData)
271
272
// Transform tuples
273
val tupleData: DataSet[(LabeledVector, Double)] = //...
274
val scaledTuples = fittedScaler2.transform(tupleData)
275
```
276
277
## Custom Preprocessing
278
279
While the built-in preprocessors cover common use cases, you can create custom preprocessing by implementing the `Transformer` trait or using Flink's native DataSet transformations.
280
281
**Custom Transformation Example:**
282
283
```scala
284
import org.apache.flink.api.scala._
285
286
// Custom log transformation
287
val logTransform: DataSet[Vector] => DataSet[Vector] = { data =>
288
data.map(vector => {
289
val newData = vector.toArray.map(x => if (x > 0) math.log(x) else 0.0)
290
DenseVector(newData)
291
})
292
}
293
294
val data: DataSet[Vector] = //... your data
295
val logTransformedData = logTransform(data)
296
297
// Chain with standard preprocessing
298
val scaler = StandardScaler()
299
val scaledLogData = scaler.fit(logTransformedData).transform(logTransformedData)
300
```