MLlib provides algorithms for discovering frequent patterns and sequential patterns in transactional data. These algorithms are useful for market basket analysis, web usage mining, and discovering association rules.
class FPGrowth(override val uid: String) extends Estimator[FPGrowthModel]
with FPGrowthParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("fpgrowth"))
def setItemsCol(value: String): FPGrowth
def setMinSupport(value: Double): FPGrowth
def setNumPartitions(value: Int): FPGrowth
def setMinConfidence(value: Double): FPGrowth
def setPredictionCol(value: String): FPGrowth
override def fit(dataset: Dataset[_]): FPGrowthModel
override def copy(extra: ParamMap): FPGrowth
}class FPGrowthModel(override val uid: String, private val parentModel: MLlibFPGrowthModel[String])
extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {
def freqItemsets: DataFrame
def associationRules: DataFrame
def numItemsets: Long
def setItemsCol(value: String): FPGrowthModel
def setMinConfidence(value: Double): FPGrowthModel
def setPredictionCol(value: String): FPGrowthModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): FPGrowthModel
def write: MLWriter
}trait FPGrowthParams extends Params {
final val itemsCol: Param[String]
final val minSupport: DoubleParam
final val numPartitions: IntParam
final val minConfidence: DoubleParam
final val predictionCol: Param[String]
def getItemsCol: String
def getMinSupport: Double
def getNumPartitions: Int
def getMinConfidence: Double
def getPredictionCol: String
}class PrefixSpan(override val uid: String) extends Estimator[PrefixSpanModel]
with PrefixSpanParams with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("prefixspan"))
def setSequenceCol(value: String): PrefixSpan
def setMinSupport(value: Double): PrefixSpan
def setMaxPatternLength(value: Int): PrefixSpan
def setMaxLocalProjDBSize(value: Long): PrefixSpan
override def fit(dataset: Dataset[_]): PrefixSpanModel
override def copy(extra: ParamMap): PrefixSpan
}class PrefixSpanModel(override val uid: String, private val parentModel: MLlibPrefixSpanModel[String])
extends Model[PrefixSpanModel] with PrefixSpanParams with MLWritable {
def freqSequences: DataFrame
def setSequenceCol(value: String): PrefixSpanModel
override def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): PrefixSpanModel
def write: MLWriter
}trait PrefixSpanParams extends Params {
final val sequenceCol: Param[String]
final val minSupport: DoubleParam
final val maxPatternLength: IntParam
final val maxLocalProjDBSize: LongParam
def getSequenceCol: String
def getMinSupport: Double
def getMaxPatternLength: Int
def getMaxLocalProjDBSize: Long
}import org.apache.spark.ml.fpm.FPGrowth
import org.apache.spark.sql.functions._
// Sample transaction data - each row contains items in a transaction
val transactions = spark.createDataFrame(Seq(
(1, Array("bread", "milk", "butter")),
(2, Array("bread", "milk")),
(3, Array("bread", "butter", "jam")),
(4, Array("milk", "butter", "eggs")),
(5, Array("bread", "milk", "butter", "eggs")),
(6, Array("bread", "jam")),
(7, Array("milk", "eggs")),
(8, Array("bread", "butter")),
(9, Array("milk", "butter", "jam")),
(10, Array("bread", "milk", "jam"))
)).toDF("id", "items")
// Create FP-Growth model
val fpGrowth = new FPGrowth()
.setItemsCol("items")
.setMinSupport(0.3) // Minimum support threshold (30%)
.setMinConfidence(0.6) // Minimum confidence for association rules
.setNumPartitions(10) // Number of partitions for parallel processing
// Fit the model
val model = fpGrowth.fit(transactions)
// Display frequent itemsets
println("Frequent Itemsets:")
model.freqItemsets.show(truncate = false)
// Display association rules
println("Association Rules:")
model.associationRules.show(truncate = false)
// Get model statistics
println(s"Number of frequent itemsets: ${model.numItemsets}")
// Make predictions (find associated items for given items)
val testData = spark.createDataFrame(Seq(
(Array("bread", "milk"),),
(Array("butter"),)
)).toDF("items")
val predictions = model.transform(testData)
println("Predictions (items that frequently appear together):")
predictions.show(truncate = false)// Analyze frequent itemsets by size
val itemsetsBySize = model.freqItemsets
.withColumn("itemset_size", size(col("items")))
.groupBy("itemset_size")
.agg(
count("*").alias("num_itemsets"),
avg("freq").alias("avg_support"),
max("freq").alias("max_support"),
min("freq").alias("min_support")
)
.orderBy("itemset_size")
println("Frequent Itemsets Analysis:")
itemsetsBySize.show()
// Find itemsets containing specific items
val itemsetsWithBread = model.freqItemsets
.filter(array_contains(col("items"), "bread"))
.orderBy(desc("freq"))
println("Itemsets containing 'bread':")
itemsetsWithBread.show(truncate = false)
// Analyze association rules
val ruleAnalysis = model.associationRules
.withColumn("antecedent_size", size(col("antecedent")))
.withColumn("consequent_size", size(col("consequent")))
.withColumn("lift", col("confidence") /
// Calculate consequent support (would need to join with itemsets)
lit(0.5)) // Placeholder - in practice, calculate from frequent itemsets
println("Association Rules Analysis:")
ruleAnalysis
.select("antecedent", "consequent", "confidence", "lift", "antecedent_size", "consequent_size")
.show(truncate = false)
// Filter high-confidence, high-lift rules
val strongRules = model.associationRules
.filter(col("confidence") > 0.8)
.orderBy(desc("confidence"))
println("Strong Association Rules (confidence > 0.8):")
strongRules.show(truncate = false)import org.apache.spark.ml.fpm.PrefixSpan
// Sequential data - each row contains a sequence of itemsets (transactions over time)
val sequences = spark.createDataFrame(Seq(
(1, Array(Array("a"), Array("a", "b", "c"), Array("a", "c"), Array("d"), Array("c", "f"))),
(2, Array(Array("a", "d"), Array("c"), Array("b", "c"), Array("a", "e"))),
(3, Array(Array("e", "f"), Array("a", "b"), Array("d", "f"), Array("c"), Array("b"))),
(4, Array(Array("e"), Array("g"), Array("a", "f"), Array("c"), Array("b"), Array("c")))
)).toDF("id", "sequence")
// Create PrefixSpan model
val prefixSpan = new PrefixSpan()
.setSequenceCol("sequence")
.setMinSupport(0.5) // Minimum support threshold (50%)
.setMaxPatternLength(5) // Maximum length of patterns to find
.setMaxLocalProjDBSize(32000000L) // Memory optimization parameter
// Fit the model
val psModel = prefixSpan.fit(sequences)
// Display frequent sequential patterns
println("Frequent Sequential Patterns:")
psModel.freqSequences.show(truncate = false)
// Analyze patterns by length
val patternAnalysis = psModel.freqSequences
.withColumn("pattern_length", size(col("sequence")))
.groupBy("pattern_length")
.agg(
count("*").alias("num_patterns"),
avg("freq").alias("avg_support"),
max("freq").alias("max_support")
)
.orderBy("pattern_length")
println("Sequential Patterns Analysis:")
patternAnalysis.show()
// Find patterns containing specific items
val patternsWithA = psModel.freqSequences
.filter(array_contains(flatten(col("sequence")), "a"))
.orderBy(desc("freq"))
println("Patterns containing item 'a':")
patternsWithA.show(truncate = false)// Web clickstream data - user sessions with page visits
val clickstreams = spark.createDataFrame(Seq(
(1, Array("home", "products", "cart", "checkout")),
(2, Array("home", "about", "contact")),
(3, Array("home", "products", "product_detail", "cart")),
(4, Array("search", "products", "product_detail", "reviews", "cart", "checkout")),
(5, Array("home", "blog", "products", "cart")),
(6, Array("home", "products", "product_detail", "reviews")),
(7, Array("search", "products", "compare", "cart", "checkout")),
(8, Array("home", "promotions", "products", "cart"))
)).toDF("session_id", "page_sequence")
// Frequent page visit patterns
val webPatternMiner = new FPGrowth()
.setItemsCol("page_sequence")
.setMinSupport(0.25)
.setMinConfidence(0.5)
val webPatternModel = webPatternMiner.fit(clickstreams)
println("Frequent Page Visit Patterns:")
webPatternModel.freqItemsets.show(truncate = false)
println("Page Navigation Rules:")
webPatternModel.associationRules.show(truncate = false)
// Sequential navigation patterns
val navPatternMiner = new PrefixSpan()
.setSequenceCol("page_sequence")
.setMinSupport(0.3)
.setMaxPatternLength(4)
// Convert page sequences to the required format (array of arrays)
val navSequences = clickstreams
.withColumn("nav_sequence",
transform(col("page_sequence"), page => array(page)))
val navPatternModel = navPatternMiner.fit(navSequences.select("session_id", "nav_sequence"))
println("Sequential Navigation Patterns:")
navPatternModel.freqSequences.show(truncate = false)// Use frequent patterns for product recommendations
def recommendProducts(frequentItemsets: DataFrame,
currentBasket: Array[String],
topK: Int = 5): DataFrame = {
val currentItems = currentBasket.toSet
// Find itemsets that contain all items in current basket
val relevantItemsets = frequentItemsets
.filter { row =>
val itemset = row.getAs[Seq[String]]("items").toSet
currentItems.subsetOf(itemset)
}
.withColumn("additional_items",
expr(s"filter(items, x -> !array_contains(array(${currentItems.map(s => s"'$s'").mkString(", ")}), x))"))
.filter(size(col("additional_items")) > 0)
.select(explode(col("additional_items")).alias("recommended_item"), col("freq"))
.groupBy("recommended_item")
.agg(sum("freq").alias("total_support"))
.orderBy(desc("total_support"))
.limit(topK)
relevantItemsets
}
// Example usage
val currentBasket = Array("bread", "milk")
val recommendations = recommendProducts(model.freqItemsets, currentBasket, 3)
println(s"Recommendations for basket: ${currentBasket.mkString(", ")}")
recommendations.show()// Large-scale frequent pattern mining optimization
val largeFPGrowth = new FPGrowth()
.setItemsCol("items")
.setMinSupport(0.01) // Lower support for more patterns (be careful with memory)
.setMinConfidence(0.5)
.setNumPartitions(200) // Increase partitions for large datasets
// Monitor performance and memory usage
val startTime = System.currentTimeMillis()
val largeModel = largeFPGrowth.fit(largeTransactions)
val endTime = System.currentTimeMillis()
println(s"Training time: ${(endTime - startTime) / 1000.0} seconds")
println(s"Number of frequent itemsets: ${largeModel.numItemsets}")
// Cache frequent itemsets for multiple analyses
val cachedItemsets = largeModel.freqItemsets.cache()
cachedItemsets.count() // Trigger caching
// Analyze different confidence thresholds
val confidenceThresholds = Array(0.3, 0.5, 0.7, 0.9)
confidenceThresholds.foreach { threshold =>
val rules = largeModel.associationRules.filter(col("confidence") >= threshold)
println(s"Rules with confidence >= $threshold: ${rules.count()}")
}
// Sequential pattern mining optimization
val optimizedPrefixSpan = new PrefixSpan()
.setSequenceCol("sequence")
.setMinSupport(0.1)
.setMaxPatternLength(3) // Limit pattern length for performance
.setMaxLocalProjDBSize(64000000L) // Adjust based on available memory
// Batch processing for very large datasets
def processLargeDatasetInBatches(dataset: DataFrame, batchSize: Int = 10000): Unit = {
val totalRows = dataset.count()
val numBatches = (totalRows / batchSize).toInt + 1
(0 until numBatches).foreach { batchNum =>
val offset = batchNum * batchSize
val batch = dataset.limit(batchSize).offset(offset)
val batchModel = fpGrowth.fit(batch)
println(s"Batch $batchNum: ${batchModel.numItemsets} frequent itemsets")
// Process or save batch results
batchModel.freqItemsets.write
.mode("append")
.option("path", s"frequent_patterns/batch_$batchNum")
.save()
}
}import org.apache.spark.sql.functions._
// Calculate additional rule quality metrics
def enhanceAssociationRules(rules: DataFrame, itemsets: DataFrame): DataFrame = {
// Create a map of itemset support values
val supportMap = itemsets
.select("items", "freq")
.rdd
.map(row => (row.getAs[Seq[String]]("items").toSet, row.getAs[Long]("freq")))
.collectAsMap()
val broadcastSupport = spark.sparkContext.broadcast(supportMap)
rules.withColumn("lift",
col("confidence") /
// In practice, calculate consequent support from itemsets
lit(0.5)) // Placeholder
.withColumn("conviction",
(lit(1.0) - lit(0.5)) / (lit(1.0) - col("confidence"))) // Placeholder calculation
.withColumn("leverage",
// support(antecedent ∪ consequent) - support(antecedent) × support(consequent)
lit(0.0)) // Placeholder calculation
}
// Filter rules by multiple criteria
val qualityRules = model.associationRules
.filter(col("confidence") > 0.6)
.filter(col("lift") > 1.2) // Lift > 1 indicates positive correlation
.filter(size(col("consequent")) === 1) // Single-item consequents only
.orderBy(desc("confidence"), desc("lift"))
println("High-Quality Association Rules:")
qualityRules.show(truncate = false)
// Statistical significance testing (Chi-square test)
def filterSignificantRules(rules: DataFrame, alpha: Double = 0.05): DataFrame = {
// Implementation would require chi-square test calculation
// This is a placeholder for the concept
rules.filter(col("confidence") > 0.5) // Simplified filter
}
// Rule pruning - remove redundant rules
def pruneRedundantRules(rules: DataFrame): DataFrame = {
// Remove rules where antecedent is a superset of another rule with same consequent
// and higher or equal confidence
rules // Simplified - full implementation would require complex logic
}