or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md
tile.json

linear-algebra.mddocs/

Linear Algebra

Apache Flink ML provides a comprehensive linear algebra framework with vector and matrix abstractions that support both dense and sparse representations, optimized for distributed computing.

Vectors

Vector Base Trait

All vector implementations extend the base Vector trait which provides common operations.

trait Vector {
  def size: Int
  def apply(index: Int): Double
  def update(index: Int, value: Double): Unit
  def copy: Vector
  def dot(other: Vector): Double
  def outer(other: Vector): Matrix
  def magnitude: Double
}

Dense Vector

Dense vectors store all values in a contiguous array, suitable for data where most elements are non-zero.

case class DenseVector(data: Array[Double]) extends Vector {
  def size: Int
  def apply(index: Int): Double
  def update(index: Int, value: Double): Unit
  def copy: DenseVector
  def dot(other: Vector): Double
  def outer(other: Vector): Matrix
  def magnitude: Double
  def toSparseVector: SparseVector
}

object DenseVector {
  def apply(values: Double*): DenseVector
  def zeros(size: Int): DenseVector
  def eye(size: Int, index: Int): DenseVector
  def init(size: Int, f: Int => Double): DenseVector
}

Usage Example:

import org.apache.flink.ml.math.DenseVector

// Create dense vectors
val v1 = DenseVector(1.0, 2.0, 3.0, 4.0)
val v2 = DenseVector.zeros(4)                    // [0.0, 0.0, 0.0, 0.0]
val v3 = DenseVector.eye(4, 2)                   // [0.0, 0.0, 1.0, 0.0]
val v4 = DenseVector.init(4, i => i * 2.0)       // [0.0, 2.0, 4.0, 6.0]

// Vector operations
val dotProduct = v1.dot(v4)                      // Compute dot product
val magnitude = v1.magnitude                     // Compute L2 norm
val outerProduct = v1.outer(v4)                  // Compute outer product matrix

// Element access and modification
val element = v1(2)                              // Get element at index 2
v1(2) = 5.0                                      // Set element at index 2

// Convert to sparse
val sparse = v1.toSparseVector

Sparse Vector

Sparse vectors store only non-zero values with their indices, efficient for data with many zero elements.

case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector {
  def apply(index: Int): Double
  def update(index: Int, value: Double): Unit
  def copy: SparseVector
  def dot(other: Vector): Double
  def outer(other: Vector): Matrix
  def magnitude: Double
  def toDenseVector: DenseVector
  def iterator: Iterator[(Int, Double)]
}

object SparseVector {
  def fromCOO(size: Int, coordinates: Array[(Int, Double)]): SparseVector
  def fromCOO(size: Int, coordinates: Iterator[(Int, Double)]): SparseVector
}

Usage Example:

import org.apache.flink.ml.math.SparseVector

// Create sparse vector from coordinate format (COO)
val coordinates = Array((0, 1.0), (2, 3.0), (5, 2.0))
val sparseVec = SparseVector.fromCOO(10, coordinates)

// Sparse vector operations
val denseVec = sparseVec.toDenseVector
val magnitude = sparseVec.magnitude

// Iterate over non-zero elements
for ((index, value) <- sparseVec) {
  println(s"Index $index: $value")
}

Matrices

Matrix Base Trait

All matrix implementations extend the base Matrix trait.

trait Matrix {
  def numRows: Int
  def numCols: Int
  def apply(row: Int, col: Int): Double
  def update(row: Int, col: Int, value: Double): Unit
  def copy: Matrix
}

Dense Matrix

Dense matrices store all values in column-major order, suitable for small to medium-sized matrices.

case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix {
  def apply(row: Int, col: Int): Double
  def update(row: Int, col: Int, value: Double): Unit
  def copy: DenseMatrix
  def toSparseMatrix: SparseMatrix
}

object DenseMatrix {
  def zeros(numRows: Int, numCols: Int): DenseMatrix
  def eye(size: Int): DenseMatrix
  def apply(numRows: Int, numCols: Int)(values: Double*): DenseMatrix
}

Usage Example:

import org.apache.flink.ml.math.DenseMatrix

// Create dense matrices
val m1 = DenseMatrix(2, 3)(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)
val zeros = DenseMatrix.zeros(3, 3)
val identity = DenseMatrix.eye(3)

// Matrix operations
val element = m1(1, 2)                           // Get element at (1, 2)
m1(1, 2) = 10.0                                  // Set element at (1, 2)

// Convert to sparse
val sparse = m1.toSparseMatrix

Sparse Matrix

Sparse matrices use Compressed Sparse Column (CSC) format for efficient storage of matrices with many zero elements.

class SparseMatrix(
  val numRows: Int,
  val numCols: Int,
  val rowIndices: Array[Int],
  val colPtrs: Array[Int],
  val data: Array[Double]
) extends Matrix {
  def apply(row: Int, col: Int): Double
  def update(row: Int, col: Int, value: Double): Unit
  def copy: SparseMatrix
  def toDenseMatrix: DenseMatrix
}

object SparseMatrix {
  def fromCOO(
    numRows: Int,
    numCols: Int,
    coordinates: Array[(Int, Int, Double)]
  ): SparseMatrix
}

Usage Example:

import org.apache.flink.ml.math.SparseMatrix

// Create sparse matrix from coordinate format
val coordinates = Array((0, 0, 1.0), (1, 2, 3.0), (2, 1, 2.0))
val sparseMatrix = SparseMatrix.fromCOO(3, 3, coordinates)

// Convert to dense
val denseMatrix = sparseMatrix.toDenseMatrix

Distributed Linear Algebra

For large-scale operations, Flink ML provides distributed matrix operations.

Distributed Matrix

trait DistributedMatrix {
  def numRows: Int
  def numCols: Int
}

Distributed Row Matrix

class DistributedRowMatrix(
  val data: DataSet[IndexedRow],
  val numRows: Int,
  val numCols: Int
) extends DistributedMatrix {
  def toCOO(): DataSet[(Int, Int, Double)]
  def toLocalSparseMatrix(): SparseMatrix
  def toLocalDenseMatrix(): DenseMatrix
  def add(other: DistributedRowMatrix): DistributedRowMatrix
  def subtract(other: DistributedRowMatrix): DistributedRowMatrix
}

case class IndexedRow(rowIndex: Int, values: Vector)

object DistributedRowMatrix {
  def fromCOO(
    data: DataSet[(Int, Int, Double)],
    numRows: Int,
    numCols: Int
  ): DistributedRowMatrix
}

Usage Example:

import org.apache.flink.ml.math.distributed.{DistributedRowMatrix, IndexedRow}
import org.apache.flink.ml.math.DenseVector

// Create distributed matrix from rows
val rows: DataSet[IndexedRow] = env.fromCollection(Seq(
  IndexedRow(0, DenseVector(1.0, 2.0, 3.0)),
  IndexedRow(1, DenseVector(4.0, 5.0, 6.0)),
  IndexedRow(2, DenseVector(7.0, 8.0, 9.0))
))

val distributedMatrix = new DistributedRowMatrix(rows, 3, 3)

// Matrix operations
val cooFormat = distributedMatrix.toCOO()
val localSparse = distributedMatrix.toLocalSparseMatrix()
val localDense = distributedMatrix.toLocalDenseMatrix()

// Arithmetic operations
val matrix2 = //... another DistributedRowMatrix
val sum = distributedMatrix.add(matrix2)
val difference = distributedMatrix.subtract(matrix2)

BLAS Operations

Basic Linear Algebra Subprograms (BLAS) for efficient low-level operations.

object BLAS {
  def axpy(a: Double, x: Vector, y: Vector): Unit
  def dot(x: Vector, y: Vector): Double
  def copy(x: Vector, y: Vector): Unit
  def scal(a: Double, x: Vector): Unit
  def syr(alpha: Double, x: Vector, A: Matrix): Unit
}

Usage Example:

import org.apache.flink.ml.math.BLAS

val x = DenseVector(1.0, 2.0, 3.0)
val y = DenseVector(4.0, 5.0, 6.0)

// y = a * x + y (AXPY operation)
BLAS.axpy(2.0, x, y)

// Dot product
val dotProduct = BLAS.dot(x, y)

// Scale vector: x = a * x
BLAS.scal(0.5, x)

Breeze Integration

Integration with the Breeze linear algebra library for interoperability.

object Breeze {
  implicit val Matrix2BreezeConverter: BreezeMatrixConverter[Matrix]
  implicit val Breeze2MatrixConverter: MatrixConverter[BDM[Double]]
  implicit val Vector2BreezeConverter: BreezeVectorConverter[Vector]
  implicit val Breeze2VectorConverter: VectorConverter[BDV[Double]]
}

trait BreezeVectorConverter[T] {
  def convert(vector: T): BDV[Double]
}

trait VectorConverter[T] {
  def convert(vector: T): Vector
}

Vector and Matrix Utilities

Package Object Extensions

The org.apache.flink.ml.math package object provides implicit classes for enhanced functionality.

implicit class RichVector(vector: Vector) extends Iterable[(Int, Double)] {
  def iterator: Iterator[(Int, Double)]
  def valueIterator: Iterator[Double]
}

implicit class RichMatrix(matrix: Matrix) extends Iterable[(Int, Int, Double)] {
  def iterator: Iterator[(Int, Int, Double)]
  def valueIterator: Iterator[Double]
}

def vector2Array(vector: Vector): Array[Double]

Usage Example:

val vector = DenseVector(1.0, 2.0, 3.0, 4.0)
val matrix = DenseMatrix(2, 2)(1.0, 2.0, 3.0, 4.0)

// Iterate over vector elements
for ((index, value) <- vector) {
  println(s"vector($index) = $value")
}

// Iterate over matrix elements
for ((row, col, value) <- matrix) {
  println(s"matrix($row, $col) = $value")
}

// Convert vector to array
val array = vector2Array(vector)

Vector Builder

Type class pattern for building vectors from different data types.

trait VectorBuilder[T] {
  def build(data: T): Vector
}

object VectorBuilder {
  implicit val arrayVectorBuilder: VectorBuilder[Array[Double]]
  implicit val seqVectorBuilder: VectorBuilder[Seq[Double]]
}

Usage Example:

import org.apache.flink.ml.math.VectorBuilder

def createVector[T](data: T)(implicit builder: VectorBuilder[T]): Vector = {
  builder.build(data)
}

val vectorFromArray = createVector(Array(1.0, 2.0, 3.0))
val vectorFromSeq = createVector(Seq(4.0, 5.0, 6.0))