Collaborative filtering algorithms for building recommendation engines, including matrix factorization techniques optimized for large-scale user-item interaction datasets.
Matrix factorization algorithm using alternating least squares optimization for collaborative filtering recommendation systems.
/**
* Alternating Least Squares collaborative filtering
*/
class ALS extends Estimator[ALSModel] with ALSParams {
def setRank(value: Int): this.type
def setMaxIter(value: Int): this.type
def setRegParam(value: Double): this.type
def setImplicitPrefs(value: Boolean): this.type
def setAlpha(value: Double): this.type
def setUserCol(value: String): this.type
def setItemCol(value: String): this.type
def setRatingCol(value: String): this.type
def setPredictionCol(value: String): this.type
def setNonnegative(value: Boolean): this.type
def setCheckpointInterval(value: Int): this.type
def setSeed(value: Long): this.type
def setNumUserBlocks(value: Int): this.type
def setNumItemBlocks(value: Int): this.type
def setIntermediateRDDStorageLevel(value: StorageLevel): this.type
def setFinalRDDStorageLevel(value: StorageLevel): this.type
def setColdStartStrategy(value: String): this.type
def setBlockSize(value: Int): this.type
}
class ALSModel extends Model[ALSModel] with ALSParams {
def rank: Int
def userFactors: DataFrame
def itemFactors: DataFrame
def recommendForAllUsers(numItems: Int): DataFrame
def recommendForAllItems(numUsers: Int): DataFrame
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame
}Usage Example:
import org.apache.spark.ml.recommendation.ALS
// Prepare training data from ratings
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
.setColdStartStrategy("drop")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
userRecs.show()
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
movieRecs.show()Extended functionality for handling various recommendation scenarios and optimization strategies.
/**
* ALS model with advanced recommendation capabilities
*/
class ALSModel extends Model[ALSModel] with ALSParams {
/**
* Recommend items for a specific user
*/
def recommendForUsers(users: Dataset[_], numItems: Int): DataFrame
/**
* Recommend users for specific items
*/
def recommendForItems(items: Dataset[_], numUsers: Int): DataFrame
/**
* Transform dataset with predictions
*/
override def transform(dataset: Dataset[_]): DataFrame
/**
* Access to latent factor matrices
*/
def userFactors: DataFrame
def itemFactors: DataFrame
/**
* Model persistence
*/
override def write: MLWriter
}
/**
* Rating data structure for recommendations
*/
case class Rating(user: Int, item: Int, rating: Float) extends Product with SerializableSpecialized configurations for handling implicit feedback data where ratings represent confidence rather than explicit preferences.
/**
* ALS configuration for implicit feedback scenarios
*/
object ALSImplicit {
/**
* Create ALS instance optimized for implicit feedback
*/
def create(): ALS = {
new ALS()
.setImplicitPrefs(true)
.setAlpha(1.0) // Confidence scaling parameter
.setNonnegative(true) // Ensure non-negative factors
}
/**
* Preprocess implicit feedback data
*/
def preprocessImplicitData(
interactions: DataFrame,
userCol: String = "userId",
itemCol: String = "itemId",
countCol: String = "count"
): DataFrame
}Methods for handling new users or items that weren't present during training.
/**
* Cold start handling strategies
*/
object ColdStartStrategy {
val NaN = "nan" // Return NaN for cold start cases
val DROP = "drop" // Drop cold start cases from predictions
}
/**
* Cold start recommendation utilities
*/
object ColdStartRecommender {
/**
* Generate recommendations for new users based on popular items
*/
def recommendForNewUsers(
model: ALSModel,
popularItems: DataFrame,
numRecommendations: Int
): DataFrame
/**
* Generate recommendations for new items based on similar items
*/
def recommendForNewItems(
model: ALSModel,
itemFeatures: DataFrame,
numRecommendations: Int
): DataFrame
}Specialized evaluation metrics for recommendation systems.
/**
* Recommendation-specific evaluation metrics
*/
class RecommendationEvaluator extends Evaluator {
def setUserCol(value: String): this.type
def setItemCol(value: String): this.type
def setRatingCol(value: String): this.type
def setPredictionCol(value: String): this.type
def setMetricName(value: String): this.type
def setK(value: Int): this.type
def setColdStartStrategy(value: String): this.type
}
/**
* Ranking-based evaluation for recommendations
*/
object RecommendationMetrics {
/**
* Calculate Mean Average Precision at K
*/
def meanAveragePrecisionAtK(
predictions: DataFrame,
k: Int,
userCol: String = "userId",
itemCol: String = "itemId",
ratingCol: String = "rating"
): Double
/**
* Calculate Normalized Discounted Cumulative Gain at K
*/
def ndcgAtK(
predictions: DataFrame,
k: Int,
userCol: String = "userId",
itemCol: String = "itemId",
ratingCol: String = "rating"
): Double
/**
* Calculate precision and recall at K
*/
def precisionRecallAtK(
predictions: DataFrame,
k: Int,
userCol: String = "userId",
itemCol: String = "itemId",
ratingCol: String = "rating"
): (Double, Double)
}Specialized parameter tuning strategies for collaborative filtering models.
/**
* ALS hyperparameter tuning utilities
*/
object ALSTuning {
/**
* Create parameter grid for ALS tuning
*/
def createParamGrid(
als: ALS,
ranks: Array[Int] = Array(10, 50, 100),
regParams: Array[Double] = Array(0.01, 0.1, 1.0),
alphas: Array[Double] = Array(1.0, 10.0, 40.0)
): Array[ParamMap]
/**
* Cross-validation for recommendation systems with temporal splitting
*/
def temporalCrossValidation(
als: ALS,
data: DataFrame,
evaluator: Evaluator,
paramGrid: Array[ParamMap],
timestampCol: String = "timestamp"
): CrossValidatorModel
}Configuration options for optimizing ALS training on large-scale distributed datasets.
/**
* ALS distributed training configuration
*/
object ALSDistributedTraining {
/**
* Optimize ALS for large-scale distributed training
*/
def optimizeForScale(
als: ALS,
numUsers: Long,
numItems: Long,
numPartitions: Int
): ALS = {
val userBlocks = math.min(numPartitions, math.sqrt(numUsers).toInt)
val itemBlocks = math.min(numPartitions, math.sqrt(numItems).toInt)
als
.setNumUserBlocks(userBlocks)
.setNumItemBlocks(itemBlocks)
.setIntermediateRDDStorageLevel(StorageLevel.MEMORY_AND_DISK)
.setFinalRDDStorageLevel(StorageLevel.MEMORY_AND_DISK)
.setCheckpointInterval(10)
}
/**
* Memory-optimized configuration for resource-constrained environments
*/
def optimizeForMemory(als: ALS): ALS = {
als
.setIntermediateRDDStorageLevel(StorageLevel.DISK_ONLY)
.setFinalRDDStorageLevel(StorageLevel.DISK_ONLY)
.setCheckpointInterval(5)
}
}// Recommendation system imports
import org.apache.spark.ml.recommendation._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.storage.StorageLevel
// ALS-specific types
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
// Rating data structure
case class Rating(user: Int, item: Int, rating: Float)
// Parameter traits
import org.apache.spark.ml.param.shared._
// Evaluation imports
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}