or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md
tile.json

frequent-pattern-mining.mddocs/

Frequent Pattern Mining

MLlib provides algorithms for discovering frequent patterns and sequential patterns in transactional data. These algorithms are useful for market basket analysis, web usage mining, and discovering association rules.

FP-Growth Algorithm

Estimator

class FPGrowth(override val uid: String) extends Estimator[FPGrowthModel]
  with FPGrowthParams with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("fpgrowth"))

  def setItemsCol(value: String): FPGrowth
  def setMinSupport(value: Double): FPGrowth
  def setNumPartitions(value: Int): FPGrowth
  def setMinConfidence(value: Double): FPGrowth
  def setPredictionCol(value: String): FPGrowth

  override def fit(dataset: Dataset[_]): FPGrowthModel
  override def copy(extra: ParamMap): FPGrowth
}

Model

class FPGrowthModel(override val uid: String, private val parentModel: MLlibFPGrowthModel[String])
  extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {

  def freqItemsets: DataFrame
  def associationRules: DataFrame
  def numItemsets: Long

  def setItemsCol(value: String): FPGrowthModel
  def setMinConfidence(value: Double): FPGrowthModel
  def setPredictionCol(value: String): FPGrowthModel

  override def transform(dataset: Dataset[_]): DataFrame
  override def copy(extra: ParamMap): FPGrowthModel
  def write: MLWriter
}

Parameters

trait FPGrowthParams extends Params {
  final val itemsCol: Param[String]
  final val minSupport: DoubleParam
  final val numPartitions: IntParam
  final val minConfidence: DoubleParam
  final val predictionCol: Param[String]

  def getItemsCol: String
  def getMinSupport: Double
  def getNumPartitions: Int
  def getMinConfidence: Double
  def getPredictionCol: String
}

PrefixSpan Algorithm

Estimator

class PrefixSpan(override val uid: String) extends Estimator[PrefixSpanModel]
  with PrefixSpanParams with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("prefixspan"))

  def setSequenceCol(value: String): PrefixSpan
  def setMinSupport(value: Double): PrefixSpan
  def setMaxPatternLength(value: Int): PrefixSpan
  def setMaxLocalProjDBSize(value: Long): PrefixSpan

  override def fit(dataset: Dataset[_]): PrefixSpanModel
  override def copy(extra: ParamMap): PrefixSpan
}

Model

class PrefixSpanModel(override val uid: String, private val parentModel: MLlibPrefixSpanModel[String])
  extends Model[PrefixSpanModel] with PrefixSpanParams with MLWritable {

  def freqSequences: DataFrame

  def setSequenceCol(value: String): PrefixSpanModel

  override def transform(dataset: Dataset[_]): DataFrame
  override def copy(extra: ParamMap): PrefixSpanModel
  def write: MLWriter
}

Parameters

trait PrefixSpanParams extends Params {
  final val sequenceCol: Param[String]
  final val minSupport: DoubleParam
  final val maxPatternLength: IntParam
  final val maxLocalProjDBSize: LongParam

  def getSequenceCol: String
  def getMinSupport: Double
  def getMaxPatternLength: Int
  def getMaxLocalProjDBSize: Long
}

Usage Examples

Market Basket Analysis with FP-Growth

import org.apache.spark.ml.fpm.FPGrowth
import org.apache.spark.sql.functions._

// Sample transaction data - each row contains items in a transaction
val transactions = spark.createDataFrame(Seq(
  (1, Array("bread", "milk", "butter")),
  (2, Array("bread", "milk")),
  (3, Array("bread", "butter", "jam")),
  (4, Array("milk", "butter", "eggs")),
  (5, Array("bread", "milk", "butter", "eggs")),
  (6, Array("bread", "jam")),
  (7, Array("milk", "eggs")),
  (8, Array("bread", "butter")),
  (9, Array("milk", "butter", "jam")),
  (10, Array("bread", "milk", "jam"))
)).toDF("id", "items")

// Create FP-Growth model
val fpGrowth = new FPGrowth()
  .setItemsCol("items")
  .setMinSupport(0.3)        // Minimum support threshold (30%)
  .setMinConfidence(0.6)     // Minimum confidence for association rules
  .setNumPartitions(10)      // Number of partitions for parallel processing

// Fit the model
val model = fpGrowth.fit(transactions)

// Display frequent itemsets
println("Frequent Itemsets:")
model.freqItemsets.show(truncate = false)

// Display association rules
println("Association Rules:")
model.associationRules.show(truncate = false)

// Get model statistics
println(s"Number of frequent itemsets: ${model.numItemsets}")

// Make predictions (find associated items for given items)
val testData = spark.createDataFrame(Seq(
  (Array("bread", "milk"),),
  (Array("butter"),)
)).toDF("items")

val predictions = model.transform(testData)
println("Predictions (items that frequently appear together):")
predictions.show(truncate = false)

Advanced FP-Growth Analysis

// Analyze frequent itemsets by size
val itemsetsBySize = model.freqItemsets
  .withColumn("itemset_size", size(col("items")))
  .groupBy("itemset_size")
  .agg(
    count("*").alias("num_itemsets"),
    avg("freq").alias("avg_support"),
    max("freq").alias("max_support"),
    min("freq").alias("min_support")
  )
  .orderBy("itemset_size")

println("Frequent Itemsets Analysis:")
itemsetsBySize.show()

// Find itemsets containing specific items
val itemsetsWithBread = model.freqItemsets
  .filter(array_contains(col("items"), "bread"))
  .orderBy(desc("freq"))

println("Itemsets containing 'bread':")
itemsetsWithBread.show(truncate = false)

// Analyze association rules
val ruleAnalysis = model.associationRules
  .withColumn("antecedent_size", size(col("antecedent")))
  .withColumn("consequent_size", size(col("consequent")))
  .withColumn("lift", col("confidence") / 
    // Calculate consequent support (would need to join with itemsets)
    lit(0.5))  // Placeholder - in practice, calculate from frequent itemsets

println("Association Rules Analysis:")
ruleAnalysis
  .select("antecedent", "consequent", "confidence", "lift", "antecedent_size", "consequent_size")
  .show(truncate = false)

// Filter high-confidence, high-lift rules
val strongRules = model.associationRules
  .filter(col("confidence") > 0.8)
  .orderBy(desc("confidence"))

println("Strong Association Rules (confidence > 0.8):")
strongRules.show(truncate = false)

Sequential Pattern Mining with PrefixSpan

import org.apache.spark.ml.fpm.PrefixSpan

// Sequential data - each row contains a sequence of itemsets (transactions over time)
val sequences = spark.createDataFrame(Seq(
  (1, Array(Array("a"), Array("a", "b", "c"), Array("a", "c"), Array("d"), Array("c", "f"))),
  (2, Array(Array("a", "d"), Array("c"), Array("b", "c"), Array("a", "e"))),
  (3, Array(Array("e", "f"), Array("a", "b"), Array("d", "f"), Array("c"), Array("b"))),
  (4, Array(Array("e"), Array("g"), Array("a", "f"), Array("c"), Array("b"), Array("c")))
)).toDF("id", "sequence")

// Create PrefixSpan model
val prefixSpan = new PrefixSpan()
  .setSequenceCol("sequence")
  .setMinSupport(0.5)          // Minimum support threshold (50%)
  .setMaxPatternLength(5)      // Maximum length of patterns to find
  .setMaxLocalProjDBSize(32000000L)  // Memory optimization parameter

// Fit the model
val psModel = prefixSpan.fit(sequences)

// Display frequent sequential patterns
println("Frequent Sequential Patterns:")
psModel.freqSequences.show(truncate = false)

// Analyze patterns by length
val patternAnalysis = psModel.freqSequences
  .withColumn("pattern_length", size(col("sequence")))
  .groupBy("pattern_length")
  .agg(
    count("*").alias("num_patterns"),
    avg("freq").alias("avg_support"),
    max("freq").alias("max_support")
  )
  .orderBy("pattern_length")

println("Sequential Patterns Analysis:")
patternAnalysis.show()

// Find patterns containing specific items
val patternsWithA = psModel.freqSequences
  .filter(array_contains(flatten(col("sequence")), "a"))
  .orderBy(desc("freq"))

println("Patterns containing item 'a':")
patternsWithA.show(truncate = false)

Web Usage Pattern Mining

// Web clickstream data - user sessions with page visits
val clickstreams = spark.createDataFrame(Seq(
  (1, Array("home", "products", "cart", "checkout")),
  (2, Array("home", "about", "contact")),
  (3, Array("home", "products", "product_detail", "cart")),
  (4, Array("search", "products", "product_detail", "reviews", "cart", "checkout")),
  (5, Array("home", "blog", "products", "cart")),
  (6, Array("home", "products", "product_detail", "reviews")),
  (7, Array("search", "products", "compare", "cart", "checkout")),
  (8, Array("home", "promotions", "products", "cart"))
)).toDF("session_id", "page_sequence")

// Frequent page visit patterns
val webPatternMiner = new FPGrowth()
  .setItemsCol("page_sequence")
  .setMinSupport(0.25)
  .setMinConfidence(0.5)

val webPatternModel = webPatternMiner.fit(clickstreams)

println("Frequent Page Visit Patterns:")
webPatternModel.freqItemsets.show(truncate = false)

println("Page Navigation Rules:")
webPatternModel.associationRules.show(truncate = false)

// Sequential navigation patterns
val navPatternMiner = new PrefixSpan()
  .setSequenceCol("page_sequence")
  .setMinSupport(0.3)
  .setMaxPatternLength(4)

// Convert page sequences to the required format (array of arrays)
val navSequences = clickstreams
  .withColumn("nav_sequence", 
    transform(col("page_sequence"), page => array(page)))

val navPatternModel = navPatternMiner.fit(navSequences.select("session_id", "nav_sequence"))

println("Sequential Navigation Patterns:")
navPatternModel.freqSequences.show(truncate = false)

Product Recommendation Based on Frequent Patterns

// Use frequent patterns for product recommendations
def recommendProducts(frequentItemsets: DataFrame, 
                     currentBasket: Array[String], 
                     topK: Int = 5): DataFrame = {
  
  val currentItems = currentBasket.toSet
  
  // Find itemsets that contain all items in current basket
  val relevantItemsets = frequentItemsets
    .filter { row =>
      val itemset = row.getAs[Seq[String]]("items").toSet
      currentItems.subsetOf(itemset)
    }
    .withColumn("additional_items", 
      expr(s"filter(items, x -> !array_contains(array(${currentItems.map(s => s"'$s'").mkString(", ")}), x))"))
    .filter(size(col("additional_items")) > 0)
    .select(explode(col("additional_items")).alias("recommended_item"), col("freq"))
    .groupBy("recommended_item")
    .agg(sum("freq").alias("total_support"))
    .orderBy(desc("total_support"))
    .limit(topK)
  
  relevantItemsets
}

// Example usage
val currentBasket = Array("bread", "milk")
val recommendations = recommendProducts(model.freqItemsets, currentBasket, 3)

println(s"Recommendations for basket: ${currentBasket.mkString(", ")}")
recommendations.show()

Performance Optimization and Tuning

// Large-scale frequent pattern mining optimization
val largeFPGrowth = new FPGrowth()
  .setItemsCol("items")
  .setMinSupport(0.01)        // Lower support for more patterns (be careful with memory)
  .setMinConfidence(0.5)
  .setNumPartitions(200)      // Increase partitions for large datasets

// Monitor performance and memory usage
val startTime = System.currentTimeMillis()
val largeModel = largeFPGrowth.fit(largeTransactions)
val endTime = System.currentTimeMillis()

println(s"Training time: ${(endTime - startTime) / 1000.0} seconds")
println(s"Number of frequent itemsets: ${largeModel.numItemsets}")

// Cache frequent itemsets for multiple analyses
val cachedItemsets = largeModel.freqItemsets.cache()
cachedItemsets.count() // Trigger caching

// Analyze different confidence thresholds
val confidenceThresholds = Array(0.3, 0.5, 0.7, 0.9)
confidenceThresholds.foreach { threshold =>
  val rules = largeModel.associationRules.filter(col("confidence") >= threshold)
  println(s"Rules with confidence >= $threshold: ${rules.count()}")
}

// Sequential pattern mining optimization
val optimizedPrefixSpan = new PrefixSpan()
  .setSequenceCol("sequence")
  .setMinSupport(0.1)
  .setMaxPatternLength(3)        // Limit pattern length for performance
  .setMaxLocalProjDBSize(64000000L)  // Adjust based on available memory

// Batch processing for very large datasets
def processLargeDatasetInBatches(dataset: DataFrame, batchSize: Int = 10000): Unit = {
  val totalRows = dataset.count()
  val numBatches = (totalRows / batchSize).toInt + 1
  
  (0 until numBatches).foreach { batchNum =>
    val offset = batchNum * batchSize
    val batch = dataset.limit(batchSize).offset(offset)
    
    val batchModel = fpGrowth.fit(batch)
    println(s"Batch $batchNum: ${batchModel.numItemsets} frequent itemsets")
    
    // Process or save batch results
    batchModel.freqItemsets.write
      .mode("append")
      .option("path", s"frequent_patterns/batch_$batchNum")
      .save()
  }
}

Rule Quality Metrics and Filtering

import org.apache.spark.sql.functions._

// Calculate additional rule quality metrics
def enhanceAssociationRules(rules: DataFrame, itemsets: DataFrame): DataFrame = {
  
  // Create a map of itemset support values
  val supportMap = itemsets
    .select("items", "freq")
    .rdd
    .map(row => (row.getAs[Seq[String]]("items").toSet, row.getAs[Long]("freq")))
    .collectAsMap()
  
  val broadcastSupport = spark.sparkContext.broadcast(supportMap)
  
  rules.withColumn("lift", 
    col("confidence") / 
    // In practice, calculate consequent support from itemsets
    lit(0.5))  // Placeholder
  .withColumn("conviction", 
    (lit(1.0) - lit(0.5)) / (lit(1.0) - col("confidence")))  // Placeholder calculation
  .withColumn("leverage", 
    // support(antecedent ∪ consequent) - support(antecedent) × support(consequent)
    lit(0.0))  // Placeholder calculation
}

// Filter rules by multiple criteria
val qualityRules = model.associationRules
  .filter(col("confidence") > 0.6)
  .filter(col("lift") > 1.2)  // Lift > 1 indicates positive correlation
  .filter(size(col("consequent")) === 1)  // Single-item consequents only
  .orderBy(desc("confidence"), desc("lift"))

println("High-Quality Association Rules:")
qualityRules.show(truncate = false)

// Statistical significance testing (Chi-square test)
def filterSignificantRules(rules: DataFrame, alpha: Double = 0.05): DataFrame = {
  // Implementation would require chi-square test calculation
  // This is a placeholder for the concept
  rules.filter(col("confidence") > 0.5)  // Simplified filter
}

// Rule pruning - remove redundant rules
def pruneRedundantRules(rules: DataFrame): DataFrame = {
  // Remove rules where antecedent is a superset of another rule with same consequent
  // and higher or equal confidence
  rules  // Simplified - full implementation would require complex logic
}