Apache Flink ML provides a comprehensive linear algebra framework with vector and matrix abstractions that support both dense and sparse representations, optimized for distributed computing.
All vector implementations extend the base Vector trait which provides common operations.
trait Vector {
def size: Int
def apply(index: Int): Double
def update(index: Int, value: Double): Unit
def copy: Vector
def dot(other: Vector): Double
def outer(other: Vector): Matrix
def magnitude: Double
}Dense vectors store all values in a contiguous array, suitable for data where most elements are non-zero.
case class DenseVector(data: Array[Double]) extends Vector {
def size: Int
def apply(index: Int): Double
def update(index: Int, value: Double): Unit
def copy: DenseVector
def dot(other: Vector): Double
def outer(other: Vector): Matrix
def magnitude: Double
def toSparseVector: SparseVector
}
object DenseVector {
def apply(values: Double*): DenseVector
def zeros(size: Int): DenseVector
def eye(size: Int, index: Int): DenseVector
def init(size: Int, f: Int => Double): DenseVector
}Usage Example:
import org.apache.flink.ml.math.DenseVector
// Create dense vectors
val v1 = DenseVector(1.0, 2.0, 3.0, 4.0)
val v2 = DenseVector.zeros(4) // [0.0, 0.0, 0.0, 0.0]
val v3 = DenseVector.eye(4, 2) // [0.0, 0.0, 1.0, 0.0]
val v4 = DenseVector.init(4, i => i * 2.0) // [0.0, 2.0, 4.0, 6.0]
// Vector operations
val dotProduct = v1.dot(v4) // Compute dot product
val magnitude = v1.magnitude // Compute L2 norm
val outerProduct = v1.outer(v4) // Compute outer product matrix
// Element access and modification
val element = v1(2) // Get element at index 2
v1(2) = 5.0 // Set element at index 2
// Convert to sparse
val sparse = v1.toSparseVectorSparse vectors store only non-zero values with their indices, efficient for data with many zero elements.
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector {
def apply(index: Int): Double
def update(index: Int, value: Double): Unit
def copy: SparseVector
def dot(other: Vector): Double
def outer(other: Vector): Matrix
def magnitude: Double
def toDenseVector: DenseVector
def iterator: Iterator[(Int, Double)]
}
object SparseVector {
def fromCOO(size: Int, coordinates: Array[(Int, Double)]): SparseVector
def fromCOO(size: Int, coordinates: Iterator[(Int, Double)]): SparseVector
}Usage Example:
import org.apache.flink.ml.math.SparseVector
// Create sparse vector from coordinate format (COO)
val coordinates = Array((0, 1.0), (2, 3.0), (5, 2.0))
val sparseVec = SparseVector.fromCOO(10, coordinates)
// Sparse vector operations
val denseVec = sparseVec.toDenseVector
val magnitude = sparseVec.magnitude
// Iterate over non-zero elements
for ((index, value) <- sparseVec) {
println(s"Index $index: $value")
}All matrix implementations extend the base Matrix trait.
trait Matrix {
def numRows: Int
def numCols: Int
def apply(row: Int, col: Int): Double
def update(row: Int, col: Int, value: Double): Unit
def copy: Matrix
}Dense matrices store all values in column-major order, suitable for small to medium-sized matrices.
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix {
def apply(row: Int, col: Int): Double
def update(row: Int, col: Int, value: Double): Unit
def copy: DenseMatrix
def toSparseMatrix: SparseMatrix
}
object DenseMatrix {
def zeros(numRows: Int, numCols: Int): DenseMatrix
def eye(size: Int): DenseMatrix
def apply(numRows: Int, numCols: Int)(values: Double*): DenseMatrix
}Usage Example:
import org.apache.flink.ml.math.DenseMatrix
// Create dense matrices
val m1 = DenseMatrix(2, 3)(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)
val zeros = DenseMatrix.zeros(3, 3)
val identity = DenseMatrix.eye(3)
// Matrix operations
val element = m1(1, 2) // Get element at (1, 2)
m1(1, 2) = 10.0 // Set element at (1, 2)
// Convert to sparse
val sparse = m1.toSparseMatrixSparse matrices use Compressed Sparse Column (CSC) format for efficient storage of matrices with many zero elements.
class SparseMatrix(
val numRows: Int,
val numCols: Int,
val rowIndices: Array[Int],
val colPtrs: Array[Int],
val data: Array[Double]
) extends Matrix {
def apply(row: Int, col: Int): Double
def update(row: Int, col: Int, value: Double): Unit
def copy: SparseMatrix
def toDenseMatrix: DenseMatrix
}
object SparseMatrix {
def fromCOO(
numRows: Int,
numCols: Int,
coordinates: Array[(Int, Int, Double)]
): SparseMatrix
}Usage Example:
import org.apache.flink.ml.math.SparseMatrix
// Create sparse matrix from coordinate format
val coordinates = Array((0, 0, 1.0), (1, 2, 3.0), (2, 1, 2.0))
val sparseMatrix = SparseMatrix.fromCOO(3, 3, coordinates)
// Convert to dense
val denseMatrix = sparseMatrix.toDenseMatrixFor large-scale operations, Flink ML provides distributed matrix operations.
trait DistributedMatrix {
def numRows: Int
def numCols: Int
}class DistributedRowMatrix(
val data: DataSet[IndexedRow],
val numRows: Int,
val numCols: Int
) extends DistributedMatrix {
def toCOO(): DataSet[(Int, Int, Double)]
def toLocalSparseMatrix(): SparseMatrix
def toLocalDenseMatrix(): DenseMatrix
def add(other: DistributedRowMatrix): DistributedRowMatrix
def subtract(other: DistributedRowMatrix): DistributedRowMatrix
}
case class IndexedRow(rowIndex: Int, values: Vector)
object DistributedRowMatrix {
def fromCOO(
data: DataSet[(Int, Int, Double)],
numRows: Int,
numCols: Int
): DistributedRowMatrix
}Usage Example:
import org.apache.flink.ml.math.distributed.{DistributedRowMatrix, IndexedRow}
import org.apache.flink.ml.math.DenseVector
// Create distributed matrix from rows
val rows: DataSet[IndexedRow] = env.fromCollection(Seq(
IndexedRow(0, DenseVector(1.0, 2.0, 3.0)),
IndexedRow(1, DenseVector(4.0, 5.0, 6.0)),
IndexedRow(2, DenseVector(7.0, 8.0, 9.0))
))
val distributedMatrix = new DistributedRowMatrix(rows, 3, 3)
// Matrix operations
val cooFormat = distributedMatrix.toCOO()
val localSparse = distributedMatrix.toLocalSparseMatrix()
val localDense = distributedMatrix.toLocalDenseMatrix()
// Arithmetic operations
val matrix2 = //... another DistributedRowMatrix
val sum = distributedMatrix.add(matrix2)
val difference = distributedMatrix.subtract(matrix2)Basic Linear Algebra Subprograms (BLAS) for efficient low-level operations.
object BLAS {
def axpy(a: Double, x: Vector, y: Vector): Unit
def dot(x: Vector, y: Vector): Double
def copy(x: Vector, y: Vector): Unit
def scal(a: Double, x: Vector): Unit
def syr(alpha: Double, x: Vector, A: Matrix): Unit
}Usage Example:
import org.apache.flink.ml.math.BLAS
val x = DenseVector(1.0, 2.0, 3.0)
val y = DenseVector(4.0, 5.0, 6.0)
// y = a * x + y (AXPY operation)
BLAS.axpy(2.0, x, y)
// Dot product
val dotProduct = BLAS.dot(x, y)
// Scale vector: x = a * x
BLAS.scal(0.5, x)Integration with the Breeze linear algebra library for interoperability.
object Breeze {
implicit val Matrix2BreezeConverter: BreezeMatrixConverter[Matrix]
implicit val Breeze2MatrixConverter: MatrixConverter[BDM[Double]]
implicit val Vector2BreezeConverter: BreezeVectorConverter[Vector]
implicit val Breeze2VectorConverter: VectorConverter[BDV[Double]]
}
trait BreezeVectorConverter[T] {
def convert(vector: T): BDV[Double]
}
trait VectorConverter[T] {
def convert(vector: T): Vector
}The org.apache.flink.ml.math package object provides implicit classes for enhanced functionality.
implicit class RichVector(vector: Vector) extends Iterable[(Int, Double)] {
def iterator: Iterator[(Int, Double)]
def valueIterator: Iterator[Double]
}
implicit class RichMatrix(matrix: Matrix) extends Iterable[(Int, Int, Double)] {
def iterator: Iterator[(Int, Int, Double)]
def valueIterator: Iterator[Double]
}
def vector2Array(vector: Vector): Array[Double]Usage Example:
val vector = DenseVector(1.0, 2.0, 3.0, 4.0)
val matrix = DenseMatrix(2, 2)(1.0, 2.0, 3.0, 4.0)
// Iterate over vector elements
for ((index, value) <- vector) {
println(s"vector($index) = $value")
}
// Iterate over matrix elements
for ((row, col, value) <- matrix) {
println(s"matrix($row, $col) = $value")
}
// Convert vector to array
val array = vector2Array(vector)Type class pattern for building vectors from different data types.
trait VectorBuilder[T] {
def build(data: T): Vector
}
object VectorBuilder {
implicit val arrayVectorBuilder: VectorBuilder[Array[Double]]
implicit val seqVectorBuilder: VectorBuilder[Seq[Double]]
}Usage Example:
import org.apache.flink.ml.math.VectorBuilder
def createVector[T](data: T)(implicit builder: VectorBuilder[T]): Vector = {
builder.build(data)
}
val vectorFromArray = createVector(Array(1.0, 2.0, 3.0))
val vectorFromSeq = createVector(Seq(4.0, 5.0, 6.0))