Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing
Apache Flink ML provides implementations of popular machine learning algorithms optimized for distributed processing on Apache Flink. All algorithms follow the Estimator-Predictor pattern and support the parameter system for configuration.
Soft-margin Support Vector Machine using the CoCoA (Communication-efficient Distributed Dual Coordinate Ascent) algorithm for distributed training.
class SVM extends Predictor[SVM] with WithParameters {
def setBlocks(blocks: Int): SVM
def setIterations(iterations: Int): SVM
def setLocalIterations(localIterations: Int): SVM
def setRegularization(regularization: Double): SVM
def setStepsize(stepsize: Double): SVM
def setSeed(seed: Long): SVM
def setThreshold(threshold: Double): SVM
def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM
}
object SVM {
def apply(): SVM
// Parameters
case object Blocks extends Parameter[Int] {
val defaultValue = Some(1)
}
case object Iterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object LocalIterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Regularization extends Parameter[Double] {
val defaultValue = Some(0.1)
}
case object Stepsize extends Parameter[Double] {
val defaultValue = Some(1.0)
}
case object Seed extends Parameter[Long] {
val defaultValue = Some(Random.nextLong())
}
case object ThresholdValue extends Parameter[Double] {
val defaultValue = Some(0.0)
}
case object OutputDecisionFunction extends Parameter[Boolean] {
val defaultValue = Some(false)
}
}Usage Example:
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")
// Configure SVM
val svm = SVM()
.setBlocks(10) // Number of data blocks for distributed training
.setIterations(100) // Maximum number of iterations
.setLocalIterations(10) // Local iterations per block per global iteration
.setRegularization(0.001) // Regularization parameter
.setStepsize(0.1) // Step size for gradient descent
.setSeed(42) // Random seed for reproducibility
.setThreshold(0.5) // Decision threshold
.setOutputDecisionFunction(true) // Output decision function values instead of binary predictions
// Train the model
val model = svm.fit(trainingData)
// Make predictions
val testData: DataSet[Vector] = //... test vectors
val predictions = model.predict(testData)Ordinary Least Squares regression using gradient descent optimization for distributed learning.
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters {
def setIterations(iterations: Int): MultipleLinearRegression
def setStepsize(stepsize: Double): MultipleLinearRegression
def setConvergenceThreshold(convergenceThreshold: Double): MultipleLinearRegression
def setLearningRateMethod(learningRateMethod: LearningRateMethod): MultipleLinearRegression
def squaredResidualSum(data: DataSet[LabeledVector]): DataSet[Double]
}
object MultipleLinearRegression {
def apply(): MultipleLinearRegression
// Parameters
case object Iterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Stepsize extends Parameter[Double] {
val defaultValue = Some(0.1)
}
case object ConvergenceThreshold extends Parameter[Double] {
val defaultValue = Some(1e-6)
}
case object LearningRateMethodValue extends Parameter[LearningRateMethod] {
val defaultValue = Some(LearningRateMethod.Default)
}
}
// Learning rate scheduling methods
sealed trait LearningRateMethod
object LearningRateMethod {
case object Default extends LearningRateMethod
case object Inverse extends LearningRateMethod
case object InverseSquareRoot extends LearningRateMethod
}Usage Example:
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.optimization.LearningRateMethod
val regression = MultipleLinearRegression()
.setIterations(200)
.setStepsize(0.01)
.setConvergenceThreshold(1e-8)
.setLearningRateMethod(LearningRateMethod.Inverse)
val model = regression.fit(trainingData)
val predictions = model.predict(testData)
// Calculate residual sum of squares for model evaluation
val residualSum = regression.squaredResidualSum(trainingData)Implements k-nearest neighbor join for finding the k closest points in the training set for each test point, with support for various distance metrics and optimizations.
class KNN extends Predictor[KNN] with WithParameters {
def setK(k: Int): KNN
def setDistanceMetric(distanceMetric: DistanceMetric): KNN
def setBlocks(blocks: Int): KNN
def setUseQuadTree(useQuadTree: Boolean): KNN
def setSizeHint(sizeHint: CrossHint): KNN
}
object KNN {
def apply(): KNN
// Parameters
case object K extends Parameter[Int] {
val defaultValue = Some(5)
}
case object DistanceMetric extends Parameter[DistanceMetric] {
val defaultValue = Some(EuclideanDistanceMetric())
}
case object Blocks extends Parameter[Int] {
val defaultValue = None
}
case object UseQuadTree extends Parameter[Boolean] {
val defaultValue = None
}
case object SizeHint extends Parameter[CrossHint] {
val defaultValue = None
}
}Usage Example:
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
val trainingData: DataSet[Vector] = //... training vectors
val testData: DataSet[Vector] = //... test vectors
val knn = KNN()
.setK(10) // Find 10 nearest neighbors
.setBlocks(5) // Split data into 5 blocks for distributed processing
.setDistanceMetric(EuclideanDistanceMetric()) // Use Euclidean distance
.setUseQuadTree(true) // Use quadtree optimization (for Euclidean distance)
.setSizeHint(CrossHint.FIRST_IS_SMALL) // Optimize when training set is small
// Train the model
val model = knn.fit(trainingData)
// Find k-nearest neighbors for each test point
val neighbors: DataSet[(Vector, Array[Vector])] = knn.predict(testData)Matrix factorization algorithm for collaborative filtering and recommendation systems using alternating least squares optimization.
class ALS extends Predictor[ALS] with WithParameters {
def setNumFactors(numFactors: Int): ALS
def setLambda(lambda: Double): ALS
def setIterations(iterations: Int): ALS
def setBlocks(blocks: Int): ALS
def setSeed(seed: Long): ALS
def setTemporaryPath(temporaryPath: String): ALS
def empiricalRisk(data: DataSet[(Int, Int, Double)]): DataSet[Double]
}
object ALS {
def apply(): ALS
// Parameters
case object NumFactors extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Lambda extends Parameter[Double] {
val defaultValue = Some(1.0)
}
case object Iterations extends Parameter[Int] {
val defaultValue = Some(10)
}
case object Blocks extends Parameter[Int] {
val defaultValue = Some(1)
}
case object Seed extends Parameter[Long] {
val defaultValue = Some(0L)
}
case object TemporaryPath extends Parameter[String] {
val defaultValue = Some(System.getProperty("java.io.tmpdir"))
}
}
// Data types for ALS
case class Rating(user: Int, item: Int, rating: Double)
case class Factors(id: Int, factors: Vector)
case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])Usage Example:
import org.apache.flink.ml.recommendation.ALS
// Rating data: (user_id, item_id, rating)
val ratings: DataSet[(Int, Int, Double)] = env.fromCollection(Seq(
(1, 1, 5.0), (1, 2, 3.0), (1, 3, 4.0),
(2, 1, 4.0), (2, 2, 2.0), (2, 4, 5.0),
(3, 2, 3.0), (3, 3, 5.0), (3, 4, 4.0)
))
val als = ALS()
.setNumFactors(50) // Number of latent factors
.setLambda(0.01) // Regularization parameter
.setIterations(20) // Number of iterations
.setBlocks(10) // Number of blocks for distributed computation
.setSeed(42) // Random seed
.setTemporaryPath("/tmp/als") // Temporary storage path
val model = als.fit(ratings)
// Predict ratings for user-item pairs
val userItemPairs: DataSet[(Int, Int)] = env.fromCollection(Seq((1, 4), (2, 3), (3, 1)))
val predictions = model.predict(userItemPairs)
// Calculate empirical risk (reconstruction error)
val risk = als.empiricalRisk(ratings)k-Nearest Neighbors algorithm for classification and regression with support for various distance metrics and QuadTree optimization.
class KNN extends Predictor[KNN] with WithParameters {
def setK(k: Int): KNN
def setDistanceMetric(distanceMetric: DistanceMetric): KNN
def setBlocks(blocks: Int): KNN
def setUseQuadTree(useQuadTree: Boolean): KNN
def setSizeHint(sizeHint: Int): KNN
}
object KNN {
def apply(): KNN
// Parameters
case object K extends Parameter[Int] {
val defaultValue = Some(5)
}
case object DistanceMetric extends Parameter[DistanceMetric] {
val defaultValue = Some(EuclideanDistanceMetric())
}
case object Blocks extends Parameter[Int] {
val defaultValue = Some(1)
}
case object UseQuadTree extends Parameter[Boolean] {
val defaultValue = Some(false)
}
case object SizeHint extends Parameter[Int] {
val defaultValue = Some(-1)
}
}Usage Example:
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric
val knn = KNN()
.setK(3) // Number of neighbors
.setDistanceMetric(ManhattanDistanceMetric()) // Distance metric
.setBlocks(5) // Number of data blocks
.setUseQuadTree(true) // Use QuadTree for optimization
.setSizeHint(1000) // Hint about dataset size
val model = knn.fit(trainingData)
val predictions = model.predict(testData)Probabilistic outlier detection algorithm for identifying anomalous data points.
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParametersUsage Example:
import org.apache.flink.ml.outlier.StochasticOutlierSelection
val outlierDetector = StochasticOutlierSelection()
val outlierScores = outlierDetector.transform(data)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-2-12