or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md
tile.json

recommendation.mddocs/

Recommendation

MLlib provides collaborative filtering algorithms for building recommendation systems using matrix factorization techniques. The primary algorithm is Alternating Least Squares (ALS) for both explicit and implicit feedback scenarios.

Alternating Least Squares (ALS)

Estimator

class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with DefaultParamsWritable {
  def this() = this(Identifiable.randomUID("als"))

  def setRank(value: Int): ALS
  def setMaxIter(value: Int): ALS
  def setRegParam(value: Double): ALS
  def setImplicitPrefs(value: Boolean): ALS
  def setAlpha(value: Double): ALS
  def setNonnegative(value: Boolean): ALS
  def setCheckpointInterval(value: Int): ALS
  def setSeed(value: Long): ALS
  def setUserCol(value: String): ALS
  def setItemCol(value: String): ALS
  def setRatingCol(value: String): ALS
  def setPredictionCol(value: String): ALS
  def setColdStartStrategy(value: String): ALS
  def setNumUserBlocks(value: Int): ALS
  def setNumItemBlocks(value: Int): ALS
  def setIntermediateStorageLevel(value: StorageLevel): ALS
  def setFinalStorageLevel(value: StorageLevel): ALS
  def setBlockSize(value: Int): ALS

  override def fit(dataset: Dataset[_]): ALSModel
  override def copy(extra: ParamMap): ALS
}

Model

class ALSModel(override val uid: String, val rank: Int, val userFactors: DataFrame, val itemFactors: DataFrame)
  extends Model[ALSModel] with ALSModelParams with MLWritable {

  def setUserCol(value: String): ALSModel
  def setItemCol(value: String): ALSModel
  def setPredictionCol(value: String): ALSModel
  def setColdStartStrategy(value: String): ALSModel

  override def transform(dataset: Dataset[_]): DataFrame
  def recommendForAllUsers(numItems: Int): DataFrame
  def recommendForAllItems(numUsers: Int): DataFrame
  def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
  def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
  override def copy(extra: ParamMap): ALSModel
  def write: MLWriter
}

Parameters Traits

trait ALSParams extends Params with HasMaxIter with HasRegParam with HasPredictionCol 
  with HasCheckpointInterval with HasSeed {

  final val rank: IntParam
  final val numUserBlocks: IntParam  
  final val numItemBlocks: IntParam
  final val implicitPrefs: BooleanParam
  final val alpha: DoubleParam
  final val userCol: Param[String]
  final val itemCol: Param[String]
  final val ratingCol: Param[String]
  final val nonnegative: BooleanParam
  final val intermediateStorageLevel: Param[String]
  final val finalStorageLevel: Param[String]
  final val coldStartStrategy: Param[String]
  final val blockSize: IntParam

  def getRank: Int
  def getNumUserBlocks: Int
  def getNumItemBlocks: Int
  def getImplicitPrefs: Boolean
  def getAlpha: Double
  def getUserCol: String
  def getItemCol: String
  def getRatingCol: String
  def getNonnegative: Boolean
  def getIntermediateStorageLevel: String
  def getFinalStorageLevel: String
  def getColdStartStrategy: String
  def getBlockSize: Int
}

trait ALSModelParams extends ALSParams {
  final val coldStartStrategy: Param[String]
  def getColdStartStrategy: String
}

Usage Examples

Basic Collaborative Filtering with Explicit Ratings

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions._

// Sample ratings data (user, item, rating)
val ratings = spark.createDataFrame(Seq(
  (1, 1, 5.0),
  (1, 2, 4.0),
  (1, 3, 1.0),
  (2, 1, 3.0),
  (2, 2, 2.0),
  (2, 4, 4.0),
  (3, 2, 5.0),
  (3, 3, 4.0),
  (3, 4, 3.0)
)).toDF("user", "item", "rating")

// Create ALS model for explicit feedback
val als = new ALS()
  .setMaxIter(20)
  .setRegParam(0.1)
  .setRank(10)
  .setUserCol("user")
  .setItemCol("item")  
  .setRatingCol("rating")
  .setColdStartStrategy("drop")  // Handle new users/items
  .setSeed(42)

// Split data for evaluation
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)

// Train model
val model = als.fit(training)

// Make predictions on test set
val predictions = model.transform(test)
predictions.show()

// Evaluate model using RMSE
import org.apache.spark.ml.evaluation.RegressionEvaluator

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")

val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

Implicit Feedback Recommendation

// Implicit feedback data (user, item, confidence/count)
val implicitData = spark.createDataFrame(Seq(
  (1, 101, 3.0),  // User 1 interacted with item 101, 3 times
  (1, 102, 5.0),  // User 1 interacted with item 102, 5 times
  (1, 103, 1.0),
  (2, 101, 2.0),
  (2, 104, 4.0),
  (3, 102, 6.0),
  (3, 103, 2.0),
  (3, 104, 1.0)
)).toDF("user", "item", "confidence")

// ALS for implicit feedback
val implicitALS = new ALS()
  .setMaxIter(15)
  .setRegParam(0.01)
  .setRank(5)
  .setImplicitPrefs(true)  // Enable implicit feedback mode
  .setAlpha(1.0)          // Confidence scaling parameter
  .setUserCol("user")
  .setItemCol("item")
  .setRatingCol("confidence")
  .setColdStartStrategy("drop")
  .setSeed(42)

val implicitModel = implicitALS.fit(implicitData)

// Generate recommendations for all users
val userRecs = implicitModel.recommendForAllUsers(3)  // Top 3 items per user
userRecs.show(truncate = false)

// Generate recommendations for all items  
val itemRecs = implicitModel.recommendForAllItems(2)  // Top 2 users per item
itemRecs.show(truncate = false)

Advanced Recommendation Scenarios

// Generate recommendations for specific users
val specificUsers = spark.createDataFrame(Seq(
  (1,), (2,)
)).toDF("user")

val userSubsetRecs = implicitModel.recommendForUserSubset(specificUsers, 5)
println("Recommendations for specific users:")
userSubsetRecs.show(truncate = false)

// Generate recommendations for specific items
val specificItems = spark.createDataFrame(Seq(
  (101,), (102,)
)).toDF("item")

val itemSubsetRecs = implicitModel.recommendForItemSubset(specificItems, 3)
println("Recommendations for specific items:")
itemSubsetRecs.show(truncate = false)

// Access learned factors
println("User factors:")
model.userFactors.show(5)

println("Item factors:")  
model.itemFactors.show(5)

// Model parameters
println(s"Model rank: ${model.rank}")
println(s"User factors count: ${model.userFactors.count()}")
println(s"Item factors count: ${model.itemFactors.count()}")

Hyperparameter Tuning for Recommendations

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

// Create parameter grid for ALS
val alsForTuning = new ALS()
  .setUserCol("user")
  .setItemCol("item")
  .setRatingCol("rating")
  .setColdStartStrategy("drop")
  .setSeed(42)

val paramGrid = new ParamGridBuilder()
  .addGrid(alsForTuning.rank, Array(5, 10, 15))
  .addGrid(alsForTuning.regParam, Array(0.01, 0.1, 1.0))
  .addGrid(alsForTuning.maxIter, Array(10, 15, 20))
  .build()

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")

val cv = new CrossValidator()
  .setEstimator(alsForTuning)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)
  .setParallelism(2)

// Note: Cross-validation for ALS can be expensive
// Consider using TrainValidationSplit for faster tuning
import org.apache.spark.ml.tuning.TrainValidationSplit

val tvs = new TrainValidationSplit()
  .setEstimator(alsForTuning)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8)
  .setSeed(42)

val bestModel = tvs.fit(training)
val bestALSModel = bestModel.bestModel.asInstanceOf[ALSModel]

println(s"Best rank: ${bestALSModel.rank}")
println(s"Best RMSE: ${bestModel.validationMetrics.min}")

Cold Start Problem Handling

// Data with new users and items not seen during training
val testWithNewUsers = spark.createDataFrame(Seq(
  (999, 1, 0.0),  // New user 999
  (1, 999, 0.0),  // New item 999
  (1, 2, 4.0)     // Known user-item pair
)).toDF("user", "item", "rating")

// Different cold start strategies
val alsWithDrop = new ALS()
  .setColdStartStrategy("drop")  // Drop rows with new users/items
  .setUserCol("user")
  .setItemCol("item")
  .setRatingCol("rating")

val alsWithNaN = new ALS()
  .setColdStartStrategy("nan")   // Predict NaN for new users/items
  .setUserCol("user")
  .setItemCol("item")
  .setRatingCol("rating")

val modelDrop = alsWithDrop.fit(training)
val modelNaN = alsWithNaN.fit(training)

println("Predictions with 'drop' strategy:")
val predictionsDrop = modelDrop.transform(testWithNewUsers)
predictionsDrop.show()

println("Predictions with 'nan' strategy:")
val predictionsNaN = modelNaN.transform(testWithNewUsers)
predictionsNaN.show()

// Handle NaN predictions manually
val predictionsHandled = predictionsNaN
  .withColumn("handled_prediction", 
    when(col("prediction").isNaN, 0.0)  // Default to 0 for unknown
    .otherwise(col("prediction")))

predictionsHandled.show()

Evaluation Metrics for Recommendations

import org.apache.spark.sql.functions._

// Precision@K and Recall@K for implicit feedback
def evaluateRecommendations(predictions: DataFrame, 
                          actualRatings: DataFrame,
                          k: Int = 10,
                          relevanceThreshold: Double = 3.0): Unit = {
  
  // Get top-K recommendations per user
  val topK = predictions
    .filter(col("prediction") >= relevanceThreshold)
    .withColumn("rank", row_number().over(
      Window.partitionBy("user").orderBy(desc("prediction"))))
    .filter(col("rank") <= k)
  
  // Get actual relevant items per user
  val actualRelevant = actualRatings
    .filter(col("rating") >= relevanceThreshold)
    .select("user", "item")
  
  // Calculate precision and recall per user
  val metrics = topK
    .join(actualRelevant, Seq("user", "item"), "left")
    .groupBy("user")
    .agg(
      count("*").alias("recommended_count"),
      sum(when(actualRelevant("item").isNotNull, 1).otherwise(0)).alias("relevant_recommended")
    )
    .join(
      actualRelevant.groupBy("user").count().alias("actual_relevant_count"),
      "user"
    )
    .withColumn("precision", col("relevant_recommended") / col("recommended_count"))
    .withColumn("recall", col("relevant_recommended") / col("actual_relevant_count"))
    .withColumn("f1", (2 * col("precision") * col("recall")) / (col("precision") + col("recall")))
  
  println(s"Evaluation Metrics (K=$k, threshold=$relevanceThreshold):")
  metrics.select(
    mean("precision").alias("avg_precision"),
    mean("recall").alias("avg_recall"),
    mean("f1").alias("avg_f1")
  ).show()
}

// Coverage metrics
def calculateCoverage(recommendations: DataFrame, totalItems: Long): Double = {
  val recommendedItems = recommendations
    .select("recommendations.item")
    .distinct()
    .count()
  
  recommendedItems.toDouble / totalItems
}

// Diversity metrics (intra-list diversity)
def calculateDiversity(model: ALSModel): DataFrame = {
  val itemFeatures = model.itemFactors
    .select("id", "features")
    .rdd
    .map { row =>
      val id = row.getAs[Int]("id")
      val features = row.getAs[DenseVector]("features")
      (id, features)
    }
    .collectAsMap()
  
  // Calculate pairwise cosine similarities
  // Implementation would depend on specific diversity requirements
  spark.emptyDataFrame  // Placeholder
}

// Usage
val userRecs = model.recommendForAllUsers(10)
val totalItems = ratings.select("item").distinct().count()
val coverage = calculateCoverage(userRecs, totalItems)
println(s"Catalog coverage: ${coverage * 100}%")

Production Recommendation Pipeline

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StringIndexer

// Production pipeline with string indexing for user/item IDs
val userIndexer = new StringIndexer()
  .setInputCol("user_id")
  .setOutputCol("user")
  .setHandleInvalid("keep")

val itemIndexer = new StringIndexer()
  .setInputCol("item_id")  
  .setOutputCol("item")
  .setHandleInvalid("keep")

val als = new ALS()
  .setMaxIter(10)
  .setRegParam(0.1)
  .setRank(10)
  .setUserCol("user")
  .setItemCol("item")
  .setRatingCol("rating")
  .setColdStartStrategy("drop")

val pipeline = new Pipeline()
  .setStages(Array(userIndexer, itemIndexer, als))

// Fit pipeline
val pipelineModel = pipeline.fit(trainingData)

// Generate batch recommendations
val batchRecs = pipelineModel
  .stages.last.asInstanceOf[ALSModel]
  .recommendForAllUsers(20)

// Save recommendations for serving
batchRecs
  .write
  .mode("overwrite")
  .option("path", "recommendations/batch")
  .saveAsTable("user_recommendations")

// Real-time prediction function
def predictRating(userId: String, itemId: String): Double = {
  val input = spark.createDataFrame(Seq(
    (userId, itemId, 0.0)  // Rating doesn't matter for prediction
  )).toDF("user_id", "item_id", "rating")
  
  val prediction = pipelineModel.transform(input)
  prediction.select("prediction").first().getDouble(0)
}

// Model persistence
pipelineModel.write.overwrite().save("models/als_pipeline")

// Load model for serving
val loadedModel = PipelineModel.load("models/als_pipeline")

Advanced ALS Configuration

import org.apache.spark.storage.StorageLevel

// Optimized ALS for large-scale data
val optimizedALS = new ALS()
  .setRank(50)
  .setMaxIter(20)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setAlpha(1.0)
  .setUserCol("user")
  .setItemCol("item")
  .setRatingCol("rating")
  
  // Performance optimizations
  .setNumUserBlocks(100)     // Increase for more users
  .setNumItemBlocks(100)     // Increase for more items
  .setBlockSize(4096)        // Larger blocks for better performance
  .setCheckpointInterval(10) // Checkpoint every 10 iterations
  
  // Storage optimization
  .setIntermediateStorageLevel(StorageLevel.MEMORY_AND_DISK)
  .setFinalStorageLevel(StorageLevel.MEMORY_AND_DISK)
  
  // Constraints
  .setNonnegative(true)      // Non-negative matrix factorization
  
  .setSeed(42)

val optimizedModel = optimizedALS.fit(largeDataset)

// Monitor training progress
println(s"Model trained with rank: ${optimizedModel.rank}")
println(s"Number of user factors: ${optimizedModel.userFactors.count()}")
println(s"Number of item factors: ${optimizedModel.itemFactors.count()}")