Apache Flink ML provides implementations of popular machine learning algorithms optimized for distributed processing on Apache Flink. All algorithms follow the Estimator-Predictor pattern and support the parameter system for configuration.
Soft-margin Support Vector Machine using the CoCoA (Communication-efficient Distributed Dual Coordinate Ascent) algorithm for distributed training.
class SVM extends Predictor[SVM] with WithParameters {
def setBlocks(blocks: Int): SVM
def setIterations(iterations: Int): SVM
def setLocalIterations(localIterations: Int): SVM
def setRegularization(regularization: Double): SVM
def setStepsize(stepsize: Double): SVM
def setSeed(seed: Long): SVM
def setThreshold(threshold: Double): SVM
def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM
}
object SVM {
def apply(): SVM
// Parameters
case object Blocks extends Parameter[Int] {
val defaultValue = Some(1)
}
case object Iterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object LocalIterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Regularization extends Parameter[Double] {
val defaultValue = Some(0.1)
}
case object Stepsize extends Parameter[Double] {
val defaultValue = Some(1.0)
}
case object Seed extends Parameter[Long] {
val defaultValue = Some(Random.nextLong())
}
case object ThresholdValue extends Parameter[Double] {
val defaultValue = Some(0.0)
}
case object OutputDecisionFunction extends Parameter[Boolean] {
val defaultValue = Some(false)
}
}Usage Example:
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")
// Configure SVM
val svm = SVM()
.setBlocks(10) // Number of data blocks for distributed training
.setIterations(100) // Maximum number of iterations
.setLocalIterations(10) // Local iterations per block per global iteration
.setRegularization(0.001) // Regularization parameter
.setStepsize(0.1) // Step size for gradient descent
.setSeed(42) // Random seed for reproducibility
.setThreshold(0.5) // Decision threshold
.setOutputDecisionFunction(true) // Output decision function values instead of binary predictions
// Train the model
val model = svm.fit(trainingData)
// Make predictions
val testData: DataSet[Vector] = //... test vectors
val predictions = model.predict(testData)Ordinary Least Squares regression using gradient descent optimization for distributed learning.
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters {
def setIterations(iterations: Int): MultipleLinearRegression
def setStepsize(stepsize: Double): MultipleLinearRegression
def setConvergenceThreshold(convergenceThreshold: Double): MultipleLinearRegression
def setLearningRateMethod(learningRateMethod: LearningRateMethod): MultipleLinearRegression
def squaredResidualSum(data: DataSet[LabeledVector]): DataSet[Double]
}
object MultipleLinearRegression {
def apply(): MultipleLinearRegression
// Parameters
case object Iterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Stepsize extends Parameter[Double] {
val defaultValue = Some(0.1)
}
case object ConvergenceThreshold extends Parameter[Double] {
val defaultValue = Some(1e-6)
}
case object LearningRateMethodValue extends Parameter[LearningRateMethod] {
val defaultValue = Some(LearningRateMethod.Default)
}
}
// Learning rate scheduling methods
sealed trait LearningRateMethod
object LearningRateMethod {
case object Default extends LearningRateMethod
case object Inverse extends LearningRateMethod
case object InverseSquareRoot extends LearningRateMethod
}Usage Example:
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.optimization.LearningRateMethod
val regression = MultipleLinearRegression()
.setIterations(200)
.setStepsize(0.01)
.setConvergenceThreshold(1e-8)
.setLearningRateMethod(LearningRateMethod.Inverse)
val model = regression.fit(trainingData)
val predictions = model.predict(testData)
// Calculate residual sum of squares for model evaluation
val residualSum = regression.squaredResidualSum(trainingData)Implements k-nearest neighbor join for finding the k closest points in the training set for each test point, with support for various distance metrics and optimizations.
class KNN extends Predictor[KNN] with WithParameters {
def setK(k: Int): KNN
def setDistanceMetric(distanceMetric: DistanceMetric): KNN
def setBlocks(blocks: Int): KNN
def setUseQuadTree(useQuadTree: Boolean): KNN
def setSizeHint(sizeHint: CrossHint): KNN
}
object KNN {
def apply(): KNN
// Parameters
case object K extends Parameter[Int] {
val defaultValue = Some(5)
}
case object DistanceMetric extends Parameter[DistanceMetric] {
val defaultValue = Some(EuclideanDistanceMetric())
}
case object Blocks extends Parameter[Int] {
val defaultValue = None
}
case object UseQuadTree extends Parameter[Boolean] {
val defaultValue = None
}
case object SizeHint extends Parameter[CrossHint] {
val defaultValue = None
}
}Usage Example:
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
val trainingData: DataSet[Vector] = //... training vectors
val testData: DataSet[Vector] = //... test vectors
val knn = KNN()
.setK(10) // Find 10 nearest neighbors
.setBlocks(5) // Split data into 5 blocks for distributed processing
.setDistanceMetric(EuclideanDistanceMetric()) // Use Euclidean distance
.setUseQuadTree(true) // Use quadtree optimization (for Euclidean distance)
.setSizeHint(CrossHint.FIRST_IS_SMALL) // Optimize when training set is small
// Train the model
val model = knn.fit(trainingData)
// Find k-nearest neighbors for each test point
val neighbors: DataSet[(Vector, Array[Vector])] = knn.predict(testData)Matrix factorization algorithm for collaborative filtering and recommendation systems using alternating least squares optimization.
class ALS extends Predictor[ALS] with WithParameters {
def setNumFactors(numFactors: Int): ALS
def setLambda(lambda: Double): ALS
def setIterations(iterations: Int): ALS
def setBlocks(blocks: Int): ALS
def setSeed(seed: Long): ALS
def setTemporaryPath(temporaryPath: String): ALS
def empiricalRisk(data: DataSet[(Int, Int, Double)]): DataSet[Double]
}
object ALS {
def apply(): ALS
// Parameters
case object NumFactors extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Lambda extends Parameter[Double] {
val defaultValue = Some(1.0)
}
case object Iterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Blocks extends Parameter[Int] {
val defaultValue = Some(1)
}
case object Seed extends Parameter[Long] {
val defaultValue = Some(0L)
}
case object TemporaryPath extends Parameter[String] {
val defaultValue = Some(System.getProperty("java.io.tmpdir"))
}
}
// Data types for ALS
case class Rating(user: Int, item: Int, rating: Double)
case class Factors(id: Int, factors: Vector)
case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])Usage Example:
import org.apache.flink.ml.recommendation.ALS
// Rating data: (user_id, item_id, rating)
val ratings: DataSet[(Int, Int, Double)] = env.fromCollection(Seq(
(1, 1, 5.0), (1, 2, 3.0), (1, 3, 4.0),
(2, 1, 4.0), (2, 2, 2.0), (2, 4, 5.0),
(3, 2, 3.0), (3, 3, 5.0), (3, 4, 4.0)
))
val als = ALS()
.setNumFactors(50) // Number of latent factors
.setLambda(0.01) // Regularization parameter
.setIterations(20) // Number of iterations
.setBlocks(10) // Number of blocks for distributed computation
.setSeed(42) // Random seed
.setTemporaryPath("/tmp/als") // Temporary storage path
val model = als.fit(ratings)
// Predict ratings for user-item pairs
val userItemPairs: DataSet[(Int, Int)] = env.fromCollection(Seq((1, 4), (2, 3), (3, 1)))
val predictions = model.predict(userItemPairs)
// Calculate empirical risk (reconstruction error)
val risk = als.empiricalRisk(ratings)k-Nearest Neighbors algorithm for classification and regression with support for various distance metrics and QuadTree optimization.
class KNN extends Predictor[KNN] with WithParameters {
def setK(k: Int): KNN
def setDistanceMetric(distanceMetric: DistanceMetric): KNN
def setBlocks(blocks: Int): KNN
def setUseQuadTree(useQuadTree: Boolean): KNN
def setSizeHint(sizeHint: Int): KNN
}
object KNN {
def apply(): KNN
// Parameters
case object K extends Parameter[Int] {
val defaultValue = Some(5)
}
case object DistanceMetric extends Parameter[DistanceMetric] {
val defaultValue = Some(EuclideanDistanceMetric())
}
case object Blocks extends Parameter[Int] {
val defaultValue = Some(1)
}
case object UseQuadTree extends Parameter[Boolean] {
val defaultValue = Some(false)
}
case object SizeHint extends Parameter[Int] {
val defaultValue = Some(-1)
}
}Usage Example:
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric
val knn = KNN()
.setK(3) // Number of neighbors
.setDistanceMetric(ManhattanDistanceMetric()) // Distance metric
.setBlocks(5) // Number of data blocks
.setUseQuadTree(true) // Use QuadTree for optimization
.setSizeHint(1000) // Hint about dataset size
val model = knn.fit(trainingData)
val predictions = model.predict(testData)Probabilistic outlier detection algorithm for identifying anomalous data points.
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParametersUsage Example:
import org.apache.flink.ml.outlier.StochasticOutlierSelection
val outlierDetector = StochasticOutlierSelection()
val outlierScores = outlierDetector.transform(data)