Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml_2-12@1.8.0Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.
pom.xml or build.sbtMaven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml_2.12</artifactId>
<version>1.8.3</version>
</dependency>SBT:
libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"import org.apache.flink.ml._
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.outlier.StochasticOutlierSelectionimport org.apache.flink.api.scala._
import org.apache.flink.ml._
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Load data using libSVM format
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")
// Create and configure SVM classifier
val svm = SVM()
.setBlocks(10)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
// Train the model
val model = svm.fit(trainingData)
// Make predictions
val testData: DataSet[Vector] = env.fromCollection(Seq(
DenseVector(Array(1.0, 2.0, 3.0)),
DenseVector(Array(4.0, 5.0, 6.0))
))
val predictions = model.predict(testData)
predictions.print()Apache Flink ML is built around several key architectural components:
Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.
// Classification
class SVM extends Predictor[SVM] with WithParameters
class KNN extends Predictor[KNN] with WithParameters
// Regression
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters
// Recommendation
class ALS extends Predictor[ALS] with WithParametersComprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.
trait Vector {
def size: Int
def apply(index: Int): Double
def update(index: Int, value: Double): Unit
def dot(other: Vector): Double
def magnitude: Double
}
trait Matrix {
def numRows: Int
def numCols: Int
def apply(row: Int, col: Int): Double
def update(row: Int, col: Int, value: Double): Unit
}Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.
class StandardScaler extends Transformer[StandardScaler] with WithParameters
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParametersAlgorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParametersCollection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.
trait DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
class EuclideanDistanceMetric extends DistanceMetric
class ManhattanDistanceMetric extends DistanceMetric
class CosineDistanceMetric extends DistanceMetricModular pipeline system for chaining transformers and predictors, enabling complex ML workflows.
trait Estimator[Self] extends WithParameters {
def fit[Training](training: DataSet[Training]): Self
}
trait Predictor[Self] extends Estimator[Self] {
def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
}
trait Transformer[Self] extends WithParameters {
def transform[Input](input: DataSet[Input]): DataSet[Output]
def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
}Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.
class GradientDescent extends IterativeSolver {
def optimize(
data: DataSet[LabeledVector],
initialWeights: Option[DataSet[WeightVector]]
): DataSet[WeightVector]
}
trait LossFunction {
def loss(dataPoint: LabeledVector, weights: WeightVector): Double
def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector
}// Data structures
case class LabeledVector(label: Double, vector: Vector)
case class WeightVector(weights: Vector, intercept: Double)
// Vector implementations
case class DenseVector(data: Array[Double]) extends Vector
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector
// Matrix implementations
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix
class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix
// Parameter system
trait Parameter[T] {
def defaultValue: Option[T]
}
class ParameterMap {
def add[T](parameter: Parameter[T], value: T): ParameterMap
def get[T](parameter: Parameter[T]): Option[T]
}
trait WithParameters {
def parameters: ParameterMap
}object MLUtils {
def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]
def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]
}
// Implicit extensions
implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {
def readLibSVM(path: String): DataSet[LabeledVector]
}
implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {
def writeAsLibSVM(path: String): DataSink[String]
}