0
# Collaborative Filtering and Recommendation
1
2
Recommendation systems using collaborative filtering techniques. MLlib provides Alternating Least Squares (ALS) for matrix factorization-based collaborative filtering, commonly used for building recommendation engines.
3
4
## Capabilities
5
6
### Alternating Least Squares (ALS)
7
8
Matrix factorization algorithm for collaborative filtering using alternating least squares optimization.
9
10
```scala { .api }
11
/**
12
* Alternating Least Squares (ALS) for collaborative filtering
13
* Factorizes user-item rating matrix into user and item latent factor matrices
14
* Supports both explicit and implicit feedback datasets
15
*/
16
class ALS extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {
17
def setRank(value: Int): this.type
18
def setMaxIter(value: Int): this.type
19
def setRegParam(value: Double): this.type
20
def setImplicitPrefs(value: Boolean): this.type
21
def setAlpha(value: Double): this.type
22
def setUserCol(value: String): this.type
23
def setItemCol(value: String): this.type
24
def setRatingCol(value: String): this.type
25
def setPredictionCol(value: String): this.type
26
def setNonnegative(value: Boolean): this.type
27
def setNumUserBlocks(value: Int): this.type
28
def setNumItemBlocks(value: Int): this.type
29
def setCheckpointInterval(value: Int): this.type
30
def setSeed(value: Long): this.type
31
def setIntermediateStorageLevel(value: String): this.type
32
def setFinalStorageLevel(value: String): this.type
33
def setColdStartStrategy(value: String): this.type
34
def setBlockSize(value: Int): this.type
35
}
36
37
/**
38
* ALSModel - fitted ALS model containing user and item factor matrices
39
* Provides methods for making recommendations and extracting latent factors
40
*/
41
class ALSModel extends Model[ALSModel] with ALSModelParams with MLWritable {
42
def rank: Int
43
def userFactors: DataFrame
44
def itemFactors: DataFrame
45
def recommendForAllUsers(numItems: Int): DataFrame
46
def recommendForAllItems(numUsers: Int): DataFrame
47
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
48
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
49
def transform(dataset: Dataset[_]): DataFrame
50
}
51
```
52
53
### Parameter Traits
54
55
```scala { .api }
56
/**
57
* ALSParams - shared parameters for ALS algorithm
58
* Defines common configuration options for collaborative filtering
59
*/
60
trait ALSParams extends Params with HasMaxIter with HasRegParam with HasPredictionCol
61
with HasCheckpointInterval with HasSeed {
62
63
final val rank: IntParam
64
final val userCol: Param[String]
65
final val itemCol: Param[String]
66
final val ratingCol: Param[String]
67
final val implicitPrefs: BooleanParam
68
final val alpha: DoubleParam
69
final val nonnegative: BooleanParam
70
final val numUserBlocks: IntParam
71
final val numItemBlocks: IntParam
72
final val intermediateStorageLevel: Param[String]
73
final val finalStorageLevel: Param[String]
74
final val coldStartStrategy: Param[String]
75
final val blockSize: IntParam
76
77
def getRank: Int
78
def getUserCol: String
79
def getItemCol: String
80
def getRatingCol: String
81
def getImplicitPrefs: Boolean
82
def getAlpha: Double
83
def getNonnegative: Boolean
84
def getNumUserBlocks: Int
85
def getNumItemBlocks: Int
86
def getIntermediateStorageLevel: String
87
def getFinalStorageLevel: String
88
def getColdStartStrategy: String
89
def getBlockSize: Int
90
}
91
92
trait ALSModelParams extends ALSParams {
93
def userFactors: DataFrame
94
def itemFactors: DataFrame
95
}
96
```
97
98
## Usage Examples
99
100
### Basic Collaborative Filtering
101
102
```scala
103
import org.apache.spark.ml.recommendation.ALS
104
import org.apache.spark.ml.evaluation.RegressionEvaluator
105
106
// Load ratings data (userId, movieId, rating, timestamp)
107
val ratings = spark.read
108
.option("header", "true")
109
.option("inferSchema", "true")
110
.csv("path/to/ratings.csv")
111
112
// Split data into training and test sets
113
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
114
115
// Build ALS model
116
val als = new ALS()
117
.setMaxIter(5)
118
.setRegParam(0.01)
119
.setUserCol("userId")
120
.setItemCol("movieId")
121
.setRatingCol("rating")
122
.setRank(10)
123
.setColdStartStrategy("drop")
124
125
val model = als.fit(training)
126
127
// Evaluate model
128
val predictions = model.transform(test)
129
val evaluator = new RegressionEvaluator()
130
.setMetricName("rmse")
131
.setLabelCol("rating")
132
.setPredictionCol("prediction")
133
134
val rmse = evaluator.evaluate(predictions)
135
println(s"Root-mean-square error = $rmse")
136
```
137
138
### Implicit Feedback Recommendation
139
140
```scala
141
// For implicit feedback data (user interactions without explicit ratings)
142
val alsImplicit = new ALS()
143
.setMaxIter(20)
144
.setRegParam(0.1)
145
.setImplicitPrefs(true)
146
.setAlpha(1.0)
147
.setUserCol("userId")
148
.setItemCol("itemId")
149
.setRatingCol("count") // interaction count
150
.setRank(50)
151
152
val implicitModel = alsImplicit.fit(implicitData)
153
154
// Generate recommendations
155
val recommendations = implicitModel.recommendForAllUsers(10)
156
recommendations.show(false)
157
```
158
159
### Generating Recommendations
160
161
```scala
162
// Recommend items for all users
163
val userRecs = model.recommendForAllUsers(10)
164
userRecs.show(false)
165
166
// Recommend users for all items
167
val itemRecs = model.recommendForAllItems(10)
168
itemRecs.show(false)
169
170
// Recommend items for specific users
171
val users = spark.createDataFrame(Seq((1), (2), (3))).toDF("userId")
172
val userSubsetRecs = model.recommendForUserSubset(users, 5)
173
userSubsetRecs.show(false)
174
175
// Recommend users for specific items
176
val items = spark.createDataFrame(Seq((1), (2), (3))).toDF("movieId")
177
val itemSubsetRecs = model.recommendForItemSubset(items, 5)
178
itemSubsetRecs.show(false)
179
```
180
181
### Accessing Latent Factors
182
183
```scala
184
// Get user latent factors
185
val userFactors = model.userFactors
186
userFactors.show(5, false)
187
188
// Get item latent factors
189
val itemFactors = model.itemFactors
190
itemFactors.show(5, false)
191
192
// User factors schema: id (int), features (vector)
193
// Item factors schema: id (int), features (vector)
194
195
println(s"Model rank (number of latent factors): ${model.rank}")
196
```
197
198
### Model Tuning with Cross-Validation
199
200
```scala
201
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
202
203
// Create parameter grid
204
val paramGrid = new ParamGridBuilder()
205
.addGrid(als.rank, Array(10, 20, 50))
206
.addGrid(als.regParam, Array(0.01, 0.1, 1.0))
207
.addGrid(als.maxIter, Array(5, 10, 20))
208
.build()
209
210
// Create cross-validator
211
val cv = new CrossValidator()
212
.setEstimator(als)
213
.setEvaluator(evaluator)
214
.setEstimatorParamMaps(paramGrid)
215
.setNumFolds(3)
216
217
// Run cross-validation
218
val cvModel = cv.fit(training)
219
220
// Get best model
221
val bestModel = cvModel.bestModel.asInstanceOf[ALSModel]
222
println(s"Best rank: ${bestModel.rank}")
223
224
// Make predictions with best model
225
val bestPredictions = bestModel.transform(test)
226
val bestRmse = evaluator.evaluate(bestPredictions)
227
println(s"Best RMSE: $bestRmse")
228
```
229
230
### Cold Start Strategy
231
232
```scala
233
// Handle cold start problem (new users/items not seen during training)
234
val alsWithColdStart = new ALS()
235
.setMaxIter(10)
236
.setRegParam(0.1)
237
.setUserCol("userId")
238
.setItemCol("movieId")
239
.setRatingCol("rating")
240
.setColdStartStrategy("drop") // Options: "nan", "drop"
241
242
val modelWithColdStart = alsWithColdStart.fit(training)
243
244
// "drop" removes predictions for unknown users/items
245
// "nan" assigns NaN to predictions for unknown users/items
246
val predictionsWithColdStart = modelWithColdStart.transform(testWithNewUsers)
247
```
248
249
### Performance Tuning
250
251
```scala
252
// Optimize for large datasets
253
val alsOptimized = new ALS()
254
.setMaxIter(10)
255
.setRegParam(0.1)
256
.setUserCol("userId")
257
.setItemCol("movieId")
258
.setRatingCol("rating")
259
.setRank(100)
260
.setNumUserBlocks(100) // Increase for more users
261
.setNumItemBlocks(100) // Increase for more items
262
.setIntermediateStorageLevel("MEMORY_AND_DISK") // Cache intermediate results
263
.setFinalStorageLevel("MEMORY_AND_DISK") // Cache final factors
264
.setCheckpointInterval(10) // Checkpoint every 10 iterations
265
266
val optimizedModel = alsOptimized.fit(largeRatings)
267
```
268
269
## Key Concepts
270
271
### Matrix Factorization
272
ALS decomposes the user-item rating matrix R into user factor matrix U and item factor matrix I such that R ≈ U × I^T, where each user and item is represented by a k-dimensional latent factor vector.
273
274
### Explicit vs Implicit Feedback
275
- **Explicit**: Direct ratings (1-5 stars, thumbs up/down)
276
- **Implicit**: Indirect signals (clicks, views, purchases, time spent)
277
278
### Cold Start Problem
279
Challenge of making recommendations for new users or items with no historical data. MLlib provides strategies to handle this.
280
281
### Hyperparameters
282
- **rank**: Number of latent factors (model complexity)
283
- **regParam**: Regularization parameter (prevents overfitting)
284
- **alpha**: Confidence parameter for implicit feedback
285
- **maxIter**: Number of iterations for convergence