0
# Recommendation
1
2
MLlib provides collaborative filtering algorithms for building recommendation systems using matrix factorization techniques. The primary algorithm is Alternating Least Squares (ALS) for both explicit and implicit feedback scenarios.
3
4
## Alternating Least Squares (ALS)
5
6
### Estimator
7
8
```scala { .api }
9
class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {
10
def this() = this(Identifiable.randomUID("als"))
11
12
def setRank(value: Int): ALS
13
def setMaxIter(value: Int): ALS
14
def setRegParam(value: Double): ALS
15
def setImplicitPrefs(value: Boolean): ALS
16
def setAlpha(value: Double): ALS
17
def setNonnegative(value: Boolean): ALS
18
def setCheckpointInterval(value: Int): ALS
19
def setSeed(value: Long): ALS
20
def setUserCol(value: String): ALS
21
def setItemCol(value: String): ALS
22
def setRatingCol(value: String): ALS
23
def setPredictionCol(value: String): ALS
24
def setColdStartStrategy(value: String): ALS
25
def setNumUserBlocks(value: Int): ALS
26
def setNumItemBlocks(value: Int): ALS
27
def setIntermediateStorageLevel(value: StorageLevel): ALS
28
def setFinalStorageLevel(value: StorageLevel): ALS
29
def setBlockSize(value: Int): ALS
30
31
override def fit(dataset: Dataset[_]): ALSModel
32
override def copy(extra: ParamMap): ALS
33
}
34
```
35
36
### Model
37
38
```scala { .api }
39
class ALSModel(override val uid: String, val rank: Int, val userFactors: DataFrame, val itemFactors: DataFrame)
40
extends Model[ALSModel] with ALSModelParams with MLWritable {
41
42
def setUserCol(value: String): ALSModel
43
def setItemCol(value: String): ALSModel
44
def setPredictionCol(value: String): ALSModel
45
def setColdStartStrategy(value: String): ALSModel
46
47
override def transform(dataset: Dataset[_]): DataFrame
48
def recommendForAllUsers(numItems: Int): DataFrame
49
def recommendForAllItems(numUsers: Int): DataFrame
50
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
51
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
52
override def copy(extra: ParamMap): ALSModel
53
def write: MLWriter
54
}
55
```
56
57
### Parameters Traits
58
59
```scala { .api }
60
trait ALSParams extends Params with HasMaxIter with HasRegParam with HasPredictionCol
61
with HasCheckpointInterval with HasSeed {
62
63
final val rank: IntParam
64
final val numUserBlocks: IntParam
65
final val numItemBlocks: IntParam
66
final val implicitPrefs: BooleanParam
67
final val alpha: DoubleParam
68
final val userCol: Param[String]
69
final val itemCol: Param[String]
70
final val ratingCol: Param[String]
71
final val nonnegative: BooleanParam
72
final val intermediateStorageLevel: Param[String]
73
final val finalStorageLevel: Param[String]
74
final val coldStartStrategy: Param[String]
75
final val blockSize: IntParam
76
77
def getRank: Int
78
def getNumUserBlocks: Int
79
def getNumItemBlocks: Int
80
def getImplicitPrefs: Boolean
81
def getAlpha: Double
82
def getUserCol: String
83
def getItemCol: String
84
def getRatingCol: String
85
def getNonnegative: Boolean
86
def getIntermediateStorageLevel: String
87
def getFinalStorageLevel: String
88
def getColdStartStrategy: String
89
def getBlockSize: Int
90
}
91
92
trait ALSModelParams extends ALSParams {
93
final val coldStartStrategy: Param[String]
94
def getColdStartStrategy: String
95
}
96
```
97
98
## Usage Examples
99
100
### Basic Collaborative Filtering with Explicit Ratings
101
102
```scala
103
import org.apache.spark.ml.recommendation.ALS
104
import org.apache.spark.sql.functions._
105
106
// Sample ratings data (user, item, rating)
107
val ratings = spark.createDataFrame(Seq(
108
(1, 1, 5.0),
109
(1, 2, 4.0),
110
(1, 3, 1.0),
111
(2, 1, 3.0),
112
(2, 2, 2.0),
113
(2, 4, 4.0),
114
(3, 2, 5.0),
115
(3, 3, 4.0),
116
(3, 4, 3.0)
117
)).toDF("user", "item", "rating")
118
119
// Create ALS model for explicit feedback
120
val als = new ALS()
121
.setMaxIter(20)
122
.setRegParam(0.1)
123
.setRank(10)
124
.setUserCol("user")
125
.setItemCol("item")
126
.setRatingCol("rating")
127
.setColdStartStrategy("drop") // Handle new users/items
128
.setSeed(42)
129
130
// Split data for evaluation
131
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)
132
133
// Train model
134
val model = als.fit(training)
135
136
// Make predictions on test set
137
val predictions = model.transform(test)
138
predictions.show()
139
140
// Evaluate model using RMSE
141
import org.apache.spark.ml.evaluation.RegressionEvaluator
142
143
val evaluator = new RegressionEvaluator()
144
.setMetricName("rmse")
145
.setLabelCol("rating")
146
.setPredictionCol("prediction")
147
148
val rmse = evaluator.evaluate(predictions)
149
println(s"Root-mean-square error = $rmse")
150
```
151
152
### Implicit Feedback Recommendation
153
154
```scala
155
// Implicit feedback data (user, item, confidence/count)
156
val implicitData = spark.createDataFrame(Seq(
157
(1, 101, 3.0), // User 1 interacted with item 101, 3 times
158
(1, 102, 5.0), // User 1 interacted with item 102, 5 times
159
(1, 103, 1.0),
160
(2, 101, 2.0),
161
(2, 104, 4.0),
162
(3, 102, 6.0),
163
(3, 103, 2.0),
164
(3, 104, 1.0)
165
)).toDF("user", "item", "confidence")
166
167
// ALS for implicit feedback
168
val implicitALS = new ALS()
169
.setMaxIter(15)
170
.setRegParam(0.01)
171
.setRank(5)
172
.setImplicitPrefs(true) // Enable implicit feedback mode
173
.setAlpha(1.0) // Confidence scaling parameter
174
.setUserCol("user")
175
.setItemCol("item")
176
.setRatingCol("confidence")
177
.setColdStartStrategy("drop")
178
.setSeed(42)
179
180
val implicitModel = implicitALS.fit(implicitData)
181
182
// Generate recommendations for all users
183
val userRecs = implicitModel.recommendForAllUsers(3) // Top 3 items per user
184
userRecs.show(truncate = false)
185
186
// Generate recommendations for all items
187
val itemRecs = implicitModel.recommendForAllItems(2) // Top 2 users per item
188
itemRecs.show(truncate = false)
189
```
190
191
### Advanced Recommendation Scenarios
192
193
```scala
194
// Generate recommendations for specific users
195
val specificUsers = spark.createDataFrame(Seq(
196
(1,), (2,)
197
)).toDF("user")
198
199
val userSubsetRecs = implicitModel.recommendForUserSubset(specificUsers, 5)
200
println("Recommendations for specific users:")
201
userSubsetRecs.show(truncate = false)
202
203
// Generate recommendations for specific items
204
val specificItems = spark.createDataFrame(Seq(
205
(101,), (102,)
206
)).toDF("item")
207
208
val itemSubsetRecs = implicitModel.recommendForItemSubset(specificItems, 3)
209
println("Recommendations for specific items:")
210
itemSubsetRecs.show(truncate = false)
211
212
// Access learned factors
213
println("User factors:")
214
model.userFactors.show(5)
215
216
println("Item factors:")
217
model.itemFactors.show(5)
218
219
// Model parameters
220
println(s"Model rank: ${model.rank}")
221
println(s"User factors count: ${model.userFactors.count()}")
222
println(s"Item factors count: ${model.itemFactors.count()}")
223
```
224
225
### Hyperparameter Tuning for Recommendations
226
227
```scala
228
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
229
230
// Create parameter grid for ALS
231
val alsForTuning = new ALS()
232
.setUserCol("user")
233
.setItemCol("item")
234
.setRatingCol("rating")
235
.setColdStartStrategy("drop")
236
.setSeed(42)
237
238
val paramGrid = new ParamGridBuilder()
239
.addGrid(alsForTuning.rank, Array(5, 10, 15))
240
.addGrid(alsForTuning.regParam, Array(0.01, 0.1, 1.0))
241
.addGrid(alsForTuning.maxIter, Array(10, 15, 20))
242
.build()
243
244
val evaluator = new RegressionEvaluator()
245
.setMetricName("rmse")
246
.setLabelCol("rating")
247
.setPredictionCol("prediction")
248
249
val cv = new CrossValidator()
250
.setEstimator(alsForTuning)
251
.setEvaluator(evaluator)
252
.setEstimatorParamMaps(paramGrid)
253
.setNumFolds(3)
254
.setParallelism(2)
255
256
// Note: Cross-validation for ALS can be expensive
257
// Consider using TrainValidationSplit for faster tuning
258
import org.apache.spark.ml.tuning.TrainValidationSplit
259
260
val tvs = new TrainValidationSplit()
261
.setEstimator(alsForTuning)
262
.setEvaluator(evaluator)
263
.setEstimatorParamMaps(paramGrid)
264
.setTrainRatio(0.8)
265
.setSeed(42)
266
267
val bestModel = tvs.fit(training)
268
val bestALSModel = bestModel.bestModel.asInstanceOf[ALSModel]
269
270
println(s"Best rank: ${bestALSModel.rank}")
271
println(s"Best RMSE: ${bestModel.validationMetrics.min}")
272
```
273
274
### Cold Start Problem Handling
275
276
```scala
277
// Data with new users and items not seen during training
278
val testWithNewUsers = spark.createDataFrame(Seq(
279
(999, 1, 0.0), // New user 999
280
(1, 999, 0.0), // New item 999
281
(1, 2, 4.0) // Known user-item pair
282
)).toDF("user", "item", "rating")
283
284
// Different cold start strategies
285
val alsWithDrop = new ALS()
286
.setColdStartStrategy("drop") // Drop rows with new users/items
287
.setUserCol("user")
288
.setItemCol("item")
289
.setRatingCol("rating")
290
291
val alsWithNaN = new ALS()
292
.setColdStartStrategy("nan") // Predict NaN for new users/items
293
.setUserCol("user")
294
.setItemCol("item")
295
.setRatingCol("rating")
296
297
val modelDrop = alsWithDrop.fit(training)
298
val modelNaN = alsWithNaN.fit(training)
299
300
println("Predictions with 'drop' strategy:")
301
val predictionsDrop = modelDrop.transform(testWithNewUsers)
302
predictionsDrop.show()
303
304
println("Predictions with 'nan' strategy:")
305
val predictionsNaN = modelNaN.transform(testWithNewUsers)
306
predictionsNaN.show()
307
308
// Handle NaN predictions manually
309
val predictionsHandled = predictionsNaN
310
.withColumn("handled_prediction",
311
when(col("prediction").isNaN, 0.0) // Default to 0 for unknown
312
.otherwise(col("prediction")))
313
314
predictionsHandled.show()
315
```
316
317
### Evaluation Metrics for Recommendations
318
319
```scala
320
import org.apache.spark.sql.functions._
321
322
// Precision@K and Recall@K for implicit feedback
323
def evaluateRecommendations(predictions: DataFrame,
324
actualRatings: DataFrame,
325
k: Int = 10,
326
relevanceThreshold: Double = 3.0): Unit = {
327
328
// Get top-K recommendations per user
329
val topK = predictions
330
.filter(col("prediction") >= relevanceThreshold)
331
.withColumn("rank", row_number().over(
332
Window.partitionBy("user").orderBy(desc("prediction"))))
333
.filter(col("rank") <= k)
334
335
// Get actual relevant items per user
336
val actualRelevant = actualRatings
337
.filter(col("rating") >= relevanceThreshold)
338
.select("user", "item")
339
340
// Calculate precision and recall per user
341
val metrics = topK
342
.join(actualRelevant, Seq("user", "item"), "left")
343
.groupBy("user")
344
.agg(
345
count("*").alias("recommended_count"),
346
sum(when(actualRelevant("item").isNotNull, 1).otherwise(0)).alias("relevant_recommended")
347
)
348
.join(
349
actualRelevant.groupBy("user").count().alias("actual_relevant_count"),
350
"user"
351
)
352
.withColumn("precision", col("relevant_recommended") / col("recommended_count"))
353
.withColumn("recall", col("relevant_recommended") / col("actual_relevant_count"))
354
.withColumn("f1", (2 * col("precision") * col("recall")) / (col("precision") + col("recall")))
355
356
println(s"Evaluation Metrics (K=$k, threshold=$relevanceThreshold):")
357
metrics.select(
358
mean("precision").alias("avg_precision"),
359
mean("recall").alias("avg_recall"),
360
mean("f1").alias("avg_f1")
361
).show()
362
}
363
364
// Coverage metrics
365
def calculateCoverage(recommendations: DataFrame, totalItems: Long): Double = {
366
val recommendedItems = recommendations
367
.select("recommendations.item")
368
.distinct()
369
.count()
370
371
recommendedItems.toDouble / totalItems
372
}
373
374
// Diversity metrics (intra-list diversity)
375
def calculateDiversity(model: ALSModel): DataFrame = {
376
val itemFeatures = model.itemFactors
377
.select("id", "features")
378
.rdd
379
.map { row =>
380
val id = row.getAs[Int]("id")
381
val features = row.getAs[DenseVector]("features")
382
(id, features)
383
}
384
.collectAsMap()
385
386
// Calculate pairwise cosine similarities
387
// Implementation would depend on specific diversity requirements
388
spark.emptyDataFrame // Placeholder
389
}
390
391
// Usage
392
val userRecs = model.recommendForAllUsers(10)
393
val totalItems = ratings.select("item").distinct().count()
394
val coverage = calculateCoverage(userRecs, totalItems)
395
println(s"Catalog coverage: ${coverage * 100}%")
396
```
397
398
### Production Recommendation Pipeline
399
400
```scala
401
import org.apache.spark.ml.Pipeline
402
import org.apache.spark.ml.feature.StringIndexer
403
404
// Production pipeline with string indexing for user/item IDs
405
val userIndexer = new StringIndexer()
406
.setInputCol("user_id")
407
.setOutputCol("user")
408
.setHandleInvalid("keep")
409
410
val itemIndexer = new StringIndexer()
411
.setInputCol("item_id")
412
.setOutputCol("item")
413
.setHandleInvalid("keep")
414
415
val als = new ALS()
416
.setMaxIter(10)
417
.setRegParam(0.1)
418
.setRank(10)
419
.setUserCol("user")
420
.setItemCol("item")
421
.setRatingCol("rating")
422
.setColdStartStrategy("drop")
423
424
val pipeline = new Pipeline()
425
.setStages(Array(userIndexer, itemIndexer, als))
426
427
// Fit pipeline
428
val pipelineModel = pipeline.fit(trainingData)
429
430
// Generate batch recommendations
431
val batchRecs = pipelineModel
432
.stages.last.asInstanceOf[ALSModel]
433
.recommendForAllUsers(20)
434
435
// Save recommendations for serving
436
batchRecs
437
.write
438
.mode("overwrite")
439
.option("path", "recommendations/batch")
440
.saveAsTable("user_recommendations")
441
442
// Real-time prediction function
443
def predictRating(userId: String, itemId: String): Double = {
444
val input = spark.createDataFrame(Seq(
445
(userId, itemId, 0.0) // Rating doesn't matter for prediction
446
)).toDF("user_id", "item_id", "rating")
447
448
val prediction = pipelineModel.transform(input)
449
prediction.select("prediction").first().getDouble(0)
450
}
451
452
// Model persistence
453
pipelineModel.write.overwrite().save("models/als_pipeline")
454
455
// Load model for serving
456
val loadedModel = PipelineModel.load("models/als_pipeline")
457
```
458
459
### Advanced ALS Configuration
460
461
```scala
462
import org.apache.spark.storage.StorageLevel
463
464
// Optimized ALS for large-scale data
465
val optimizedALS = new ALS()
466
.setRank(50)
467
.setMaxIter(20)
468
.setRegParam(0.01)
469
.setImplicitPrefs(true)
470
.setAlpha(1.0)
471
.setUserCol("user")
472
.setItemCol("item")
473
.setRatingCol("rating")
474
475
// Performance optimizations
476
.setNumUserBlocks(100) // Increase for more users
477
.setNumItemBlocks(100) // Increase for more items
478
.setBlockSize(4096) // Larger blocks for better performance
479
.setCheckpointInterval(10) // Checkpoint every 10 iterations
480
481
// Storage optimization
482
.setIntermediateStorageLevel(StorageLevel.MEMORY_AND_DISK)
483
.setFinalStorageLevel(StorageLevel.MEMORY_AND_DISK)
484
485
// Constraints
486
.setNonnegative(true) // Non-negative matrix factorization
487
488
.setSeed(42)
489
490
val optimizedModel = optimizedALS.fit(largeDataset)
491
492
// Monitor training progress
493
println(s"Model trained with rank: ${optimizedModel.rank}")
494
println(s"Number of user factors: ${optimizedModel.userFactors.count()}")
495
println(s"Number of item factors: ${optimizedModel.itemFactors.count()}")
496
```