CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-ml-2-12

Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing

Overview
Eval results
Files

pipeline.mddocs/

Pipeline Framework

Apache Flink ML provides a pipeline framework that enables building complex machine learning workflows by chaining transformers and predictors. The framework follows a similar design to scikit-learn with Estimator, Predictor, and Transformer abstractions.

Core Pipeline Components

Estimator

Base trait for components that can be fitted to data to learn parameters.

trait Estimator[Self] extends WithParameters {
  def fit[Training](training: DataSet[Training])(implicit fitOperation: FitOperation[Self, Training]): Self
}

All machine learning algorithms in Flink ML extend Estimator to provide the fit method for training.

Predictor

Extends Estimator to add prediction capabilities for supervised learning algorithms.

trait Predictor[Self] extends Estimator[Self] {
  def predict[Testing](
    testing: DataSet[Testing]
  )(implicit predictOperation: PredictDataSetOperation[Self, Testing, Prediction]): DataSet[Prediction]
  
  def evaluate[Testing, Prediction](
    testing: DataSet[Testing]
  )(implicit evaluateOperation: EvaluateDataSetOperation[Self, Testing, Prediction]): DataSet[Prediction]
}

Usage Example:

import org.apache.flink.ml.classification.SVM

val svm: SVM = SVM()
  .setIterations(100)
  .setRegularization(0.01)

// Fit the predictor
val trainedModel = svm.fit(trainingData)

// Make predictions
val predictions = trainedModel.predict(testData)

// Evaluate model performance
val evaluationResults = trainedModel.evaluate(testData)

Transformer

Base trait for unsupervised learning components that transform data without learning from labels.

trait Transformer[Self] extends WithParameters {
  def transform[Input](
    input: DataSet[Input]
  )(implicit transformOperation: TransformDataSetOperation[Self, Input, Output]): DataSet[Output]
  
  def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
  def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
}

Usage Example:

import org.apache.flink.ml.preprocessing.StandardScaler

val scaler: StandardScaler = StandardScaler()
  .setMean(true)
  .setStd(true)

// Fit the transformer (learn mean and std)
val fittedScaler = scaler.fit(trainingData)

// Transform data
val scaledData = fittedScaler.transform(trainingData)

Chained Components

Chained Transformer

Combines multiple transformers into a single pipeline component.

case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](
  left: L,
  right: R
) extends Transformer[ChainedTransformer[L, R]] {
  def transform[Input](input: DataSet[Input]): DataSet[Output]
}

Chained Predictor

Combines transformers with a final predictor for end-to-end ML pipelines.

case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](
  transformer: T,
  predictor: P
) extends Predictor[ChainedPredictor[T, P]] {
  def fit[Training](training: DataSet[Training]): ChainedPredictor[T, P]
  def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]
}

Building Pipelines

Simple Transformer Chain

import org.apache.flink.ml.preprocessing.{StandardScaler, MinMaxScaler, PolynomialFeatures}

val scaler1 = MinMaxScaler().setMin(0.0).setMax(1.0)
val polyFeatures = PolynomialFeatures().setDegree(2)
val scaler2 = StandardScaler()

// Chain transformers
val preprocessingPipeline = scaler1
  .chainTransformer(polyFeatures)
  .chainTransformer(scaler2)

// Fit and transform
val fittedPipeline = preprocessingPipeline.fit(trainingData)
val transformedData = fittedPipeline.transform(trainingData)

Complete ML Pipeline

import org.apache.flink.ml.preprocessing.StandardScaler
import org.apache.flink.ml.classification.SVM

val scaler = StandardScaler()
val svm = SVM().setIterations(100).setRegularization(0.01)

// Create end-to-end pipeline
val mlPipeline = scaler.chainPredictor(svm)

// Fit entire pipeline
val trainedPipeline = mlPipeline.fit(trainingData)

// Make predictions (automatically applies scaling then SVM)
val predictions = trainedPipeline.predict(testData)

Complex Multi-Stage Pipeline

import org.apache.flink.ml.preprocessing.{StandardScaler, PolynomialFeatures}
import org.apache.flink.ml.regression.MultipleLinearRegression

// Multi-stage preprocessing
val minMaxScaler = MinMaxScaler().setMin(-1.0).setMax(1.0)
val polyFeatures = PolynomialFeatures().setDegree(3)
val standardScaler = StandardScaler()

// Regression model
val regression = MultipleLinearRegression()
  .setIterations(200)
  .setStepsize(0.01)

// Build complex pipeline
val complexPipeline = minMaxScaler
  .chainTransformer(polyFeatures)
  .chainTransformer(standardScaler)
  .chainPredictor(regression)

// Train pipeline
val trainedComplexPipeline = complexPipeline.fit(trainingData)

// Use pipeline
val predictions = trainedComplexPipeline.predict(testData)

Type Class Operations

The pipeline framework uses type classes to provide flexible operations for different data types.

Fit Operation

trait FitOperation[Self, Training] {
  def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Self
}

Transform Operations

trait TransformOperation[Instance, Model, Input, Output] {
  def transform(
    instance: Instance,
    model: Model,
    input: Input
  ): Output
}

trait TransformDataSetOperation[Instance, Input, Output] {
  def transformDataSet(
    instance: Instance,
    transformParameters: ParameterMap,
    input: DataSet[Input]
  ): DataSet[Output]
}

Predict Operations

trait PredictOperation[Instance, Model, Testing, Prediction] {
  def predict(instance: Instance, model: Model, testing: Testing): Prediction
}

trait PredictDataSetOperation[Instance, Testing, Prediction] {
  def predictDataSet(
    instance: Instance,
    predictParameters: ParameterMap,
    testing: DataSet[Testing]
  ): DataSet[Prediction]
}

Evaluate Operation

trait EvaluateDataSetOperation[Instance, Testing, Prediction] {
  def evaluateDataSet(
    instance: Instance,
    evaluateParameters: ParameterMap,
    testing: DataSet[Testing]
  ): DataSet[Prediction]
}

Parameter Management in Pipelines

Pipelines preserve and merge parameters from all components.

// Configure individual components
val scaler = StandardScaler()
  .setMean(true)
  .setStd(false)

val svm = SVM()
  .setIterations(50)
  .setRegularization(0.1)

// Create pipeline - parameters are preserved
val pipeline = scaler.chainPredictor(svm)

// Access combined parameters
val allParameters = pipeline.parameters

// You can still modify parameters of the chained pipeline
val modifiedPipeline = pipeline.set(SVM.Iterations, 100)

Custom Pipeline Components

You can create custom transformers and predictors by implementing the respective traits.

Custom Transformer Example

import org.apache.flink.ml.pipeline.Transformer
import org.apache.flink.ml.common.WithParameters

class LogTransformer extends Transformer[LogTransformer] with WithParameters {
  def transform[Input](input: DataSet[Input]): DataSet[Output] = {
    // Implementation depends on implicit TransformDataSetOperation
    transformDataSet(this, parameters, input)
  }
}

object LogTransformer {
  def apply(): LogTransformer = new LogTransformer()
  
  // Define implicit operations
  implicit val logTransformVectors = new TransformDataSetOperation[LogTransformer, Vector, Vector] {
    def transformDataSet(
      instance: LogTransformer,
      transformParameters: ParameterMap,
      input: DataSet[Vector]
    ): DataSet[Vector] = {
      input.map(vector => {
        val logData = vector.toArray.map(x => if (x > 0) math.log(x) else 0.0)
        DenseVector(logData)
      })
    }
  }
}

// Usage
val logTransform = LogTransformer()
val scaler = StandardScaler()
val pipeline = logTransform.chainTransformer(scaler)

Pipeline Persistence

While the core framework doesn't provide built-in persistence, you can save pipeline parameters and recreate pipelines:

// Save pipeline configuration
val scaler = StandardScaler().setMean(true).setStd(true)
val svm = SVM().setIterations(100).setRegularization(0.01)
val pipeline = scaler.chainPredictor(svm)

// Extract parameters for serialization
val scalerParams = scaler.parameters
val svmParams = svm.parameters

// Recreate pipeline later
val recreatedScaler = StandardScaler().setParameters(scalerParams)
val recreatedSVM = SVM().setParameters(svmParams)
val recreatedPipeline = recreatedScaler.chainPredictor(recreatedSVM)

Error Handling in Pipelines

Pipeline components should handle errors gracefully:

try {
  val trainedPipeline = pipeline.fit(trainingData)
  val predictions = trainedPipeline.predict(testData)
} catch {
  case e: IllegalArgumentException => 
    println(s"Invalid parameters: ${e.getMessage}")
  case e: RuntimeException =>
    println(s"Runtime error in pipeline: ${e.getMessage}")
}

Best Practices

  1. Chain transformers before predictors: Always apply data transformations before supervised learning
  2. Fit on training data only: Only use training data to fit transformers and predictors
  3. Preserve test data integrity: Apply the same fitted transformations to test data
  4. Parameter validation: Validate parameters before fitting pipelines
  5. Memory management: Consider data persistence for large datasets in complex pipelines
// Good practice example
val pipeline = StandardScaler()
  .chainTransformer(PolynomialFeatures().setDegree(2))
  .chainPredictor(SVM().setIterations(100))

// Fit on training data only
val trainedPipeline = pipeline.fit(trainingData)

// Apply to test data
val predictions = trainedPipeline.predict(testData)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-ml-2-12

docs

algorithms.md

distance-metrics.md

index.md

linear-algebra.md

optimization.md

outlier-detection.md

pipeline.md

preprocessing.md

tile.json