CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-ml-2-12

Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing

Overview
Eval results
Files

algorithms.mddocs/

Machine Learning Algorithms

Apache Flink ML provides implementations of popular machine learning algorithms optimized for distributed processing on Apache Flink. All algorithms follow the Estimator-Predictor pattern and support the parameter system for configuration.

Classification

Support Vector Machine (SVM)

Soft-margin Support Vector Machine using the CoCoA (Communication-efficient Distributed Dual Coordinate Ascent) algorithm for distributed training.

class SVM extends Predictor[SVM] with WithParameters {
  def setBlocks(blocks: Int): SVM
  def setIterations(iterations: Int): SVM  
  def setLocalIterations(localIterations: Int): SVM
  def setRegularization(regularization: Double): SVM
  def setStepsize(stepsize: Double): SVM
  def setSeed(seed: Long): SVM
  def setThreshold(threshold: Double): SVM
  def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM
}

object SVM {
  def apply(): SVM
  
  // Parameters
  case object Blocks extends Parameter[Int] {
    val defaultValue = Some(1)
  }
  
  case object Iterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object LocalIterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Regularization extends Parameter[Double] {
    val defaultValue = Some(0.1)
  }
  
  case object Stepsize extends Parameter[Double] {
    val defaultValue = Some(1.0)
  }
  
  case object Seed extends Parameter[Long] {
    val defaultValue = Some(Random.nextLong())
  }
  
  case object ThresholdValue extends Parameter[Double] {
    val defaultValue = Some(0.0)
  }
  
  case object OutputDecisionFunction extends Parameter[Boolean] {
    val defaultValue = Some(false)
  }
}

Usage Example:

import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

// Configure SVM
val svm = SVM()
  .setBlocks(10)                    // Number of data blocks for distributed training
  .setIterations(100)               // Maximum number of iterations
  .setLocalIterations(10)           // Local iterations per block per global iteration
  .setRegularization(0.001)         // Regularization parameter
  .setStepsize(0.1)                 // Step size for gradient descent
  .setSeed(42)                      // Random seed for reproducibility
  .setThreshold(0.5)                // Decision threshold
  .setOutputDecisionFunction(true)  // Output decision function values instead of binary predictions

// Train the model
val model = svm.fit(trainingData)

// Make predictions
val testData: DataSet[Vector] = //... test vectors
val predictions = model.predict(testData)

Regression

Multiple Linear Regression

Ordinary Least Squares regression using gradient descent optimization for distributed learning.

class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters {
  def setIterations(iterations: Int): MultipleLinearRegression
  def setStepsize(stepsize: Double): MultipleLinearRegression
  def setConvergenceThreshold(convergenceThreshold: Double): MultipleLinearRegression
  def setLearningRateMethod(learningRateMethod: LearningRateMethod): MultipleLinearRegression
  def squaredResidualSum(data: DataSet[LabeledVector]): DataSet[Double]
}

object MultipleLinearRegression {
  def apply(): MultipleLinearRegression
  
  // Parameters
  case object Iterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Stepsize extends Parameter[Double] {
    val defaultValue = Some(0.1)
  }
  
  case object ConvergenceThreshold extends Parameter[Double] {
    val defaultValue = Some(1e-6)
  }
  
  case object LearningRateMethodValue extends Parameter[LearningRateMethod] {
    val defaultValue = Some(LearningRateMethod.Default)
  }
}

// Learning rate scheduling methods
sealed trait LearningRateMethod
object LearningRateMethod {
  case object Default extends LearningRateMethod
  case object Inverse extends LearningRateMethod  
  case object InverseSquareRoot extends LearningRateMethod
}

Usage Example:

import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.optimization.LearningRateMethod

val regression = MultipleLinearRegression()
  .setIterations(200)
  .setStepsize(0.01)
  .setConvergenceThreshold(1e-8)
  .setLearningRateMethod(LearningRateMethod.Inverse)

val model = regression.fit(trainingData)
val predictions = model.predict(testData)

// Calculate residual sum of squares for model evaluation
val residualSum = regression.squaredResidualSum(trainingData)

Nearest Neighbor

K-Nearest Neighbors (KNN)

Implements k-nearest neighbor join for finding the k closest points in the training set for each test point, with support for various distance metrics and optimizations.

class KNN extends Predictor[KNN] with WithParameters {
  def setK(k: Int): KNN
  def setDistanceMetric(distanceMetric: DistanceMetric): KNN
  def setBlocks(blocks: Int): KNN
  def setUseQuadTree(useQuadTree: Boolean): KNN
  def setSizeHint(sizeHint: CrossHint): KNN
}

object KNN {
  def apply(): KNN
  
  // Parameters
  case object K extends Parameter[Int] {
    val defaultValue = Some(5)
  }
  
  case object DistanceMetric extends Parameter[DistanceMetric] {
    val defaultValue = Some(EuclideanDistanceMetric())
  }
  
  case object Blocks extends Parameter[Int] {
    val defaultValue = None
  }
  
  case object UseQuadTree extends Parameter[Boolean] {
    val defaultValue = None
  }
  
  case object SizeHint extends Parameter[CrossHint] {
    val defaultValue = None
  }
}

Usage Example:

import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint

val trainingData: DataSet[Vector] = //... training vectors
val testData: DataSet[Vector] = //... test vectors

val knn = KNN()
  .setK(10)                                    // Find 10 nearest neighbors
  .setBlocks(5)                               // Split data into 5 blocks for distributed processing
  .setDistanceMetric(EuclideanDistanceMetric()) // Use Euclidean distance
  .setUseQuadTree(true)                       // Use quadtree optimization (for Euclidean distance)
  .setSizeHint(CrossHint.FIRST_IS_SMALL)      // Optimize when training set is small

// Train the model
val model = knn.fit(trainingData)

// Find k-nearest neighbors for each test point
val neighbors: DataSet[(Vector, Array[Vector])] = knn.predict(testData)

Recommendation

Alternating Least Squares (ALS)

Matrix factorization algorithm for collaborative filtering and recommendation systems using alternating least squares optimization.

class ALS extends Predictor[ALS] with WithParameters {
  def setNumFactors(numFactors: Int): ALS
  def setLambda(lambda: Double): ALS
  def setIterations(iterations: Int): ALS
  def setBlocks(blocks: Int): ALS
  def setSeed(seed: Long): ALS
  def setTemporaryPath(temporaryPath: String): ALS
  def empiricalRisk(data: DataSet[(Int, Int, Double)]): DataSet[Double]
}

object ALS {
  def apply(): ALS
  
  // Parameters
  case object NumFactors extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Lambda extends Parameter[Double] {
    val defaultValue = Some(1.0)
  }
  
  case object Iterations extends Parameter[Int] {
    val defaultValue = Some(10)
  }
  
  case object Blocks extends Parameter[Int] {
    val defaultValue = Some(1)
  }
  
  case object Seed extends Parameter[Long] {
    val defaultValue = Some(0L)
  }
  
  case object TemporaryPath extends Parameter[String] {
    val defaultValue = Some(System.getProperty("java.io.tmpdir"))
  }
}

// Data types for ALS
case class Rating(user: Int, item: Int, rating: Double)
case class Factors(id: Int, factors: Vector)
case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])

Usage Example:

import org.apache.flink.ml.recommendation.ALS

// Rating data: (user_id, item_id, rating)
val ratings: DataSet[(Int, Int, Double)] = env.fromCollection(Seq(
  (1, 1, 5.0), (1, 2, 3.0), (1, 3, 4.0),
  (2, 1, 4.0), (2, 2, 2.0), (2, 4, 5.0),
  (3, 2, 3.0), (3, 3, 5.0), (3, 4, 4.0)
))

val als = ALS()
  .setNumFactors(50)           // Number of latent factors
  .setLambda(0.01)             // Regularization parameter
  .setIterations(20)           // Number of iterations
  .setBlocks(10)               // Number of blocks for distributed computation
  .setSeed(42)                 // Random seed
  .setTemporaryPath("/tmp/als") // Temporary storage path

val model = als.fit(ratings)

// Predict ratings for user-item pairs
val userItemPairs: DataSet[(Int, Int)] = env.fromCollection(Seq((1, 4), (2, 3), (3, 1)))
val predictions = model.predict(userItemPairs)

// Calculate empirical risk (reconstruction error)
val risk = als.empiricalRisk(ratings)

Nearest Neighbors

k-Nearest Neighbors (k-NN)

k-Nearest Neighbors algorithm for classification and regression with support for various distance metrics and QuadTree optimization.

class KNN extends Predictor[KNN] with WithParameters {
  def setK(k: Int): KNN
  def setDistanceMetric(distanceMetric: DistanceMetric): KNN
  def setBlocks(blocks: Int): KNN
  def setUseQuadTree(useQuadTree: Boolean): KNN
  def setSizeHint(sizeHint: Int): KNN
}

object KNN {
  def apply(): KNN
  
  // Parameters
  case object K extends Parameter[Int] {
    val defaultValue = Some(5)
  }
  
  case object DistanceMetric extends Parameter[DistanceMetric] {
    val defaultValue = Some(EuclideanDistanceMetric())
  }
  
  case object Blocks extends Parameter[Int] {
    val defaultValue = Some(1)
  }
  
  case object UseQuadTree extends Parameter[Boolean] {
    val defaultValue = Some(false)
  }
  
  case object SizeHint extends Parameter[Int] {
    val defaultValue = Some(-1)
  }
}

Usage Example:

import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric

val knn = KNN()
  .setK(3)                                    // Number of neighbors
  .setDistanceMetric(ManhattanDistanceMetric()) // Distance metric
  .setBlocks(5)                               // Number of data blocks
  .setUseQuadTree(true)                       // Use QuadTree for optimization
  .setSizeHint(1000)                          // Hint about dataset size

val model = knn.fit(trainingData)
val predictions = model.predict(testData)

Outlier Detection

Stochastic Outlier Selection

Probabilistic outlier detection algorithm for identifying anomalous data points.

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

Usage Example:

import org.apache.flink.ml.outlier.StochasticOutlierSelection

val outlierDetector = StochasticOutlierSelection()
val outlierScores = outlierDetector.transform(data)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-ml-2-12

docs

algorithms.md

distance-metrics.md

index.md

linear-algebra.md

optimization.md

outlier-detection.md

pipeline.md

preprocessing.md

tile.json