0
# Frequent Pattern Mining
1
2
MLlib provides algorithms for discovering frequent patterns and sequential patterns in transactional data. These algorithms are useful for market basket analysis, web usage mining, and discovering association rules.
3
4
## FP-Growth Algorithm
5
6
### Estimator
7
8
```scala { .api }
9
class FPGrowth(override val uid: String) extends Estimator[FPGrowthModel]
10
with FPGrowthParams with DefaultParamsWritable {
11
12
def this() = this(Identifiable.randomUID("fpgrowth"))
13
14
def setItemsCol(value: String): FPGrowth
15
def setMinSupport(value: Double): FPGrowth
16
def setNumPartitions(value: Int): FPGrowth
17
def setMinConfidence(value: Double): FPGrowth
18
def setPredictionCol(value: String): FPGrowth
19
20
override def fit(dataset: Dataset[_]): FPGrowthModel
21
override def copy(extra: ParamMap): FPGrowth
22
}
23
```
24
25
### Model
26
27
```scala { .api }
28
class FPGrowthModel(override val uid: String, private val parentModel: MLlibFPGrowthModel[String])
29
extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {
30
31
def freqItemsets: DataFrame
32
def associationRules: DataFrame
33
def numItemsets: Long
34
35
def setItemsCol(value: String): FPGrowthModel
36
def setMinConfidence(value: Double): FPGrowthModel
37
def setPredictionCol(value: String): FPGrowthModel
38
39
override def transform(dataset: Dataset[_]): DataFrame
40
override def copy(extra: ParamMap): FPGrowthModel
41
def write: MLWriter
42
}
43
```
44
45
### Parameters
46
47
```scala { .api }
48
trait FPGrowthParams extends Params {
49
final val itemsCol: Param[String]
50
final val minSupport: DoubleParam
51
final val numPartitions: IntParam
52
final val minConfidence: DoubleParam
53
final val predictionCol: Param[String]
54
55
def getItemsCol: String
56
def getMinSupport: Double
57
def getNumPartitions: Int
58
def getMinConfidence: Double
59
def getPredictionCol: String
60
}
61
```
62
63
## PrefixSpan Algorithm
64
65
### Estimator
66
67
```scala { .api }
68
class PrefixSpan(override val uid: String) extends Estimator[PrefixSpanModel]
69
with PrefixSpanParams with DefaultParamsWritable {
70
71
def this() = this(Identifiable.randomUID("prefixspan"))
72
73
def setSequenceCol(value: String): PrefixSpan
74
def setMinSupport(value: Double): PrefixSpan
75
def setMaxPatternLength(value: Int): PrefixSpan
76
def setMaxLocalProjDBSize(value: Long): PrefixSpan
77
78
override def fit(dataset: Dataset[_]): PrefixSpanModel
79
override def copy(extra: ParamMap): PrefixSpan
80
}
81
```
82
83
### Model
84
85
```scala { .api }
86
class PrefixSpanModel(override val uid: String, private val parentModel: MLlibPrefixSpanModel[String])
87
extends Model[PrefixSpanModel] with PrefixSpanParams with MLWritable {
88
89
def freqSequences: DataFrame
90
91
def setSequenceCol(value: String): PrefixSpanModel
92
93
override def transform(dataset: Dataset[_]): DataFrame
94
override def copy(extra: ParamMap): PrefixSpanModel
95
def write: MLWriter
96
}
97
```
98
99
### Parameters
100
101
```scala { .api }
102
trait PrefixSpanParams extends Params {
103
final val sequenceCol: Param[String]
104
final val minSupport: DoubleParam
105
final val maxPatternLength: IntParam
106
final val maxLocalProjDBSize: LongParam
107
108
def getSequenceCol: String
109
def getMinSupport: Double
110
def getMaxPatternLength: Int
111
def getMaxLocalProjDBSize: Long
112
}
113
```
114
115
## Usage Examples
116
117
### Market Basket Analysis with FP-Growth
118
119
```scala
120
import org.apache.spark.ml.fpm.FPGrowth
121
import org.apache.spark.sql.functions._
122
123
// Sample transaction data - each row contains items in a transaction
124
val transactions = spark.createDataFrame(Seq(
125
(1, Array("bread", "milk", "butter")),
126
(2, Array("bread", "milk")),
127
(3, Array("bread", "butter", "jam")),
128
(4, Array("milk", "butter", "eggs")),
129
(5, Array("bread", "milk", "butter", "eggs")),
130
(6, Array("bread", "jam")),
131
(7, Array("milk", "eggs")),
132
(8, Array("bread", "butter")),
133
(9, Array("milk", "butter", "jam")),
134
(10, Array("bread", "milk", "jam"))
135
)).toDF("id", "items")
136
137
// Create FP-Growth model
138
val fpGrowth = new FPGrowth()
139
.setItemsCol("items")
140
.setMinSupport(0.3) // Minimum support threshold (30%)
141
.setMinConfidence(0.6) // Minimum confidence for association rules
142
.setNumPartitions(10) // Number of partitions for parallel processing
143
144
// Fit the model
145
val model = fpGrowth.fit(transactions)
146
147
// Display frequent itemsets
148
println("Frequent Itemsets:")
149
model.freqItemsets.show(truncate = false)
150
151
// Display association rules
152
println("Association Rules:")
153
model.associationRules.show(truncate = false)
154
155
// Get model statistics
156
println(s"Number of frequent itemsets: ${model.numItemsets}")
157
158
// Make predictions (find associated items for given items)
159
val testData = spark.createDataFrame(Seq(
160
(Array("bread", "milk"),),
161
(Array("butter"),)
162
)).toDF("items")
163
164
val predictions = model.transform(testData)
165
println("Predictions (items that frequently appear together):")
166
predictions.show(truncate = false)
167
```
168
169
### Advanced FP-Growth Analysis
170
171
```scala
172
// Analyze frequent itemsets by size
173
val itemsetsBySize = model.freqItemsets
174
.withColumn("itemset_size", size(col("items")))
175
.groupBy("itemset_size")
176
.agg(
177
count("*").alias("num_itemsets"),
178
avg("freq").alias("avg_support"),
179
max("freq").alias("max_support"),
180
min("freq").alias("min_support")
181
)
182
.orderBy("itemset_size")
183
184
println("Frequent Itemsets Analysis:")
185
itemsetsBySize.show()
186
187
// Find itemsets containing specific items
188
val itemsetsWithBread = model.freqItemsets
189
.filter(array_contains(col("items"), "bread"))
190
.orderBy(desc("freq"))
191
192
println("Itemsets containing 'bread':")
193
itemsetsWithBread.show(truncate = false)
194
195
// Analyze association rules
196
val ruleAnalysis = model.associationRules
197
.withColumn("antecedent_size", size(col("antecedent")))
198
.withColumn("consequent_size", size(col("consequent")))
199
.withColumn("lift", col("confidence") /
200
// Calculate consequent support (would need to join with itemsets)
201
lit(0.5)) // Placeholder - in practice, calculate from frequent itemsets
202
203
println("Association Rules Analysis:")
204
ruleAnalysis
205
.select("antecedent", "consequent", "confidence", "lift", "antecedent_size", "consequent_size")
206
.show(truncate = false)
207
208
// Filter high-confidence, high-lift rules
209
val strongRules = model.associationRules
210
.filter(col("confidence") > 0.8)
211
.orderBy(desc("confidence"))
212
213
println("Strong Association Rules (confidence > 0.8):")
214
strongRules.show(truncate = false)
215
```
216
217
### Sequential Pattern Mining with PrefixSpan
218
219
```scala
220
import org.apache.spark.ml.fpm.PrefixSpan
221
222
// Sequential data - each row contains a sequence of itemsets (transactions over time)
223
val sequences = spark.createDataFrame(Seq(
224
(1, Array(Array("a"), Array("a", "b", "c"), Array("a", "c"), Array("d"), Array("c", "f"))),
225
(2, Array(Array("a", "d"), Array("c"), Array("b", "c"), Array("a", "e"))),
226
(3, Array(Array("e", "f"), Array("a", "b"), Array("d", "f"), Array("c"), Array("b"))),
227
(4, Array(Array("e"), Array("g"), Array("a", "f"), Array("c"), Array("b"), Array("c")))
228
)).toDF("id", "sequence")
229
230
// Create PrefixSpan model
231
val prefixSpan = new PrefixSpan()
232
.setSequenceCol("sequence")
233
.setMinSupport(0.5) // Minimum support threshold (50%)
234
.setMaxPatternLength(5) // Maximum length of patterns to find
235
.setMaxLocalProjDBSize(32000000L) // Memory optimization parameter
236
237
// Fit the model
238
val psModel = prefixSpan.fit(sequences)
239
240
// Display frequent sequential patterns
241
println("Frequent Sequential Patterns:")
242
psModel.freqSequences.show(truncate = false)
243
244
// Analyze patterns by length
245
val patternAnalysis = psModel.freqSequences
246
.withColumn("pattern_length", size(col("sequence")))
247
.groupBy("pattern_length")
248
.agg(
249
count("*").alias("num_patterns"),
250
avg("freq").alias("avg_support"),
251
max("freq").alias("max_support")
252
)
253
.orderBy("pattern_length")
254
255
println("Sequential Patterns Analysis:")
256
patternAnalysis.show()
257
258
// Find patterns containing specific items
259
val patternsWithA = psModel.freqSequences
260
.filter(array_contains(flatten(col("sequence")), "a"))
261
.orderBy(desc("freq"))
262
263
println("Patterns containing item 'a':")
264
patternsWithA.show(truncate = false)
265
```
266
267
### Web Usage Pattern Mining
268
269
```scala
270
// Web clickstream data - user sessions with page visits
271
val clickstreams = spark.createDataFrame(Seq(
272
(1, Array("home", "products", "cart", "checkout")),
273
(2, Array("home", "about", "contact")),
274
(3, Array("home", "products", "product_detail", "cart")),
275
(4, Array("search", "products", "product_detail", "reviews", "cart", "checkout")),
276
(5, Array("home", "blog", "products", "cart")),
277
(6, Array("home", "products", "product_detail", "reviews")),
278
(7, Array("search", "products", "compare", "cart", "checkout")),
279
(8, Array("home", "promotions", "products", "cart"))
280
)).toDF("session_id", "page_sequence")
281
282
// Frequent page visit patterns
283
val webPatternMiner = new FPGrowth()
284
.setItemsCol("page_sequence")
285
.setMinSupport(0.25)
286
.setMinConfidence(0.5)
287
288
val webPatternModel = webPatternMiner.fit(clickstreams)
289
290
println("Frequent Page Visit Patterns:")
291
webPatternModel.freqItemsets.show(truncate = false)
292
293
println("Page Navigation Rules:")
294
webPatternModel.associationRules.show(truncate = false)
295
296
// Sequential navigation patterns
297
val navPatternMiner = new PrefixSpan()
298
.setSequenceCol("page_sequence")
299
.setMinSupport(0.3)
300
.setMaxPatternLength(4)
301
302
// Convert page sequences to the required format (array of arrays)
303
val navSequences = clickstreams
304
.withColumn("nav_sequence",
305
transform(col("page_sequence"), page => array(page)))
306
307
val navPatternModel = navPatternMiner.fit(navSequences.select("session_id", "nav_sequence"))
308
309
println("Sequential Navigation Patterns:")
310
navPatternModel.freqSequences.show(truncate = false)
311
```
312
313
### Product Recommendation Based on Frequent Patterns
314
315
```scala
316
// Use frequent patterns for product recommendations
317
def recommendProducts(frequentItemsets: DataFrame,
318
currentBasket: Array[String],
319
topK: Int = 5): DataFrame = {
320
321
val currentItems = currentBasket.toSet
322
323
// Find itemsets that contain all items in current basket
324
val relevantItemsets = frequentItemsets
325
.filter { row =>
326
val itemset = row.getAs[Seq[String]]("items").toSet
327
currentItems.subsetOf(itemset)
328
}
329
.withColumn("additional_items",
330
expr(s"filter(items, x -> !array_contains(array(${currentItems.map(s => s"'$s'").mkString(", ")}), x))"))
331
.filter(size(col("additional_items")) > 0)
332
.select(explode(col("additional_items")).alias("recommended_item"), col("freq"))
333
.groupBy("recommended_item")
334
.agg(sum("freq").alias("total_support"))
335
.orderBy(desc("total_support"))
336
.limit(topK)
337
338
relevantItemsets
339
}
340
341
// Example usage
342
val currentBasket = Array("bread", "milk")
343
val recommendations = recommendProducts(model.freqItemsets, currentBasket, 3)
344
345
println(s"Recommendations for basket: ${currentBasket.mkString(", ")}")
346
recommendations.show()
347
```
348
349
### Performance Optimization and Tuning
350
351
```scala
352
// Large-scale frequent pattern mining optimization
353
val largeFPGrowth = new FPGrowth()
354
.setItemsCol("items")
355
.setMinSupport(0.01) // Lower support for more patterns (be careful with memory)
356
.setMinConfidence(0.5)
357
.setNumPartitions(200) // Increase partitions for large datasets
358
359
// Monitor performance and memory usage
360
val startTime = System.currentTimeMillis()
361
val largeModel = largeFPGrowth.fit(largeTransactions)
362
val endTime = System.currentTimeMillis()
363
364
println(s"Training time: ${(endTime - startTime) / 1000.0} seconds")
365
println(s"Number of frequent itemsets: ${largeModel.numItemsets}")
366
367
// Cache frequent itemsets for multiple analyses
368
val cachedItemsets = largeModel.freqItemsets.cache()
369
cachedItemsets.count() // Trigger caching
370
371
// Analyze different confidence thresholds
372
val confidenceThresholds = Array(0.3, 0.5, 0.7, 0.9)
373
confidenceThresholds.foreach { threshold =>
374
val rules = largeModel.associationRules.filter(col("confidence") >= threshold)
375
println(s"Rules with confidence >= $threshold: ${rules.count()}")
376
}
377
378
// Sequential pattern mining optimization
379
val optimizedPrefixSpan = new PrefixSpan()
380
.setSequenceCol("sequence")
381
.setMinSupport(0.1)
382
.setMaxPatternLength(3) // Limit pattern length for performance
383
.setMaxLocalProjDBSize(64000000L) // Adjust based on available memory
384
385
// Batch processing for very large datasets
386
def processLargeDatasetInBatches(dataset: DataFrame, batchSize: Int = 10000): Unit = {
387
val totalRows = dataset.count()
388
val numBatches = (totalRows / batchSize).toInt + 1
389
390
(0 until numBatches).foreach { batchNum =>
391
val offset = batchNum * batchSize
392
val batch = dataset.limit(batchSize).offset(offset)
393
394
val batchModel = fpGrowth.fit(batch)
395
println(s"Batch $batchNum: ${batchModel.numItemsets} frequent itemsets")
396
397
// Process or save batch results
398
batchModel.freqItemsets.write
399
.mode("append")
400
.option("path", s"frequent_patterns/batch_$batchNum")
401
.save()
402
}
403
}
404
```
405
406
### Rule Quality Metrics and Filtering
407
408
```scala
409
import org.apache.spark.sql.functions._
410
411
// Calculate additional rule quality metrics
412
def enhanceAssociationRules(rules: DataFrame, itemsets: DataFrame): DataFrame = {
413
414
// Create a map of itemset support values
415
val supportMap = itemsets
416
.select("items", "freq")
417
.rdd
418
.map(row => (row.getAs[Seq[String]]("items").toSet, row.getAs[Long]("freq")))
419
.collectAsMap()
420
421
val broadcastSupport = spark.sparkContext.broadcast(supportMap)
422
423
rules.withColumn("lift",
424
col("confidence") /
425
// In practice, calculate consequent support from itemsets
426
lit(0.5)) // Placeholder
427
.withColumn("conviction",
428
(lit(1.0) - lit(0.5)) / (lit(1.0) - col("confidence"))) // Placeholder calculation
429
.withColumn("leverage",
430
// support(antecedent ∪ consequent) - support(antecedent) × support(consequent)
431
lit(0.0)) // Placeholder calculation
432
}
433
434
// Filter rules by multiple criteria
435
val qualityRules = model.associationRules
436
.filter(col("confidence") > 0.6)
437
.filter(col("lift") > 1.2) // Lift > 1 indicates positive correlation
438
.filter(size(col("consequent")) === 1) // Single-item consequents only
439
.orderBy(desc("confidence"), desc("lift"))
440
441
println("High-Quality Association Rules:")
442
qualityRules.show(truncate = false)
443
444
// Statistical significance testing (Chi-square test)
445
def filterSignificantRules(rules: DataFrame, alpha: Double = 0.05): DataFrame = {
446
// Implementation would require chi-square test calculation
447
// This is a placeholder for the concept
448
rules.filter(col("confidence") > 0.5) // Simplified filter
449
}
450
451
// Rule pruning - remove redundant rules
452
def pruneRedundantRules(rules: DataFrame): DataFrame = {
453
// Remove rules where antecedent is a superset of another rule with same consequent
454
// and higher or equal confidence
455
rules // Simplified - full implementation would require complex logic
456
}
457
```