0
# Outlier Detection
1
2
Apache Flink ML provides outlier detection algorithms for identifying anomalous data points in datasets. These algorithms follow the Transformer pattern and can be integrated into ML pipelines.
3
4
## Stochastic Outlier Selection
5
6
### Stochastic Outlier Selection (SOS)
7
8
Implements the Stochastic Outlier Selection algorithm which computes outlier probabilities for data points using stochastic affinity-based methodology. SOS treats outlier detection as a probabilistic problem rather than a binary classification.
9
10
```scala { .api }
11
class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] with WithParameters {
12
def setPerplexity(perplexity: Double): StochasticOutlierSelection
13
def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection
14
def setMaxIterations(maxIterations: Int): StochasticOutlierSelection
15
}
16
17
object StochasticOutlierSelection {
18
def apply(): StochasticOutlierSelection
19
20
// Parameters
21
case object Perplexity extends Parameter[Double] {
22
val defaultValue = Some(30.0)
23
}
24
25
case object ErrorTolerance extends Parameter[Double] {
26
val defaultValue = Some(1e-20)
27
}
28
29
case object MaxIterations extends Parameter[Int] {
30
val defaultValue = Some(5000)
31
}
32
}
33
```
34
35
**Algorithm Description:**
36
37
The SOS algorithm computes for each data point a probability of being an outlier. It does this by:
38
39
1. Computing an affinity matrix using Gaussian kernels with adaptive bandwidths
40
2. Using perplexity to control the effective number of neighbors for each point
41
3. Calculating binding probabilities and outlier probabilities based on the affinity structure
42
43
The perplexity parameter can be interpreted similar to k in k-nearest neighbor algorithms, but represents a soft neighborhood rather than a hard cutoff.
44
45
**Usage Example:**
46
47
```scala
48
import org.apache.flink.ml.outlier.StochasticOutlierSelection
49
import org.apache.flink.ml.common.LabeledVector
50
import org.apache.flink.ml.math.DenseVector
51
import org.apache.flink.api.scala._
52
53
val env = ExecutionEnvironment.getExecutionEnvironment
54
55
// Create sample data with an outlier
56
val data: DataSet[LabeledVector] = env.fromCollection(Seq(
57
LabeledVector(0.0, DenseVector(1.0, 1.0)), // Normal point
58
LabeledVector(1.0, DenseVector(2.0, 1.0)), // Normal point
59
LabeledVector(2.0, DenseVector(1.0, 2.0)), // Normal point
60
LabeledVector(3.0, DenseVector(2.0, 2.0)), // Normal point
61
LabeledVector(4.0, DenseVector(5.0, 8.0)) // Outlier point
62
))
63
64
// Configure SOS algorithm
65
val sos = StochasticOutlierSelection()
66
.setPerplexity(3.0) // Effective number of neighbors
67
.setErrorTolerance(1e-10) // Computational precision
68
.setMaxIterations(1000) // Maximum optimization iterations
69
70
// Transform data to get outlier probabilities
71
val outlierScores: DataSet[(Int, Double)] = sos.transform(data)
72
73
// Collect results
74
val results = outlierScores.collect()
75
76
// Higher scores indicate more likely outliers
77
// Point with index 4 (the outlier) should have a high score (~0.99)
78
results.foreach { case (index, score) =>
79
println(s"Point $index has outlier probability: $score")
80
}
81
82
// Filter outliers based on threshold
83
val threshold = 0.5
84
val outliers = outlierScores.filter(_._2 > threshold)
85
```
86
87
**Working with Different Data Types:**
88
89
SOS supports transformation of LabeledVector datasets:
90
91
```scala
92
// Works with labeled vectors
93
val labeledData: DataSet[LabeledVector] = //... your data
94
val outlierScores = sos.transform(labeledData)
95
96
// The output is (Int, Double) where:
97
// - Int: index of the original data point
98
// - Double: outlier probability score (0.0 to 1.0)
99
```
100
101
**Integration with Preprocessing:**
102
103
SOS can be integrated into preprocessing pipelines to filter outliers before training:
104
105
```scala
106
import org.apache.flink.ml.preprocessing.StandardScaler
107
import org.apache.flink.ml.classification.SVM
108
109
val data: DataSet[LabeledVector] = //... your training data
110
111
// First scale the data
112
val scaler = StandardScaler()
113
val scaledData = scaler.fit(data).transform(data)
114
115
// Detect outliers
116
val sos = StochasticOutlierSelection().setPerplexity(10)
117
val outlierScores = sos.transform(scaledData)
118
119
// Filter out outliers (keeping only points with score < 0.8)
120
val cleanData = scaledData.zipWithUniqueId
121
.join(outlierScores).where(_._2).equalTo(_._1)
122
.filter(_._2._2 < 0.8) // Remove outliers
123
.map(_._1._1) // Extract original data
124
125
// Train classifier on clean data
126
val svm = SVM().setIterations(100)
127
val model = svm.fit(cleanData)
128
```
129
130
**Parameters:**
131
132
- **Perplexity**: Controls the effective number of neighbors. Must be between 1 and n-1 where n is the number of data points. Higher values consider more neighbors. Default: 30.0
133
134
- **ErrorTolerance**: Accepted error tolerance when computing perplexity. Higher values trade accuracy for speed. Default: 1e-20
135
136
- **MaxIterations**: Maximum number of iterations for the optimization algorithm. Default: 5000
137
138
**Performance Considerations:**
139
140
- SOS has O(n²) complexity where n is the number of data points
141
- For large datasets, consider sampling or using other outlier detection methods
142
- The algorithm is computationally intensive due to the iterative optimization process
143
- Parallelization is handled automatically by Flink's distributed processing
144
145
**References:**
146
147
J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the Netherlands, 2012.
148
149
More information: https://github.com/jeroenjanssens/sos