or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md

algorithms.mddocs/

0

# Machine Learning Algorithms

1

2

Apache Flink ML provides implementations of popular machine learning algorithms optimized for distributed processing on Apache Flink. All algorithms follow the Estimator-Predictor pattern and support the parameter system for configuration.

3

4

## Classification

5

6

### Support Vector Machine (SVM)

7

8

Soft-margin Support Vector Machine using the CoCoA (Communication-efficient Distributed Dual Coordinate Ascent) algorithm for distributed training.

9

10

```scala { .api }

11

class SVM extends Predictor[SVM] with WithParameters {

12

def setBlocks(blocks: Int): SVM

13

def setIterations(iterations: Int): SVM

14

def setLocalIterations(localIterations: Int): SVM

15

def setRegularization(regularization: Double): SVM

16

def setStepsize(stepsize: Double): SVM

17

def setSeed(seed: Long): SVM

18

def setThreshold(threshold: Double): SVM

19

def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM

20

}

21

22

object SVM {

23

def apply(): SVM

24

25

// Parameters

26

case object Blocks extends Parameter[Int] {

27

val defaultValue = Some(1)

28

}

29

30

case object Iterations extends Parameter[Int] {

31

val defaultValue = Some(10)

32

}

33

34

case object LocalIterations extends Parameter[Int] {

35

val defaultValue = Some(10)

36

}

37

38

case object Regularization extends Parameter[Double] {

39

val defaultValue = Some(0.1)

40

}

41

42

case object Stepsize extends Parameter[Double] {

43

val defaultValue = Some(1.0)

44

}

45

46

case object Seed extends Parameter[Long] {

47

val defaultValue = Some(Random.nextLong())

48

}

49

50

case object ThresholdValue extends Parameter[Double] {

51

val defaultValue = Some(0.0)

52

}

53

54

case object OutputDecisionFunction extends Parameter[Boolean] {

55

val defaultValue = Some(false)

56

}

57

}

58

```

59

60

**Usage Example:**

61

62

```scala

63

import org.apache.flink.ml.classification.SVM

64

import org.apache.flink.ml.common.LabeledVector

65

import org.apache.flink.api.scala._

66

67

val env = ExecutionEnvironment.getExecutionEnvironment

68

val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

69

70

// Configure SVM

71

val svm = SVM()

72

.setBlocks(10) // Number of data blocks for distributed training

73

.setIterations(100) // Maximum number of iterations

74

.setLocalIterations(10) // Local iterations per block per global iteration

75

.setRegularization(0.001) // Regularization parameter

76

.setStepsize(0.1) // Step size for gradient descent

77

.setSeed(42) // Random seed for reproducibility

78

.setThreshold(0.5) // Decision threshold

79

.setOutputDecisionFunction(true) // Output decision function values instead of binary predictions

80

81

// Train the model

82

val model = svm.fit(trainingData)

83

84

// Make predictions

85

val testData: DataSet[Vector] = //... test vectors

86

val predictions = model.predict(testData)

87

```

88

89

## Regression

90

91

### Multiple Linear Regression

92

93

Ordinary Least Squares regression using gradient descent optimization for distributed learning.

94

95

```scala { .api }

96

class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters {

97

def setIterations(iterations: Int): MultipleLinearRegression

98

def setStepsize(stepsize: Double): MultipleLinearRegression

99

def setConvergenceThreshold(convergenceThreshold: Double): MultipleLinearRegression

100

def setLearningRateMethod(learningRateMethod: LearningRateMethod): MultipleLinearRegression

101

def squaredResidualSum(data: DataSet[LabeledVector]): DataSet[Double]

102

}

103

104

object MultipleLinearRegression {

105

def apply(): MultipleLinearRegression

106

107

// Parameters

108

case object Iterations extends Parameter[Int] {

109

val defaultValue = Some(10)

110

}

111

112

case object Stepsize extends Parameter[Double] {

113

val defaultValue = Some(0.1)

114

}

115

116

case object ConvergenceThreshold extends Parameter[Double] {

117

val defaultValue = Some(1e-6)

118

}

119

120

case object LearningRateMethodValue extends Parameter[LearningRateMethod] {

121

val defaultValue = Some(LearningRateMethod.Default)

122

}

123

}

124

125

// Learning rate scheduling methods

126

sealed trait LearningRateMethod

127

object LearningRateMethod {

128

case object Default extends LearningRateMethod

129

case object Inverse extends LearningRateMethod

130

case object InverseSquareRoot extends LearningRateMethod

131

}

132

```

133

134

**Usage Example:**

135

136

```scala

137

import org.apache.flink.ml.regression.MultipleLinearRegression

138

import org.apache.flink.ml.optimization.LearningRateMethod

139

140

val regression = MultipleLinearRegression()

141

.setIterations(200)

142

.setStepsize(0.01)

143

.setConvergenceThreshold(1e-8)

144

.setLearningRateMethod(LearningRateMethod.Inverse)

145

146

val model = regression.fit(trainingData)

147

val predictions = model.predict(testData)

148

149

// Calculate residual sum of squares for model evaluation

150

val residualSum = regression.squaredResidualSum(trainingData)

151

```

152

153

## Nearest Neighbor

154

155

### K-Nearest Neighbors (KNN)

156

157

Implements k-nearest neighbor join for finding the k closest points in the training set for each test point, with support for various distance metrics and optimizations.

158

159

```scala { .api }

160

class KNN extends Predictor[KNN] with WithParameters {

161

def setK(k: Int): KNN

162

def setDistanceMetric(distanceMetric: DistanceMetric): KNN

163

def setBlocks(blocks: Int): KNN

164

def setUseQuadTree(useQuadTree: Boolean): KNN

165

def setSizeHint(sizeHint: CrossHint): KNN

166

}

167

168

object KNN {

169

def apply(): KNN

170

171

// Parameters

172

case object K extends Parameter[Int] {

173

val defaultValue = Some(5)

174

}

175

176

case object DistanceMetric extends Parameter[DistanceMetric] {

177

val defaultValue = Some(EuclideanDistanceMetric())

178

}

179

180

case object Blocks extends Parameter[Int] {

181

val defaultValue = None

182

}

183

184

case object UseQuadTree extends Parameter[Boolean] {

185

val defaultValue = None

186

}

187

188

case object SizeHint extends Parameter[CrossHint] {

189

val defaultValue = None

190

}

191

}

192

```

193

194

**Usage Example:**

195

196

```scala

197

import org.apache.flink.ml.nn.KNN

198

import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric

199

import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint

200

201

val trainingData: DataSet[Vector] = //... training vectors

202

val testData: DataSet[Vector] = //... test vectors

203

204

val knn = KNN()

205

.setK(10) // Find 10 nearest neighbors

206

.setBlocks(5) // Split data into 5 blocks for distributed processing

207

.setDistanceMetric(EuclideanDistanceMetric()) // Use Euclidean distance

208

.setUseQuadTree(true) // Use quadtree optimization (for Euclidean distance)

209

.setSizeHint(CrossHint.FIRST_IS_SMALL) // Optimize when training set is small

210

211

// Train the model

212

val model = knn.fit(trainingData)

213

214

// Find k-nearest neighbors for each test point

215

val neighbors: DataSet[(Vector, Array[Vector])] = knn.predict(testData)

216

```

217

218

## Recommendation

219

220

### Alternating Least Squares (ALS)

221

222

Matrix factorization algorithm for collaborative filtering and recommendation systems using alternating least squares optimization.

223

224

```scala { .api }

225

class ALS extends Predictor[ALS] with WithParameters {

226

def setNumFactors(numFactors: Int): ALS

227

def setLambda(lambda: Double): ALS

228

def setIterations(iterations: Int): ALS

229

def setBlocks(blocks: Int): ALS

230

def setSeed(seed: Long): ALS

231

def setTemporaryPath(temporaryPath: String): ALS

232

def empiricalRisk(data: DataSet[(Int, Int, Double)]): DataSet[Double]

233

}

234

235

object ALS {

236

def apply(): ALS

237

238

// Parameters

239

case object NumFactors extends Parameter[Int] {

240

val defaultValue = Some(10)

241

}

242

243

case object Lambda extends Parameter[Double] {

244

val defaultValue = Some(1.0)

245

}

246

247

case object Iterations extends Parameter[Int] {

248

val defaultValue = Some(10)

249

}

250

251

case object Blocks extends Parameter[Int] {

252

val defaultValue = Some(1)

253

}

254

255

case object Seed extends Parameter[Long] {

256

val defaultValue = Some(0L)

257

}

258

259

case object TemporaryPath extends Parameter[String] {

260

val defaultValue = Some(System.getProperty("java.io.tmpdir"))

261

}

262

}

263

264

// Data types for ALS

265

case class Rating(user: Int, item: Int, rating: Double)

266

case class Factors(id: Int, factors: Vector)

267

case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])

268

```

269

270

**Usage Example:**

271

272

```scala

273

import org.apache.flink.ml.recommendation.ALS

274

275

// Rating data: (user_id, item_id, rating)

276

val ratings: DataSet[(Int, Int, Double)] = env.fromCollection(Seq(

277

(1, 1, 5.0), (1, 2, 3.0), (1, 3, 4.0),

278

(2, 1, 4.0), (2, 2, 2.0), (2, 4, 5.0),

279

(3, 2, 3.0), (3, 3, 5.0), (3, 4, 4.0)

280

))

281

282

val als = ALS()

283

.setNumFactors(50) // Number of latent factors

284

.setLambda(0.01) // Regularization parameter

285

.setIterations(20) // Number of iterations

286

.setBlocks(10) // Number of blocks for distributed computation

287

.setSeed(42) // Random seed

288

.setTemporaryPath("/tmp/als") // Temporary storage path

289

290

val model = als.fit(ratings)

291

292

// Predict ratings for user-item pairs

293

val userItemPairs: DataSet[(Int, Int)] = env.fromCollection(Seq((1, 4), (2, 3), (3, 1)))

294

val predictions = model.predict(userItemPairs)

295

296

// Calculate empirical risk (reconstruction error)

297

val risk = als.empiricalRisk(ratings)

298

```

299

300

## Nearest Neighbors

301

302

### k-Nearest Neighbors (k-NN)

303

304

k-Nearest Neighbors algorithm for classification and regression with support for various distance metrics and QuadTree optimization.

305

306

```scala { .api }

307

class KNN extends Predictor[KNN] with WithParameters {

308

def setK(k: Int): KNN

309

def setDistanceMetric(distanceMetric: DistanceMetric): KNN

310

def setBlocks(blocks: Int): KNN

311

def setUseQuadTree(useQuadTree: Boolean): KNN

312

def setSizeHint(sizeHint: Int): KNN

313

}

314

315

object KNN {

316

def apply(): KNN

317

318

// Parameters

319

case object K extends Parameter[Int] {

320

val defaultValue = Some(5)

321

}

322

323

case object DistanceMetric extends Parameter[DistanceMetric] {

324

val defaultValue = Some(EuclideanDistanceMetric())

325

}

326

327

case object Blocks extends Parameter[Int] {

328

val defaultValue = Some(1)

329

}

330

331

case object UseQuadTree extends Parameter[Boolean] {

332

val defaultValue = Some(false)

333

}

334

335

case object SizeHint extends Parameter[Int] {

336

val defaultValue = Some(-1)

337

}

338

}

339

```

340

341

**Usage Example:**

342

343

```scala

344

import org.apache.flink.ml.nn.KNN

345

import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric

346

347

val knn = KNN()

348

.setK(3) // Number of neighbors

349

.setDistanceMetric(ManhattanDistanceMetric()) // Distance metric

350

.setBlocks(5) // Number of data blocks

351

.setUseQuadTree(true) // Use QuadTree for optimization

352

.setSizeHint(1000) // Hint about dataset size

353

354

val model = knn.fit(trainingData)

355

val predictions = model.predict(testData)

356

```

357

358

## Outlier Detection

359

360

### Stochastic Outlier Selection

361

362

Probabilistic outlier detection algorithm for identifying anomalous data points.

363

364

```scala { .api }

365

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

366

```

367

368

**Usage Example:**

369

370

```scala

371

import org.apache.flink.ml.outlier.StochasticOutlierSelection

372

373

val outlierDetector = StochasticOutlierSelection()

374

val outlierScores = outlierDetector.transform(data)

375

```