Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing
Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.
pom.xml or build.sbtMaven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml_2.12</artifactId>
<version>1.8.3</version>
</dependency>SBT:
libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"import org.apache.flink.ml._
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.outlier.StochasticOutlierSelectionimport org.apache.flink.api.scala._
import org.apache.flink.ml._
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Load data using libSVM format
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")
// Create and configure SVM classifier
val svm = SVM()
.setBlocks(10)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
// Train the model
val model = svm.fit(trainingData)
// Make predictions
val testData: DataSet[Vector] = env.fromCollection(Seq(
DenseVector(Array(1.0, 2.0, 3.0)),
DenseVector(Array(4.0, 5.0, 6.0))
))
val predictions = model.predict(testData)
predictions.print()Apache Flink ML is built around several key architectural components:
Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.
// Classification
class SVM extends Predictor[SVM] with WithParameters
class KNN extends Predictor[KNN] with WithParameters
// Regression
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters
// Recommendation
class ALS extends Predictor[ALS] with WithParametersComprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.
trait Vector {
def size: Int
def apply(index: Int): Double
def update(index: Int, value: Double): Unit
def dot(other: Vector): Double
def magnitude: Double
}
trait Matrix {
def numRows: Int
def numCols: Int
def apply(row: Int, col: Int): Double
def update(row: Int, col: Int, value: Double): Unit
}Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.
class StandardScaler extends Transformer[StandardScaler] with WithParameters
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParametersAlgorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParametersCollection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.
trait DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
class EuclideanDistanceMetric extends DistanceMetric
class ManhattanDistanceMetric extends DistanceMetric
class CosineDistanceMetric extends DistanceMetricModular pipeline system for chaining transformers and predictors, enabling complex ML workflows.
trait Estimator[Self] extends WithParameters {
def fit[Training](training: DataSet[Training]): Self
}
trait Predictor[Self] extends Estimator[Self] {
def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
}
trait Transformer[Self] extends WithParameters {
def transform[Input](input: DataSet[Input]): DataSet[Output]
def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
}Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.
class GradientDescent extends IterativeSolver {
def optimize(
data: DataSet[LabeledVector],
initialWeights: Option[DataSet[WeightVector]]
): DataSet[WeightVector]
}
trait LossFunction {
def loss(dataPoint: LabeledVector, weights: WeightVector): Double
def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector
}// Data structures
case class LabeledVector(label: Double, vector: Vector)
case class WeightVector(weights: Vector, intercept: Double)
// Vector implementations
case class DenseVector(data: Array[Double]) extends Vector
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector
// Matrix implementations
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix
class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix
// Parameter system
trait Parameter[T] {
def defaultValue: Option[T]
}
class ParameterMap {
def add[T](parameter: Parameter[T], value: T): ParameterMap
def get[T](parameter: Parameter[T]): Option[T]
}
trait WithParameters {
def parameters: ParameterMap
}object MLUtils {
def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]
def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]
}
// Implicit extensions
implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {
def readLibSVM(path: String): DataSet[LabeledVector]
}
implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {
def writeAsLibSVM(path: String): DataSink[String]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-2-12