or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md

preprocessing.mddocs/

0

# Data Preprocessing

1

2

Apache Flink ML provides data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms. All preprocessing components follow the Transformer pattern and can be chained together in pipelines.

3

4

## Feature Scaling

5

6

### Standard Scaler

7

8

Standardizes features by removing the mean and scaling to unit variance (z-score normalization).

9

10

```scala { .api }

11

class StandardScaler extends Transformer[StandardScaler] with WithParameters {

12

def setMean(mean: Boolean): StandardScaler

13

def setStd(std: Boolean): StandardScaler

14

}

15

16

object StandardScaler {

17

def apply(): StandardScaler

18

19

// Parameters

20

case object Mean extends Parameter[Boolean] {

21

val defaultValue = Some(true)

22

}

23

24

case object Std extends Parameter[Boolean] {

25

val defaultValue = Some(true)

26

}

27

}

28

```

29

30

**Usage Example:**

31

32

```scala

33

import org.apache.flink.ml.preprocessing.StandardScaler

34

import org.apache.flink.ml.common.LabeledVector

35

import org.apache.flink.ml.math.DenseVector

36

37

val data: DataSet[Vector] = env.fromCollection(Seq(

38

DenseVector(1.0, 2.0, 3.0),

39

DenseVector(4.0, 5.0, 6.0),

40

DenseVector(7.0, 8.0, 9.0)

41

))

42

43

// Configure standard scaler

44

val scaler = StandardScaler()

45

.setMean(true) // Center data (subtract mean)

46

.setStd(true) // Scale to unit variance

47

48

// Fit scaler to data

49

val fittedScaler = scaler.fit(data)

50

51

// Transform data

52

val scaledData = fittedScaler.transform(data)

53

54

// Works with LabeledVector too

55

val labeledData: DataSet[LabeledVector] = env.fromCollection(Seq(

56

LabeledVector(1.0, DenseVector(1.0, 2.0, 3.0)),

57

LabeledVector(0.0, DenseVector(4.0, 5.0, 6.0))

58

))

59

60

val scaledLabeledData = fittedScaler.transform(labeledData)

61

```

62

63

### Min-Max Scaler

64

65

Scales features to a specified range by linear transformation.

66

67

```scala { .api }

68

class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters {

69

def setMin(min: Double): MinMaxScaler

70

def setMax(max: Double): MinMaxScaler

71

}

72

73

object MinMaxScaler {

74

def apply(): MinMaxScaler

75

76

// Parameters

77

case object Min extends Parameter[Double] {

78

val defaultValue = Some(0.0)

79

}

80

81

case object Max extends Parameter[Double] {

82

val defaultValue = Some(1.0)

83

}

84

}

85

```

86

87

**Usage Example:**

88

89

```scala

90

import org.apache.flink.ml.preprocessing.MinMaxScaler

91

92

val data: DataSet[Vector] = env.fromCollection(Seq(

93

DenseVector(1.0, 2.0, 3.0),

94

DenseVector(4.0, 5.0, 6.0),

95

DenseVector(7.0, 8.0, 9.0)

96

))

97

98

// Configure min-max scaler

99

val scaler = MinMaxScaler()

100

.setMin(-1.0) // Minimum value in output range

101

.setMax(1.0) // Maximum value in output range

102

103

// Fit and transform

104

val fittedScaler = scaler.fit(data)

105

val scaledData = fittedScaler.transform(data)

106

107

// Chain with other transformers

108

val standardScaler = StandardScaler()

109

val pipeline = scaler.chainTransformer(standardScaler)

110

```

111

112

## Feature Engineering

113

114

### Polynomial Features

115

116

Generates polynomial and interaction features from the input features.

117

118

```scala { .api }

119

class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters {

120

def setDegree(degree: Int): PolynomialFeatures

121

}

122

123

object PolynomialFeatures {

124

def apply(): PolynomialFeatures

125

126

// Parameters

127

case object Degree extends Parameter[Int] {

128

val defaultValue = Some(2)

129

}

130

}

131

```

132

133

**Usage Example:**

134

135

```scala

136

import org.apache.flink.ml.preprocessing.PolynomialFeatures

137

138

val data: DataSet[Vector] = env.fromCollection(Seq(

139

DenseVector(2.0, 3.0), // Original features: [x1, x2]

140

DenseVector(4.0, 5.0),

141

DenseVector(6.0, 7.0)

142

))

143

144

// Configure polynomial features

145

val polyFeatures = PolynomialFeatures()

146

.setDegree(2) // Degree 2: [1, x1, x2, x1^2, x1*x2, x2^2]

147

148

// Fit and transform - creates polynomial combinations

149

val fittedPoly = polyFeatures.fit(data)

150

val expandedData = fittedPoly.transform(data)

151

152

// Input [2.0, 3.0] becomes [1.0, 2.0, 3.0, 4.0, 6.0, 9.0]

153

// Features: [bias, x1, x2, x1^2, x1*x2, x2^2]

154

155

// Works with LabeledVector

156

val labeledData: DataSet[LabeledVector] = env.fromCollection(Seq(

157

LabeledVector(1.0, DenseVector(2.0, 3.0)),

158

LabeledVector(0.0, DenseVector(4.0, 5.0))

159

))

160

161

val expandedLabeledData = fittedPoly.transform(labeledData)

162

```

163

164

## Data Splitting

165

166

### Splitter

167

168

Utility for splitting datasets into training and testing sets.

169

170

```scala { .api }

171

class Splitter extends Transformer[Splitter] with WithParameters {

172

// Implementation details depend on the specific splitting strategy

173

}

174

175

object Splitter {

176

def apply(): Splitter

177

}

178

```

179

180

**Usage Example:**

181

182

```scala

183

import org.apache.flink.ml.preprocessing.Splitter

184

185

val data: DataSet[LabeledVector] = //... your dataset

186

187

val splitter = Splitter()

188

// Configure splitting parameters as needed

189

190

val splitData = splitter.transform(data)

191

```

192

193

## Transformer Chaining

194

195

All preprocessing transformers can be chained together to create complex preprocessing pipelines.

196

197

```scala { .api }

198

trait Transformer[Self] extends WithParameters {

199

def transform[Input](input: DataSet[Input]): DataSet[Output]

200

def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]

201

def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]

202

}

203

204

case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](

205

left: L,

206

right: R

207

) extends Transformer[ChainedTransformer[L, R]]

208

209

case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](

210

transformer: T,

211

predictor: P

212

) extends Predictor[ChainedPredictor[T, P]]

213

```

214

215

**Usage Example:**

216

217

```scala

218

import org.apache.flink.ml.preprocessing.{StandardScaler, MinMaxScaler, PolynomialFeatures}

219

import org.apache.flink.ml.classification.SVM

220

221

val data: DataSet[LabeledVector] = //... your training data

222

223

// Create preprocessing pipeline

224

val minMaxScaler = MinMaxScaler().setMin(0.0).setMax(1.0)

225

val polyFeatures = PolynomialFeatures().setDegree(2)

226

val standardScaler = StandardScaler()

227

228

// Chain transformers

229

val preprocessingPipeline = minMaxScaler

230

.chainTransformer(polyFeatures)

231

.chainTransformer(standardScaler)

232

233

// Chain with predictor

234

val svm = SVM().setIterations(100)

235

val completePipeline = preprocessingPipeline.chainPredictor(svm)

236

237

// Fit the entire pipeline

238

val trainedPipeline = completePipeline.fit(data)

239

240

// Make predictions (automatically applies all preprocessing steps)

241

val testData: DataSet[Vector] = //... test vectors

242

val predictions = trainedPipeline.predict(testData)

243

```

244

245

## Working with Different Data Types

246

247

The preprocessing transformers support multiple input data types through implicit type class operations.

248

249

### Supported Input Types

250

251

- `Vector`: Raw feature vectors

252

- `LabeledVector`: Feature vectors with labels for supervised learning

253

- `(LabeledVector, Double)`: Tuples for specialized operations

254

255

**Usage Examples:**

256

257

```scala

258

val scaler = StandardScaler()

259

260

// Fit on vectors

261

val vectorData: DataSet[Vector] = //...

262

val fittedScaler1 = scaler.fit(vectorData)

263

264

// Fit on labeled vectors

265

val labeledData: DataSet[LabeledVector] = //...

266

val fittedScaler2 = scaler.fit(labeledData)

267

268

// Transform different types

269

val scaledVectors = fittedScaler1.transform(vectorData)

270

val scaledLabeled = fittedScaler2.transform(labeledData)

271

272

// Transform tuples

273

val tupleData: DataSet[(LabeledVector, Double)] = //...

274

val scaledTuples = fittedScaler2.transform(tupleData)

275

```

276

277

## Custom Preprocessing

278

279

While the built-in preprocessors cover common use cases, you can create custom preprocessing by implementing the `Transformer` trait or using Flink's native DataSet transformations.

280

281

**Custom Transformation Example:**

282

283

```scala

284

import org.apache.flink.api.scala._

285

286

// Custom log transformation

287

val logTransform: DataSet[Vector] => DataSet[Vector] = { data =>

288

data.map(vector => {

289

val newData = vector.toArray.map(x => if (x > 0) math.log(x) else 0.0)

290

DenseVector(newData)

291

})

292

}

293

294

val data: DataSet[Vector] = //... your data

295

val logTransformedData = logTransform(data)

296

297

// Chain with standard preprocessing

298

val scaler = StandardScaler()

299

val scaledLogData = scaler.fit(logTransformedData).transform(logTransformedData)

300

```