or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md
tile.json

tessl/maven-org-apache-flink--flink-ml_2-12

Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-ml_2.12@1.8.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml_2-12@1.8.0

index.mddocs/

Apache Flink ML

Apache Flink ML is a machine learning library that provides highly optimized implementations of popular ML algorithms designed to scale to datasets that vastly exceed single-machine memory capacity. Built on Apache Flink's distributed streaming and batch processing engine, it enables real-time and batch machine learning workflows with fault-tolerance and exactly-once processing guarantees.

Package Information

  • Package Name: org.apache.flink:flink-ml_2.12
  • Package Type: maven
  • Language: Scala (with Scala 2.12)
  • Version: 1.8.3
  • Installation: Add to your pom.xml or build.sbt

Maven:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-ml_2.12</artifactId>
  <version>1.8.3</version>
</dependency>

SBT:

libraryDependencies += "org.apache.flink" % "flink-ml_2.12" % "1.8.3"

Core Imports

import org.apache.flink.ml._
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.{Vector, DenseVector, SparseVector}
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.outlier.StochasticOutlierSelection

Basic Usage

import org.apache.flink.api.scala._
import org.apache.flink.ml._
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector

// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Load data using libSVM format
val trainingData: DataSet[LabeledVector] = env.readLibSVM("training.libsvm")

// Create and configure SVM classifier
val svm = SVM()
  .setBlocks(10)
  .setIterations(100)
  .setRegularization(0.001)
  .setStepsize(0.1)

// Train the model
val model = svm.fit(trainingData)

// Make predictions
val testData: DataSet[Vector] = env.fromCollection(Seq(
  DenseVector(Array(1.0, 2.0, 3.0)),
  DenseVector(Array(4.0, 5.0, 6.0))
))

val predictions = model.predict(testData)
predictions.print()

Architecture

Apache Flink ML is built around several key architectural components:

  • Pipeline Framework: Estimator, Predictor, and Transformer traits provide a scikit-learn-like API for building ML pipelines
  • Linear Algebra: Complete vector and matrix abstractions with both dense and sparse implementations
  • Distributed Computing: All algorithms are designed for Flink's DataSet API with optimizations for distributed processing
  • Type-Safe Parameters: Parameter system using Scala's type system for algorithm configuration
  • Optimization Framework: Pluggable optimization algorithms like Gradient Descent with configurable loss functions

Capabilities

Machine Learning Algorithms

Core machine learning algorithms including classification, regression, and recommendation systems, all optimized for distributed processing.

// Classification
class SVM extends Predictor[SVM] with WithParameters
class KNN extends Predictor[KNN] with WithParameters

// Regression  
class MultipleLinearRegression extends Predictor[MultipleLinearRegression] with WithParameters

// Recommendation
class ALS extends Predictor[ALS] with WithParameters

Machine Learning Algorithms

Linear Algebra

Comprehensive linear algebra framework with vectors and matrices supporting both dense and sparse representations.

trait Vector {
  def size: Int
  def apply(index: Int): Double
  def update(index: Int, value: Double): Unit
  def dot(other: Vector): Double
  def magnitude: Double
}

trait Matrix {
  def numRows: Int
  def numCols: Int
  def apply(row: Int, col: Int): Double
  def update(row: Int, col: Int, value: Double): Unit
}

Linear Algebra

Data Preprocessing

Data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms.

class StandardScaler extends Transformer[StandardScaler] with WithParameters
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters

Data Preprocessing

Outlier Detection

Algorithms for identifying anomalous data points in datasets using probabilistic and statistical methods.

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters

Outlier Detection

Distance Metrics

Collection of distance metrics for measuring similarity between vectors, used by algorithms like k-NN.

trait DistanceMetric {
  def distance(a: Vector, b: Vector): Double
}

class EuclideanDistanceMetric extends DistanceMetric
class ManhattanDistanceMetric extends DistanceMetric
class CosineDistanceMetric extends DistanceMetric

Distance Metrics

Pipeline Framework

Modular pipeline system for chaining transformers and predictors, enabling complex ML workflows.

trait Estimator[Self] extends WithParameters {
  def fit[Training](training: DataSet[Training]): Self
}

trait Predictor[Self] extends Estimator[Self] {
  def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
}

trait Transformer[Self] extends WithParameters {
  def transform[Input](input: DataSet[Input]): DataSet[Output]
  def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
  def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
}

Pipeline Framework

Optimization Framework

Flexible optimization framework with gradient descent solver and pluggable loss functions for training ML models.

class GradientDescent extends IterativeSolver {
  def optimize(
    data: DataSet[LabeledVector],
    initialWeights: Option[DataSet[WeightVector]]
  ): DataSet[WeightVector]
}

trait LossFunction {
  def loss(dataPoint: LabeledVector, weights: WeightVector): Double
  def gradient(dataPoint: LabeledVector, weights: WeightVector): Vector
}

Optimization Framework

Core Types

// Data structures
case class LabeledVector(label: Double, vector: Vector)
case class WeightVector(weights: Vector, intercept: Double)

// Vector implementations
case class DenseVector(data: Array[Double]) extends Vector
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector

// Matrix implementations  
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix
class SparseMatrix(numRows: Int, numCols: Int, rowIndices: Array[Int], colPtrs: Array[Int], data: Array[Double]) extends Matrix

// Parameter system
trait Parameter[T] {
  def defaultValue: Option[T]
}

class ParameterMap {
  def add[T](parameter: Parameter[T], value: T): ParameterMap
  def get[T](parameter: Parameter[T]): Option[T]
}

trait WithParameters {
  def parameters: ParameterMap
}

Data I/O

object MLUtils {
  def readLibSVM(env: ExecutionEnvironment, filePath: String): DataSet[LabeledVector]
  def writeLibSVM(filePath: String, labeledVectors: DataSet[LabeledVector]): DataSink[String]
}

// Implicit extensions
implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) {
  def readLibSVM(path: String): DataSet[LabeledVector]
}

implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {
  def writeAsLibSVM(path: String): DataSink[String]
}