MLlib provides collaborative filtering algorithms for building recommendation systems using matrix factorization techniques. The primary algorithm is Alternating Least Squares (ALS) for both explicit and implicit feedback scenarios.
class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("als"))
def setRank(value: Int): ALS
def setMaxIter(value: Int): ALS
def setRegParam(value: Double): ALS
def setImplicitPrefs(value: Boolean): ALS
def setAlpha(value: Double): ALS
def setNonnegative(value: Boolean): ALS
def setCheckpointInterval(value: Int): ALS
def setSeed(value: Long): ALS
def setUserCol(value: String): ALS
def setItemCol(value: String): ALS
def setRatingCol(value: String): ALS
def setPredictionCol(value: String): ALS
def setColdStartStrategy(value: String): ALS
def setNumUserBlocks(value: Int): ALS
def setNumItemBlocks(value: Int): ALS
def setIntermediateStorageLevel(value: StorageLevel): ALS
def setFinalStorageLevel(value: StorageLevel): ALS
def setBlockSize(value: Int): ALS
override def fit(dataset: Dataset[_]): ALSModel
override def copy(extra: ParamMap): ALS
}class ALSModel(override val uid: String, val rank: Int, val userFactors: DataFrame, val itemFactors: DataFrame)
extends Model[ALSModel] with ALSModelParams with MLWritable {
def setUserCol(value: String): ALSModel
def setItemCol(value: String): ALSModel
def setPredictionCol(value: String): ALSModel
def setColdStartStrategy(value: String): ALSModel
override def transform(dataset: Dataset[_]): DataFrame
def recommendForAllUsers(numItems: Int): DataFrame
def recommendForAllItems(numUsers: Int): DataFrame
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
override def copy(extra: ParamMap): ALSModel
def write: MLWriter
}trait ALSParams extends Params with HasMaxIter with HasRegParam with HasPredictionCol
with HasCheckpointInterval with HasSeed {
final val rank: IntParam
final val numUserBlocks: IntParam
final val numItemBlocks: IntParam
final val implicitPrefs: BooleanParam
final val alpha: DoubleParam
final val userCol: Param[String]
final val itemCol: Param[String]
final val ratingCol: Param[String]
final val nonnegative: BooleanParam
final val intermediateStorageLevel: Param[String]
final val finalStorageLevel: Param[String]
final val coldStartStrategy: Param[String]
final val blockSize: IntParam
def getRank: Int
def getNumUserBlocks: Int
def getNumItemBlocks: Int
def getImplicitPrefs: Boolean
def getAlpha: Double
def getUserCol: String
def getItemCol: String
def getRatingCol: String
def getNonnegative: Boolean
def getIntermediateStorageLevel: String
def getFinalStorageLevel: String
def getColdStartStrategy: String
def getBlockSize: Int
}
trait ALSModelParams extends ALSParams {
final val coldStartStrategy: Param[String]
def getColdStartStrategy: String
}import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions._
// Sample ratings data (user, item, rating)
val ratings = spark.createDataFrame(Seq(
(1, 1, 5.0),
(1, 2, 4.0),
(1, 3, 1.0),
(2, 1, 3.0),
(2, 2, 2.0),
(2, 4, 4.0),
(3, 2, 5.0),
(3, 3, 4.0),
(3, 4, 3.0)
)).toDF("user", "item", "rating")
// Create ALS model for explicit feedback
val als = new ALS()
.setMaxIter(20)
.setRegParam(0.1)
.setRank(10)
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
.setColdStartStrategy("drop") // Handle new users/items
.setSeed(42)
// Split data for evaluation
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)
// Train model
val model = als.fit(training)
// Make predictions on test set
val predictions = model.transform(test)
predictions.show()
// Evaluate model using RMSE
import org.apache.spark.ml.evaluation.RegressionEvaluator
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")// Implicit feedback data (user, item, confidence/count)
val implicitData = spark.createDataFrame(Seq(
(1, 101, 3.0), // User 1 interacted with item 101, 3 times
(1, 102, 5.0), // User 1 interacted with item 102, 5 times
(1, 103, 1.0),
(2, 101, 2.0),
(2, 104, 4.0),
(3, 102, 6.0),
(3, 103, 2.0),
(3, 104, 1.0)
)).toDF("user", "item", "confidence")
// ALS for implicit feedback
val implicitALS = new ALS()
.setMaxIter(15)
.setRegParam(0.01)
.setRank(5)
.setImplicitPrefs(true) // Enable implicit feedback mode
.setAlpha(1.0) // Confidence scaling parameter
.setUserCol("user")
.setItemCol("item")
.setRatingCol("confidence")
.setColdStartStrategy("drop")
.setSeed(42)
val implicitModel = implicitALS.fit(implicitData)
// Generate recommendations for all users
val userRecs = implicitModel.recommendForAllUsers(3) // Top 3 items per user
userRecs.show(truncate = false)
// Generate recommendations for all items
val itemRecs = implicitModel.recommendForAllItems(2) // Top 2 users per item
itemRecs.show(truncate = false)// Generate recommendations for specific users
val specificUsers = spark.createDataFrame(Seq(
(1,), (2,)
)).toDF("user")
val userSubsetRecs = implicitModel.recommendForUserSubset(specificUsers, 5)
println("Recommendations for specific users:")
userSubsetRecs.show(truncate = false)
// Generate recommendations for specific items
val specificItems = spark.createDataFrame(Seq(
(101,), (102,)
)).toDF("item")
val itemSubsetRecs = implicitModel.recommendForItemSubset(specificItems, 3)
println("Recommendations for specific items:")
itemSubsetRecs.show(truncate = false)
// Access learned factors
println("User factors:")
model.userFactors.show(5)
println("Item factors:")
model.itemFactors.show(5)
// Model parameters
println(s"Model rank: ${model.rank}")
println(s"User factors count: ${model.userFactors.count()}")
println(s"Item factors count: ${model.itemFactors.count()}")import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
// Create parameter grid for ALS
val alsForTuning = new ALS()
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
.setColdStartStrategy("drop")
.setSeed(42)
val paramGrid = new ParamGridBuilder()
.addGrid(alsForTuning.rank, Array(5, 10, 15))
.addGrid(alsForTuning.regParam, Array(0.01, 0.1, 1.0))
.addGrid(alsForTuning.maxIter, Array(10, 15, 20))
.build()
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val cv = new CrossValidator()
.setEstimator(alsForTuning)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
.setParallelism(2)
// Note: Cross-validation for ALS can be expensive
// Consider using TrainValidationSplit for faster tuning
import org.apache.spark.ml.tuning.TrainValidationSplit
val tvs = new TrainValidationSplit()
.setEstimator(alsForTuning)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
.setSeed(42)
val bestModel = tvs.fit(training)
val bestALSModel = bestModel.bestModel.asInstanceOf[ALSModel]
println(s"Best rank: ${bestALSModel.rank}")
println(s"Best RMSE: ${bestModel.validationMetrics.min}")// Data with new users and items not seen during training
val testWithNewUsers = spark.createDataFrame(Seq(
(999, 1, 0.0), // New user 999
(1, 999, 0.0), // New item 999
(1, 2, 4.0) // Known user-item pair
)).toDF("user", "item", "rating")
// Different cold start strategies
val alsWithDrop = new ALS()
.setColdStartStrategy("drop") // Drop rows with new users/items
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
val alsWithNaN = new ALS()
.setColdStartStrategy("nan") // Predict NaN for new users/items
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
val modelDrop = alsWithDrop.fit(training)
val modelNaN = alsWithNaN.fit(training)
println("Predictions with 'drop' strategy:")
val predictionsDrop = modelDrop.transform(testWithNewUsers)
predictionsDrop.show()
println("Predictions with 'nan' strategy:")
val predictionsNaN = modelNaN.transform(testWithNewUsers)
predictionsNaN.show()
// Handle NaN predictions manually
val predictionsHandled = predictionsNaN
.withColumn("handled_prediction",
when(col("prediction").isNaN, 0.0) // Default to 0 for unknown
.otherwise(col("prediction")))
predictionsHandled.show()import org.apache.spark.sql.functions._
// Precision@K and Recall@K for implicit feedback
def evaluateRecommendations(predictions: DataFrame,
actualRatings: DataFrame,
k: Int = 10,
relevanceThreshold: Double = 3.0): Unit = {
// Get top-K recommendations per user
val topK = predictions
.filter(col("prediction") >= relevanceThreshold)
.withColumn("rank", row_number().over(
Window.partitionBy("user").orderBy(desc("prediction"))))
.filter(col("rank") <= k)
// Get actual relevant items per user
val actualRelevant = actualRatings
.filter(col("rating") >= relevanceThreshold)
.select("user", "item")
// Calculate precision and recall per user
val metrics = topK
.join(actualRelevant, Seq("user", "item"), "left")
.groupBy("user")
.agg(
count("*").alias("recommended_count"),
sum(when(actualRelevant("item").isNotNull, 1).otherwise(0)).alias("relevant_recommended")
)
.join(
actualRelevant.groupBy("user").count().alias("actual_relevant_count"),
"user"
)
.withColumn("precision", col("relevant_recommended") / col("recommended_count"))
.withColumn("recall", col("relevant_recommended") / col("actual_relevant_count"))
.withColumn("f1", (2 * col("precision") * col("recall")) / (col("precision") + col("recall")))
println(s"Evaluation Metrics (K=$k, threshold=$relevanceThreshold):")
metrics.select(
mean("precision").alias("avg_precision"),
mean("recall").alias("avg_recall"),
mean("f1").alias("avg_f1")
).show()
}
// Coverage metrics
def calculateCoverage(recommendations: DataFrame, totalItems: Long): Double = {
val recommendedItems = recommendations
.select("recommendations.item")
.distinct()
.count()
recommendedItems.toDouble / totalItems
}
// Diversity metrics (intra-list diversity)
def calculateDiversity(model: ALSModel): DataFrame = {
val itemFeatures = model.itemFactors
.select("id", "features")
.rdd
.map { row =>
val id = row.getAs[Int]("id")
val features = row.getAs[DenseVector]("features")
(id, features)
}
.collectAsMap()
// Calculate pairwise cosine similarities
// Implementation would depend on specific diversity requirements
spark.emptyDataFrame // Placeholder
}
// Usage
val userRecs = model.recommendForAllUsers(10)
val totalItems = ratings.select("item").distinct().count()
val coverage = calculateCoverage(userRecs, totalItems)
println(s"Catalog coverage: ${coverage * 100}%")import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StringIndexer
// Production pipeline with string indexing for user/item IDs
val userIndexer = new StringIndexer()
.setInputCol("user_id")
.setOutputCol("user")
.setHandleInvalid("keep")
val itemIndexer = new StringIndexer()
.setInputCol("item_id")
.setOutputCol("item")
.setHandleInvalid("keep")
val als = new ALS()
.setMaxIter(10)
.setRegParam(0.1)
.setRank(10)
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
.setColdStartStrategy("drop")
val pipeline = new Pipeline()
.setStages(Array(userIndexer, itemIndexer, als))
// Fit pipeline
val pipelineModel = pipeline.fit(trainingData)
// Generate batch recommendations
val batchRecs = pipelineModel
.stages.last.asInstanceOf[ALSModel]
.recommendForAllUsers(20)
// Save recommendations for serving
batchRecs
.write
.mode("overwrite")
.option("path", "recommendations/batch")
.saveAsTable("user_recommendations")
// Real-time prediction function
def predictRating(userId: String, itemId: String): Double = {
val input = spark.createDataFrame(Seq(
(userId, itemId, 0.0) // Rating doesn't matter for prediction
)).toDF("user_id", "item_id", "rating")
val prediction = pipelineModel.transform(input)
prediction.select("prediction").first().getDouble(0)
}
// Model persistence
pipelineModel.write.overwrite().save("models/als_pipeline")
// Load model for serving
val loadedModel = PipelineModel.load("models/als_pipeline")import org.apache.spark.storage.StorageLevel
// Optimized ALS for large-scale data
val optimizedALS = new ALS()
.setRank(50)
.setMaxIter(20)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setAlpha(1.0)
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
// Performance optimizations
.setNumUserBlocks(100) // Increase for more users
.setNumItemBlocks(100) // Increase for more items
.setBlockSize(4096) // Larger blocks for better performance
.setCheckpointInterval(10) // Checkpoint every 10 iterations
// Storage optimization
.setIntermediateStorageLevel(StorageLevel.MEMORY_AND_DISK)
.setFinalStorageLevel(StorageLevel.MEMORY_AND_DISK)
// Constraints
.setNonnegative(true) // Non-negative matrix factorization
.setSeed(42)
val optimizedModel = optimizedALS.fit(largeDataset)
// Monitor training progress
println(s"Model trained with rank: ${optimizedModel.rank}")
println(s"Number of user factors: ${optimizedModel.userFactors.count()}")
println(s"Number of item factors: ${optimizedModel.itemFactors.count()}")