0
# Linear Algebra
1
2
High-performance vector and matrix operations with support for dense and sparse representations, distributed matrices, and common linear algebra operations. MLlib provides optimized linear algebra primitives that form the foundation for all machine learning algorithms.
3
4
## Capabilities
5
6
### Vector Operations
7
8
```scala { .api }
9
/**
10
* Vector - abstract base class for dense and sparse vectors
11
* Represents a numerical vector with optimized storage formats
12
*/
13
sealed trait Vector extends Serializable {
14
def size: Int
15
def toArray: Array[Double]
16
def apply(i: Int): Double
17
def copy: Vector
18
def foreachActive(f: (Int, Double) => Unit): Unit
19
def numActives: Int
20
def numNonzeros: Int
21
def toDense: DenseVector
22
def toSparse: SparseVector
23
def compressed: Vector
24
def argmax: Int
25
def dot(other: Vector): Double
26
def squared: Vector
27
def norm(p: Double): Double
28
}
29
30
/**
31
* DenseVector - dense vector implementation storing all values
32
* Efficient for vectors with few zero values
33
*/
34
class DenseVector(val values: Array[Double]) extends Vector {
35
def size: Int = values.length
36
def apply(i: Int): Double = values(i)
37
def copy: DenseVector = new DenseVector(values.clone())
38
}
39
40
/**
41
* SparseVector - sparse vector storing only non-zero values
42
* Efficient for vectors with many zero values
43
*/
44
class SparseVector(
45
val size: Int,
46
val indices: Array[Int],
47
val values: Array[Double]
48
) extends Vector {
49
def numActives: Int = values.length
50
def numNonzeros: Int = values.count(_ != 0.0)
51
def apply(i: Int): Double = {
52
val idx = java.util.Arrays.binarySearch(indices, i)
53
if (idx >= 0) values(idx) else 0.0
54
}
55
}
56
57
/**
58
* Vectors - factory object for creating vectors
59
* Provides utility methods for vector creation and operations
60
*/
61
object Vectors {
62
def dense(values: Array[Double]): DenseVector
63
def dense(firstValue: Double, otherValues: Double*): DenseVector
64
def sparse(size: Int, indices: Array[Int], values: Array[Double]): SparseVector
65
def sparse(size: Int, elements: Seq[(Int, Double)]): SparseVector
66
def zeros(size: Int): DenseVector
67
def fromBreeze(bv: BV[Double]): Vector
68
def fromML(vector: newlinalg.Vector): Vector
69
def norm(vector: Vector, p: Double): Double
70
def sqdist(v1: Vector, v2: Vector): Double
71
}
72
```
73
74
### Matrix Operations
75
76
```scala { .api }
77
/**
78
* Matrix - abstract base class for dense and sparse matrices
79
* Represents a numerical matrix with optimized storage formats
80
*/
81
sealed trait Matrix extends Serializable {
82
def numRows: Int
83
def numCols: Int
84
def toArray: Array[Double]
85
def apply(i: Int, j: Int): Double
86
def copy: Matrix
87
def transpose: Matrix
88
def foreachActive(f: (Int, Int, Double) => Unit): Unit
89
def numActives: Int
90
def numNonzeros: Int
91
def toDense: DenseMatrix
92
def toSparse: SparseMatrix
93
def compressed: Matrix
94
def colIter: Iterator[Vector]
95
def rowIter: Iterator[Vector]
96
}
97
98
/**
99
* DenseMatrix - dense matrix implementation storing all values
100
* Column-major storage format for efficient linear algebra operations
101
*/
102
class DenseMatrix(
103
val numRows: Int,
104
val numCols: Int,
105
val values: Array[Double],
106
val isTransposed: Boolean = false
107
) extends Matrix {
108
def apply(i: Int, j: Int): Double = {
109
if (isTransposed) values(j * numRows + i)
110
else values(i + j * numRows)
111
}
112
113
def copy: DenseMatrix = new DenseMatrix(numRows, numCols, values.clone())
114
}
115
116
/**
117
* SparseMatrix - sparse matrix storing only non-zero values
118
* Compressed Sparse Column (CSC) format for memory efficiency
119
*/
120
class SparseMatrix(
121
val numRows: Int,
122
val numCols: Int,
123
val colPtrs: Array[Int],
124
val rowIndices: Array[Int],
125
val values: Array[Double],
126
val isTransposed: Boolean = false
127
) extends Matrix {
128
def numActives: Int = values.length
129
def numNonzeros: Int = values.count(_ != 0.0)
130
131
def apply(i: Int, j: Int): Double = {
132
val (row, col) = if (isTransposed) (j, i) else (i, j)
133
val start = colPtrs(col)
134
val end = colPtrs(col + 1)
135
val idx = java.util.Arrays.binarySearch(rowIndices, start, end, row)
136
if (idx >= 0) values(idx) else 0.0
137
}
138
}
139
140
/**
141
* Matrices - factory object for creating matrices
142
* Provides utility methods for matrix creation and operations
143
*/
144
object Matrices {
145
def dense(numRows: Int, numCols: Int, values: Array[Double]): DenseMatrix
146
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int],
147
rowIndices: Array[Int], values: Array[Double]): SparseMatrix
148
def sparse(numRows: Int, numCols: Int, entries: Iterable[(Int, Int, Double)]): SparseMatrix
149
def zeros(numRows: Int, numCols: Int): DenseMatrix
150
def eye(n: Int): DenseMatrix
151
def diag(vector: Vector): DenseMatrix
152
def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix
153
def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix
154
def fromBreeze(breeze: BM[Double]): Matrix
155
def horzcat(matrices: Array[Matrix]): Matrix
156
def vertcat(matrices: Array[Matrix]): Matrix
157
}
158
```
159
160
### SQL Data Types
161
162
```scala { .api }
163
/**
164
* VectorUDT - User-defined type for vectors in SQL
165
* Enables storage and querying of vectors in DataFrames
166
*/
167
class VectorUDT extends UserDefinedType[Vector] {
168
def userClass: Class[Vector] = classOf[Vector]
169
def sqlType: DataType = StructType(Seq(
170
StructField("type", ByteType, nullable = false),
171
StructField("size", IntegerType, nullable = true),
172
StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true),
173
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)
174
))
175
def serialize(vector: Vector): InternalRow
176
def deserialize(datum: Any): Vector
177
}
178
179
/**
180
* MatrixUDT - User-defined type for matrices in SQL
181
* Enables storage and querying of matrices in DataFrames
182
*/
183
class MatrixUDT extends UserDefinedType[Matrix] {
184
def userClass: Class[Matrix] = classOf[Matrix]
185
def sqlType: DataType = StructType(Seq(
186
StructField("type", ByteType, nullable = false),
187
StructField("numRows", IntegerType, nullable = false),
188
StructField("numCols", IntegerType, nullable = false),
189
StructField("colPtrs", ArrayType(IntegerType, containsNull = false), nullable = true),
190
StructField("rowIndices", ArrayType(IntegerType, containsNull = false), nullable = true),
191
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = false),
192
StructField("isTransposed", BooleanType, nullable = false)
193
))
194
def serialize(matrix: Matrix): InternalRow
195
def deserialize(datum: Any): Matrix
196
}
197
198
/**
199
* SQLDataTypes - provides SQL data types for ML linear algebra
200
* Registers vector and matrix types for use in SQL queries
201
*/
202
object SQLDataTypes {
203
val VectorType: VectorUDT = new VectorUDT
204
val MatrixType: MatrixUDT = new MatrixUDT
205
}
206
```
207
208
### Distributed Linear Algebra (Legacy RDD-based)
209
210
```scala { .api }
211
/**
212
* RowMatrix - distributed matrix with row-oriented partitioning
213
* Each row is a local vector stored as RDD[Vector]
214
*/
215
class RowMatrix(
216
val rows: RDD[Vector],
217
private var nRows: Long = 0L,
218
private var nCols: Int = 0
219
) extends DistributedMatrix {
220
def numRows(): Long
221
def numCols(): Int
222
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary
223
def computeCovariance(): Matrix
224
def computeGramianMatrix(): Matrix
225
def computePrincipalComponents(k: Int): Matrix
226
def computeSVD(k: Int, computeU: Boolean = false, rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix]
227
def multiply(B: Matrix): RowMatrix
228
def columnSimilarities(): CoordinateMatrix
229
def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix]
230
}
231
232
/**
233
* IndexedRowMatrix - distributed matrix with indexed rows
234
* Each row has a long index and local vector: RDD[IndexedRow]
235
*/
236
class IndexedRowMatrix(
237
val rows: RDD[IndexedRow],
238
private var nRows: Long = 0L,
239
private var nCols: Int = 0
240
) extends DistributedMatrix {
241
def numRows(): Long
242
def numCols(): Int
243
def toRowMatrix(): RowMatrix
244
def toBlockMatrix(rowsPerBlock: Int = 1024, colsPerBlock: Int = 1024): BlockMatrix
245
def toCoordinateMatrix(): CoordinateMatrix
246
def computeGramianMatrix(): Matrix
247
def multiply(B: Matrix): IndexedRowMatrix
248
}
249
250
/**
251
* CoordinateMatrix - distributed matrix in coordinate format
252
* Stores entries as RDD[MatrixEntry] for sparse matrices
253
*/
254
class CoordinateMatrix(
255
val entries: RDD[MatrixEntry],
256
private var nRows: Long = 0L,
257
private var nCols: Long = 0L
258
) extends DistributedMatrix {
259
def numRows(): Long
260
def numCols(): Long
261
def toRowMatrix(): RowMatrix
262
def toIndexedRowMatrix(): IndexedRowMatrix
263
def toBlockMatrix(rowsPerBlock: Int = 1024, colsPerBlock: Int = 1024): BlockMatrix
264
def transpose(): CoordinateMatrix
265
}
266
267
/**
268
* BlockMatrix - distributed matrix in block format
269
* Partitions matrix into blocks for distributed linear algebra operations
270
*/
271
class BlockMatrix(
272
val blocks: RDD[((Int, Int), Matrix)],
273
val rowsPerBlock: Int,
274
val colsPerBlock: Int,
275
private var nRows: Long = 0L,
276
private var nCols: Long = 0L
277
) extends DistributedMatrix with Logging {
278
def numRows(): Long
279
def numCols(): Long
280
def numRowBlocks: Int
281
def numColBlocks: Int
282
def validate(): Unit
283
def cache(): this.type
284
def persist(storageLevel: StorageLevel): this.type
285
def toLocalMatrix(): Matrix
286
def toIndexedRowMatrix(): IndexedRowMatrix
287
def toCoordinateMatrix(): CoordinateMatrix
288
def transpose: BlockMatrix
289
def add(other: BlockMatrix): BlockMatrix
290
def subtract(other: BlockMatrix): BlockMatrix
291
def multiply(other: BlockMatrix): BlockMatrix
292
def simulateMultiply(other: BlockMatrix): BlockMatrix
293
}
294
295
// Supporting types
296
case class IndexedRow(index: Long, vector: Vector)
297
case class MatrixEntry(i: Long, j: Long, value: Double)
298
```
299
300
## Usage Examples
301
302
### Vector Operations
303
304
```scala
305
import org.apache.spark.ml.linalg.{Vector, Vectors}
306
307
// Create dense vector
308
val denseVec = Vectors.dense(1.0, 2.0, 3.0)
309
println(s"Dense vector: $denseVec")
310
311
// Create sparse vector
312
val sparseVec = Vectors.sparse(5, Array(0, 2, 4), Array(1.0, 3.0, 5.0))
313
println(s"Sparse vector: $sparseVec")
314
315
// Vector operations
316
val dotProduct = denseVec.dot(Vectors.dense(1.0, 1.0, 1.0))
317
val norm = Vectors.norm(denseVec, 2.0)
318
val squared = denseVec.squared
319
320
println(s"Dot product: $dotProduct")
321
println(s"L2 norm: $norm")
322
println(s"Element-wise square: $squared")
323
324
// Convert between formats
325
val sparseToDense = sparseVec.toDense
326
val denseToSparse = denseVec.toSparse
327
328
// Vector element access
329
println(s"First element: ${denseVec(0)}")
330
println(s"Vector size: ${denseVec.size}")
331
```
332
333
### Matrix Operations
334
335
```scala
336
import org.apache.spark.ml.linalg.{Matrix, Matrices}
337
338
// Create dense matrix (2x3, column-major)
339
val denseMat = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
340
println(s"Dense matrix:\n$denseMat")
341
342
// Create sparse matrix
343
val sparseMat = Matrices.sparse(3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1.0, 2.0, 3.0))
344
println(s"Sparse matrix:\n$sparseMat")
345
346
// Matrix operations
347
val transposed = denseMat.transpose
348
val element = denseMat(1, 0) // Row 1, Column 0
349
350
println(s"Transposed:\n$transposed")
351
println(s"Element at (1,0): $element")
352
353
// Identity matrix
354
val identity = Matrices.eye(3)
355
println(s"Identity matrix:\n$identity")
356
357
// Matrix from diagonal vector
358
val diagMat = Matrices.diag(Vectors.dense(1.0, 2.0, 3.0))
359
println(s"Diagonal matrix:\n$diagMat")
360
```
361
362
### Using Vectors in DataFrames
363
364
```scala
365
import org.apache.spark.sql.functions._
366
import org.apache.spark.ml.linalg.SQLDataTypes._
367
368
// Create DataFrame with vector column
369
val data = Seq(
370
("a", Vectors.dense(1.0, 2.0, 3.0)),
371
("b", Vectors.sparse(3, Array(0, 2), Array(4.0, 5.0)))
372
).toDF("id", "features")
373
374
data.show(false)
375
376
// Access vector elements in SQL
377
data.createOrReplaceTempView("vectors")
378
spark.sql("SELECT id, features FROM vectors").show(false)
379
380
// Vector operations in DataFrames
381
import org.apache.spark.ml.functions._
382
383
val withNorms = data.withColumn("norm", vector_to_array(col("features")))
384
withNorms.show(false)
385
```
386
387
### Distributed Matrix Operations (Legacy)
388
389
```scala
390
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRow, IndexedRowMatrix}
391
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
392
393
// Create distributed row matrix
394
val rows = sc.parallelize(Seq(
395
OldVectors.dense(1.0, 2.0, 3.0),
396
OldVectors.dense(4.0, 5.0, 6.0),
397
OldVectors.dense(7.0, 8.0, 9.0)
398
))
399
400
val rowMat = new RowMatrix(rows)
401
402
// Compute statistics
403
val summary = rowMat.computeColumnSummaryStatistics()
404
println(s"Column means: ${summary.mean}")
405
println(s"Column variances: ${summary.variance}")
406
407
// Compute PCA
408
val pc = rowMat.computePrincipalComponents(2)
409
println(s"Principal components:\n$pc")
410
411
// SVD decomposition
412
val svd = rowMat.computeSVD(2, computeU = true)
413
println(s"Singular values: ${svd.s}")
414
415
// Create indexed row matrix
416
val indexedRows = sc.parallelize(Seq(
417
IndexedRow(0L, OldVectors.dense(1.0, 2.0)),
418
IndexedRow(1L, OldVectors.dense(3.0, 4.0)),
419
IndexedRow(2L, OldVectors.dense(5.0, 6.0))
420
))
421
422
val indexedRowMat = new IndexedRowMatrix(indexedRows)
423
val blockMat = indexedRowMat.toBlockMatrix(2, 2)
424
425
println(s"Block matrix dimensions: ${blockMat.numRows()} x ${blockMat.numCols()}")
426
```
427
428
### Performance Considerations
429
430
```scala
431
// Sparse vectors for high-dimensional data with few non-zeros
432
val highDimSparse = Vectors.sparse(10000, Array(0, 5000, 9999), Array(1.0, 2.0, 3.0))
433
434
// Dense vectors for low-dimensional data or data with many non-zeros
435
val lowDimDense = Vectors.dense(Array.fill(100)(1.0))
436
437
// Memory-efficient sparse matrix for large sparse data
438
val sparseLargeMat = Matrices.sparse(1000, 1000,
439
Array.range(0, 1001), // Column pointers
440
Array.fill(1000)(scala.util.Random.nextInt(1000)), // Row indices
441
Array.fill(1000)(scala.util.Random.nextDouble()) // Values
442
)
443
444
// Use compressed format for better performance
445
val compressed = sparseVec.compressed
446
```
447
448
## Integration with ML Pipeline
449
450
```scala
451
import org.apache.spark.ml.feature.VectorAssembler
452
import org.apache.spark.ml.classification.LogisticRegression
453
454
// VectorAssembler creates vectors from DataFrame columns
455
val assembler = new VectorAssembler()
456
.setInputCols(Array("feature1", "feature2", "feature3"))
457
.setOutputCol("features")
458
459
val vectorData = assembler.transform(rawData)
460
461
// ML algorithms work with vector columns
462
val lr = new LogisticRegression()
463
.setFeaturesCol("features")
464
.setLabelCol("label")
465
466
val model = lr.fit(vectorData)
467
```