0
# Apache Spark MLlib
1
2
Apache Spark MLlib is a scalable machine learning library built on top of Apache Spark's distributed computing framework. It provides comprehensive machine learning capabilities through unified APIs for classification, regression, clustering, collaborative filtering, dimensionality reduction, and feature processing across large datasets.
3
4
## Package Information
5
6
- **Package Name**: spark-mllib_2.12
7
- **Package Type**: maven
8
- **Language**: Scala (with Java, Python, R APIs)
9
- **Version**: 2.4.8
10
- **Installation**: Add to your `build.sbt`: `libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.8"`
11
12
## Core Imports
13
14
```scala
15
// DataFrame-based API (recommended)
16
import org.apache.spark.ml._
17
import org.apache.spark.ml.classification._
18
import org.apache.spark.ml.regression._
19
import org.apache.spark.ml.clustering._
20
import org.apache.spark.ml.feature._
21
import org.apache.spark.ml.evaluation._
22
import org.apache.spark.ml.tuning._
23
import org.apache.spark.ml.linalg.{Vector, Vectors, Matrix, Matrices}
24
25
// RDD-based API (legacy)
26
import org.apache.spark.mllib.classification._
27
import org.apache.spark.mllib.regression._
28
import org.apache.spark.mllib.clustering._
29
import org.apache.spark.mllib.feature._
30
import org.apache.spark.mllib.linalg.{Vector => MLlibVector, Vectors => MLlibVectors}
31
32
// Spark SQL and core
33
import org.apache.spark.sql.{DataFrame, SparkSession}
34
import org.apache.spark.{SparkContext, SparkConf}
35
```
36
37
## Basic Usage
38
39
```scala
40
import org.apache.spark.sql.SparkSession
41
import org.apache.spark.ml.classification.LogisticRegression
42
import org.apache.spark.ml.feature.VectorAssembler
43
import org.apache.spark.ml.Pipeline
44
45
// Create Spark session
46
val spark = SparkSession.builder()
47
.appName("MLlib Example")
48
.getOrCreate()
49
50
import spark.implicits._
51
52
// Load data
53
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
54
55
// Create feature vector assembler
56
val assembler = new VectorAssembler()
57
.setInputCols(Array("feature1", "feature2", "feature3"))
58
.setOutputCol("features")
59
60
// Create classifier
61
val lr = new LogisticRegression()
62
.setMaxIter(10)
63
.setRegParam(0.3)
64
.setElasticNetParam(0.8)
65
66
// Create and fit pipeline
67
val pipeline = new Pipeline().setStages(Array(assembler, lr))
68
val model = pipeline.fit(data)
69
70
// Make predictions
71
val predictions = model.transform(data)
72
predictions.select("features", "label", "probability", "prediction").show()
73
```
74
75
## Architecture
76
77
MLlib provides two API layers:
78
79
### DataFrame-based API (ml package) - Recommended
80
Modern, high-level API built on Spark DataFrames with:
81
- **Pipeline Framework**: Chain transformers and estimators
82
- **Parameter Management**: Type-safe parameter handling
83
- **ML Persistence**: Save and load models
84
- **Integration**: Seamless DataFrame operations
85
86
### RDD-based API (mllib package) - Legacy
87
Lower-level API built on Spark RDDs:
88
- **Direct Algorithm Access**: Training methods on RDDs
89
- **Distributed Linear Algebra**: Matrix operations
90
- **Maintenance Mode**: Receives bug fixes only
91
92
## Capabilities
93
94
### Core Framework
95
96
Pipeline-based machine learning with standardized interfaces for building and deploying ML workflows.
97
98
```scala { .api }
99
abstract class PipelineStage extends Params with Logging {
100
def copy(extra: ParamMap): PipelineStage
101
def transformSchema(schema: StructType): StructType
102
}
103
104
abstract class Estimator[M <: Model[M]] extends PipelineStage {
105
def fit(dataset: Dataset[_]): M
106
def fit(dataset: Dataset[_], paramMap: ParamMap): M
107
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[M]
108
}
109
110
abstract class Transformer extends PipelineStage {
111
def transform(dataset: Dataset[_]): DataFrame
112
}
113
114
class Pipeline(val uid: String) extends Estimator[PipelineModel] {
115
def setStages(value: Array[PipelineStage]): Pipeline
116
def getStages: Array[PipelineStage]
117
def fit(dataset: Dataset[_]): PipelineModel
118
}
119
```
120
121
[Core Framework](./core-framework.md)
122
123
### Classification
124
125
Supervised learning algorithms for predicting categorical labels including logistic regression, decision trees, random forests, gradient boosted trees, naive Bayes, neural networks, and support vector machines.
126
127
```scala { .api }
128
class LogisticRegression(override val uid: String) extends Classifier[Vector, LogisticRegression, LogisticRegressionModel]
129
with LogisticRegressionParams with DefaultParamsWritable {
130
131
def setRegParam(value: Double): LogisticRegression
132
def setElasticNetParam(value: Double): LogisticRegression
133
def setMaxIter(value: Int): LogisticRegression
134
def setTol(value: Double): LogisticRegression
135
def setFitIntercept(value: Boolean): LogisticRegression
136
def setThreshold(value: Double): LogisticRegression
137
def setThresholds(value: Array[Double]): LogisticRegression
138
}
139
140
class RandomForestClassifier(override val uid: String) extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel]
141
with RandomForestClassifierParams with DefaultParamsWritable {
142
143
def setNumTrees(value: Int): RandomForestClassifier
144
def setMaxDepth(value: Int): RandomForestClassifier
145
def setMinInstancesPerNode(value: Int): RandomForestClassifier
146
def setFeatureSubsetStrategy(value: String): RandomForestClassifier
147
}
148
```
149
150
[Classification](./classification.md)
151
152
### Regression
153
154
Supervised learning algorithms for predicting continuous values including linear regression, generalized linear models, decision trees, random forests, gradient boosted trees, and survival regression.
155
156
```scala { .api }
157
class LinearRegression(override val uid: String) extends Regressor[Vector, LinearRegression, LinearRegressionModel]
158
with LinearRegressionParams with DefaultParamsWritable {
159
160
def setRegParam(value: Double): LinearRegression
161
def setElasticNetParam(value: Double): LinearRegression
162
def setMaxIter(value: Int): LinearRegression
163
def setTol(value: Double): LinearRegression
164
def setFitIntercept(value: Boolean): LinearRegression
165
def setSolver(value: String): LinearRegression
166
}
167
168
class RandomForestRegressor(override val uid: String) extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel]
169
with RandomForestRegressorParams with DefaultParamsWritable {
170
171
def setNumTrees(value: Int): RandomForestRegressor
172
def setMaxDepth(value: Int): RandomForestRegressor
173
def setMinInstancesPerNode(value: Int): RandomForestRegressor
174
}
175
```
176
177
[Regression](./regression.md)
178
179
### Clustering
180
181
Unsupervised learning algorithms for discovering data patterns including K-means, bisecting K-means, Gaussian mixture models, latent Dirichlet allocation, and power iteration clustering.
182
183
```scala { .api }
184
class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams with DefaultParamsWritable {
185
def setK(value: Int): KMeans
186
def setMaxIter(value: Int): KMeans
187
def setTol(value: Double): KMeans
188
def setInitMode(value: String): KMeans
189
def setInitSteps(value: Int): KMeans
190
def setSeed(value: Long): KMeans
191
}
192
193
class GaussianMixture(override val uid: String) extends Estimator[GaussianMixtureModel]
194
with GaussianMixtureParams with DefaultParamsWritable {
195
196
def setK(value: Int): GaussianMixture
197
def setMaxIter(value: Int): GaussianMixture
198
def setTol(value: Double): GaussianMixture
199
def setSeed(value: Long): GaussianMixture
200
}
201
```
202
203
[Clustering](./clustering.md)
204
205
### Feature Processing
206
207
Comprehensive feature engineering including scaling, normalization, encoding, selection, extraction, and transformation for preparing data for machine learning.
208
209
```scala { .api }
210
class VectorAssembler(override val uid: String) extends Transformer
211
with HasInputCols with HasOutputCol with DefaultParamsWritable {
212
213
def setInputCols(value: Array[String]): VectorAssembler
214
def setOutputCol(value: String): VectorAssembler
215
def transform(dataset: Dataset[_]): DataFrame
216
}
217
218
class StandardScaler(override val uid: String) extends Estimator[StandardScalerModel]
219
with StandardScalerParams with DefaultParamsWritable {
220
221
def setWithMean(value: Boolean): StandardScaler
222
def setWithStd(value: Boolean): StandardScaler
223
def fit(dataset: Dataset[_]): StandardScalerModel
224
}
225
```
226
227
[Feature Processing](./feature-processing.md)
228
229
### Model Evaluation
230
231
Comprehensive evaluation metrics and validation techniques including cross-validation, train-validation split, and specialized evaluators for different task types.
232
233
```scala { .api }
234
class BinaryClassificationEvaluator(override val uid: String) extends Evaluator
235
with HasLabelCol with HasRawPredictionCol with DefaultParamsWritable {
236
237
def setMetricName(value: String): BinaryClassificationEvaluator
238
def evaluate(dataset: Dataset[_]): Double
239
}
240
241
class CrossValidator(override val uid: String) extends Estimator[CrossValidatorModel]
242
with CrossValidatorParams with MLWritable {
243
244
def setEstimator(value: Estimator[_]): CrossValidator
245
def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
246
def setEvaluator(value: Evaluator): CrossValidator
247
def setNumFolds(value: Int): CrossValidator
248
}
249
```
250
251
[Model Evaluation](./evaluation.md)
252
253
### Recommendation
254
255
Collaborative filtering algorithms for building recommendation systems using alternating least squares matrix factorization.
256
257
```scala { .api }
258
class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {
259
def setRank(value: Int): ALS
260
def setMaxIter(value: Int): ALS
261
def setRegParam(value: Double): ALS
262
def setAlpha(value: Double): ALS
263
def setImplicitPrefs(value: Boolean): ALS
264
def setNonnegative(value: Boolean): ALS
265
}
266
```
267
268
[Recommendation](./recommendation.md)
269
270
### Frequent Pattern Mining
271
272
Algorithms for discovering frequent patterns and association rules in transactional data including FP-Growth and PrefixSpan.
273
274
```scala { .api }
275
class FPGrowth(override val uid: String) extends Estimator[FPGrowthModel]
276
with FPGrowthParams with DefaultParamsWritable {
277
278
def setMinSupport(value: Double): FPGrowth
279
def setNumPartitions(value: Int): FPGrowth
280
def setMinConfidence(value: Double): FPGrowth
281
}
282
```
283
284
[Frequent Pattern Mining](./frequent-pattern-mining.md)
285
286
### Linear Algebra
287
288
Distributed linear algebra operations with support for dense and sparse vectors and matrices, plus specialized distributed matrix types.
289
290
```scala { .api }
291
trait Vector extends Serializable {
292
def size: Int
293
def toArray: Array[Double]
294
def apply(i: Int): Double
295
def copy: Vector
296
def foreachActive(f: (Int, Double) => Unit): Unit
297
}
298
299
object Vectors {
300
def dense(values: Array[Double]): Vector
301
def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
302
def zeros(size: Int): Vector
303
def norm(vector: Vector, p: Double): Double
304
}
305
```
306
307
[Linear Algebra](./linear-algebra.md)
308
309
### RDD-based API (Legacy)
310
311
Original MLlib API built on RDDs, providing direct algorithm access and distributed linear algebra operations.
312
313
```scala { .api }
314
// Classification
315
object LogisticRegressionWithLBFGS {
316
def train(input: RDD[LabeledPoint], numClasses: Int): LogisticRegressionModel
317
}
318
319
// Clustering
320
object KMeans {
321
def train(data: RDD[org.apache.spark.mllib.linalg.Vector],
322
k: Int, maxIterations: Int): KMeansModel
323
}
324
```
325
326
[RDD-based API](./rdd-api.md)
327
328
## Types
329
330
```scala { .api }
331
// Core parameter types
332
trait Param[T] extends Serializable {
333
def name: String
334
def doc: String
335
def parent: String
336
}
337
338
class ParamMap extends Serializable {
339
def put[T](param: Param[T], value: T): ParamMap
340
def get[T](param: Param[T]): Option[T]
341
def apply[T](param: Param[T]): T
342
}
343
344
case class ParamPair[T](param: Param[T], value: T)
345
346
// Model summary types
347
abstract class TrainingSummary extends Serializable {
348
def predictions: DataFrame
349
def predictionCol: String
350
def labelCol: String
351
def featuresCol: String
352
}
353
354
// Linear algebra types (ml.linalg)
355
sealed trait Vector extends Serializable
356
case class DenseVector(values: Array[Double]) extends Vector
357
case class SparseVector(size: Int, indices: Array[Int], values: Array[Double]) extends Vector
358
359
sealed trait Matrix extends Serializable
360
case class DenseMatrix(numRows: Int, numCols: Int, values: Array[Double],
361
isTransposed: Boolean = false) extends Matrix
362
case class SparseMatrix(numRows: Int, numCols: Int, colPtrs: Array[Int],
363
rowIndices: Array[Int], values: Array[Double],
364
isTransposed: Boolean = false) extends Matrix
365
```