0
# Linear Algebra
1
2
Apache Flink ML provides a comprehensive linear algebra framework with vector and matrix abstractions that support both dense and sparse representations, optimized for distributed computing.
3
4
## Vectors
5
6
### Vector Base Trait
7
8
All vector implementations extend the base `Vector` trait which provides common operations.
9
10
```scala { .api }
11
trait Vector {
12
def size: Int
13
def apply(index: Int): Double
14
def update(index: Int, value: Double): Unit
15
def copy: Vector
16
def dot(other: Vector): Double
17
def outer(other: Vector): Matrix
18
def magnitude: Double
19
}
20
```
21
22
### Dense Vector
23
24
Dense vectors store all values in a contiguous array, suitable for data where most elements are non-zero.
25
26
```scala { .api }
27
case class DenseVector(data: Array[Double]) extends Vector {
28
def size: Int
29
def apply(index: Int): Double
30
def update(index: Int, value: Double): Unit
31
def copy: DenseVector
32
def dot(other: Vector): Double
33
def outer(other: Vector): Matrix
34
def magnitude: Double
35
def toSparseVector: SparseVector
36
}
37
38
object DenseVector {
39
def apply(values: Double*): DenseVector
40
def zeros(size: Int): DenseVector
41
def eye(size: Int, index: Int): DenseVector
42
def init(size: Int, f: Int => Double): DenseVector
43
}
44
```
45
46
**Usage Example:**
47
48
```scala
49
import org.apache.flink.ml.math.DenseVector
50
51
// Create dense vectors
52
val v1 = DenseVector(1.0, 2.0, 3.0, 4.0)
53
val v2 = DenseVector.zeros(4) // [0.0, 0.0, 0.0, 0.0]
54
val v3 = DenseVector.eye(4, 2) // [0.0, 0.0, 1.0, 0.0]
55
val v4 = DenseVector.init(4, i => i * 2.0) // [0.0, 2.0, 4.0, 6.0]
56
57
// Vector operations
58
val dotProduct = v1.dot(v4) // Compute dot product
59
val magnitude = v1.magnitude // Compute L2 norm
60
val outerProduct = v1.outer(v4) // Compute outer product matrix
61
62
// Element access and modification
63
val element = v1(2) // Get element at index 2
64
v1(2) = 5.0 // Set element at index 2
65
66
// Convert to sparse
67
val sparse = v1.toSparseVector
68
```
69
70
### Sparse Vector
71
72
Sparse vectors store only non-zero values with their indices, efficient for data with many zero elements.
73
74
```scala { .api }
75
case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) extends Vector {
76
def apply(index: Int): Double
77
def update(index: Int, value: Double): Unit
78
def copy: SparseVector
79
def dot(other: Vector): Double
80
def outer(other: Vector): Matrix
81
def magnitude: Double
82
def toDenseVector: DenseVector
83
def iterator: Iterator[(Int, Double)]
84
}
85
86
object SparseVector {
87
def fromCOO(size: Int, coordinates: Array[(Int, Double)]): SparseVector
88
def fromCOO(size: Int, coordinates: Iterator[(Int, Double)]): SparseVector
89
}
90
```
91
92
**Usage Example:**
93
94
```scala
95
import org.apache.flink.ml.math.SparseVector
96
97
// Create sparse vector from coordinate format (COO)
98
val coordinates = Array((0, 1.0), (2, 3.0), (5, 2.0))
99
val sparseVec = SparseVector.fromCOO(10, coordinates)
100
101
// Sparse vector operations
102
val denseVec = sparseVec.toDenseVector
103
val magnitude = sparseVec.magnitude
104
105
// Iterate over non-zero elements
106
for ((index, value) <- sparseVec) {
107
println(s"Index $index: $value")
108
}
109
```
110
111
## Matrices
112
113
### Matrix Base Trait
114
115
All matrix implementations extend the base `Matrix` trait.
116
117
```scala { .api }
118
trait Matrix {
119
def numRows: Int
120
def numCols: Int
121
def apply(row: Int, col: Int): Double
122
def update(row: Int, col: Int, value: Double): Unit
123
def copy: Matrix
124
}
125
```
126
127
### Dense Matrix
128
129
Dense matrices store all values in column-major order, suitable for small to medium-sized matrices.
130
131
```scala { .api }
132
case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) extends Matrix {
133
def apply(row: Int, col: Int): Double
134
def update(row: Int, col: Int, value: Double): Unit
135
def copy: DenseMatrix
136
def toSparseMatrix: SparseMatrix
137
}
138
139
object DenseMatrix {
140
def zeros(numRows: Int, numCols: Int): DenseMatrix
141
def eye(size: Int): DenseMatrix
142
def apply(numRows: Int, numCols: Int)(values: Double*): DenseMatrix
143
}
144
```
145
146
**Usage Example:**
147
148
```scala
149
import org.apache.flink.ml.math.DenseMatrix
150
151
// Create dense matrices
152
val m1 = DenseMatrix(2, 3)(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)
153
val zeros = DenseMatrix.zeros(3, 3)
154
val identity = DenseMatrix.eye(3)
155
156
// Matrix operations
157
val element = m1(1, 2) // Get element at (1, 2)
158
m1(1, 2) = 10.0 // Set element at (1, 2)
159
160
// Convert to sparse
161
val sparse = m1.toSparseMatrix
162
```
163
164
### Sparse Matrix
165
166
Sparse matrices use Compressed Sparse Column (CSC) format for efficient storage of matrices with many zero elements.
167
168
```scala { .api }
169
class SparseMatrix(
170
val numRows: Int,
171
val numCols: Int,
172
val rowIndices: Array[Int],
173
val colPtrs: Array[Int],
174
val data: Array[Double]
175
) extends Matrix {
176
def apply(row: Int, col: Int): Double
177
def update(row: Int, col: Int, value: Double): Unit
178
def copy: SparseMatrix
179
def toDenseMatrix: DenseMatrix
180
}
181
182
object SparseMatrix {
183
def fromCOO(
184
numRows: Int,
185
numCols: Int,
186
coordinates: Array[(Int, Int, Double)]
187
): SparseMatrix
188
}
189
```
190
191
**Usage Example:**
192
193
```scala
194
import org.apache.flink.ml.math.SparseMatrix
195
196
// Create sparse matrix from coordinate format
197
val coordinates = Array((0, 0, 1.0), (1, 2, 3.0), (2, 1, 2.0))
198
val sparseMatrix = SparseMatrix.fromCOO(3, 3, coordinates)
199
200
// Convert to dense
201
val denseMatrix = sparseMatrix.toDenseMatrix
202
```
203
204
## Distributed Linear Algebra
205
206
For large-scale operations, Flink ML provides distributed matrix operations.
207
208
### Distributed Matrix
209
210
```scala { .api }
211
trait DistributedMatrix {
212
def numRows: Int
213
def numCols: Int
214
}
215
```
216
217
### Distributed Row Matrix
218
219
```scala { .api }
220
class DistributedRowMatrix(
221
val data: DataSet[IndexedRow],
222
val numRows: Int,
223
val numCols: Int
224
) extends DistributedMatrix {
225
def toCOO(): DataSet[(Int, Int, Double)]
226
def toLocalSparseMatrix(): SparseMatrix
227
def toLocalDenseMatrix(): DenseMatrix
228
def add(other: DistributedRowMatrix): DistributedRowMatrix
229
def subtract(other: DistributedRowMatrix): DistributedRowMatrix
230
}
231
232
case class IndexedRow(rowIndex: Int, values: Vector)
233
234
object DistributedRowMatrix {
235
def fromCOO(
236
data: DataSet[(Int, Int, Double)],
237
numRows: Int,
238
numCols: Int
239
): DistributedRowMatrix
240
}
241
```
242
243
**Usage Example:**
244
245
```scala
246
import org.apache.flink.ml.math.distributed.{DistributedRowMatrix, IndexedRow}
247
import org.apache.flink.ml.math.DenseVector
248
249
// Create distributed matrix from rows
250
val rows: DataSet[IndexedRow] = env.fromCollection(Seq(
251
IndexedRow(0, DenseVector(1.0, 2.0, 3.0)),
252
IndexedRow(1, DenseVector(4.0, 5.0, 6.0)),
253
IndexedRow(2, DenseVector(7.0, 8.0, 9.0))
254
))
255
256
val distributedMatrix = new DistributedRowMatrix(rows, 3, 3)
257
258
// Matrix operations
259
val cooFormat = distributedMatrix.toCOO()
260
val localSparse = distributedMatrix.toLocalSparseMatrix()
261
val localDense = distributedMatrix.toLocalDenseMatrix()
262
263
// Arithmetic operations
264
val matrix2 = //... another DistributedRowMatrix
265
val sum = distributedMatrix.add(matrix2)
266
val difference = distributedMatrix.subtract(matrix2)
267
```
268
269
## BLAS Operations
270
271
Basic Linear Algebra Subprograms (BLAS) for efficient low-level operations.
272
273
```scala { .api }
274
object BLAS {
275
def axpy(a: Double, x: Vector, y: Vector): Unit
276
def dot(x: Vector, y: Vector): Double
277
def copy(x: Vector, y: Vector): Unit
278
def scal(a: Double, x: Vector): Unit
279
def syr(alpha: Double, x: Vector, A: Matrix): Unit
280
}
281
```
282
283
**Usage Example:**
284
285
```scala
286
import org.apache.flink.ml.math.BLAS
287
288
val x = DenseVector(1.0, 2.0, 3.0)
289
val y = DenseVector(4.0, 5.0, 6.0)
290
291
// y = a * x + y (AXPY operation)
292
BLAS.axpy(2.0, x, y)
293
294
// Dot product
295
val dotProduct = BLAS.dot(x, y)
296
297
// Scale vector: x = a * x
298
BLAS.scal(0.5, x)
299
```
300
301
## Breeze Integration
302
303
Integration with the Breeze linear algebra library for interoperability.
304
305
```scala { .api }
306
object Breeze {
307
implicit val Matrix2BreezeConverter: BreezeMatrixConverter[Matrix]
308
implicit val Breeze2MatrixConverter: MatrixConverter[BDM[Double]]
309
implicit val Vector2BreezeConverter: BreezeVectorConverter[Vector]
310
implicit val Breeze2VectorConverter: VectorConverter[BDV[Double]]
311
}
312
313
trait BreezeVectorConverter[T] {
314
def convert(vector: T): BDV[Double]
315
}
316
317
trait VectorConverter[T] {
318
def convert(vector: T): Vector
319
}
320
```
321
322
## Vector and Matrix Utilities
323
324
### Package Object Extensions
325
326
The `org.apache.flink.ml.math` package object provides implicit classes for enhanced functionality.
327
328
```scala { .api }
329
implicit class RichVector(vector: Vector) extends Iterable[(Int, Double)] {
330
def iterator: Iterator[(Int, Double)]
331
def valueIterator: Iterator[Double]
332
}
333
334
implicit class RichMatrix(matrix: Matrix) extends Iterable[(Int, Int, Double)] {
335
def iterator: Iterator[(Int, Int, Double)]
336
def valueIterator: Iterator[Double]
337
}
338
339
def vector2Array(vector: Vector): Array[Double]
340
```
341
342
**Usage Example:**
343
344
```scala
345
val vector = DenseVector(1.0, 2.0, 3.0, 4.0)
346
val matrix = DenseMatrix(2, 2)(1.0, 2.0, 3.0, 4.0)
347
348
// Iterate over vector elements
349
for ((index, value) <- vector) {
350
println(s"vector($index) = $value")
351
}
352
353
// Iterate over matrix elements
354
for ((row, col, value) <- matrix) {
355
println(s"matrix($row, $col) = $value")
356
}
357
358
// Convert vector to array
359
val array = vector2Array(vector)
360
```
361
362
## Vector Builder
363
364
Type class pattern for building vectors from different data types.
365
366
```scala { .api }
367
trait VectorBuilder[T] {
368
def build(data: T): Vector
369
}
370
371
object VectorBuilder {
372
implicit val arrayVectorBuilder: VectorBuilder[Array[Double]]
373
implicit val seqVectorBuilder: VectorBuilder[Seq[Double]]
374
}
375
```
376
377
**Usage Example:**
378
379
```scala
380
import org.apache.flink.ml.math.VectorBuilder
381
382
def createVector[T](data: T)(implicit builder: VectorBuilder[T]): Vector = {
383
builder.build(data)
384
}
385
386
val vectorFromArray = createVector(Array(1.0, 2.0, 3.0))
387
val vectorFromSeq = createVector(Seq(4.0, 5.0, 6.0))
388
```