0
# Recommendation
1
2
Collaborative filtering algorithms for building recommendation engines, including matrix factorization techniques optimized for large-scale user-item interaction datasets.
3
4
## Capabilities
5
6
### Alternating Least Squares (ALS)
7
8
Matrix factorization algorithm using alternating least squares optimization for collaborative filtering recommendation systems.
9
10
```scala { .api }
11
/**
12
* Alternating Least Squares collaborative filtering
13
*/
14
class ALS extends Estimator[ALSModel] with ALSParams {
15
def setRank(value: Int): this.type
16
def setMaxIter(value: Int): this.type
17
def setRegParam(value: Double): this.type
18
def setImplicitPrefs(value: Boolean): this.type
19
def setAlpha(value: Double): this.type
20
def setUserCol(value: String): this.type
21
def setItemCol(value: String): this.type
22
def setRatingCol(value: String): this.type
23
def setPredictionCol(value: String): this.type
24
def setNonnegative(value: Boolean): this.type
25
def setCheckpointInterval(value: Int): this.type
26
def setSeed(value: Long): this.type
27
def setNumUserBlocks(value: Int): this.type
28
def setNumItemBlocks(value: Int): this.type
29
def setIntermediateRDDStorageLevel(value: StorageLevel): this.type
30
def setFinalRDDStorageLevel(value: StorageLevel): this.type
31
def setColdStartStrategy(value: String): this.type
32
def setBlockSize(value: Int): this.type
33
}
34
35
class ALSModel extends Model[ALSModel] with ALSParams {
36
def rank: Int
37
def userFactors: DataFrame
38
def itemFactors: DataFrame
39
def recommendForAllUsers(numItems: Int): DataFrame
40
def recommendForAllItems(numUsers: Int): DataFrame
41
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
42
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
43
}
44
```
45
46
**Usage Example:**
47
48
```scala
49
import org.apache.spark.ml.recommendation.ALS
50
51
// Prepare training data from ratings
52
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
53
54
// Build the recommendation model using ALS on the training data
55
val als = new ALS()
56
.setMaxIter(5)
57
.setRegParam(0.01)
58
.setUserCol("userId")
59
.setItemCol("movieId")
60
.setRatingCol("rating")
61
.setColdStartStrategy("drop")
62
63
val model = als.fit(training)
64
65
// Evaluate the model by computing the RMSE on the test data
66
val predictions = model.transform(test)
67
68
val evaluator = new RegressionEvaluator()
69
.setMetricName("rmse")
70
.setLabelCol("rating")
71
.setPredictionCol("prediction")
72
73
val rmse = evaluator.evaluate(predictions)
74
println(s"Root-mean-square error = $rmse")
75
76
// Generate top 10 movie recommendations for each user
77
val userRecs = model.recommendForAllUsers(10)
78
userRecs.show()
79
80
// Generate top 10 user recommendations for each movie
81
val movieRecs = model.recommendForAllItems(10)
82
movieRecs.show()
83
```
84
85
### Advanced ALS Features
86
87
Extended functionality for handling various recommendation scenarios and optimization strategies.
88
89
```scala { .api }
90
/**
91
* ALS model with advanced recommendation capabilities
92
*/
93
class ALSModel extends Model[ALSModel] with ALSParams {
94
/**
95
* Recommend items for a specific user
96
*/
97
def recommendForUsers(users: Dataset[_], numItems: Int): DataFrame
98
99
/**
100
* Recommend users for specific items
101
*/
102
def recommendForItems(items: Dataset[_], numUsers: Int): DataFrame
103
104
/**
105
* Transform dataset with predictions
106
*/
107
override def transform(dataset: Dataset[_]): DataFrame
108
109
/**
110
* Access to latent factor matrices
111
*/
112
def userFactors: DataFrame
113
def itemFactors: DataFrame
114
115
/**
116
* Model persistence
117
*/
118
override def write: MLWriter
119
}
120
121
/**
122
* Rating data structure for recommendations
123
*/
124
case class Rating(user: Int, item: Int, rating: Float) extends Product with Serializable
125
```
126
127
### Implicit Feedback Handling
128
129
Specialized configurations for handling implicit feedback data where ratings represent confidence rather than explicit preferences.
130
131
```scala { .api }
132
/**
133
* ALS configuration for implicit feedback scenarios
134
*/
135
object ALSImplicit {
136
/**
137
* Create ALS instance optimized for implicit feedback
138
*/
139
def create(): ALS = {
140
new ALS()
141
.setImplicitPrefs(true)
142
.setAlpha(1.0) // Confidence scaling parameter
143
.setNonnegative(true) // Ensure non-negative factors
144
}
145
146
/**
147
* Preprocess implicit feedback data
148
*/
149
def preprocessImplicitData(
150
interactions: DataFrame,
151
userCol: String = "userId",
152
itemCol: String = "itemId",
153
countCol: String = "count"
154
): DataFrame
155
}
156
```
157
158
### Cold Start Strategies
159
160
Methods for handling new users or items that weren't present during training.
161
162
```scala { .api }
163
/**
164
* Cold start handling strategies
165
*/
166
object ColdStartStrategy {
167
val NaN = "nan" // Return NaN for cold start cases
168
val DROP = "drop" // Drop cold start cases from predictions
169
}
170
171
/**
172
* Cold start recommendation utilities
173
*/
174
object ColdStartRecommender {
175
/**
176
* Generate recommendations for new users based on popular items
177
*/
178
def recommendForNewUsers(
179
model: ALSModel,
180
popularItems: DataFrame,
181
numRecommendations: Int
182
): DataFrame
183
184
/**
185
* Generate recommendations for new items based on similar items
186
*/
187
def recommendForNewItems(
188
model: ALSModel,
189
itemFeatures: DataFrame,
190
numRecommendations: Int
191
): DataFrame
192
}
193
```
194
195
### Recommendation Evaluation
196
197
Specialized evaluation metrics for recommendation systems.
198
199
```scala { .api }
200
/**
201
* Recommendation-specific evaluation metrics
202
*/
203
class RecommendationEvaluator extends Evaluator {
204
def setUserCol(value: String): this.type
205
def setItemCol(value: String): this.type
206
def setRatingCol(value: String): this.type
207
def setPredictionCol(value: String): this.type
208
def setMetricName(value: String): this.type
209
def setK(value: Int): this.type
210
def setColdStartStrategy(value: String): this.type
211
}
212
213
/**
214
* Ranking-based evaluation for recommendations
215
*/
216
object RecommendationMetrics {
217
/**
218
* Calculate Mean Average Precision at K
219
*/
220
def meanAveragePrecisionAtK(
221
predictions: DataFrame,
222
k: Int,
223
userCol: String = "userId",
224
itemCol: String = "itemId",
225
ratingCol: String = "rating"
226
): Double
227
228
/**
229
* Calculate Normalized Discounted Cumulative Gain at K
230
*/
231
def ndcgAtK(
232
predictions: DataFrame,
233
k: Int,
234
userCol: String = "userId",
235
itemCol: String = "itemId",
236
ratingCol: String = "rating"
237
): Double
238
239
/**
240
* Calculate precision and recall at K
241
*/
242
def precisionRecallAtK(
243
predictions: DataFrame,
244
k: Int,
245
userCol: String = "userId",
246
itemCol: String = "itemId",
247
ratingCol: String = "rating"
248
): (Double, Double)
249
}
250
```
251
252
### Hyperparameter Tuning for ALS
253
254
Specialized parameter tuning strategies for collaborative filtering models.
255
256
```scala { .api }
257
/**
258
* ALS hyperparameter tuning utilities
259
*/
260
object ALSTuning {
261
/**
262
* Create parameter grid for ALS tuning
263
*/
264
def createParamGrid(
265
als: ALS,
266
ranks: Array[Int] = Array(10, 50, 100),
267
regParams: Array[Double] = Array(0.01, 0.1, 1.0),
268
alphas: Array[Double] = Array(1.0, 10.0, 40.0)
269
): Array[ParamMap]
270
271
/**
272
* Cross-validation for recommendation systems with temporal splitting
273
*/
274
def temporalCrossValidation(
275
als: ALS,
276
data: DataFrame,
277
evaluator: Evaluator,
278
paramGrid: Array[ParamMap],
279
timestampCol: String = "timestamp"
280
): CrossValidatorModel
281
}
282
```
283
284
### Distributed Training Optimization
285
286
Configuration options for optimizing ALS training on large-scale distributed datasets.
287
288
```scala { .api }
289
/**
290
* ALS distributed training configuration
291
*/
292
object ALSDistributedTraining {
293
/**
294
* Optimize ALS for large-scale distributed training
295
*/
296
def optimizeForScale(
297
als: ALS,
298
numUsers: Long,
299
numItems: Long,
300
numPartitions: Int
301
): ALS = {
302
val userBlocks = math.min(numPartitions, math.sqrt(numUsers).toInt)
303
val itemBlocks = math.min(numPartitions, math.sqrt(numItems).toInt)
304
305
als
306
.setNumUserBlocks(userBlocks)
307
.setNumItemBlocks(itemBlocks)
308
.setIntermediateRDDStorageLevel(StorageLevel.MEMORY_AND_DISK)
309
.setFinalRDDStorageLevel(StorageLevel.MEMORY_AND_DISK)
310
.setCheckpointInterval(10)
311
}
312
313
/**
314
* Memory-optimized configuration for resource-constrained environments
315
*/
316
def optimizeForMemory(als: ALS): ALS = {
317
als
318
.setIntermediateRDDStorageLevel(StorageLevel.DISK_ONLY)
319
.setFinalRDDStorageLevel(StorageLevel.DISK_ONLY)
320
.setCheckpointInterval(5)
321
}
322
}
323
```
324
325
## Types
326
327
```scala { .api }
328
// Recommendation system imports
329
import org.apache.spark.ml.recommendation._
330
import org.apache.spark.sql.{DataFrame, Dataset}
331
import org.apache.spark.storage.StorageLevel
332
333
// ALS-specific types
334
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
335
336
// Rating data structure
337
case class Rating(user: Int, item: Int, rating: Float)
338
339
// Parameter traits
340
import org.apache.spark.ml.param.shared._
341
342
// Evaluation imports
343
import org.apache.spark.ml.evaluation.RegressionEvaluator
344
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
345
```