or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md

index.mddocs/

0

# Apache Flink ML

1

2

Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-ml_2.12

7

- **Package Type**: maven

8

- **Language**: Scala (with Scala 2.12)

9

- **Version**: 1.8.3

10

- **Installation**: Add to your `pom.xml` or `build.sbt`

11

12

Maven:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-ml_2.12</artifactId>

17

<version>1.8.3</version>

18

</dependency>

19

```

20

21

SBT:

22

```scala

23

libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"

24

```

25

26

## Core Imports

27

28

```scala

29

import org.apache.flink.ml._

30

import org.apache.flink.ml.common.LabeledVector

31

import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}

32

import org.apache.flink.ml.classification.SVM

33

import org.apache.flink.ml.nn.KNN

34

import org.apache.flink.ml.regression.MultipleLinearRegression

35

import org.apache.flink.ml.recommendation.ALS

36

import org.apache.flink.ml.outlier.StochasticOutlierSelection

37

```

38

39

## Basic Usage

40

41

```scala

42

import org.apache.flink.api.scala._

43

import org.apache.flink.ml._

44

import org.apache.flink.ml.classification.SVM

45

import org.apache.flink.ml.common.LabeledVector

46

import org.apache.flink.ml.math.DenseVector

47

48

// Create execution environment

49

val env = ExecutionEnvironment.getExecutionEnvironment

50

51

// Load data using libSVM format

52

val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

53

54

// Create and configure SVM classifier

55

val svm = SVM()

56

.setBlocks(10)

57

.setIterations(100)

58

.setRegularization(0.001)

59

.setStepsize(0.1)

60

61

// Train the model

62

val model = svm.fit(trainingData)

63

64

// Make predictions

65

val testData: DataSet[Vector] = env.fromCollection(Seq(

66

DenseVector(Array(1.0, 2.0, 3.0)),

67

DenseVector(Array(4.0, 5.0, 6.0))

68

))

69

70

val predictions = model.predict(testData)

71

predictions.print()

72

```

73

74

## Architecture

75

76

Apache Flink ML is built around several key architectural components:

77

78

- **Pipeline Framework**: Estimator, Predictor, and Transformer traits provide a scikit-learn-like API for building ML pipelines

79

- **Linear Algebra**: Complete vector and matrix abstractions with both dense and sparse implementations

80

- **Distributed Computing**: All algorithms are designed for Flink's DataSet API with optimizations for distributed processing

81

- **Type-Safe Parameters**: Parameter system using Scala's type system for algorithm configuration

82

- **Optimization Framework**: Pluggable optimization algorithms like Gradient Descent with configurable loss functions

83

84

## Capabilities

85

86

### Machine Learning Algorithms

87

88

Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.

89

90

```scala { .api }

91

// Classification

92

class SVM extends Predictor[SVM] with WithParameters

93

class KNN extends Predictor[KNN] with WithParameters

94

95

// Regression

96

class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters

97

98

// Recommendation

99

class ALS extends Predictor[ALS] with WithParameters

100

```

101

102

[Machine Learning Algorithms](./algorithms.md)

103

104

### Linear Algebra

105

106

Comprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.

107

108

```scala { .api }

109

trait Vector {

110

def size: Int

111

def apply(index: Int): Double

112

def update(index: Int, value: Double): Unit

113

def dot(other: Vector): Double

114

def magnitude: Double

115

}

116

117

trait Matrix {

118

def numRows: Int

119

def numCols: Int

120

def apply(row: Int, col: Int): Double

121

def update(row: Int, col: Int, value: Double): Unit

122

}

123

```

124

125

[Linear Algebra](./linear-algebra.md)

126

127

### Data Preprocessing

128

129

Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.

130

131

```scala { .api }

132

class StandardScaler extends Transformer[StandardScaler] with WithParameters

133

class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters

134

class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters

135

```

136

137

[Data Preprocessing](./preprocessing.md)

138

139

### Outlier Detection

140

141

Algorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.

142

143

```scala { .api }

144

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

145

```

146

147

[Outlier Detection](./outlier-detection.md)

148

149

### Distance Metrics

150

151

Collection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.

152

153

```scala { .api }

154

trait DistanceMetric {

155

def distance(a: Vector, b: Vector): Double

156

}

157

158

class EuclideanDistanceMetric extends DistanceMetric

159

class ManhattanDistanceMetric extends DistanceMetric

160

class CosineDistanceMetric extends DistanceMetric

161

```

162

163

[Distance Metrics](./distance-metrics.md)

164

165

### Pipeline Framework

166

167

Modular pipeline system for chaining transformers and predictors, enabling complex ML workflows.

168

169

```scala { .api }

170

trait Estimator[Self] extends WithParameters {

171

def fit[Training](training: DataSet[Training]): Self

172

}

173

174

trait Predictor[Self] extends Estimator[Self] {

175

def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]

176

}

177

178

trait Transformer[Self] extends WithParameters {

179

def transform[Input](input: DataSet[Input]): DataSet[Output]

180

def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]

181

def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]

182

}

183

```

184

185

[Pipeline Framework](./pipeline.md)

186

187

### Optimization Framework

188

189

Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.

190

191

```scala { .api }

192

class GradientDescent extends IterativeSolver {

193

def optimize(

194

data: DataSet[LabeledVector],

195

initialWeights: Option[DataSet[WeightVector]]

196

): DataSet[WeightVector]

197

}

198

199

trait LossFunction {

200

def loss(dataPoint: LabeledVector, weights: WeightVector): Double

201

def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector

202

}

203

```

204

205

[Optimization Framework](./optimization.md)

206

207

## Core Types

208

209

```scala { .api }

210

// Data structures

211

case class LabeledVector(label: Double, vector: Vector)

212

case class WeightVector(weights: Vector, intercept: Double)

213

214

// Vector implementations

215

case class DenseVector(data: Array[Double]) extends Vector

216

case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector

217

218

// Matrix implementations

219

case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix

220

class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix

221

222

// Parameter system

223

trait Parameter[T] {

224

def defaultValue: Option[T]

225

}

226

227

class ParameterMap {

228

def add[T](parameter: Parameter[T], value: T): ParameterMap

229

def get[T](parameter: Parameter[T]): Option[T]

230

}

231

232

trait WithParameters {

233

def parameters: ParameterMap

234

}

235

```

236

237

## Data I/O

238

239

```scala { .api }

240

object MLUtils {

241

def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]

242

def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]

243

}

244

245

// Implicit extensions

246

implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {

247

def readLibSVM(path: String): DataSet[LabeledVector]

248

}

249

250

implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {

251

def writeAsLibSVM(path: String): DataSink[String]

252

}

253

```