Apache Flink ML provides data preprocessing utilities for feature scaling, transformation, and engineering to prepare data for machine learning algorithms. All preprocessing components follow the Transformer pattern and can be chained together in pipelines.
Standardizes features by removing the mean and scaling to unit variance (z-score normalization).
class StandardScaler extends Transformer[StandardScaler] with WithParameters {
def setMean(mean: Boolean): StandardScaler
def setStd(std: Boolean): StandardScaler
}
object StandardScaler {
def apply(): StandardScaler
// Parameters
case object Mean extends Parameter[Boolean] {
val defaultValue = Some(true)
}
case object Std extends Parameter[Boolean] {
val defaultValue = Some(true)
}
}Usage Example:
import org.apache.flink.ml.preprocessing.StandardScaler
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
val data: DataSet[Vector] = env.fromCollection(Seq(
DenseVector(1.0, 2.0, 3.0),
DenseVector(4.0, 5.0, 6.0),
DenseVector(7.0, 8.0, 9.0)
))
// Configure standard scaler
val scaler = StandardScaler()
.setMean(true) // Center data (subtract mean)
.setStd(true) // Scale to unit variance
// Fit scaler to data
val fittedScaler = scaler.fit(data)
// Transform data
val scaledData = fittedScaler.transform(data)
// Works with LabeledVector too
val labeledData: DataSet[LabeledVector] = env.fromCollection(Seq(
LabeledVector(1.0, DenseVector(1.0, 2.0, 3.0)),
LabeledVector(0.0, DenseVector(4.0, 5.0, 6.0))
))
val scaledLabeledData = fittedScaler.transform(labeledData)Scales features to a specified range by linear transformation.
class MinMaxScaler extends Transformer[MinMaxScaler] with WithParameters {
def setMin(min: Double): MinMaxScaler
def setMax(max: Double): MinMaxScaler
}
object MinMaxScaler {
def apply(): MinMaxScaler
// Parameters
case object Min extends Parameter[Double] {
val defaultValue = Some(0.0)
}
case object Max extends Parameter[Double] {
val defaultValue = Some(1.0)
}
}Usage Example:
import org.apache.flink.ml.preprocessing.MinMaxScaler
val data: DataSet[Vector] = env.fromCollection(Seq(
DenseVector(1.0, 2.0, 3.0),
DenseVector(4.0, 5.0, 6.0),
DenseVector(7.0, 8.0, 9.0)
))
// Configure min-max scaler
val scaler = MinMaxScaler()
.setMin(-1.0) // Minimum value in output range
.setMax(1.0) // Maximum value in output range
// Fit and transform
val fittedScaler = scaler.fit(data)
val scaledData = fittedScaler.transform(data)
// Chain with other transformers
val standardScaler = StandardScaler()
val pipeline = scaler.chainTransformer(standardScaler)Generates polynomial and interaction features from the input features.
class PolynomialFeatures extends Transformer[PolynomialFeatures] with WithParameters {
def setDegree(degree: Int): PolynomialFeatures
}
object PolynomialFeatures {
def apply(): PolynomialFeatures
// Parameters
case object Degree extends Parameter[Int] {
val defaultValue = Some(2)
}
}Usage Example:
import org.apache.flink.ml.preprocessing.PolynomialFeatures
val data: DataSet[Vector] = env.fromCollection(Seq(
DenseVector(2.0, 3.0), // Original features: [x1, x2]
DenseVector(4.0, 5.0),
DenseVector(6.0, 7.0)
))
// Configure polynomial features
val polyFeatures = PolynomialFeatures()
.setDegree(2) // Degree 2: [1, x1, x2, x1^2, x1*x2, x2^2]
// Fit and transform - creates polynomial combinations
val fittedPoly = polyFeatures.fit(data)
val expandedData = fittedPoly.transform(data)
// Input [2.0, 3.0] becomes [1.0, 2.0, 3.0, 4.0, 6.0, 9.0]
// Features: [bias, x1, x2, x1^2, x1*x2, x2^2]
// Works with LabeledVector
val labeledData: DataSet[LabeledVector] = env.fromCollection(Seq(
LabeledVector(1.0, DenseVector(2.0, 3.0)),
LabeledVector(0.0, DenseVector(4.0, 5.0))
))
val expandedLabeledData = fittedPoly.transform(labeledData)Utility for splitting datasets into training and testing sets.
class Splitter extends Transformer[Splitter] with WithParameters {
// Implementation details depend on the specific splitting strategy
}
object Splitter {
def apply(): Splitter
}Usage Example:
import org.apache.flink.ml.preprocessing.Splitter
val data: DataSet[LabeledVector] = //... your dataset
val splitter = Splitter()
// Configure splitting parameters as needed
val splitData = splitter.transform(data)All preprocessing transformers can be chained together to create complex preprocessing pipelines.
trait Transformer[Self] extends WithParameters {
def transform[Input](input: DataSet[Input]): DataSet[Output]
def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]
def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]
}
case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](
left: L,
right: R
) extends Transformer[ChainedTransformer[L, R]]
case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](
transformer: T,
predictor: P
) extends Predictor[ChainedPredictor[T, P]]Usage Example:
import org.apache.flink.ml.preprocessing.{StandardScaler, MinMaxScaler, PolynomialFeatures}
import org.apache.flink.ml.classification.SVM
val data: DataSet[LabeledVector] = //... your training data
// Create preprocessing pipeline
val minMaxScaler = MinMaxScaler().setMin(0.0).setMax(1.0)
val polyFeatures = PolynomialFeatures().setDegree(2)
val standardScaler = StandardScaler()
// Chain transformers
val preprocessingPipeline = minMaxScaler
.chainTransformer(polyFeatures)
.chainTransformer(standardScaler)
// Chain with predictor
val svm = SVM().setIterations(100)
val completePipeline = preprocessingPipeline.chainPredictor(svm)
// Fit the entire pipeline
val trainedPipeline = completePipeline.fit(data)
// Make predictions (automatically applies all preprocessing steps)
val testData: DataSet[Vector] = //... test vectors
val predictions = trainedPipeline.predict(testData)The preprocessing transformers support multiple input data types through implicit type class operations.
Vector: Raw feature vectorsLabeledVector: Feature vectors with labels for supervised learning(LabeledVector, Double): Tuples for specialized operationsUsage Examples:
val scaler = StandardScaler()
// Fit on vectors
val vectorData: DataSet[Vector] = //...
val fittedScaler1 = scaler.fit(vectorData)
// Fit on labeled vectors
val labeledData: DataSet[LabeledVector] = //...
val fittedScaler2 = scaler.fit(labeledData)
// Transform different types
val scaledVectors = fittedScaler1.transform(vectorData)
val scaledLabeled = fittedScaler2.transform(labeledData)
// Transform tuples
val tupleData: DataSet[(LabeledVector, Double)] = //...
val scaledTuples = fittedScaler2.transform(tupleData)While the built-in preprocessors cover common use cases, you can create custom preprocessing by implementing the Transformer trait or using Flink's native DataSet transformations.
Custom Transformation Example:
import org.apache.flink.api.scala._
// Custom log transformation
val logTransform: DataSet[Vector] => DataSet[Vector] = { data =>
data.map(vector => {
val newData = vector.toArray.map(x => if (x > 0) math.log(x) else 0.0)
DenseVector(newData)
})
}
val data: DataSet[Vector] = //... your data
val logTransformedData = logTransform(data)
// Chain with standard preprocessing
val scaler = StandardScaler()
val scaledLogData = scaler.fit(logTransformedData).transform(logTransformedData)