0
# Linear Algebra
1
2
MLlib provides comprehensive linear algebra operations with support for dense and sparse vectors and matrices. The linear algebra package includes both local operations and distributed matrix computations for large-scale data processing.
3
4
## Vectors
5
6
### Vector Trait
7
8
```scala { .api }
9
trait Vector extends Serializable {
10
def size: Int
11
def toArray: Array[Double]
12
def apply(i: Int): Double
13
def copy: Vector
14
def foreachActive(f: (Int, Double) => Unit): Unit
15
def numActives: Int
16
def numNonzeros: Int
17
def compressed: Vector
18
def toDense: DenseVector
19
def toSparse: SparseVector
20
def argmax: Int
21
def dot(other: Vector): Double
22
def squared: Vector
23
}
24
```
25
26
### DenseVector
27
28
```scala { .api }
29
class DenseVector(val values: Array[Double]) extends Vector {
30
def this(values: Double*) = this(values.toArray)
31
32
override def size: Int = values.length
33
override def toArray: Array[Double] = values.clone()
34
override def apply(i: Int): Double = values(i)
35
override def copy: DenseVector = new DenseVector(values.clone())
36
override def foreachActive(f: (Int, Double) => Unit): Unit
37
override def numActives: Int = size
38
override def numNonzeros: Int
39
override def compressed: Vector = toSparse
40
override def toDense: DenseVector = this
41
override def toSparse: SparseVector
42
override def argmax: Int
43
override def dot(other: Vector): Double
44
override def squared: DenseVector
45
}
46
```
47
48
### SparseVector
49
50
```scala { .api }
51
class SparseVector(override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector {
52
require(indices.length == values.length)
53
require(indices.length <= size)
54
55
override def toArray: Array[Double]
56
override def apply(i: Int): Double
57
override def copy: SparseVector = new SparseVector(size, indices.clone(), values.clone())
58
override def foreachActive(f: (Int, Double) => Unit): Unit
59
override def numActives: Int = values.length
60
override def numNonzeros: Int
61
override def compressed: SparseVector = this
62
override def toDense: DenseVector
63
override def toSparse: SparseVector = this
64
override def argmax: Int
65
override def dot(other: Vector): Double
66
override def squared: SparseVector
67
}
68
```
69
70
### Vectors Object
71
72
```scala { .api }
73
object Vectors {
74
def dense(firstValue: Double, otherValues: Double*): DenseVector
75
def dense(values: Array[Double]): DenseVector
76
def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
77
def sparse(size: Int, elements: Seq[(Int, Double)]): SparseVector
78
def zeros(size: Int): DenseVector
79
def fromBreeze(breezeVector: BV[Double]): Vector
80
def norm(vector: Vector, p: Double): Double
81
def sqdist(v1: Vector, v2: Vector): Double
82
def parseNumeric(s: String): Vector
83
}
84
```
85
86
## Matrices
87
88
### Matrix Trait
89
90
```scala { .api }
91
trait Matrix extends Serializable {
92
def numRows: Int
93
def numCols: Int
94
def toArray: Array[Double]
95
def apply(i: Int, j: Int): Double
96
def copy: Matrix
97
def foreachActive(f: (Int, Int, Double) => Unit): Unit
98
def numActives: Int
99
def numNonzeros: Int
100
def transpose: Matrix
101
def toDense: DenseMatrix
102
def toSparse: SparseMatrix
103
def multiply(y: DenseMatrix): DenseMatrix
104
def multiply(y: DenseVector): DenseVector
105
def colIter: Iterator[Vector]
106
def rowIter: Iterator[Vector]
107
}
108
```
109
110
### DenseMatrix
111
112
```scala { .api }
113
class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double],
114
val isTransposed: Boolean = false) extends Matrix {
115
116
require(values.length == numRows * numCols)
117
118
def this(numRows: Int, numCols: Int, values: Array[Double]) = this(numRows, numCols, values, false)
119
120
override def toArray: Array[Double] = values.clone()
121
override def apply(i: Int, j: Int): Double
122
override def copy: DenseMatrix
123
override def foreachActive(f: (Int, Int, Double) => Unit): Unit
124
override def numActives: Int = numRows * numCols
125
override def numNonzeros: Int
126
override def transpose: DenseMatrix
127
override def toDense: DenseMatrix = this
128
override def toSparse: SparseMatrix
129
override def multiply(y: DenseMatrix): DenseMatrix
130
override def multiply(y: DenseVector): DenseVector
131
override def colIter: Iterator[DenseVector]
132
override def rowIter: Iterator[DenseVector]
133
134
def map(f: Double => Double): DenseMatrix
135
def update(i: Int, j: Int, value: Double): Unit
136
}
137
```
138
139
### SparseMatrix
140
141
```scala { .api }
142
class SparseMatrix(override val numRows: Int, override val numCols: Int,
143
val colPtrs: Array[Int], val rowIndices: Array[Int], val values: Array[Double],
144
override val isTransposed: Boolean = false) extends Matrix {
145
146
require(colPtrs.length == numCols + 1)
147
require(rowIndices.length == values.length)
148
149
override def toArray: Array[Double]
150
override def apply(i: Int, j: Int): Double
151
override def copy: SparseMatrix
152
override def foreachActive(f: (Int, Int, Double) => Unit): Unit
153
override def numActives: Int = values.length
154
override def numNonzeros: Int
155
override def transpose: SparseMatrix
156
override def toDense: DenseMatrix
157
override def toSparse: SparseMatrix = this
158
override def multiply(y: DenseMatrix): DenseMatrix
159
override def multiply(y: DenseVector): DenseVector
160
override def colIter: Iterator[SparseVector]
161
override def rowIter: Iterator[SparseVector]
162
}
163
```
164
165
### Matrices Object
166
167
```scala { .api }
168
object Matrices {
169
def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
170
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],
171
rowIndices: Array[Int], values: Array[Double]): SparseMatrix
172
def sparse(numRows: Int, numCols: Int, entries: Seq[(Int, Int, Double)]): SparseMatrix
173
def zeros(numRows: Int, numCols: Int): DenseMatrix
174
def eye(n: Int): DenseMatrix
175
def diag(vector: Vector): DenseMatrix
176
def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix
177
def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix
178
def horzcat(matrices: Array[Matrix]): Matrix
179
def vertcat(matrices: Array[Matrix]): Matrix
180
def fromBreeze(breeze: BM[Double]): Matrix
181
}
182
```
183
184
## BLAS Operations
185
186
```scala { .api }
187
object BLAS {
188
// Level 1 BLAS operations
189
def dot(x: Vector, y: Vector): Double
190
def scal(a: Double, x: Vector): Unit
191
def copy(x: Vector, y: Vector): Unit
192
def axpy(a: Double, x: Vector, y: Vector): Unit
193
def nrm2(x: Vector): Double
194
195
// Level 2 BLAS operations
196
def gemv(alpha: Double, A: Matrix, x: Vector, beta: Double, y: Vector): Unit
197
def ger(alpha: Double, x: Vector, y: Vector, A: Matrix): Unit
198
def spmv(alpha: Double, A: Matrix, x: Vector, beta: Double, y: Vector): Unit
199
def spr(alpha: Double, x: Vector, A: DenseMatrix): Unit
200
def syr(alpha: Double, x: Vector, A: DenseMatrix): Unit
201
202
// Level 3 BLAS operations
203
def gemm(alpha: Double, A: Matrix, B: Matrix, beta: Double, C: DenseMatrix): Unit
204
def syrk(alpha: Double, A: Matrix, beta: Double, C: DenseMatrix): Unit
205
}
206
```
207
208
## SQL Integration
209
210
### User-Defined Types
211
212
```scala { .api }
213
class VectorUDT extends UserDefinedType[Vector] {
214
override def sqlType: DataType = StructType(Array(
215
StructField("type", ByteType, nullable = false),
216
StructField("size", IntegerType, nullable = true),
217
StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true),
218
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)
219
))
220
221
override def serialize(obj: Vector): InternalRow
222
override def deserialize(datum: Any): Vector
223
override def userClass: Class[Vector] = classOf[Vector]
224
}
225
226
class MatrixUDT extends UserDefinedType[Matrix] {
227
override def sqlType: DataType = StructType(Array(
228
StructField("type", ByteType, nullable = false),
229
StructField("numRows", IntegerType, nullable = false),
230
StructField("numCols", IntegerType, nullable = false),
231
StructField("colPtrs", ArrayType(IntegerType, containsNull = false), nullable = true),
232
StructField("rowIndices", ArrayType(IntegerType, containsNull = false), nullable = true),
233
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = false),
234
StructField("isTransposed", BooleanType, nullable = false)
235
))
236
237
override def serialize(obj: Matrix): InternalRow
238
override def deserialize(datum: Any): Matrix
239
override def userClass: Class[Matrix] = classOf[Matrix]
240
}
241
```
242
243
### SQL Data Types
244
245
```scala { .api }
246
object SQLDataTypes {
247
val VectorType: DataType = new VectorUDT()
248
val MatrixType: DataType = new MatrixUDT()
249
}
250
```
251
252
## Usage Examples
253
254
### Basic Vector Operations
255
256
```scala
257
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, SparseVector}
258
259
// Create dense vectors
260
val denseVec1 = Vectors.dense(1.0, 2.0, 3.0, 4.0)
261
val denseVec2 = Vectors.dense(Array(5.0, 6.0, 7.0, 8.0))
262
263
// Create sparse vectors
264
val sparseVec1 = Vectors.sparse(5, Array(0, 2, 4), Array(1.0, 3.0, 5.0))
265
val sparseVec2 = Vectors.sparse(5, Seq((1, 2.0), (3, 4.0)))
266
267
// Vector operations
268
println(s"Dense vector: $denseVec1")
269
println(s"Sparse vector: $sparseVec1")
270
println(s"Vector size: ${denseVec1.size}")
271
println(s"Element access: ${denseVec1(1)}")
272
273
// Dot product
274
val dotProduct = denseVec1.dot(denseVec2)
275
println(s"Dot product: $dotProduct")
276
277
// Vector norms
278
val norm1 = Vectors.norm(denseVec1, 1.0) // L1 norm
279
val norm2 = Vectors.norm(denseVec1, 2.0) // L2 norm (Euclidean)
280
println(s"L1 norm: $norm1")
281
println(s"L2 norm: $norm2")
282
283
// Convert between dense and sparse
284
val denseToSparse = denseVec1.toSparse
285
val sparseToDense = sparseVec1.toDense
286
println(s"Dense to sparse: $denseToSparse")
287
println(s"Sparse to dense: $sparseToDense")
288
289
// Iterate over active elements
290
println("Active elements in sparse vector:")
291
sparseVec1.foreachActive { case (index, value) =>
292
println(s" Index $index: $value")
293
}
294
295
// Vector statistics
296
println(s"Number of active elements: ${sparseVec1.numActives}")
297
println(s"Number of non-zero elements: ${sparseVec1.numNonzeros}")
298
println(s"Argmax index: ${denseVec1.argmax}")
299
```
300
301
### Matrix Operations
302
303
```scala
304
import org.apache.spark.ml.linalg.{Matrix, Matrices, DenseMatrix, SparseMatrix}
305
306
// Create dense matrices
307
val denseMatrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
308
val identityMatrix = Matrices.eye(3)
309
val zeroMatrix = Matrices.zeros(2, 3)
310
311
// Create sparse matrix
312
val sparseMatrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(9.0, 6.0, 8.0))
313
314
println(s"Dense matrix:\n${denseMatrix}")
315
println(s"Sparse matrix:\n${sparseMatrix}")
316
317
// Matrix properties
318
println(s"Matrix dimensions: ${denseMatrix.numRows} x ${denseMatrix.numCols}")
319
println(s"Number of non-zeros: ${sparseMatrix.numNonzeros}")
320
321
// Element access
322
println(s"Matrix element (1,0): ${denseMatrix(1, 0)}")
323
324
// Matrix transpose
325
val transposed = denseMatrix.transpose
326
println(s"Transposed matrix:\n${transposed}")
327
328
// Matrix-vector multiplication
329
val vector = Vectors.dense(1.0, 2.0)
330
val result = denseMatrix.multiply(vector)
331
println(s"Matrix-vector product: $result")
332
333
// Matrix-matrix multiplication
334
val matrix2 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
335
val matrixProduct = transposed.multiply(matrix2.toDense)
336
println(s"Matrix-matrix product:\n${matrixProduct}")
337
338
// Iterate over columns and rows
339
println("Matrix columns:")
340
denseMatrix.colIter.zipWithIndex.foreach { case (col, idx) =>
341
println(s" Column $idx: $col")
342
}
343
344
println("Matrix rows:")
345
denseMatrix.rowIter.zipWithIndex.foreach { case (row, idx) =>
346
println(s" Row $idx: $row")
347
}
348
```
349
350
### BLAS Operations
351
352
```scala
353
import org.apache.spark.ml.linalg.BLAS
354
import org.apache.spark.ml.linalg.{Vectors, Matrices}
355
356
// Level 1 BLAS operations
357
val x = Vectors.dense(1.0, 2.0, 3.0).toDense
358
val y = Vectors.dense(4.0, 5.0, 6.0).toDense
359
360
// Dot product
361
val dot = BLAS.dot(x, y)
362
println(s"BLAS dot product: $dot")
363
364
// Scale vector (in-place)
365
val scaledX = x.copy.toDense
366
BLAS.scal(2.0, scaledX)
367
println(s"Scaled vector: $scaledX")
368
369
// Vector addition: y = alpha * x + y
370
val axpyResult = y.copy.toDense
371
BLAS.axpy(0.5, x, axpyResult)
372
println(s"AXPY result: $axpyResult")
373
374
// Vector norm
375
val norm = BLAS.nrm2(x)
376
println(s"Vector norm: $norm")
377
378
// Level 2 BLAS operations
379
val A = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)).toDense
380
val xVec = Vectors.dense(1.0, 2.0, 3.0).toDense
381
val yVec = Vectors.zeros(2).toDense
382
383
// General matrix-vector multiplication: y = alpha * A * x + beta * y
384
BLAS.gemv(1.0, A, xVec, 0.0, yVec)
385
println(s"GEMV result: $yVec")
386
387
// Level 3 BLAS operations
388
val B = Matrices.dense(3, 2, Array(1.0, 0.0, 0.0, 0.0, 1.0, 0.0)).toDense
389
val C = Matrices.zeros(2, 2).toDense
390
391
// General matrix-matrix multiplication: C = alpha * A * B + beta * C
392
BLAS.gemm(1.0, A, B, 0.0, C)
393
println(s"GEMM result:\n${C}")
394
```
395
396
### Working with DataFrames
397
398
```scala
399
import org.apache.spark.sql.functions._
400
import org.apache.spark.ml.linalg.{SQLDataTypes, Vectors}
401
402
// Create DataFrame with vector columns
403
val data = Seq(
404
(1, Vectors.dense(1.0, 2.0, 3.0)),
405
(2, Vectors.sparse(3, Array(0, 2), Array(4.0, 5.0))),
406
(3, Vectors.dense(6.0, 7.0, 8.0))
407
).toDF("id", "features")
408
409
data.printSchema()
410
data.show(truncate = false)
411
412
// Extract vector elements using SQL functions
413
val withElements = data
414
.withColumn("first_element", col("features").getItem(0))
415
.withColumn("vector_size", expr("size(features)"))
416
417
withElements.show()
418
419
// Vector aggregations
420
val vectorStats = data.agg(
421
count("features").alias("count"),
422
// Custom aggregations would require UDAFs
423
first("features").alias("sample_vector")
424
)
425
426
vectorStats.show(truncate = false)
427
428
// Filter vectors by properties
429
val denseVectors = data.filter(
430
col("features").cast("string").contains("DenseVector")
431
)
432
433
println("Dense vectors:")
434
denseVectors.show(truncate = false)
435
```
436
437
### Sparse Matrix Construction and Operations
438
439
```scala
440
import scala.util.Random
441
442
// Construct large sparse matrix efficiently
443
def createRandomSparseMatrix(numRows: Int, numCols: Int, density: Double): SparseMatrix = {
444
val random = new Random(42)
445
val entries = scala.collection.mutable.ArrayBuffer[(Int, Int, Double)]()
446
447
for {
448
i <- 0 until numRows
449
j <- 0 until numCols
450
if random.nextDouble() < density
451
} {
452
entries += ((i, j, random.nextGaussian()))
453
}
454
455
Matrices.sparse(numRows, numCols, entries.toSeq).toSparse
456
}
457
458
val largeSparseMatrix = createRandomSparseMatrix(1000, 500, 0.01)
459
println(s"Created sparse matrix: ${largeSparseMatrix.numRows} x ${largeSparseMatrix.numCols}")
460
println(s"Density: ${largeSparseMatrix.numNonzeros.toDouble / (largeSparseMatrix.numRows * largeSparseMatrix.numCols)}")
461
462
// Sparse matrix operations
463
val vector1000 = Vectors.sparse(500, (0 until 50).toArray, Array.fill(50)(1.0))
464
val sparseResult = largeSparseMatrix.multiply(vector1000)
465
println(s"Sparse matrix-vector multiplication result size: ${sparseResult.size}")
466
467
// Convert between sparse and dense (be careful with memory)
468
if (largeSparseMatrix.numNonzeros < 10000) { // Only if reasonably small
469
val dense = largeSparseMatrix.toDense
470
println(s"Converted to dense: ${dense.numRows} x ${dense.numCols}")
471
}
472
```
473
474
### Advanced Linear Algebra Operations
475
476
```scala
477
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
478
import breeze.linalg._
479
480
// Convert between MLlib and Breeze for advanced operations
481
def mlibToBreeze(matrix: DenseMatrix): BDM[Double] = {
482
new BDM(matrix.numRows, matrix.numCols, matrix.values)
483
}
484
485
def breezeToMllib(matrix: BDM[Double]): DenseMatrix = {
486
Matrices.dense(matrix.rows, matrix.cols, matrix.data).toDense
487
}
488
489
val mlibMatrix = Matrices.dense(3, 3, Array(1, 2, 3, 4, 5, 6, 7, 8, 9)).toDense
490
val breezeMatrix = mlibToBreeze(mlibMatrix)
491
492
// Advanced operations using Breeze
493
val eigenDecomp = eig(breezeMatrix)
494
println(s"Eigenvalues: ${eigenDecomp.eigenvalues}")
495
496
val svd = breeze.linalg.svd(breezeMatrix)
497
println(s"Singular values: ${svd.S}")
498
499
// Matrix inverse (if square and invertible)
500
if (breezeMatrix.rows == breezeMatrix.cols) {
501
try {
502
val inverse = inv(breezeMatrix)
503
val mlibInverse = breezeToMllib(inverse)
504
println(s"Matrix inverse:\n${mlibInverse}")
505
} catch {
506
case _: breeze.linalg.MatrixSingularException =>
507
println("Matrix is singular")
508
}
509
}
510
511
// QR decomposition
512
val qr = breeze.linalg.qr(breezeMatrix)
513
val mlibQ = breezeToMllib(qr.q)
514
val mlibR = breezeToMllib(qr.r)
515
println(s"Q matrix:\n${mlibQ}")
516
println(s"R matrix:\n${mlibR}")
517
```
518
519
### Performance Considerations
520
521
```scala
522
import org.apache.spark.storage.StorageLevel
523
524
// Efficient vector operations for large datasets
525
def efficientVectorProcessing(vectors: DataFrame): DataFrame = {
526
// Cache frequently accessed vector data
527
val cachedVectors = vectors.cache()
528
529
// Use vectorized operations when possible
530
val processedVectors = cachedVectors
531
.withColumn("vector_norm",
532
expr("aggregate(transform(features.values, x -> x * x), 0.0, (acc, x) -> acc + x, acc -> sqrt(acc))"))
533
.withColumn("max_element",
534
expr("array_max(features.values)"))
535
.withColumn("min_element",
536
expr("array_min(features.values)"))
537
538
processedVectors
539
}
540
541
// Memory-efficient sparse operations
542
def processLargeSparseVectors(sparseVectors: Array[SparseVector]): Unit = {
543
sparseVectors.foreach { vec =>
544
// Process only active elements to save computation
545
vec.foreachActive { case (index, value) =>
546
// Efficient processing of non-zero elements only
547
if (math.abs(value) > 1e-10) { // Numerical stability
548
// Process significant values
549
}
550
}
551
}
552
}
553
554
// Batch matrix operations
555
def batchMatrixOperations(matrices: Array[DenseMatrix]): Array[DenseMatrix] = {
556
// Process matrices in parallel where possible
557
matrices.par.map { matrix =>
558
// Perform expensive operations in parallel
559
val result = matrix.transpose
560
result
561
}.toArray
562
}
563
```