MLlib provides comprehensive linear algebra operations with support for dense and sparse vectors and matrices. The linear algebra package includes both local operations and distributed matrix computations for large-scale data processing.
trait Vector extends Serializable {
def size: Int
def toArray: Array[Double]
def apply(i: Int): Double
def copy: Vector
def foreachActive(f: (Int, Double) => Unit): Unit
def numActives: Int
def numNonzeros: Int
def compressed: Vector
def toDense: DenseVector
def toSparse: SparseVector
def argmax: Int
def dot(other: Vector): Double
def squared: Vector
}class DenseVector(val values: Array[Double]) extends Vector {
def this(values: Double*) = this(values.toArray)
override def size: Int = values.length
override def toArray: Array[Double] = values.clone()
override def apply(i: Int): Double = values(i)
override def copy: DenseVector = new DenseVector(values.clone())
override def foreachActive(f: (Int, Double) => Unit): Unit
override def numActives: Int = size
override def numNonzeros: Int
override def compressed: Vector = toSparse
override def toDense: DenseVector = this
override def toSparse: SparseVector
override def argmax: Int
override def dot(other: Vector): Double
override def squared: DenseVector
}class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector {
require(indices.length == values.length)
require(indices.length <= size)
override def toArray: Array[Double]
override def apply(i: Int): Double
override def copy: SparseVector = new SparseVector(size, indices.clone(), values.clone())
override def foreachActive(f: (Int, Double) => Unit): Unit
override def numActives: Int = values.length
override def numNonzeros: Int
override def compressed: SparseVector = this
override def toDense: DenseVector
override def toSparse: SparseVector = this
override def argmax: Int
override def dot(other: Vector): Double
override def squared: SparseVector
}object Vectors {
def dense(firstValue: Double, otherValues: Double*): DenseVector
def dense(values: Array[Double]): DenseVector
def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
def sparse(size: Int, elements: Seq[(Int, Double)]): SparseVector
def zeros(size: Int): DenseVector
def fromBreeze(breezeVector: BV[Double]): Vector
def norm(vector: Vector, p: Double): Double
def sqdist(v1: Vector, v2: Vector): Double
def parseNumeric(s: String): Vector
}trait Matrix extends Serializable {
def numRows: Int
def numCols: Int
def toArray: Array[Double]
def apply(i: Int, j: Int): Double
def copy: Matrix
def foreachActive(f: (Int, Int, Double) => Unit): Unit
def numActives: Int
def numNonzeros: Int
def transpose: Matrix
def toDense: DenseMatrix
def toSparse: SparseMatrix
def multiply(y: DenseMatrix): DenseMatrix
def multiply(y: DenseVector): DenseVector
def colIter: Iterator[Vector]
def rowIter: Iterator[Vector]
}class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double],
val isTransposed: Boolean = false) extends Matrix {
require(values.length == numRows * numCols)
def this(numRows: Int, numCols: Int, values: Array[Double]) = this(numRows, numCols, values, false)
override def toArray: Array[Double] = values.clone()
override def apply(i: Int, j: Int): Double
override def copy: DenseMatrix
override def foreachActive(f: (Int, Int, Double) => Unit): Unit
override def numActives: Int = numRows * numCols
override def numNonzeros: Int
override def transpose: DenseMatrix
override def toDense: DenseMatrix = this
override def toSparse: SparseMatrix
override def multiply(y: DenseMatrix): DenseMatrix
override def multiply(y: DenseVector): DenseVector
override def colIter: Iterator[DenseVector]
override def rowIter: Iterator[DenseVector]
def map(f: Double => Double): DenseMatrix
def update(i: Int, j: Int, value: Double): Unit
}class SparseMatrix(override val numRows: Int, override val numCols: Int,
val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double],
override val isTransposed: Boolean = false) extends Matrix {
require(colPtrs.length == numCols + 1)
require(rowIndices.length == values.length)
override def toArray: Array[Double]
override def apply(i: Int, j: Int): Double
override def copy: SparseMatrix
override def foreachActive(f: (Int, Int, Double) => Unit): Unit
override def numActives: Int = values.length
override def numNonzeros: Int
override def transpose: SparseMatrix
override def toDense: DenseMatrix
override def toSparse: SparseMatrix = this
override def multiply(y: DenseMatrix): DenseMatrix
override def multiply(y: DenseVector): DenseVector
override def colIter: Iterator[SparseVector]
override def rowIter: Iterator[SparseVector]
}object Matrices {
def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],
rowIndices: Array[Int], values: Array[Double]): SparseMatrix
def sparse(numRows: Int, numCols: Int, entries: Seq[(Int, Int, Double)]): SparseMatrix
def zeros(numRows: Int, numCols: Int): DenseMatrix
def eye(n: Int): DenseMatrix
def diag(vector: Vector): DenseMatrix
def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix
def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix
def horzcat(matrices: Array[Matrix]): Matrix
def vertcat(matrices: Array[Matrix]): Matrix
def fromBreeze(breeze: BM[Double]): Matrix
}object BLAS {
// Level 1 BLAS operations
def dot(x: Vector, y: Vector): Double
def scal(a: Double, x: Vector): Unit
def copy(x: Vector, y: Vector): Unit
def axpy(a: Double, x: Vector, y: Vector): Unit
def nrm2(x: Vector): Double
// Level 2 BLAS operations
def gemv(alpha: Double, A: Matrix, x: Vector, beta: Double, y: Vector): Unit
def ger(alpha: Double, x: Vector, y: Vector, A: Matrix): Unit
def spmv(alpha: Double, A: Matrix, x: Vector, beta: Double, y: Vector): Unit
def spr(alpha: Double, x: Vector, A: DenseMatrix): Unit
def syr(alpha: Double, x: Vector, A: DenseMatrix): Unit
// Level 3 BLAS operations
def gemm(alpha: Double, A: Matrix, B: Matrix, beta: Double, C: DenseMatrix): Unit
def syrk(alpha: Double, A: Matrix, beta: Double, C: DenseMatrix): Unit
}class VectorUDT extends UserDefinedType[Vector] {
override def sqlType: DataType = StructType(Array(
StructField("type", ByteType, nullable = false),
StructField("size", IntegerType, nullable = true),
StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true),
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)
))
override def serialize(obj: Vector): InternalRow
override def deserialize(datum: Any): Vector
override def userClass: Class[Vector] = classOf[Vector]
}
class MatrixUDT extends UserDefinedType[Matrix] {
override def sqlType: DataType = StructType(Array(
StructField("type", ByteType, nullable = false),
StructField("numRows", IntegerType, nullable = false),
StructField("numCols", IntegerType, nullable = false),
StructField("colPtrs", ArrayType(IntegerType, containsNull = false), nullable = true),
StructField("rowIndices", ArrayType(IntegerType, containsNull = false), nullable = true),
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = false),
StructField("isTransposed", BooleanType, nullable = false)
))
override def serialize(obj: Matrix): InternalRow
override def deserialize(datum: Any): Matrix
override def userClass: Class[Matrix] = classOf[Matrix]
}object SQLDataTypes {
val VectorType: DataType = new VectorUDT()
val MatrixType: DataType = new MatrixUDT()
}import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, SparseVector}
// Create dense vectors
val denseVec1 = Vectors.dense(1.0, 2.0, 3.0, 4.0)
val denseVec2 = Vectors.dense(Array(5.0, 6.0, 7.0, 8.0))
// Create sparse vectors
val sparseVec1 = Vectors.sparse(5, Array(0, 2, 4), Array(1.0, 3.0, 5.0))
val sparseVec2 = Vectors.sparse(5, Seq((1, 2.0), (3, 4.0)))
// Vector operations
println(s"Dense vector: $denseVec1")
println(s"Sparse vector: $sparseVec1")
println(s"Vector size: ${denseVec1.size}")
println(s"Element access: ${denseVec1(1)}")
// Dot product
val dotProduct = denseVec1.dot(denseVec2)
println(s"Dot product: $dotProduct")
// Vector norms
val norm1 = Vectors.norm(denseVec1, 1.0) // L1 norm
val norm2 = Vectors.norm(denseVec1, 2.0) // L2 norm (Euclidean)
println(s"L1 norm: $norm1")
println(s"L2 norm: $norm2")
// Convert between dense and sparse
val denseToSparse = denseVec1.toSparse
val sparseToDense = sparseVec1.toDense
println(s"Dense to sparse: $denseToSparse")
println(s"Sparse to dense: $sparseToDense")
// Iterate over active elements
println("Active elements in sparse vector:")
sparseVec1.foreachActive { case (index, value) =>
println(s" Index $index: $value")
}
// Vector statistics
println(s"Number of active elements: ${sparseVec1.numActives}")
println(s"Number of non-zero elements: ${sparseVec1.numNonzeros}")
println(s"Argmax index: ${denseVec1.argmax}")import org.apache.spark.ml.linalg.{Matrix, Matrices, DenseMatrix, SparseMatrix}
// Create dense matrices
val denseMatrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
val identityMatrix = Matrices.eye(3)
val zeroMatrix = Matrices.zeros(2, 3)
// Create sparse matrix
val sparseMatrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(9.0, 6.0, 8.0))
println(s"Dense matrix:\n${denseMatrix}")
println(s"Sparse matrix:\n${sparseMatrix}")
// Matrix properties
println(s"Matrix dimensions: ${denseMatrix.numRows} x ${denseMatrix.numCols}")
println(s"Number of non-zeros: ${sparseMatrix.numNonzeros}")
// Element access
println(s"Matrix element (1,0): ${denseMatrix(1, 0)}")
// Matrix transpose
val transposed = denseMatrix.transpose
println(s"Transposed matrix:\n${transposed}")
// Matrix-vector multiplication
val vector = Vectors.dense(1.0, 2.0)
val result = denseMatrix.multiply(vector)
println(s"Matrix-vector product: $result")
// Matrix-matrix multiplication
val matrix2 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
val matrixProduct = transposed.multiply(matrix2.toDense)
println(s"Matrix-matrix product:\n${matrixProduct}")
// Iterate over columns and rows
println("Matrix columns:")
denseMatrix.colIter.zipWithIndex.foreach { case (col, idx) =>
println(s" Column $idx: $col")
}
println("Matrix rows:")
denseMatrix.rowIter.zipWithIndex.foreach { case (row, idx) =>
println(s" Row $idx: $row")
}import org.apache.spark.ml.linalg.BLAS
import org.apache.spark.ml.linalg.{Vectors, Matrices}
// Level 1 BLAS operations
val x = Vectors.dense(1.0, 2.0, 3.0).toDense
val y = Vectors.dense(4.0, 5.0, 6.0).toDense
// Dot product
val dot = BLAS.dot(x, y)
println(s"BLAS dot product: $dot")
// Scale vector (in-place)
val scaledX = x.copy.toDense
BLAS.scal(2.0, scaledX)
println(s"Scaled vector: $scaledX")
// Vector addition: y = alpha * x + y
val axpyResult = y.copy.toDense
BLAS.axpy(0.5, x, axpyResult)
println(s"AXPY result: $axpyResult")
// Vector norm
val norm = BLAS.nrm2(x)
println(s"Vector norm: $norm")
// Level 2 BLAS operations
val A = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)).toDense
val xVec = Vectors.dense(1.0, 2.0, 3.0).toDense
val yVec = Vectors.zeros(2).toDense
// General matrix-vector multiplication: y = alpha * A * x + beta * y
BLAS.gemv(1.0, A, xVec, 0.0, yVec)
println(s"GEMV result: $yVec")
// Level 3 BLAS operations
val B = Matrices.dense(3, 2, Array(1.0, 0.0, 0.0, 0.0, 1.0, 0.0)).toDense
val C = Matrices.zeros(2, 2).toDense
// General matrix-matrix multiplication: C = alpha * A * B + beta * C
BLAS.gemm(1.0, A, B, 0.0, C)
println(s"GEMM result:\n${C}")import org.apache.spark.sql.functions._
import org.apache.spark.ml.linalg.{SQLDataTypes, Vectors}
// Create DataFrame with vector columns
val data = Seq(
(1, Vectors.dense(1.0, 2.0, 3.0)),
(2, Vectors.sparse(3, Array(0, 2), Array(4.0, 5.0))),
(3, Vectors.dense(6.0, 7.0, 8.0))
).toDF("id", "features")
data.printSchema()
data.show(truncate = false)
// Extract vector elements using SQL functions
val withElements = data
.withColumn("first_element", col("features").getItem(0))
.withColumn("vector_size", expr("size(features)"))
withElements.show()
// Vector aggregations
val vectorStats = data.agg(
count("features").alias("count"),
// Custom aggregations would require UDAFs
first("features").alias("sample_vector")
)
vectorStats.show(truncate = false)
// Filter vectors by properties
val denseVectors = data.filter(
col("features").cast("string").contains("DenseVector")
)
println("Dense vectors:")
denseVectors.show(truncate = false)import scala.util.Random
// Construct large sparse matrix efficiently
def createRandomSparseMatrix(numRows: Int, numCols: Int, density: Double): SparseMatrix = {
val random = new Random(42)
val entries = scala.collection.mutable.ArrayBuffer[(Int, Int, Double)]()
for {
i <- 0 until numRows
j <- 0 until numCols
if random.nextDouble() < density
} {
entries += ((i, j, random.nextGaussian()))
}
Matrices.sparse(numRows, numCols, entries.toSeq).toSparse
}
val largeSparseMatrix = createRandomSparseMatrix(1000, 500, 0.01)
println(s"Created sparse matrix: ${largeSparseMatrix.numRows} x ${largeSparseMatrix.numCols}")
println(s"Density: ${largeSparseMatrix.numNonzeros.toDouble / (largeSparseMatrix.numRows * largeSparseMatrix.numCols)}")
// Sparse matrix operations
val vector1000 = Vectors.sparse(500, (0 until 50).toArray, Array.fill(50)(1.0))
val sparseResult = largeSparseMatrix.multiply(vector1000)
println(s"Sparse matrix-vector multiplication result size: ${sparseResult.size}")
// Convert between sparse and dense (be careful with memory)
if (largeSparseMatrix.numNonzeros < 10000) { // Only if reasonably small
val dense = largeSparseMatrix.toDense
println(s"Converted to dense: ${dense.numRows} x ${dense.numCols}")
}import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
import breeze.linalg._
// Convert between MLlib and Breeze for advanced operations
def mlibToBreeze(matrix: DenseMatrix): BDM[Double] = {
new BDM(matrix.numRows, matrix.numCols, matrix.values)
}
def breezeToMllib(matrix: BDM[Double]): DenseMatrix = {
Matrices.dense(matrix.rows, matrix.cols, matrix.data).toDense
}
val mlibMatrix = Matrices.dense(3, 3, Array(1, 2, 3, 4, 5, 6, 7, 8, 9)).toDense
val breezeMatrix = mlibToBreeze(mlibMatrix)
// Advanced operations using Breeze
val eigenDecomp = eig(breezeMatrix)
println(s"Eigenvalues: ${eigenDecomp.eigenvalues}")
val svd = breeze.linalg.svd(breezeMatrix)
println(s"Singular values: ${svd.S}")
// Matrix inverse (if square and invertible)
if (breezeMatrix.rows == breezeMatrix.cols) {
try {
val inverse = inv(breezeMatrix)
val mlibInverse = breezeToMllib(inverse)
println(s"Matrix inverse:\n${mlibInverse}")
} catch {
case _: breeze.linalg.MatrixSingularException =>
println("Matrix is singular")
}
}
// QR decomposition
val qr = breeze.linalg.qr(breezeMatrix)
val mlibQ = breezeToMllib(qr.q)
val mlibR = breezeToMllib(qr.r)
println(s"Q matrix:\n${mlibQ}")
println(s"R matrix:\n${mlibR}")import org.apache.spark.storage.StorageLevel
// Efficient vector operations for large datasets
def efficientVectorProcessing(vectors: DataFrame): DataFrame = {
// Cache frequently accessed vector data
val cachedVectors = vectors.cache()
// Use vectorized operations when possible
val processedVectors = cachedVectors
.withColumn("vector_norm",
expr("aggregate(transform(features.values, x -> x * x), 0.0, (acc, x) -> acc + x, acc -> sqrt(acc))"))
.withColumn("max_element",
expr("array_max(features.values)"))
.withColumn("min_element",
expr("array_min(features.values)"))
processedVectors
}
// Memory-efficient sparse operations
def processLargeSparseVectors(sparseVectors: Array[SparseVector]): Unit = {
sparseVectors.foreach { vec =>
// Process only active elements to save computation
vec.foreachActive { case (index, value) =>
// Efficient processing of non-zero elements only
if (math.abs(value) > 1e-10) { // Numerical stability
// Process significant values
}
}
}
}
// Batch matrix operations
def batchMatrixOperations(matrices: Array[DenseMatrix]): Array[DenseMatrix] = {
// Process matrices in parallel where possible
matrices.par.map { matrix =>
// Perform expensive operations in parallel
val result = matrix.transpose
result
}.toArray
}