Machine learning library for Apache Flink providing scalable ML algorithms including classification (SVM), regression (multiple linear regression), and recommendation (ALS) optimized for distributed stream and batch processing
Apache Flink ML provides a comprehensive collection of distance metrics for measuring similarity between vectors. These metrics are used by algorithms like k-Nearest Neighbors and can be used standalone for similarity computations.
All distance metrics implement the DistanceMetric trait.
trait DistanceMetric {
def distance(a: Vector, b: Vector): Double
}Standard Euclidean distance (L2 norm) - the straight-line distance between two points.
class EuclideanDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object EuclideanDistanceMetric {
def apply(): EuclideanDistanceMetric
}Formula: √(Σ(aᵢ - bᵢ)²)
Usage Example:
import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
import org.apache.flink.ml.math.DenseVector
val euclidean = EuclideanDistanceMetric()
val v1 = DenseVector(1.0, 2.0, 3.0)
val v2 = DenseVector(4.0, 5.0, 6.0)
val distance = euclidean.distance(v1, v2) // Returns: 5.196152422706632Squared Euclidean distance - faster than standard Euclidean when only relative distances matter.
class SquaredEuclideanDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object SquaredEuclideanDistanceMetric {
def apply(): SquaredEuclideanDistanceMetric
}Formula: Σ(aᵢ - bᵢ)²
Usage Example:
import org.apache.flink.ml.metrics.distances.SquaredEuclideanDistanceMetric
val squaredEuclidean = SquaredEuclideanDistanceMetric()
val distance = squaredEuclidean.distance(v1, v2) // Returns: 27.0Manhattan distance (L1 norm) - sum of absolute differences, also known as taxicab distance.
class ManhattanDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object ManhattanDistanceMetric {
def apply(): ManhattanDistanceMetric
}Formula: Σ|aᵢ - bᵢ|
Usage Example:
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric
val manhattan = ManhattanDistanceMetric()
val distance = manhattan.distance(v1, v2) // Returns: 9.0Cosine distance - measures the angle between vectors, independent of magnitude.
class CosineDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object CosineDistanceMetric {
def apply(): CosineDistanceMetric
}Formula: 1 - (a·b)/(||a|| × ||b||)
Usage Example:
import org.apache.flink.ml.metrics.distances.CosineDistanceMetric
val cosine = CosineDistanceMetric()
val distance = cosine.distance(v1, v2) // Returns cosine distance (0 = identical direction)Chebyshev distance (L∞ norm) - maximum absolute difference across all dimensions.
class ChebyshevDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object ChebyshevDistanceMetric {
def apply(): ChebyshevDistanceMetric
}Formula: max|aᵢ - bᵢ|
Usage Example:
import org.apache.flink.ml.metrics.distances.ChebyshevDistanceMetric
val chebyshev = ChebyshevDistanceMetric()
val distance = chebyshev.distance(v1, v2) // Returns: 3.0Generalized Minkowski distance (Lp norm) - parameterized distance metric that includes other metrics as special cases.
class MinkowskiDistanceMetric(p: Double) extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object MinkowskiDistanceMetric {
def apply(p: Double): MinkowskiDistanceMetric
}Formula: (Σ|aᵢ - bᵢ|ᵖ)^(1/p)
Special Cases:
Usage Example:
import org.apache.flink.ml.metrics.distances.MinkowskiDistanceMetric
val minkowski3 = MinkowskiDistanceMetric(3.0) // L3 norm
val minkowski1 = MinkowskiDistanceMetric(1.0) // Equivalent to Manhattan
val minkowski2 = MinkowskiDistanceMetric(2.0) // Equivalent to Euclidean
val distance = minkowski3.distance(v1, v2)Tanimoto distance (Jaccard distance) - measures similarity for binary or non-negative vectors.
class TanimotoDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double
}
object TanimotoDistanceMetric {
def apply(): TanimotoDistanceMetric
}Formula: 1 - (a·b)/(||a||² + ||b||² - a·b)
Usage Example:
import org.apache.flink.ml.metrics.distances.TanimotoDistanceMetric
val tanimoto = TanimotoDistanceMetric()
val distance = tanimoto.distance(v1, v2)Distance metrics are commonly used with machine learning algorithms, particularly k-Nearest Neighbors.
Example with k-NN:
import org.apache.flink.ml.nn.KNN
import org.apache.flink.ml.metrics.distances.ManhattanDistanceMetric
val trainingData: DataSet[LabeledVector] = //... your training data
val knn = KNN()
.setK(5)
.setDistanceMetric(ManhattanDistanceMetric()) // Use Manhattan distance
.setBlocks(10)
val model = knn.fit(trainingData)
val predictions = model.predict(testData)Different distance metrics are suitable for different types of data and applications:
You can implement custom distance metrics by extending the DistanceMetric trait:
class CustomDistanceMetric extends DistanceMetric {
def distance(a: Vector, b: Vector): Double = {
// Implement your custom distance calculation
require(a.size == b.size, "Vectors must have the same size")
var sum = 0.0
for (i <- 0 until a.size) {
val diff = a(i) - b(i)
sum += math.pow(math.abs(diff), 1.5) // Example: L1.5 norm
}
math.pow(sum, 1.0 / 1.5)
}
}
// Use with algorithms
val customMetric = new CustomDistanceMetric()
val knn = KNN().setDistanceMetric(customMetric)Choose the appropriate distance metric based on your data characteristics and computational requirements.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-2-12