0
# Machine Learning Algorithms
1
2
Apache Flink ML provides implementations of popular machine learning algorithms optimized for distributed processing on Apache Flink. All algorithms follow the Estimator-Predictor pattern and support the parameter system for configuration.
3
4
## Classification
5
6
### Support Vector Machine (SVM)
7
8
Soft-margin Support Vector Machine using the CoCoA (Communication-efficient Distributed Dual Coordinate Ascent) algorithm for distributed training.
9
10
```scala { .api }
11
class SVM extends Predictor[SVM] with WithParameters {
12
def setBlocks(blocks: Int): SVM
13
def setIterations(iterations: Int): SVM
14
def setLocalIterations(localIterations: Int): SVM
15
def setRegularization(regularization: Double): SVM
16
def setStepsize(stepsize: Double): SVM
17
def setSeed(seed: Long): SVM
18
def setThreshold(threshold: Double): SVM
19
def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM
20
}
21
22
object SVM {
23
def apply(): SVM
24
25
// Parameters
26
case object Blocks extends Parameter[Int] {
27
val defaultValue = Some(1)
28
}
29
30
case object Iterations extends Parameter[Int] {
31
val defaultValue = Some(10)
32
}
33
34
case object LocalIterations extends Parameter[Int] {
35
val defaultValue = Some(10)
36
}
37
38
case object Regularization extends Parameter[Double] {
39
val defaultValue = Some(0.1)
40
}
41
42
case object Stepsize extends Parameter[Double] {
43
val defaultValue = Some(1.0)
44
}
45
46
case object Seed extends Parameter[Long] {
47
val defaultValue = Some(Random.nextLong())
48
}
49
50
case object ThresholdValue extends Parameter[Double] {
51
val defaultValue = Some(0.0)
52
}
53
54
case object OutputDecisionFunction extends Parameter[Boolean] {
55
val defaultValue = Some(false)
56
}
57
}
58
```
59
60
**Usage Example:**
61
62
```scala
63
import org.apache.flink.ml.classification.SVM
64
import org.apache.flink.ml.common.LabeledVector
65
import org.apache.flink.api.scala._
66
67
val env = ExecutionEnvironment.getExecutionEnvironment
68
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")
69
70
// Configure SVM
71
val svm = SVM()
72
.setBlocks(10) // Number of data blocks for distributed training
73
.setIterations(100) // Maximum number of iterations
74
.setLocalIterations(10) // Local iterations per block per global iteration
75
.setRegularization(0.001) // Regularization parameter
76
.setStepsize(0.1) // Step size for gradient descent
77
.setSeed(42) // Random seed for reproducibility
78
.setThreshold(0.5) // Decision threshold
79
.setOutputDecisionFunction(true) // Output decision function values instead of binary predictions
80
81
// Train the model
82
val model = svm.fit(trainingData)
83
84
// Make predictions
85
val testData: DataSet[Vector] = //... test vectors
86
val predictions = model.predict(testData)
87
```
88
89
## Regression
90
91
### Multiple Linear Regression
92
93
Ordinary Least Squares regression using gradient descent optimization for distributed learning.
94
95
```scala { .api }
96
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters {
97
def setIterations(iterations: Int): MultipleLinearRegression
98
def setStepsize(stepsize: Double): MultipleLinearRegression
99
def setConvergenceThreshold(convergenceThreshold: Double): MultipleLinearRegression
100
def setLearningRateMethod(learningRateMethod: LearningRateMethod): MultipleLinearRegression
101
def squaredResidualSum(data: DataSet[LabeledVector]): DataSet[Double]
102
}
103
104
object MultipleLinearRegression {
105
def apply(): MultipleLinearRegression
106
107
// Parameters
108
case object Iterations extends Parameter[Int] {
109
val defaultValue = Some(10)
110
}
111
112
case object Stepsize extends Parameter[Double] {
113
val defaultValue = Some(0.1)
114
}
115
116
case object ConvergenceThreshold extends Parameter[Double] {
117
val defaultValue = Some(1e-6)
118
}
119
120
case object LearningRateMethodValue extends Parameter[LearningRateMethod] {
121
val defaultValue = Some(LearningRateMethod.Default)
122
}
123
}
124
125
// Learning rate scheduling methods
126
sealed trait LearningRateMethod
127
object LearningRateMethod {
128
case object Default extends LearningRateMethod
129
case object Inverse extends LearningRateMethod
130
case object InverseSquareRoot extends LearningRateMethod
131
}
132
```
133
134
**Usage Example:**
135
136
```scala
137
import org.apache.flink.ml.regression.MultipleLinearRegression
138
import org.apache.flink.ml.optimization.LearningRateMethod
139
140
val regression = MultipleLinearRegression()
141
.setIterations(200)
142
.setStepsize(0.01)
143
.setConvergenceThreshold(1e-8)
144
.setLearningRateMethod(LearningRateMethod.Inverse)
145
146
val model = regression.fit(trainingData)
147
val predictions = model.predict(testData)
148
149
// Calculate residual sum of squares for model evaluation
150
val residualSum = regression.squaredResidualSum(trainingData)
151
```
152
153
## Nearest Neighbor
154
155
### K-Nearest Neighbors (KNN)
156
157
Implements k-nearest neighbor join for finding the k closest points in the training set for each test point, with support for various distance metrics and optimizations.
158
159
```scala { .api }
160
class KNN extends Predictor[KNN] with WithParameters {
161
def setK(k: Int): KNN
162
def setDistanceMetric(distanceMetric: DistanceMetric): KNN
163
def setBlocks(blocks: Int): KNN
164
def setUseQuadTree(useQuadTree: Boolean): KNN
165
def setSizeHint(sizeHint: CrossHint): KNN
166
}
167
168
object KNN {
169
def apply(): KNN
170
171
// Parameters
172
case object K extends Parameter[Int] {
173
val defaultValue = Some(5)
174
}
175
176
case object DistanceMetric extends Parameter[DistanceMetric] {
177
val defaultValue = Some(EuclideanDistanceMetric())
178
}
179
180
case object Blocks extends Parameter[Int] {
181
val defaultValue = None
182
}
183
184
case object UseQuadTree extends Parameter[Boolean] {
185
val defaultValue = None
186
}
187
188
case object SizeHint extends Parameter[CrossHint] {
189
val defaultValue = None
190
}
191
}
192
```
193
194
**Usage Example:**
195
196
```scala
197
import org.apache.flink.ml.nn.KNN
198
import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
199
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
200
201
val trainingData: DataSet[Vector] = //... training vectors
202
val testData: DataSet[Vector] = //... test vectors
203
204
val knn = KNN()
205
.setK(10) // Find 10 nearest neighbors
206
.setBlocks(5) // Split data into 5 blocks for distributed processing
207
.setDistanceMetric(EuclideanDistanceMetric()) // Use Euclidean distance
208
.setUseQuadTree(true) // Use quadtree optimization (for Euclidean distance)
209
.setSizeHint(CrossHint.FIRST_IS_SMALL) // Optimize when training set is small
210
211
// Train the model
212
val model = knn.fit(trainingData)
213
214
// Find k-nearest neighbors for each test point
215
val neighbors: DataSet[(Vector, Array[Vector])] = knn.predict(testData)
216
```
217
218
## Recommendation
219
220
### Alternating Least Squares (ALS)
221
222
Matrix factorization algorithm for collaborative filtering and recommendation systems using alternating least squares optimization.
223
224
```scala { .api }
225
class ALS extends Predictor[ALS] with WithParameters {
226
def setNumFactors(numFactors: Int): ALS
227
def setLambda(lambda: Double): ALS
228
def setIterations(iterations: Int): ALS
229
def setBlocks(blocks: Int): ALS
230
def setSeed(seed: Long): ALS
231
def setTemporaryPath(temporaryPath: String): ALS
232
def empiricalRisk(data: DataSet[(Int, Int, Double)]): DataSet[Double]
233
}
234
235
object ALS {
236
def apply(): ALS
237
238
// Parameters
239
case object NumFactors extends Parameter[Int] {
240
val defaultValue = Some(10)
241
}
242
243
case object Lambda extends Parameter[Double] {
244
val defaultValue = Some(1.0)
245
}
246
247
case object Iterations extends Parameter[Int] {
248
val defaultValue = Some(10)
249
}
250
251
case object Blocks extends Parameter[Int] {
252
val defaultValue = Some(1)
253
}
254
255
case object Seed extends Parameter[Long] {
256
val defaultValue = Some(0L)
257
}
258
259
case object TemporaryPath extends Parameter[String] {
260
val defaultValue = Some(System.getProperty("java.io.tmpdir"))
261
}
262
}
263
264
// Data types for ALS
265
case class Rating(user: Int, item: Int, rating: Double)
266
case class Factors(id: Int, factors: Vector)
267
case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])
268
```
269
270
**Usage Example:**
271
272
```scala
273
import org.apache.flink.ml.recommendation.ALS
274
275
// Rating data: (user_id, item_id, rating)
276
val ratings: DataSet[(Int, Int, Double)] = env.fromCollection(Seq(
277
(1, 1, 5.0), (1, 2, 3.0), (1, 3, 4.0),
278
(2, 1, 4.0), (2, 2, 2.0), (2, 4, 5.0),
279
(3, 2, 3.0), (3, 3, 5.0), (3, 4, 4.0)
280
))
281
282
val als = ALS()
283
.setNumFactors(50) // Number of latent factors
284
.setLambda(0.01) // Regularization parameter
285
.setIterations(20) // Number of iterations
286
.setBlocks(10) // Number of blocks for distributed computation
287
.setSeed(42) // Random seed
288
.setTemporaryPath("/tmp/als") // Temporary storage path
289
290
val model = als.fit(ratings)
291
292
// Predict ratings for user-item pairs
293
val userItemPairs: DataSet[(Int, Int)] = env.fromCollection(Seq((1, 4), (2, 3), (3, 1)))
294
val predictions = model.predict(userItemPairs)
295
296
// Calculate empirical risk (reconstruction error)
297
val risk = als.empiricalRisk(ratings)
298
```
299
300
## Nearest Neighbors
301
302
### k-Nearest Neighbors (k-NN)
303
304
k-Nearest Neighbors algorithm for classification and regression with support for various distance metrics and QuadTree optimization.
305
306
```scala { .api }
307
class KNN extends Predictor[KNN] with WithParameters {
308
def setK(k: Int): KNN
309
def setDistanceMetric(distanceMetric: DistanceMetric): KNN
310
def setBlocks(blocks: Int): KNN
311
def setUseQuadTree(useQuadTree: Boolean): KNN
312
def setSizeHint(sizeHint: Int): KNN
313
}
314
315
object KNN {
316
def apply(): KNN
317
318
// Parameters
319
case object K extends Parameter[Int] {
320
val defaultValue = Some(5)
321
}
322
323
case object DistanceMetric extends Parameter[DistanceMetric] {
324
val defaultValue = Some(EuclideanDistanceMetric())
325
}
326
327
case object Blocks extends Parameter[Int] {
328
val defaultValue = Some(1)
329
}
330
331
case object UseQuadTree extends Parameter[Boolean] {
332
val defaultValue = Some(false)
333
}
334
335
case object SizeHint extends Parameter[Int] {
336
val defaultValue = Some(-1)
337
}
338
}
339
```
340
341
**Usage Example:**
342
343
```scala
344
import org.apache.flink.ml.nn.KNN
345
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric
346
347
val knn = KNN()
348
.setK(3) // Number of neighbors
349
.setDistanceMetric(ManhattanDistanceMetric()) // Distance metric
350
.setBlocks(5) // Number of data blocks
351
.setUseQuadTree(true) // Use QuadTree for optimization
352
.setSizeHint(1000) // Hint about dataset size
353
354
val model = knn.fit(trainingData)
355
val predictions = model.predict(testData)
356
```
357
358
## Outlier Detection
359
360
### Stochastic Outlier Selection
361
362
Probabilistic outlier detection algorithm for identifying anomalous data points.
363
364
```scala { .api }
365
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters
366
```
367
368
**Usage Example:**
369
370
```scala
371
import org.apache.flink.ml.outlier.StochasticOutlierSelection
372
373
val outlierDetector = StochasticOutlierSelection()
374
val outlierScores = outlierDetector.transform(data)
375
```