or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdevaluation-tuning.mdfeature-engineering.mdindex.mdlinear-algebra.mdpipeline-components.mdrecommendation.mdregression.md
tile.json

recommendation.mddocs/

Recommendation

Collaborative filtering algorithms for building recommendation engines, including matrix factorization techniques optimized for large-scale user-item interaction datasets.

Capabilities

Alternating Least Squares (ALS)

Matrix factorization algorithm using alternating least squares optimization for collaborative filtering recommendation systems.

/**
 * Alternating Least Squares collaborative filtering
 */
class ALS extends Estimator[ALSModel] with ALSParams {
  def setRank(value: Int): this.type
  def setMaxIter(value: Int): this.type
  def setRegParam(value: Double): this.type
  def setImplicitPrefs(value: Boolean): this.type
  def setAlpha(value: Double): this.type
  def setUserCol(value: String): this.type
  def setItemCol(value: String): this.type
  def setRatingCol(value: String): this.type
  def setPredictionCol(value: String): this.type
  def setNonnegative(value: Boolean): this.type
  def setCheckpointInterval(value: Int): this.type
  def setSeed(value: Long): this.type
  def setNumUserBlocks(value: Int): this.type
  def setNumItemBlocks(value: Int): this.type
  def setIntermediateRDDStorageLevel(value: StorageLevel): this.type
  def setFinalRDDStorageLevel(value: StorageLevel): this.type
  def setColdStartStrategy(value: String): this.type
  def setBlockSize(value: Int): this.type
}

class ALSModel extends Model[ALSModel] with ALSParams {
  def rank: Int
  def userFactors: DataFrame
  def itemFactors: DataFrame
  def recommendForAllUsers(numItems: Int): DataFrame
  def recommendForAllItems(numUsers: Int): DataFrame
  def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
  def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
}

Usage Example:

import org.apache.spark.ml.recommendation.ALS

// Prepare training data from ratings
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
  .setColdStartStrategy("drop")

val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")

val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
userRecs.show()

// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
movieRecs.show()

Advanced ALS Features

Extended functionality for handling various recommendation scenarios and optimization strategies.

/**
 * ALS model with advanced recommendation capabilities
 */
class ALSModel extends Model[ALSModel] with ALSParams {
  /**
   * Recommend items for a specific user
   */
  def recommendForUsers(users: Dataset[_], numItems: Int): DataFrame
  
  /**
   * Recommend users for specific items
   */
  def recommendForItems(items: Dataset[_], numUsers: Int): DataFrame
  
  /**
   * Transform dataset with predictions
   */
  override def transform(dataset: Dataset[_]): DataFrame
  
  /**
   * Access to latent factor matrices
   */
  def userFactors: DataFrame
  def itemFactors: DataFrame
  
  /**
   * Model persistence
   */
  override def write: MLWriter
}

/**
 * Rating data structure for recommendations
 */
case class Rating(user: Int, item: Int, rating: Float) extends Product with Serializable

Implicit Feedback Handling

Specialized configurations for handling implicit feedback data where ratings represent confidence rather than explicit preferences.

/**
 * ALS configuration for implicit feedback scenarios
 */
object ALSImplicit {
  /**
   * Create ALS instance optimized for implicit feedback
   */
  def create(): ALS = {
    new ALS()
      .setImplicitPrefs(true)
      .setAlpha(1.0)  // Confidence scaling parameter
      .setNonnegative(true)  // Ensure non-negative factors
  }
  
  /**
   * Preprocess implicit feedback data
   */
  def preprocessImplicitData(
    interactions: DataFrame,
    userCol: String = "userId",
    itemCol: String = "itemId",
    countCol: String = "count"
  ): DataFrame
}

Cold Start Strategies

Methods for handling new users or items that weren't present during training.

/**
 * Cold start handling strategies
 */
object ColdStartStrategy {
  val NaN = "nan"      // Return NaN for cold start cases
  val DROP = "drop"    // Drop cold start cases from predictions
}

/**
 * Cold start recommendation utilities
 */
object ColdStartRecommender {
  /**
   * Generate recommendations for new users based on popular items
   */
  def recommendForNewUsers(
    model: ALSModel,
    popularItems: DataFrame,
    numRecommendations: Int
  ): DataFrame
  
  /**
   * Generate recommendations for new items based on similar items
   */
  def recommendForNewItems(
    model: ALSModel,
    itemFeatures: DataFrame,
    numRecommendations: Int
  ): DataFrame
}

Recommendation Evaluation

Specialized evaluation metrics for recommendation systems.

/**
 * Recommendation-specific evaluation metrics
 */
class RecommendationEvaluator extends Evaluator {
  def setUserCol(value: String): this.type
  def setItemCol(value: String): this.type
  def setRatingCol(value: String): this.type
  def setPredictionCol(value: String): this.type
  def setMetricName(value: String): this.type
  def setK(value: Int): this.type
  def setColdStartStrategy(value: String): this.type
}

/**
 * Ranking-based evaluation for recommendations
 */
object RecommendationMetrics {
  /**
   * Calculate Mean Average Precision at K
   */
  def meanAveragePrecisionAtK(
    predictions: DataFrame,
    k: Int,
    userCol: String = "userId",
    itemCol: String = "itemId",
    ratingCol: String = "rating"
  ): Double
  
  /**
   * Calculate Normalized Discounted Cumulative Gain at K
   */
  def ndcgAtK(
    predictions: DataFrame,
    k: Int,
    userCol: String = "userId",
    itemCol: String = "itemId",
    ratingCol: String = "rating"
  ): Double
  
  /**
   * Calculate precision and recall at K
   */
  def precisionRecallAtK(
    predictions: DataFrame,
    k: Int,
    userCol: String = "userId",
    itemCol: String = "itemId",
    ratingCol: String = "rating"
  ): (Double, Double)
}

Hyperparameter Tuning for ALS

Specialized parameter tuning strategies for collaborative filtering models.

/**
 * ALS hyperparameter tuning utilities
 */
object ALSTuning {
  /**
   * Create parameter grid for ALS tuning
   */
  def createParamGrid(
    als: ALS,
    ranks: Array[Int] = Array(10, 50, 100),
    regParams: Array[Double] = Array(0.01, 0.1, 1.0),
    alphas: Array[Double] = Array(1.0, 10.0, 40.0)
  ): Array[ParamMap]
  
  /**
   * Cross-validation for recommendation systems with temporal splitting
   */
  def temporalCrossValidation(
    als: ALS,
    data: DataFrame,
    evaluator: Evaluator,
    paramGrid: Array[ParamMap],
    timestampCol: String = "timestamp"
  ): CrossValidatorModel
}

Distributed Training Optimization

Configuration options for optimizing ALS training on large-scale distributed datasets.

/**
 * ALS distributed training configuration
 */
object ALSDistributedTraining {
  /**
   * Optimize ALS for large-scale distributed training
   */
  def optimizeForScale(
    als: ALS,
    numUsers: Long,
    numItems: Long,
    numPartitions: Int
  ): ALS = {
    val userBlocks = math.min(numPartitions, math.sqrt(numUsers).toInt)
    val itemBlocks = math.min(numPartitions, math.sqrt(numItems).toInt)
    
    als
      .setNumUserBlocks(userBlocks)
      .setNumItemBlocks(itemBlocks)
      .setIntermediateRDDStorageLevel(StorageLevel.MEMORY_AND_DISK)
      .setFinalRDDStorageLevel(StorageLevel.MEMORY_AND_DISK)
      .setCheckpointInterval(10)
  }
  
  /**
   * Memory-optimized configuration for resource-constrained environments
   */
  def optimizeForMemory(als: ALS): ALS = {
    als
      .setIntermediateRDDStorageLevel(StorageLevel.DISK_ONLY)
      .setFinalRDDStorageLevel(StorageLevel.DISK_ONLY)
      .setCheckpointInterval(5)
  }
}

Types

// Recommendation system imports
import org.apache.spark.ml.recommendation._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.storage.StorageLevel

// ALS-specific types
import org.apache.spark.ml.recommendation.{ALS, ALSModel}

// Rating data structure
case class Rating(user: Int, item: Int, rating: Float)

// Parameter traits
import org.apache.spark.ml.param.shared._

// Evaluation imports
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}