or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md
tile.json

index.mddocs/

Apache Flink ML

Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.

Package Information

  • Package Name: org.apache.flink:flink-ml_2.12
  • Package Type: maven
  • Language: Scala (with Scala 2.12)
  • Version: 1.8.3
  • Installation: Add to your pom.xml or build.sbt

Maven:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-ml_2.12</artifactId>
  <version>1.8.3</version>
</dependency>

SBT:

libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"

Core Imports

import org.apache.flink.ml._
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.outlier.StochasticOutlierSelection

Basic Usage

import org.apache.flink.api.scala._
import org.apache.flink.ml._
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector

// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Load data using libSVM format
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

// Create and configure SVM classifier
val svm = SVM()
  .setBlocks(10)
  .setIterations(100)
  .setRegularization(0.001)
  .setStepsize(0.1)

// Train the model
val model = svm.fit(trainingData)

// Make predictions
val testData: DataSet[Vector] = env.fromCollection(Seq(
  DenseVector(Array(1.0, 2.0, 3.0)),
  DenseVector(Array(4.0, 5.0, 6.0))
))

val predictions = model.predict(testData)
predictions.print()

Architecture

Apache Flink ML is built around several key architectural components:

  • Pipeline Framework: Estimator, Predictor, and Transformer traits provide a scikit-learn-like API for building ML pipelines
  • Linear Algebra: Complete vector and matrix abstractions with both dense and sparse implementations
  • Distributed Computing: All algorithms are designed for Flink's DataSet API with optimizations for distributed processing
  • Type-Safe Parameters: Parameter system using Scala's type system for algorithm configuration
  • Optimization Framework: Pluggable optimization algorithms like Gradient Descent with configurable loss functions

Capabilities

Machine Learning Algorithms

Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.

// Classification
class SVM extends Predictor[SVM] with WithParameters
class KNN extends Predictor[KNN] with WithParameters

// Regression  
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters

// Recommendation
class ALS extends Predictor[ALS] with WithParameters

Machine Learning Algorithms

Linear Algebra

Comprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.

trait Vector {
  def size: Int
  def apply(index: Int): Double
  def update(index: Int, value: Double): Unit
  def dot(other: Vector): Double
  def magnitude: Double
}

trait Matrix {
  def numRows: Int
  def numCols: Int
  def apply(row: Int, col: Int): Double
  def update(row: Int, col: Int, value: Double): Unit
}

Linear Algebra

Data Preprocessing

Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.

class StandardScaler extends Transformer[StandardScaler] with WithParameters
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters

Data Preprocessing

Outlier Detection

Algorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

Outlier Detection

Distance Metrics

Collection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.

trait DistanceMetric {
  def distance(a: Vector, b: Vector): Double
}

class EuclideanDistanceMetric extends DistanceMetric
class ManhattanDistanceMetric extends DistanceMetric
class CosineDistanceMetric extends DistanceMetric

Distance Metrics

Pipeline Framework

Modular pipeline system for chaining transformers and predictors, enabling complex ML workflows.

trait Estimator[Self] extends WithParameters {
  def fit[Training](training: DataSet[Training]): Self
}

trait Predictor[Self] extends Estimator[Self] {
  def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
}

trait Transformer[Self] extends WithParameters {
  def transform[Input](input: DataSet[Input]): DataSet[Output]
  def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
  def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
}

Pipeline Framework

Optimization Framework

Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.

class GradientDescent extends IterativeSolver {
  def optimize(
    data: DataSet[LabeledVector],
    initialWeights: Option[DataSet[WeightVector]]
  ): DataSet[WeightVector]
}

trait LossFunction {
  def loss(dataPoint: LabeledVector, weights: WeightVector): Double
  def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector
}

Optimization Framework

Core Types

// Data structures
case class LabeledVector(label: Double, vector: Vector)
case class WeightVector(weights: Vector, intercept: Double)

// Vector implementations
case class DenseVector(data: Array[Double]) extends Vector
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector

// Matrix implementations  
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix
class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix

// Parameter system
trait Parameter[T] {
  def defaultValue: Option[T]
}

class ParameterMap {
  def add[T](parameter: Parameter[T], value: T): ParameterMap
  def get[T](parameter: Parameter[T]): Option[T]
}

trait WithParameters {
  def parameters: ParameterMap
}

Data I/O

object MLUtils {
  def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]
  def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]
}

// Implicit extensions
implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {
  def readLibSVM(path: String): DataSet[LabeledVector]
}

implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {
  def writeAsLibSVM(path: String): DataSink[String]
}