Apache Spark MLlib is a scalable machine learning library that provides high-level APIs for common machine learning algorithms and utilities
MLlib provides comprehensive linear algebra operations with support for dense and sparse vectors and matrices. The linear algebra package includes both local operations and distributed matrix computations for large-scale data processing.
trait Vector extends Serializable {
def size: Int
def toArray: Array[Double]
def apply(i: Int): Double
def copy: Vector
def foreachActive(f: (Int, Double) => Unit): Unit
def numActives: Int
def numNonzeros: Int
def compressed: Vector
def toDense: DenseVector
def toSparse: SparseVector
def argmax: Int
def dot(other: Vector): Double
def squared: Vector
}class DenseVector(val values: Array[Double]) extends Vector {
def this(values: Double*) = this(values.toArray)
override def size: Int = values.length
override def toArray: Array[Double] = values.clone()
override def apply(i: Int): Double = values(i)
override def copy: DenseVector = new DenseVector(values.clone())
override def foreachActive(f: (Int, Double) => Unit): Unit
override def numActives: Int = size
override def numNonzeros: Int
override def compressed: Vector = toSparse
override def toDense: DenseVector = this
override def toSparse: SparseVector
override def argmax: Int
override def dot(other: Vector): Double
override def squared: DenseVector
}class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector {
require(indices.length == values.length)
require(indices.length <= size)
override def toArray: Array[Double]
override def apply(i: Int): Double
override def copy: SparseVector = new SparseVector(size, indices.clone(), values.clone())
override def foreachActive(f: (Int, Double) => Unit): Unit
override def numActives: Int = values.length
override def numNonzeros: Int
override def compressed: SparseVector = this
override def toDense: DenseVector
override def toSparse: SparseVector = this
override def argmax: Int
override def dot(other: Vector): Double
override def squared: SparseVector
}object Vectors {
def dense(firstValue: Double, otherValues: Double*): DenseVector
def dense(values: Array[Double]): DenseVector
def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
def sparse(size: Int, elements: Seq[(Int, Double)]): SparseVector
def zeros(size: Int): DenseVector
def fromBreeze(breezeVector: BV[Double]): Vector
def norm(vector: Vector, p: Double): Double
def sqdist(v1: Vector, v2: Vector): Double
def parseNumeric(s: String): Vector
}trait Matrix extends Serializable {
def numRows: Int
def numCols: Int
def toArray: Array[Double]
def apply(i: Int, j: Int): Double
def copy: Matrix
def foreachActive(f: (Int, Int, Double) => Unit): Unit
def numActives: Int
def numNonzeros: Int
def transpose: Matrix
def toDense: DenseMatrix
def toSparse: SparseMatrix
def multiply(y: DenseMatrix): DenseMatrix
def multiply(y: DenseVector): DenseVector
def colIter: Iterator[Vector]
def rowIter: Iterator[Vector]
}class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double],
val isTransposed: Boolean = false) extends Matrix {
require(values.length == numRows * numCols)
def this(numRows: Int, numCols: Int, values: Array[Double]) = this(numRows, numCols, values, false)
override def toArray: Array[Double] = values.clone()
override def apply(i: Int, j: Int): Double
override def copy: DenseMatrix
override def foreachActive(f: (Int, Int, Double) => Unit): Unit
override def numActives: Int = numRows * numCols
override def numNonzeros: Int
override def transpose: DenseMatrix
override def toDense: DenseMatrix = this
override def toSparse: SparseMatrix
override def multiply(y: DenseMatrix): DenseMatrix
override def multiply(y: DenseVector): DenseVector
override def colIter: Iterator[DenseVector]
override def rowIter: Iterator[DenseVector]
def map(f: Double => Double): DenseMatrix
def update(i: Int, j: Int, value: Double): Unit
}class SparseMatrix(override val numRows: Int, override val numCols: Int,
val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double],
override val isTransposed: Boolean = false) extends Matrix {
require(colPtrs.length == numCols + 1)
require(rowIndices.length == values.length)
override def toArray: Array[Double]
override def apply(i: Int, j: Int): Double
override def copy: SparseMatrix
override def foreachActive(f: (Int, Int, Double) => Unit): Unit
override def numActives: Int = values.length
override def numNonzeros: Int
override def transpose: SparseMatrix
override def toDense: DenseMatrix
override def toSparse: SparseMatrix = this
override def multiply(y: DenseMatrix): DenseMatrix
override def multiply(y: DenseVector): DenseVector
override def colIter: Iterator[SparseVector]
override def rowIter: Iterator[SparseVector]
}object Matrices {
def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],
rowIndices: Array[Int], values: Array[Double]): SparseMatrix
def sparse(numRows: Int, numCols: Int, entries: Seq[(Int, Int, Double)]): SparseMatrix
def zeros(numRows: Int, numCols: Int): DenseMatrix
def eye(n: Int): DenseMatrix
def diag(vector: Vector): DenseMatrix
def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix
def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix
def horzcat(matrices: Array[Matrix]): Matrix
def vertcat(matrices: Array[Matrix]): Matrix
def fromBreeze(breeze: BM[Double]): Matrix
}object BLAS {
// Level 1 BLAS operations
def dot(x: Vector, y: Vector): Double
def scal(a: Double, x: Vector): Unit
def copy(x: Vector, y: Vector): Unit
def axpy(a: Double, x: Vector, y: Vector): Unit
def nrm2(x: Vector): Double
// Level 2 BLAS operations
def gemv(alpha: Double, A: Matrix, x: Vector, beta: Double, y: Vector): Unit
def ger(alpha: Double, x: Vector, y: Vector, A: Matrix): Unit
def spmv(alpha: Double, A: Matrix, x: Vector, beta: Double, y: Vector): Unit
def spr(alpha: Double, x: Vector, A: DenseMatrix): Unit
def syr(alpha: Double, x: Vector, A: DenseMatrix): Unit
// Level 3 BLAS operations
def gemm(alpha: Double, A: Matrix, B: Matrix, beta: Double, C: DenseMatrix): Unit
def syrk(alpha: Double, A: Matrix, beta: Double, C: DenseMatrix): Unit
}class VectorUDT extends UserDefinedType[Vector] {
override def sqlType: DataType = StructType(Array(
StructField("type", ByteType, nullable = false),
StructField("size", IntegerType, nullable = true),
StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true),
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)
))
override def serialize(obj: Vector): InternalRow
override def deserialize(datum: Any): Vector
override def userClass: Class[Vector] = classOf[Vector]
}
class MatrixUDT extends UserDefinedType[Matrix] {
override def sqlType: DataType = StructType(Array(
StructField("type", ByteType, nullable = false),
StructField("numRows", IntegerType, nullable = false),
StructField("numCols", IntegerType, nullable = false),
StructField("colPtrs", ArrayType(IntegerType, containsNull = false), nullable = true),
StructField("rowIndices", ArrayType(IntegerType, containsNull = false), nullable = true),
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = false),
StructField("isTransposed", BooleanType, nullable = false)
))
override def serialize(obj: Matrix): InternalRow
override def deserialize(datum: Any): Matrix
override def userClass: Class[Matrix] = classOf[Matrix]
}object SQLDataTypes {
val VectorType: DataType = new VectorUDT()
val MatrixType: DataType = new MatrixUDT()
}import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, SparseVector}
// Create dense vectors
val denseVec1 = Vectors.dense(1.0, 2.0, 3.0, 4.0)
val denseVec2 = Vectors.dense(Array(5.0, 6.0, 7.0, 8.0))
// Create sparse vectors
val sparseVec1 = Vectors.sparse(5, Array(0, 2, 4), Array(1.0, 3.0, 5.0))
val sparseVec2 = Vectors.sparse(5, Seq((1, 2.0), (3, 4.0)))
// Vector operations
println(s"Dense vector: $denseVec1")
println(s"Sparse vector: $sparseVec1")
println(s"Vector size: ${denseVec1.size}")
println(s"Element access: ${denseVec1(1)}")
// Dot product
val dotProduct = denseVec1.dot(denseVec2)
println(s"Dot product: $dotProduct")
// Vector norms
val norm1 = Vectors.norm(denseVec1, 1.0) // L1 norm
val norm2 = Vectors.norm(denseVec1, 2.0) // L2 norm (Euclidean)
println(s"L1 norm: $norm1")
println(s"L2 norm: $norm2")
// Convert between dense and sparse
val denseToSparse = denseVec1.toSparse
val sparseToDense = sparseVec1.toDense
println(s"Dense to sparse: $denseToSparse")
println(s"Sparse to dense: $sparseToDense")
// Iterate over active elements
println("Active elements in sparse vector:")
sparseVec1.foreachActive { case (index, value) =>
println(s" Index $index: $value")
}
// Vector statistics
println(s"Number of active elements: ${sparseVec1.numActives}")
println(s"Number of non-zero elements: ${sparseVec1.numNonzeros}")
println(s"Argmax index: ${denseVec1.argmax}")import org.apache.spark.ml.linalg.{Matrix, Matrices, DenseMatrix, SparseMatrix}
// Create dense matrices
val denseMatrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
val identityMatrix = Matrices.eye(3)
val zeroMatrix = Matrices.zeros(2, 3)
// Create sparse matrix
val sparseMatrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(9.0, 6.0, 8.0))
println(s"Dense matrix:\n${denseMatrix}")
println(s"Sparse matrix:\n${sparseMatrix}")
// Matrix properties
println(s"Matrix dimensions: ${denseMatrix.numRows} x ${denseMatrix.numCols}")
println(s"Number of non-zeros: ${sparseMatrix.numNonzeros}")
// Element access
println(s"Matrix element (1,0): ${denseMatrix(1, 0)}")
// Matrix transpose
val transposed = denseMatrix.transpose
println(s"Transposed matrix:\n${transposed}")
// Matrix-vector multiplication
val vector = Vectors.dense(1.0, 2.0)
val result = denseMatrix.multiply(vector)
println(s"Matrix-vector product: $result")
// Matrix-matrix multiplication
val matrix2 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
val matrixProduct = transposed.multiply(matrix2.toDense)
println(s"Matrix-matrix product:\n${matrixProduct}")
// Iterate over columns and rows
println("Matrix columns:")
denseMatrix.colIter.zipWithIndex.foreach { case (col, idx) =>
println(s" Column $idx: $col")
}
println("Matrix rows:")
denseMatrix.rowIter.zipWithIndex.foreach { case (row, idx) =>
println(s" Row $idx: $row")
}import org.apache.spark.ml.linalg.BLAS
import org.apache.spark.ml.linalg.{Vectors, Matrices}
// Level 1 BLAS operations
val x = Vectors.dense(1.0, 2.0, 3.0).toDense
val y = Vectors.dense(4.0, 5.0, 6.0).toDense
// Dot product
val dot = BLAS.dot(x, y)
println(s"BLAS dot product: $dot")
// Scale vector (in-place)
val scaledX = x.copy.toDense
BLAS.scal(2.0, scaledX)
println(s"Scaled vector: $scaledX")
// Vector addition: y = alpha * x + y
val axpyResult = y.copy.toDense
BLAS.axpy(0.5, x, axpyResult)
println(s"AXPY result: $axpyResult")
// Vector norm
val norm = BLAS.nrm2(x)
println(s"Vector norm: $norm")
// Level 2 BLAS operations
val A = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)).toDense
val xVec = Vectors.dense(1.0, 2.0, 3.0).toDense
val yVec = Vectors.zeros(2).toDense
// General matrix-vector multiplication: y = alpha * A * x + beta * y
BLAS.gemv(1.0, A, xVec, 0.0, yVec)
println(s"GEMV result: $yVec")
// Level 3 BLAS operations
val B = Matrices.dense(3, 2, Array(1.0, 0.0, 0.0, 0.0, 1.0, 0.0)).toDense
val C = Matrices.zeros(2, 2).toDense
// General matrix-matrix multiplication: C = alpha * A * B + beta * C
BLAS.gemm(1.0, A, B, 0.0, C)
println(s"GEMM result:\n${C}")import org.apache.spark.sql.functions._
import org.apache.spark.ml.linalg.{SQLDataTypes, Vectors}
// Create DataFrame with vector columns
val data = Seq(
(1, Vectors.dense(1.0, 2.0, 3.0)),
(2, Vectors.sparse(3, Array(0, 2), Array(4.0, 5.0))),
(3, Vectors.dense(6.0, 7.0, 8.0))
).toDF("id", "features")
data.printSchema()
data.show(truncate = false)
// Extract vector elements using SQL functions
val withElements = data
.withColumn("first_element", col("features").getItem(0))
.withColumn("vector_size", expr("size(features)"))
withElements.show()
// Vector aggregations
val vectorStats = data.agg(
count("features").alias("count"),
// Custom aggregations would require UDAFs
first("features").alias("sample_vector")
)
vectorStats.show(truncate = false)
// Filter vectors by properties
val denseVectors = data.filter(
col("features").cast("string").contains("DenseVector")
)
println("Dense vectors:")
denseVectors.show(truncate = false)import scala.util.Random
// Construct large sparse matrix efficiently
def createRandomSparseMatrix(numRows: Int, numCols: Int, density: Double): SparseMatrix = {
val random = new Random(42)
val entries = scala.collection.mutable.ArrayBuffer[(Int, Int, Double)]()
for {
i <- 0 until numRows
j <- 0 until numCols
if random.nextDouble() < density
} {
entries += ((i, j, random.nextGaussian()))
}
Matrices.sparse(numRows, numCols, entries.toSeq).toSparse
}
val largeSparseMatrix = createRandomSparseMatrix(1000, 500, 0.01)
println(s"Created sparse matrix: ${largeSparseMatrix.numRows} x ${largeSparseMatrix.numCols}")
println(s"Density: ${largeSparseMatrix.numNonzeros.toDouble / (largeSparseMatrix.numRows * largeSparseMatrix.numCols)}")
// Sparse matrix operations
val vector1000 = Vectors.sparse(500, (0 until 50).toArray, Array.fill(50)(1.0))
val sparseResult = largeSparseMatrix.multiply(vector1000)
println(s"Sparse matrix-vector multiplication result size: ${sparseResult.size}")
// Convert between sparse and dense (be careful with memory)
if (largeSparseMatrix.numNonzeros < 10000) { // Only if reasonably small
val dense = largeSparseMatrix.toDense
println(s"Converted to dense: ${dense.numRows} x ${dense.numCols}")
}import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
import breeze.linalg._
// Convert between MLlib and Breeze for advanced operations
def mlibToBreeze(matrix: DenseMatrix): BDM[Double] = {
new BDM(matrix.numRows, matrix.numCols, matrix.values)
}
def breezeToMllib(matrix: BDM[Double]): DenseMatrix = {
Matrices.dense(matrix.rows, matrix.cols, matrix.data).toDense
}
val mlibMatrix = Matrices.dense(3, 3, Array(1, 2, 3, 4, 5, 6, 7, 8, 9)).toDense
val breezeMatrix = mlibToBreeze(mlibMatrix)
// Advanced operations using Breeze
val eigenDecomp = eig(breezeMatrix)
println(s"Eigenvalues: ${eigenDecomp.eigenvalues}")
val svd = breeze.linalg.svd(breezeMatrix)
println(s"Singular values: ${svd.S}")
// Matrix inverse (if square and invertible)
if (breezeMatrix.rows == breezeMatrix.cols) {
try {
val inverse = inv(breezeMatrix)
val mlibInverse = breezeToMllib(inverse)
println(s"Matrix inverse:\n${mlibInverse}")
} catch {
case _: breeze.linalg.MatrixSingularException =>
println("Matrix is singular")
}
}
// QR decomposition
val qr = breeze.linalg.qr(breezeMatrix)
val mlibQ = breezeToMllib(qr.q)
val mlibR = breezeToMllib(qr.r)
println(s"Q matrix:\n${mlibQ}")
println(s"R matrix:\n${mlibR}")import org.apache.spark.storage.StorageLevel
// Efficient vector operations for large datasets
def efficientVectorProcessing(vectors: DataFrame): DataFrame = {
// Cache frequently accessed vector data
val cachedVectors = vectors.cache()
// Use vectorized operations when possible
val processedVectors = cachedVectors
.withColumn("vector_norm",
expr("aggregate(transform(features.values, x -> x * x), 0.0, (acc, x) -> acc + x, acc -> sqrt(acc))"))
.withColumn("max_element",
expr("array_max(features.values)"))
.withColumn("min_element",
expr("array_min(features.values)"))
processedVectors
}
// Memory-efficient sparse operations
def processLargeSparseVectors(sparseVectors: Array[SparseVector]): Unit = {
sparseVectors.foreach { vec =>
// Process only active elements to save computation
vec.foreachActive { case (index, value) =>
// Efficient processing of non-zero elements only
if (math.abs(value) > 1e-10) { // Numerical stability
// Process significant values
}
}
}
}
// Batch matrix operations
def batchMatrixOperations(matrices: Array[DenseMatrix]): Array[DenseMatrix] = {
// Process matrices in parallel where possible
matrices.par.map { matrix =>
// Perform expensive operations in parallel
val result = matrix.transpose
result
}.toArray
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-mllib-2-12