or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md

recommendation.mddocs/

0

# Recommendation

1

2

MLlib provides collaborative filtering algorithms for building recommendation systems using matrix factorization techniques. The primary algorithm is Alternating Least Squares (ALS) for both explicit and implicit feedback scenarios.

3

4

## Alternating Least Squares (ALS)

5

6

### Estimator

7

8

```scala { .api }

9

class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {

10

def this() = this(Identifiable.randomUID("als"))

11

12

def setRank(value: Int): ALS

13

def setMaxIter(value: Int): ALS

14

def setRegParam(value: Double): ALS

15

def setImplicitPrefs(value: Boolean): ALS

16

def setAlpha(value: Double): ALS

17

def setNonnegative(value: Boolean): ALS

18

def setCheckpointInterval(value: Int): ALS

19

def setSeed(value: Long): ALS

20

def setUserCol(value: String): ALS

21

def setItemCol(value: String): ALS

22

def setRatingCol(value: String): ALS

23

def setPredictionCol(value: String): ALS

24

def setColdStartStrategy(value: String): ALS

25

def setNumUserBlocks(value: Int): ALS

26

def setNumItemBlocks(value: Int): ALS

27

def setIntermediateStorageLevel(value: StorageLevel): ALS

28

def setFinalStorageLevel(value: StorageLevel): ALS

29

def setBlockSize(value: Int): ALS

30

31

override def fit(dataset: Dataset[_]): ALSModel

32

override def copy(extra: ParamMap): ALS

33

}

34

```

35

36

### Model

37

38

```scala { .api }

39

class ALSModel(override val uid: String, val rank: Int, val userFactors: DataFrame, val itemFactors: DataFrame)

40

extends Model[ALSModel] with ALSModelParams with MLWritable {

41

42

def setUserCol(value: String): ALSModel

43

def setItemCol(value: String): ALSModel

44

def setPredictionCol(value: String): ALSModel

45

def setColdStartStrategy(value: String): ALSModel

46

47

override def transform(dataset: Dataset[_]): DataFrame

48

def recommendForAllUsers(numItems: Int): DataFrame

49

def recommendForAllItems(numUsers: Int): DataFrame

50

def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame

51

def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame

52

override def copy(extra: ParamMap): ALSModel

53

def write: MLWriter

54

}

55

```

56

57

### Parameters Traits

58

59

```scala { .api }

60

trait ALSParams extends Params with HasMaxIter with HasRegParam with HasPredictionCol

61

with HasCheckpointInterval with HasSeed {

62

63

final val rank: IntParam

64

final val numUserBlocks: IntParam

65

final val numItemBlocks: IntParam

66

final val implicitPrefs: BooleanParam

67

final val alpha: DoubleParam

68

final val userCol: Param[String]

69

final val itemCol: Param[String]

70

final val ratingCol: Param[String]

71

final val nonnegative: BooleanParam

72

final val intermediateStorageLevel: Param[String]

73

final val finalStorageLevel: Param[String]

74

final val coldStartStrategy: Param[String]

75

final val blockSize: IntParam

76

77

def getRank: Int

78

def getNumUserBlocks: Int

79

def getNumItemBlocks: Int

80

def getImplicitPrefs: Boolean

81

def getAlpha: Double

82

def getUserCol: String

83

def getItemCol: String

84

def getRatingCol: String

85

def getNonnegative: Boolean

86

def getIntermediateStorageLevel: String

87

def getFinalStorageLevel: String

88

def getColdStartStrategy: String

89

def getBlockSize: Int

90

}

91

92

trait ALSModelParams extends ALSParams {

93

final val coldStartStrategy: Param[String]

94

def getColdStartStrategy: String

95

}

96

```

97

98

## Usage Examples

99

100

### Basic Collaborative Filtering with Explicit Ratings

101

102

```scala

103

import org.apache.spark.ml.recommendation.ALS

104

import org.apache.spark.sql.functions._

105

106

// Sample ratings data (user, item, rating)

107

val ratings = spark.createDataFrame(Seq(

108

(1, 1, 5.0),

109

(1, 2, 4.0),

110

(1, 3, 1.0),

111

(2, 1, 3.0),

112

(2, 2, 2.0),

113

(2, 4, 4.0),

114

(3, 2, 5.0),

115

(3, 3, 4.0),

116

(3, 4, 3.0)

117

)).toDF("user", "item", "rating")

118

119

// Create ALS model for explicit feedback

120

val als = new ALS()

121

.setMaxIter(20)

122

.setRegParam(0.1)

123

.setRank(10)

124

.setUserCol("user")

125

.setItemCol("item")

126

.setRatingCol("rating")

127

.setColdStartStrategy("drop") // Handle new users/items

128

.setSeed(42)

129

130

// Split data for evaluation

131

val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)

132

133

// Train model

134

val model = als.fit(training)

135

136

// Make predictions on test set

137

val predictions = model.transform(test)

138

predictions.show()

139

140

// Evaluate model using RMSE

141

import org.apache.spark.ml.evaluation.RegressionEvaluator

142

143

val evaluator = new RegressionEvaluator()

144

.setMetricName("rmse")

145

.setLabelCol("rating")

146

.setPredictionCol("prediction")

147

148

val rmse = evaluator.evaluate(predictions)

149

println(s"Root-mean-square error = $rmse")

150

```

151

152

### Implicit Feedback Recommendation

153

154

```scala

155

// Implicit feedback data (user, item, confidence/count)

156

val implicitData = spark.createDataFrame(Seq(

157

(1, 101, 3.0), // User 1 interacted with item 101, 3 times

158

(1, 102, 5.0), // User 1 interacted with item 102, 5 times

159

(1, 103, 1.0),

160

(2, 101, 2.0),

161

(2, 104, 4.0),

162

(3, 102, 6.0),

163

(3, 103, 2.0),

164

(3, 104, 1.0)

165

)).toDF("user", "item", "confidence")

166

167

// ALS for implicit feedback

168

val implicitALS = new ALS()

169

.setMaxIter(15)

170

.setRegParam(0.01)

171

.setRank(5)

172

.setImplicitPrefs(true) // Enable implicit feedback mode

173

.setAlpha(1.0) // Confidence scaling parameter

174

.setUserCol("user")

175

.setItemCol("item")

176

.setRatingCol("confidence")

177

.setColdStartStrategy("drop")

178

.setSeed(42)

179

180

val implicitModel = implicitALS.fit(implicitData)

181

182

// Generate recommendations for all users

183

val userRecs = implicitModel.recommendForAllUsers(3) // Top 3 items per user

184

userRecs.show(truncate = false)

185

186

// Generate recommendations for all items

187

val itemRecs = implicitModel.recommendForAllItems(2) // Top 2 users per item

188

itemRecs.show(truncate = false)

189

```

190

191

### Advanced Recommendation Scenarios

192

193

```scala

194

// Generate recommendations for specific users

195

val specificUsers = spark.createDataFrame(Seq(

196

(1,), (2,)

197

)).toDF("user")

198

199

val userSubsetRecs = implicitModel.recommendForUserSubset(specificUsers, 5)

200

println("Recommendations for specific users:")

201

userSubsetRecs.show(truncate = false)

202

203

// Generate recommendations for specific items

204

val specificItems = spark.createDataFrame(Seq(

205

(101,), (102,)

206

)).toDF("item")

207

208

val itemSubsetRecs = implicitModel.recommendForItemSubset(specificItems, 3)

209

println("Recommendations for specific items:")

210

itemSubsetRecs.show(truncate = false)

211

212

// Access learned factors

213

println("User factors:")

214

model.userFactors.show(5)

215

216

println("Item factors:")

217

model.itemFactors.show(5)

218

219

// Model parameters

220

println(s"Model rank: ${model.rank}")

221

println(s"User factors count: ${model.userFactors.count()}")

222

println(s"Item factors count: ${model.itemFactors.count()}")

223

```

224

225

### Hyperparameter Tuning for Recommendations

226

227

```scala

228

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

229

230

// Create parameter grid for ALS

231

val alsForTuning = new ALS()

232

.setUserCol("user")

233

.setItemCol("item")

234

.setRatingCol("rating")

235

.setColdStartStrategy("drop")

236

.setSeed(42)

237

238

val paramGrid = new ParamGridBuilder()

239

.addGrid(alsForTuning.rank, Array(5, 10, 15))

240

.addGrid(alsForTuning.regParam, Array(0.01, 0.1, 1.0))

241

.addGrid(alsForTuning.maxIter, Array(10, 15, 20))

242

.build()

243

244

val evaluator = new RegressionEvaluator()

245

.setMetricName("rmse")

246

.setLabelCol("rating")

247

.setPredictionCol("prediction")

248

249

val cv = new CrossValidator()

250

.setEstimator(alsForTuning)

251

.setEvaluator(evaluator)

252

.setEstimatorParamMaps(paramGrid)

253

.setNumFolds(3)

254

.setParallelism(2)

255

256

// Note: Cross-validation for ALS can be expensive

257

// Consider using TrainValidationSplit for faster tuning

258

import org.apache.spark.ml.tuning.TrainValidationSplit

259

260

val tvs = new TrainValidationSplit()

261

.setEstimator(alsForTuning)

262

.setEvaluator(evaluator)

263

.setEstimatorParamMaps(paramGrid)

264

.setTrainRatio(0.8)

265

.setSeed(42)

266

267

val bestModel = tvs.fit(training)

268

val bestALSModel = bestModel.bestModel.asInstanceOf[ALSModel]

269

270

println(s"Best rank: ${bestALSModel.rank}")

271

println(s"Best RMSE: ${bestModel.validationMetrics.min}")

272

```

273

274

### Cold Start Problem Handling

275

276

```scala

277

// Data with new users and items not seen during training

278

val testWithNewUsers = spark.createDataFrame(Seq(

279

(999, 1, 0.0), // New user 999

280

(1, 999, 0.0), // New item 999

281

(1, 2, 4.0) // Known user-item pair

282

)).toDF("user", "item", "rating")

283

284

// Different cold start strategies

285

val alsWithDrop = new ALS()

286

.setColdStartStrategy("drop") // Drop rows with new users/items

287

.setUserCol("user")

288

.setItemCol("item")

289

.setRatingCol("rating")

290

291

val alsWithNaN = new ALS()

292

.setColdStartStrategy("nan") // Predict NaN for new users/items

293

.setUserCol("user")

294

.setItemCol("item")

295

.setRatingCol("rating")

296

297

val modelDrop = alsWithDrop.fit(training)

298

val modelNaN = alsWithNaN.fit(training)

299

300

println("Predictions with 'drop' strategy:")

301

val predictionsDrop = modelDrop.transform(testWithNewUsers)

302

predictionsDrop.show()

303

304

println("Predictions with 'nan' strategy:")

305

val predictionsNaN = modelNaN.transform(testWithNewUsers)

306

predictionsNaN.show()

307

308

// Handle NaN predictions manually

309

val predictionsHandled = predictionsNaN

310

.withColumn("handled_prediction",

311

when(col("prediction").isNaN, 0.0) // Default to 0 for unknown

312

.otherwise(col("prediction")))

313

314

predictionsHandled.show()

315

```

316

317

### Evaluation Metrics for Recommendations

318

319

```scala

320

import org.apache.spark.sql.functions._

321

322

// Precision@K and Recall@K for implicit feedback

323

def evaluateRecommendations(predictions: DataFrame,

324

actualRatings: DataFrame,

325

k: Int = 10,

326

relevanceThreshold: Double = 3.0): Unit = {

327

328

// Get top-K recommendations per user

329

val topK = predictions

330

.filter(col("prediction") >= relevanceThreshold)

331

.withColumn("rank", row_number().over(

332

Window.partitionBy("user").orderBy(desc("prediction"))))

333

.filter(col("rank") <= k)

334

335

// Get actual relevant items per user

336

val actualRelevant = actualRatings

337

.filter(col("rating") >= relevanceThreshold)

338

.select("user", "item")

339

340

// Calculate precision and recall per user

341

val metrics = topK

342

.join(actualRelevant, Seq("user", "item"), "left")

343

.groupBy("user")

344

.agg(

345

count("*").alias("recommended_count"),

346

sum(when(actualRelevant("item").isNotNull, 1).otherwise(0)).alias("relevant_recommended")

347

)

348

.join(

349

actualRelevant.groupBy("user").count().alias("actual_relevant_count"),

350

"user"

351

)

352

.withColumn("precision", col("relevant_recommended") / col("recommended_count"))

353

.withColumn("recall", col("relevant_recommended") / col("actual_relevant_count"))

354

.withColumn("f1", (2 * col("precision") * col("recall")) / (col("precision") + col("recall")))

355

356

println(s"Evaluation Metrics (K=$k, threshold=$relevanceThreshold):")

357

metrics.select(

358

mean("precision").alias("avg_precision"),

359

mean("recall").alias("avg_recall"),

360

mean("f1").alias("avg_f1")

361

).show()

362

}

363

364

// Coverage metrics

365

def calculateCoverage(recommendations: DataFrame, totalItems: Long): Double = {

366

val recommendedItems = recommendations

367

.select("recommendations.item")

368

.distinct()

369

.count()

370

371

recommendedItems.toDouble / totalItems

372

}

373

374

// Diversity metrics (intra-list diversity)

375

def calculateDiversity(model: ALSModel): DataFrame = {

376

val itemFeatures = model.itemFactors

377

.select("id", "features")

378

.rdd

379

.map { row =>

380

val id = row.getAs[Int]("id")

381

val features = row.getAs[DenseVector]("features")

382

(id, features)

383

}

384

.collectAsMap()

385

386

// Calculate pairwise cosine similarities

387

// Implementation would depend on specific diversity requirements

388

spark.emptyDataFrame // Placeholder

389

}

390

391

// Usage

392

val userRecs = model.recommendForAllUsers(10)

393

val totalItems = ratings.select("item").distinct().count()

394

val coverage = calculateCoverage(userRecs, totalItems)

395

println(s"Catalog coverage: ${coverage * 100}%")

396

```

397

398

### Production Recommendation Pipeline

399

400

```scala

401

import org.apache.spark.ml.Pipeline

402

import org.apache.spark.ml.feature.StringIndexer

403

404

// Production pipeline with string indexing for user/item IDs

405

val userIndexer = new StringIndexer()

406

.setInputCol("user_id")

407

.setOutputCol("user")

408

.setHandleInvalid("keep")

409

410

val itemIndexer = new StringIndexer()

411

.setInputCol("item_id")

412

.setOutputCol("item")

413

.setHandleInvalid("keep")

414

415

val als = new ALS()

416

.setMaxIter(10)

417

.setRegParam(0.1)

418

.setRank(10)

419

.setUserCol("user")

420

.setItemCol("item")

421

.setRatingCol("rating")

422

.setColdStartStrategy("drop")

423

424

val pipeline = new Pipeline()

425

.setStages(Array(userIndexer, itemIndexer, als))

426

427

// Fit pipeline

428

val pipelineModel = pipeline.fit(trainingData)

429

430

// Generate batch recommendations

431

val batchRecs = pipelineModel

432

.stages.last.asInstanceOf[ALSModel]

433

.recommendForAllUsers(20)

434

435

// Save recommendations for serving

436

batchRecs

437

.write

438

.mode("overwrite")

439

.option("path", "recommendations/batch")

440

.saveAsTable("user_recommendations")

441

442

// Real-time prediction function

443

def predictRating(userId: String, itemId: String): Double = {

444

val input = spark.createDataFrame(Seq(

445

(userId, itemId, 0.0) // Rating doesn't matter for prediction

446

)).toDF("user_id", "item_id", "rating")

447

448

val prediction = pipelineModel.transform(input)

449

prediction.select("prediction").first().getDouble(0)

450

}

451

452

// Model persistence

453

pipelineModel.write.overwrite().save("models/als_pipeline")

454

455

// Load model for serving

456

val loadedModel = PipelineModel.load("models/als_pipeline")

457

```

458

459

### Advanced ALS Configuration

460

461

```scala

462

import org.apache.spark.storage.StorageLevel

463

464

// Optimized ALS for large-scale data

465

val optimizedALS = new ALS()

466

.setRank(50)

467

.setMaxIter(20)

468

.setRegParam(0.01)

469

.setImplicitPrefs(true)

470

.setAlpha(1.0)

471

.setUserCol("user")

472

.setItemCol("item")

473

.setRatingCol("rating")

474

475

// Performance optimizations

476

.setNumUserBlocks(100) // Increase for more users

477

.setNumItemBlocks(100) // Increase for more items

478

.setBlockSize(4096) // Larger blocks for better performance

479

.setCheckpointInterval(10) // Checkpoint every 10 iterations

480

481

// Storage optimization

482

.setIntermediateStorageLevel(StorageLevel.MEMORY_AND_DISK)

483

.setFinalStorageLevel(StorageLevel.MEMORY_AND_DISK)

484

485

// Constraints

486

.setNonnegative(true) // Non-negative matrix factorization

487

488

.setSeed(42)

489

490

val optimizedModel = optimizedALS.fit(largeDataset)

491

492

// Monitor training progress

493

println(s"Model trained with rank: ${optimizedModel.rank}")

494

println(s"Number of user factors: ${optimizedModel.userFactors.count()}")

495

println(s"Number of item factors: ${optimizedModel.itemFactors.count()}")

496

```