or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md
tile.json

outlier-detection.mddocs/

Outlier Detection

Apache Flink ML provides outlier detection algorithms for identifying anomalous data points in datasets. These algorithms follow the Transformer pattern and can be integrated into ML pipelines.

Stochastic Outlier Selection

Stochastic Outlier Selection (SOS)

Implements the Stochastic Outlier Selection algorithm which computes outlier probabilities for data points using stochastic affinity-based methodology. SOS treats outlier detection as a probabilistic problem rather than a binary classification.

class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters {
  def setPerplexity(perplexity: Double): StochasticOutlierSelection
  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection
  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection
}

object StochasticOutlierSelection {
  def apply(): StochasticOutlierSelection
  
  // Parameters
  case object Perplexity extends Parameter[Double] {
    val defaultValue = Some(30.0)
  }
  
  case object ErrorTolerance extends Parameter[Double] {
    val defaultValue = Some(1e-20)
  }
  
  case object MaxIterations extends Parameter[Int] {
    val defaultValue = Some(5000)
  }
}

Algorithm Description:

The SOS algorithm computes for each data point a probability of being an outlier. It does this by:

  1. Computing an affinity matrix using Gaussian kernels with adaptive bandwidths
  2. Using perplexity to control the effective number of neighbors for each point
  3. Calculating binding probabilities and outlier probabilities based on the affinity structure

The perplexity parameter can be interpreted similar to k in k-nearest neighbor algorithms, but represents a soft neighborhood rather than a hard cutoff.

Usage Example:

import org.apache.flink.ml.outlier.StochasticOutlierSelection
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Create sample data with an outlier
val data: DataSet[LabeledVector] = env.fromCollection(Seq(
  LabeledVector(0.0, DenseVector(1.0, 1.0)),    // Normal point
  LabeledVector(1.0, DenseVector(2.0, 1.0)),    // Normal point
  LabeledVector(2.0, DenseVector(1.0, 2.0)),    // Normal point
  LabeledVector(3.0, DenseVector(2.0, 2.0)),    // Normal point
  LabeledVector(4.0, DenseVector(5.0, 8.0))     // Outlier point
))

// Configure SOS algorithm
val sos = StochasticOutlierSelection()
  .setPerplexity(3.0)           // Effective number of neighbors
  .setErrorTolerance(1e-10)     // Computational precision
  .setMaxIterations(1000)       // Maximum optimization iterations

// Transform data to get outlier probabilities
val outlierScores: DataSet[(Int, Double)] = sos.transform(data)

// Collect results
val results = outlierScores.collect()

// Higher scores indicate more likely outliers
// Point with index 4 (the outlier) should have a high score (~0.99)
results.foreach { case (index, score) =>
  println(s"Point $index has outlier probability: $score")
}

// Filter outliers based on threshold
val threshold = 0.5
val outliers = outlierScores.filter(_._2 > threshold)

Working with Different Data Types:

SOS supports transformation of LabeledVector datasets:

// Works with labeled vectors
val labeledData: DataSet[LabeledVector] = //... your data
val outlierScores = sos.transform(labeledData)

// The output is (Int, Double) where:
// - Int: index of the original data point
// - Double: outlier probability score (0.0 to 1.0)

Integration with Preprocessing:

SOS can be integrated into preprocessing pipelines to filter outliers before training:

import org.apache.flink.ml.preprocessing.StandardScaler
import org.apache.flink.ml.classification.SVM

val data: DataSet[LabeledVector] = //... your training data

// First scale the data
val scaler = StandardScaler()
val scaledData = scaler.fit(data).transform(data)

// Detect outliers
val sos = StochasticOutlierSelection().setPerplexity(10)
val outlierScores = sos.transform(scaledData)

// Filter out outliers (keeping only points with score < 0.8)
val cleanData = scaledData.zipWithUniqueId
  .join(outlierScores).where(_._2).equalTo(_._1)
  .filter(_._2._2 < 0.8)  // Remove outliers
  .map(_._1._1)           // Extract original data

// Train classifier on clean data
val svm = SVM().setIterations(100)
val model = svm.fit(cleanData)

Parameters:

  • Perplexity: Controls the effective number of neighbors. Must be between 1 and n-1 where n is the number of data points. Higher values consider more neighbors. Default: 30.0

  • ErrorTolerance: Accepted error tolerance when computing perplexity. Higher values trade accuracy for speed. Default: 1e-20

  • MaxIterations: Maximum number of iterations for the optimization algorithm. Default: 5000

Performance Considerations:

  • SOS has O(n²) complexity where n is the number of data points
  • For large datasets, consider sampling or using other outlier detection methods
  • The algorithm is computationally intensive due to the iterative optimization process
  • Parallelization is handled automatically by Flink's distributed processing

References:

J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the Netherlands, 2012.

More information: https://github.com/jeroenjanssens/sos