or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md
tile.json

algorithms.mddocs/

Machine Learning Algorithms

Apache Flink ML provides implementations of popular machine learning algorithms optimized for distributed processing on Apache Flink. All algorithms follow the Estimator-Predictor pattern and support the parameter system for configuration.

Classification

Support Vector Machine (SVM)

Soft-margin Support Vector Machine using the CoCoA (Communication-efficient Distributed Dual Coordinate Ascent) algorithm for distributed training.

class SVM extends Predictor[SVM] with WithParameters {
  def setBlocks(blocks: Int): SVM
  def setIterations(iterations: Int): SVM  
  def setLocalIterations(localIterations: Int): SVM
  def setRegularization(regularization: Double): SVM
  def setStepsize(stepsize: Double): SVM
  def setSeed(seed: Long): SVM
  def setThreshold(threshold: Double): SVM
  def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM
}

object SVM {
  def apply(): SVM
  
  // Parameters
  case object Blocks extends Parameter[Int] {
    val defaultValue = Some(1)
  }
  
  case object Iterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object LocalIterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Regularization extends Parameter[Double] {
    val defaultValue = Some(0.1)
  }
  
  case object Stepsize extends Parameter[Double] {
    val defaultValue = Some(1.0)
  }
  
  case object Seed extends Parameter[Long] {
    val defaultValue = Some(Random.nextLong())
  }
  
  case object ThresholdValue extends Parameter[Double] {
    val defaultValue = Some(0.0)
  }
  
  case object OutputDecisionFunction extends Parameter[Boolean] {
    val defaultValue = Some(false)
  }
}

Usage Example:

import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

// Configure SVM
val svm = SVM()
  .setBlocks(10)                    // Number of data blocks for distributed training
  .setIterations(100)               // Maximum number of iterations
  .setLocalIterations(10)           // Local iterations per block per global iteration
  .setRegularization(0.001)         // Regularization parameter
  .setStepsize(0.1)                 // Step size for gradient descent
  .setSeed(42)                      // Random seed for reproducibility
  .setThreshold(0.5)                // Decision threshold
  .setOutputDecisionFunction(true)  // Output decision function values instead of binary predictions

// Train the model
val model = svm.fit(trainingData)

// Make predictions
val testData: DataSet[Vector] = //... test vectors
val predictions = model.predict(testData)

Regression

Multiple Linear Regression

Ordinary Least Squares regression using gradient descent optimization for distributed learning.

class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters {
  def setIterations(iterations: Int): MultipleLinearRegression
  def setStepsize(stepsize: Double): MultipleLinearRegression
  def setConvergenceThreshold(convergenceThreshold: Double): MultipleLinearRegression
  def setLearningRateMethod(learningRateMethod: LearningRateMethod): MultipleLinearRegression
  def squaredResidualSum(data: DataSet[LabeledVector]): DataSet[Double]
}

object MultipleLinearRegression {
  def apply(): MultipleLinearRegression
  
  // Parameters
  case object Iterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Stepsize extends Parameter[Double] {
    val defaultValue = Some(0.1)
  }
  
  case object ConvergenceThreshold extends Parameter[Double] {
    val defaultValue = Some(1e-6)
  }
  
  case object LearningRateMethodValue extends Parameter[LearningRateMethod] {
    val defaultValue = Some(LearningRateMethod.Default)
  }
}

// Learning rate scheduling methods
sealed trait LearningRateMethod
object LearningRateMethod {
  case object Default extends LearningRateMethod
  case object Inverse extends LearningRateMethod  
  case object InverseSquareRoot extends LearningRateMethod
}

Usage Example:

import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.optimization.LearningRateMethod

val regression = MultipleLinearRegression()
  .setIterations(200)
  .setStepsize(0.01)
  .setConvergenceThreshold(1e-8)
  .setLearningRateMethod(LearningRateMethod.Inverse)

val model = regression.fit(trainingData)
val predictions = model.predict(testData)

// Calculate residual sum of squares for model evaluation
val residualSum = regression.squaredResidualSum(trainingData)

Nearest Neighbor

K-Nearest Neighbors (KNN)

Implements k-nearest neighbor join for finding the k closest points in the training set for each test point, with support for various distance metrics and optimizations.

class KNN extends Predictor[KNN] with WithParameters {
  def setK(k: Int): KNN
  def setDistanceMetric(distanceMetric: DistanceMetric): KNN
  def setBlocks(blocks: Int): KNN
  def setUseQuadTree(useQuadTree: Boolean): KNN
  def setSizeHint(sizeHint: CrossHint): KNN
}

object KNN {
  def apply(): KNN
  
  // Parameters
  case object K extends Parameter[Int] {
    val defaultValue = Some(5)
  }
  
  case object DistanceMetric extends Parameter[DistanceMetric] {
    val defaultValue = Some(EuclideanDistanceMetric())
  }
  
  case object Blocks extends Parameter[Int] {
    val defaultValue = None
  }
  
  case object UseQuadTree extends Parameter[Boolean] {
    val defaultValue = None
  }
  
  case object SizeHint extends Parameter[CrossHint] {
    val defaultValue = None
  }
}

Usage Example:

import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint

val trainingData: DataSet[Vector] = //... training vectors
val testData: DataSet[Vector] = //... test vectors

val knn = KNN()
  .setK(10)                                    // Find 10 nearest neighbors
  .setBlocks(5)                               // Split data into 5 blocks for distributed processing
  .setDistanceMetric(EuclideanDistanceMetric()) // Use Euclidean distance
  .setUseQuadTree(true)                       // Use quadtree optimization (for Euclidean distance)
  .setSizeHint(CrossHint.FIRST_IS_SMALL)      // Optimize when training set is small

// Train the model
val model = knn.fit(trainingData)

// Find k-nearest neighbors for each test point
val neighbors: DataSet[(Vector, Array[Vector])] = knn.predict(testData)

Recommendation

Alternating Least Squares (ALS)

Matrix factorization algorithm for collaborative filtering and recommendation systems using alternating least squares optimization.

class ALS extends Predictor[ALS] with WithParameters {
  def setNumFactors(numFactors: Int): ALS
  def setLambda(lambda: Double): ALS
  def setIterations(iterations: Int): ALS
  def setBlocks(blocks: Int): ALS
  def setSeed(seed: Long): ALS
  def setTemporaryPath(temporaryPath: String): ALS
  def empiricalRisk(data: DataSet[(Int, Int, Double)]): DataSet[Double]
}

object ALS {
  def apply(): ALS
  
  // Parameters
  case object NumFactors extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Lambda extends Parameter[Double] {
    val defaultValue = Some(1.0)
  }
  
  case object Iterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Blocks extends Parameter[Int] {
    val defaultValue = Some(1)
  }
  
  case object Seed extends Parameter[Long] {
    val defaultValue = Some(0L)
  }
  
  case object TemporaryPath extends Parameter[String] {
    val defaultValue = Some(System.getProperty("java.io.tmpdir"))
  }
}

// Data types for ALS
case class Rating(user: Int, item: Int, rating: Double)
case class Factors(id: Int, factors: Vector)
case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])

Usage Example:

import org.apache.flink.ml.recommendation.ALS

// Rating data: (user_id, item_id, rating)
val ratings: DataSet[(Int, Int, Double)] = env.fromCollection(Seq(
  (1, 1, 5.0), (1, 2, 3.0), (1, 3, 4.0),
  (2, 1, 4.0), (2, 2, 2.0), (2, 4, 5.0),
  (3, 2, 3.0), (3, 3, 5.0), (3, 4, 4.0)
))

val als = ALS()
  .setNumFactors(50)           // Number of latent factors
  .setLambda(0.01)             // Regularization parameter
  .setIterations(20)           // Number of iterations
  .setBlocks(10)               // Number of blocks for distributed computation
  .setSeed(42)                 // Random seed
  .setTemporaryPath("/tmp/als") // Temporary storage path

val model = als.fit(ratings)

// Predict ratings for user-item pairs
val userItemPairs: DataSet[(Int, Int)] = env.fromCollection(Seq((1, 4), (2, 3), (3, 1)))
val predictions = model.predict(userItemPairs)

// Calculate empirical risk (reconstruction error)
val risk = als.empiricalRisk(ratings)

Nearest Neighbors

k-Nearest Neighbors (k-NN)

k-Nearest Neighbors algorithm for classification and regression with support for various distance metrics and QuadTree optimization.

class KNN extends Predictor[KNN] with WithParameters {
  def setK(k: Int): KNN
  def setDistanceMetric(distanceMetric: DistanceMetric): KNN
  def setBlocks(blocks: Int): KNN
  def setUseQuadTree(useQuadTree: Boolean): KNN
  def setSizeHint(sizeHint: Int): KNN
}

object KNN {
  def apply(): KNN
  
  // Parameters
  case object K extends Parameter[Int] {
    val defaultValue = Some(5)
  }
  
  case object DistanceMetric extends Parameter[DistanceMetric] {
    val defaultValue = Some(EuclideanDistanceMetric())
  }
  
  case object Blocks extends Parameter[Int] {
    val defaultValue = Some(1)
  }
  
  case object UseQuadTree extends Parameter[Boolean] {
    val defaultValue = Some(false)
  }
  
  case object SizeHint extends Parameter[Int] {
    val defaultValue = Some(-1)
  }
}

Usage Example:

import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric

val knn = KNN()
  .setK(3)                                    // Number of neighbors
  .setDistanceMetric(ManhattanDistanceMetric()) // Distance metric
  .setBlocks(5)                               // Number of data blocks
  .setUseQuadTree(true)                       // Use QuadTree for optimization
  .setSizeHint(1000)                          // Hint about dataset size

val model = knn.fit(trainingData)
val predictions = model.predict(testData)

Outlier Detection

Stochastic Outlier Selection

Probabilistic outlier detection algorithm for identifying anomalous data points.

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

Usage Example:

import org.apache.flink.ml.outlier.StochasticOutlierSelection

val outlierDetector = StochasticOutlierSelection()
val outlierScores = outlierDetector.transform(data)