or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-ml_2-12

Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-ml_2.12@1.8.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml_2-12@1.8.0

0

# Apache Flink ML

1

2

Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-ml_2.12

7

- **Package Type**: maven

8

- **Language**: Scala (with Scala 2.12)

9

- **Version**: 1.8.3

10

- **Installation**: Add to your `pom.xml` or `build.sbt`

11

12

Maven:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-ml_2.12</artifactId>

17

<version>1.8.3</version>

18

</dependency>

19

```

20

21

SBT:

22

```scala

23

libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"

24

```

25

26

## Core Imports

27

28

```scala

29

import org.apache.flink.ml._

30

import org.apache.flink.ml.common.LabeledVector

31

import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}

32

import org.apache.flink.ml.classification.SVM

33

import org.apache.flink.ml.nn.KNN

34

import org.apache.flink.ml.regression.MultipleLinearRegression

35

import org.apache.flink.ml.recommendation.ALS

36

import org.apache.flink.ml.outlier.StochasticOutlierSelection

37

```

38

39

## Basic Usage

40

41

```scala

42

import org.apache.flink.api.scala._

43

import org.apache.flink.ml._

44

import org.apache.flink.ml.classification.SVM

45

import org.apache.flink.ml.common.LabeledVector

46

import org.apache.flink.ml.math.DenseVector

47

48

// Create execution environment

49

val env = ExecutionEnvironment.getExecutionEnvironment

50

51

// Load data using libSVM format

52

val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

53

54

// Create and configure SVM classifier

55

val svm = SVM()

56

.setBlocks(10)

57

.setIterations(100)

58

.setRegularization(0.001)

59

.setStepsize(0.1)

60

61

// Train the model

62

val model = svm.fit(trainingData)

63

64

// Make predictions

65

val testData: DataSet[Vector] = env.fromCollection(Seq(

66

DenseVector(Array(1.0, 2.0, 3.0)),

67

DenseVector(Array(4.0, 5.0, 6.0))

68

))

69

70

val predictions = model.predict(testData)

71

predictions.print()

72

```

73

74

## Architecture

75

76

Apache Flink ML is built around several key architectural components:

77

78

- **Pipeline Framework**: Estimator, Predictor, and Transformer traits provide a scikit-learn-like API for building ML pipelines

79

- **Linear Algebra**: Complete vector and matrix abstractions with both dense and sparse implementations

80

- **Distributed Computing**: All algorithms are designed for Flink's DataSet API with optimizations for distributed processing

81

- **Type-Safe Parameters**: Parameter system using Scala's type system for algorithm configuration

82

- **Optimization Framework**: Pluggable optimization algorithms like Gradient Descent with configurable loss functions

83

84

## Capabilities

85

86

### Machine Learning Algorithms

87

88

Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.

89

90

```scala { .api }

91

// Classification

92

class SVM extends Predictor[SVM] with WithParameters

93

class KNN extends Predictor[KNN] with WithParameters

94

95

// Regression

96

class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters

97

98

// Recommendation

99

class ALS extends Predictor[ALS] with WithParameters

100

```

101

102

[Machine Learning Algorithms](./algorithms.md)

103

104

### Linear Algebra

105

106

Comprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.

107

108

```scala { .api }

109

trait Vector {

110

def size: Int

111

def apply(index: Int): Double

112

def update(index: Int, value: Double): Unit

113

def dot(other: Vector): Double

114

def magnitude: Double

115

}

116

117

trait Matrix {

118

def numRows: Int

119

def numCols: Int

120

def apply(row: Int, col: Int): Double

121

def update(row: Int, col: Int, value: Double): Unit

122

}

123

```

124

125

[Linear Algebra](./linear-algebra.md)

126

127

### Data Preprocessing

128

129

Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.

130

131

```scala { .api }

132

class StandardScaler extends Transformer[StandardScaler] with WithParameters

133

class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters

134

class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters

135

```

136

137

[Data Preprocessing](./preprocessing.md)

138

139

### Outlier Detection

140

141

Algorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.

142

143

```scala { .api }

144

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

145

```

146

147

[Outlier Detection](./outlier-detection.md)

148

149

### Distance Metrics

150

151

Collection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.

152

153

```scala { .api }

154

trait DistanceMetric {

155

def distance(a: Vector, b: Vector): Double

156

}

157

158

class EuclideanDistanceMetric extends DistanceMetric

159

class ManhattanDistanceMetric extends DistanceMetric

160

class CosineDistanceMetric extends DistanceMetric

161

```

162

163

[Distance Metrics](./distance-metrics.md)

164

165

### Pipeline Framework

166

167

Modular pipeline system for chaining transformers and predictors, enabling complex ML workflows.

168

169

```scala { .api }

170

trait Estimator[Self] extends WithParameters {

171

def fit[Training](training: DataSet[Training]): Self

172

}

173

174

trait Predictor[Self] extends Estimator[Self] {

175

def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]

176

}

177

178

trait Transformer[Self] extends WithParameters {

179

def transform[Input](input: DataSet[Input]): DataSet[Output]

180

def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]

181

def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]

182

}

183

```

184

185

[Pipeline Framework](./pipeline.md)

186

187

### Optimization Framework

188

189

Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.

190

191

```scala { .api }

192

class GradientDescent extends IterativeSolver {

193

def optimize(

194

data: DataSet[LabeledVector],

195

initialWeights: Option[DataSet[WeightVector]]

196

): DataSet[WeightVector]

197

}

198

199

trait LossFunction {

200

def loss(dataPoint: LabeledVector, weights: WeightVector): Double

201

def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector

202

}

203

```

204

205

[Optimization Framework](./optimization.md)

206

207

## Core Types

208

209

```scala { .api }

210

// Data structures

211

case class LabeledVector(label: Double, vector: Vector)

212

case class WeightVector(weights: Vector, intercept: Double)

213

214

// Vector implementations

215

case class DenseVector(data: Array[Double]) extends Vector

216

case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector

217

218

// Matrix implementations

219

case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix

220

class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix

221

222

// Parameter system

223

trait Parameter[T] {

224

def defaultValue: Option[T]

225

}

226

227

class ParameterMap {

228

def add[T](parameter: Parameter[T], value: T): ParameterMap

229

def get[T](parameter: Parameter[T]): Option[T]

230

}

231

232

trait WithParameters {

233

def parameters: ParameterMap

234

}

235

```

236

237

## Data I/O

238

239

```scala { .api }

240

object MLUtils {

241

def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]

242

def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]

243

}

244

245

// Implicit extensions

246

implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {

247

def readLibSVM(path: String): DataSet[LabeledVector]

248

}

249

250

implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {

251

def writeAsLibSVM(path: String): DataSink[String]

252

}

253

```