Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml_2-12@1.8.00
# Apache Flink ML
1
2
Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-ml_2.12
7
- **Package Type**: maven
8
- **Language**: Scala (with Scala 2.12)
9
- **Version**: 1.8.3
10
- **Installation**: Add to your `pom.xml` or `build.sbt`
11
12
Maven:
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-ml_2.12</artifactId>
17
<version>1.8.3</version>
18
</dependency>
19
```
20
21
SBT:
22
```scala
23
libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"
24
```
25
26
## Core Imports
27
28
```scala
29
import org.apache.flink.ml._
30
import org.apache.flink.ml.common.LabeledVector
31
import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}
32
import org.apache.flink.ml.classification.SVM
33
import org.apache.flink.ml.nn.KNN
34
import org.apache.flink.ml.regression.MultipleLinearRegression
35
import org.apache.flink.ml.recommendation.ALS
36
import org.apache.flink.ml.outlier.StochasticOutlierSelection
37
```
38
39
## Basic Usage
40
41
```scala
42
import org.apache.flink.api.scala._
43
import org.apache.flink.ml._
44
import org.apache.flink.ml.classification.SVM
45
import org.apache.flink.ml.common.LabeledVector
46
import org.apache.flink.ml.math.DenseVector
47
48
// Create execution environment
49
val env = ExecutionEnvironment.getExecutionEnvironment
50
51
// Load data using libSVM format
52
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")
53
54
// Create and configure SVM classifier
55
val svm = SVM()
56
.setBlocks(10)
57
.setIterations(100)
58
.setRegularization(0.001)
59
.setStepsize(0.1)
60
61
// Train the model
62
val model = svm.fit(trainingData)
63
64
// Make predictions
65
val testData: DataSet[Vector] = env.fromCollection(Seq(
66
DenseVector(Array(1.0, 2.0, 3.0)),
67
DenseVector(Array(4.0, 5.0, 6.0))
68
))
69
70
val predictions = model.predict(testData)
71
predictions.print()
72
```
73
74
## Architecture
75
76
Apache Flink ML is built around several key architectural components:
77
78
- **Pipeline Framework**: Estimator, Predictor, and Transformer traits provide a scikit-learn-like API for building ML pipelines
79
- **Linear Algebra**: Complete vector and matrix abstractions with both dense and sparse implementations
80
- **Distributed Computing**: All algorithms are designed for Flink's DataSet API with optimizations for distributed processing
81
- **Type-Safe Parameters**: Parameter system using Scala's type system for algorithm configuration
82
- **Optimization Framework**: Pluggable optimization algorithms like Gradient Descent with configurable loss functions
83
84
## Capabilities
85
86
### Machine Learning Algorithms
87
88
Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.
89
90
```scala { .api }
91
// Classification
92
class SVM extends Predictor[SVM] with WithParameters
93
class KNN extends Predictor[KNN] with WithParameters
94
95
// Regression
96
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters
97
98
// Recommendation
99
class ALS extends Predictor[ALS] with WithParameters
100
```
101
102
[Machine Learning Algorithms](./algorithms.md)
103
104
### Linear Algebra
105
106
Comprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.
107
108
```scala { .api }
109
trait Vector {
110
def size: Int
111
def apply(index: Int): Double
112
def update(index: Int, value: Double): Unit
113
def dot(other: Vector): Double
114
def magnitude: Double
115
}
116
117
trait Matrix {
118
def numRows: Int
119
def numCols: Int
120
def apply(row: Int, col: Int): Double
121
def update(row: Int, col: Int, value: Double): Unit
122
}
123
```
124
125
[Linear Algebra](./linear-algebra.md)
126
127
### Data Preprocessing
128
129
Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.
130
131
```scala { .api }
132
class StandardScaler extends Transformer[StandardScaler] with WithParameters
133
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters
134
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters
135
```
136
137
[Data Preprocessing](./preprocessing.md)
138
139
### Outlier Detection
140
141
Algorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.
142
143
```scala { .api }
144
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters
145
```
146
147
[Outlier Detection](./outlier-detection.md)
148
149
### Distance Metrics
150
151
Collection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.
152
153
```scala { .api }
154
trait DistanceMetric {
155
def distance(a: Vector, b: Vector): Double
156
}
157
158
class EuclideanDistanceMetric extends DistanceMetric
159
class ManhattanDistanceMetric extends DistanceMetric
160
class CosineDistanceMetric extends DistanceMetric
161
```
162
163
[Distance Metrics](./distance-metrics.md)
164
165
### Pipeline Framework
166
167
Modular pipeline system for chaining transformers and predictors, enabling complex ML workflows.
168
169
```scala { .api }
170
trait Estimator[Self] extends WithParameters {
171
def fit[Training](training: DataSet[Training]): Self
172
}
173
174
trait Predictor[Self] extends Estimator[Self] {
175
def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
176
}
177
178
trait Transformer[Self] extends WithParameters {
179
def transform[Input](input: DataSet[Input]): DataSet[Output]
180
def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
181
def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
182
}
183
```
184
185
[Pipeline Framework](./pipeline.md)
186
187
### Optimization Framework
188
189
Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.
190
191
```scala { .api }
192
class GradientDescent extends IterativeSolver {
193
def optimize(
194
data: DataSet[LabeledVector],
195
initialWeights: Option[DataSet[WeightVector]]
196
): DataSet[WeightVector]
197
}
198
199
trait LossFunction {
200
def loss(dataPoint: LabeledVector, weights: WeightVector): Double
201
def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector
202
}
203
```
204
205
[Optimization Framework](./optimization.md)
206
207
## Core Types
208
209
```scala { .api }
210
// Data structures
211
case class LabeledVector(label: Double, vector: Vector)
212
case class WeightVector(weights: Vector, intercept: Double)
213
214
// Vector implementations
215
case class DenseVector(data: Array[Double]) extends Vector
216
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector
217
218
// Matrix implementations
219
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix
220
class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix
221
222
// Parameter system
223
trait Parameter[T] {
224
def defaultValue: Option[T]
225
}
226
227
class ParameterMap {
228
def add[T](parameter: Parameter[T], value: T): ParameterMap
229
def get[T](parameter: Parameter[T]): Option[T]
230
}
231
232
trait WithParameters {
233
def parameters: ParameterMap
234
}
235
```
236
237
## Data I/O
238
239
```scala { .api }
240
object MLUtils {
241
def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]
242
def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]
243
}
244
245
// Implicit extensions
246
implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {
247
def readLibSVM(path: String): DataSet[LabeledVector]
248
}
249
250
implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {
251
def writeAsLibSVM(path: String): DataSink[String]
252
}
253
```