Spark ML Local Library providing linear algebra and statistical utilities for local machine learning operations without requiring a distributed Spark cluster
—
Statistical distribution implementations for probability computations and machine learning algorithms. Supports multivariate distributions with advanced numerical stability features.
Implementation of multivariate Gaussian (Normal) distribution with support for singular covariance matrices through pseudo-inverse computations.
/**
* Multivariate Gaussian (Normal) Distribution
* Handles singular covariance matrices by computing density in reduced dimensional subspace
*/
class MultivariateGaussian(val mean: Vector, val cov: Matrix) extends Serializable {
/** Mean vector of the distribution */
val mean: Vector
/** Covariance matrix of the distribution */
val cov: Matrix
/**
* Computes probability density function at given point
* @param x Point to evaluate density at
* @return Probability density value
*/
def pdf(x: Vector): Double
/**
* Computes log probability density function at given point
* @param x Point to evaluate log density at
* @return Log probability density value
*/
def logpdf(x: Vector): Double
}Usage Examples:
import org.apache.spark.ml.linalg.{Vectors, Matrices}
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
// Create a 2D Gaussian distribution
val mean = Vectors.dense(0.0, 0.0)
val cov = Matrices.dense(2, 2, Array(
1.0, 0.5, // First column: [1.0, 0.5]
0.5, 1.0 // Second column: [0.5, 1.0]
))
val gaussian = new MultivariateGaussian(mean, cov)
// Evaluate probability density
val point1 = Vectors.dense(0.0, 0.0) // At the mean
val density1 = gaussian.pdf(point1)
println(s"Density at mean: $density1")
val point2 = Vectors.dense(1.0, 1.0) // Away from mean
val density2 = gaussian.pdf(point2)
println(s"Density at (1,1): $density2")
// Evaluate log probability density (more numerically stable)
val logDensity1 = gaussian.logpdf(point1)
val logDensity2 = gaussian.logpdf(point2)
println(s"Log density at mean: $logDensity1")
println(s"Log density at (1,1): $logDensity2")import org.apache.spark.ml.linalg.{Vectors, Matrices}
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
// Create 5-dimensional Gaussian
val dim = 5
val mean = Vectors.zeros(dim)
// Create diagonal covariance matrix
val covValues = Array.tabulate(dim * dim) { i =>
val row = i / dim
val col = i % dim
if (row == col) 1.0 else 0.0 // Identity matrix
}
val cov = Matrices.dense(dim, dim, covValues)
val gaussian = new MultivariateGaussian(mean, cov)
// Evaluate multiple points
val points = Array(
Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0), // Origin
Vectors.dense(1.0, 0.0, 0.0, 0.0, 0.0), // Unit vector in first dimension
Vectors.dense(1.0, 1.0, 1.0, 1.0, 1.0) // Unit vector in all dimensions
)
points.zipWithIndex.foreach { case (point, i) =>
val density = gaussian.pdf(point)
val logDensity = gaussian.logpdf(point)
println(s"Point $i: density = $density, log density = $logDensity")
}The implementation handles singular (non-invertible) covariance matrices using pseudo-inverse computation:
import org.apache.spark.ml.linalg.{Vectors, Matrices}
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
// Create a singular covariance matrix (rank deficient)
val mean = Vectors.dense(0.0, 0.0, 0.0)
val singularCov = Matrices.dense(3, 3, Array(
1.0, 1.0, 1.0, // First column
1.0, 1.0, 1.0, // Second column (identical to first)
1.0, 1.0, 1.0 // Third column (identical to first)
))
// This will work despite the singular covariance matrix
val singularGaussian = new MultivariateGaussian(mean, singularCov)
val testPoint = Vectors.dense(1.0, 1.0, 1.0)
val density = singularGaussian.pdf(testPoint)
val logDensity = singularGaussian.logpdf(testPoint)
println(s"Density with singular covariance: $density")
println(s"Log density with singular covariance: $logDensity")import org.apache.spark.ml.linalg.{Vectors, Matrices}
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
// Create correlated 3D Gaussian
val mean = Vectors.dense(1.0, 2.0, 3.0)
// Covariance matrix with correlations
val cov = Matrices.dense(3, 3, Array(
2.0, 0.8, 0.3, // var=2.0, corr with dim2=0.4, corr with dim3=0.15
0.8, 1.5, 0.6, // corr with dim1=0.4, var=1.5, corr with dim3=0.4
0.3, 0.6, 1.0 // corr with dim1=0.15, corr with dim2=0.4, var=1.0
))
val correlatedGaussian = new MultivariateGaussian(mean, cov)
// Sample different points and compare densities
val points = Array(
mean, // Should have highest density
Vectors.dense(1.0, 2.0, 3.5), // Close to mean
Vectors.dense(0.0, 0.0, 0.0), // Far from mean
Vectors.dense(2.0, 3.0, 4.0) // Scaled version of mean
)
println("Evaluating correlated Gaussian:")
points.zipWithIndex.foreach { case (point, i) =>
val density = correlatedGaussian.pdf(point)
val logDensity = correlatedGaussian.logpdf(point)
println(f"Point $i: density = $density%.6f, log density = $logDensity%.6f")
}The MultivariateGaussian implementation includes several numerical stability features:
logpdf method for numerical stability in high dimensions// The implementation automatically handles numerical issues
val mean = Vectors.dense(0.0, 0.0)
val poorlyConditioned = Matrices.dense(2, 2, Array(
1e12, 1e12, // Very large values
1e12, 1e12 + 1e-6 // Nearly singular
))
val stableGaussian = new MultivariateGaussian(mean, poorlyConditioned)
// These operations will be numerically stable
val point = Vectors.dense(1e6, 1e6)
val stableDensity = stableGaussian.pdf(point)
val stableLogDensity = stableGaussian.logpdf(point) // Preferred for stabilityThe multivariate Gaussian PDF is given by:
pdf(x) = (2π)^(-k/2) * |Σ|^(-1/2) * exp(-1/2 * (x-μ)^T * Σ^(-1) * (x-μ))Where:
k is the dimensionalityμ is the mean vectorΣ is the covariance matrix|Σ| is the determinant of the covariance matrixFor singular covariance matrices, the implementation computes the pseudo-determinant and uses the pseudo-inverse in a reduced-dimensional subspace.
import org.apache.spark.ml.linalg.{Vector, Matrix}
class MultivariateGaussian(val mean: Vector, val cov: Matrix) extends Serializable {
require(cov.numCols == cov.numRows, "Covariance matrix must be square")
require(mean.size == cov.numCols, "Mean vector length must match covariance matrix size")
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-mllib-local-2-11