0
# Frequent Pattern Mining
1
2
Pattern discovery algorithms for finding recurring patterns in transactional and sequential data. MLlib provides FP-Growth for frequent itemset mining and PrefixSpan for sequential pattern mining, enabling market basket analysis, recommendation systems, and behavioral pattern discovery.
3
4
## Capabilities
5
6
### FP-Growth Algorithm
7
8
Parallel FP-Growth implementation for mining frequent itemsets using the divide-and-conquer approach described in Li et al.'s PFP algorithm.
9
10
```scala { .api }
11
/**
12
* FPGrowth - parallel FP-growth algorithm for frequent itemset mining
13
* Discovers frequent itemsets in transactional data using FP-trees
14
*/
15
class FPGrowth extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable {
16
def setMinSupport(value: Double): this.type
17
def setNumPartitions(value: Int): this.type
18
def setMinConfidence(value: Double): this.type
19
def setItemsCol(value: String): this.type
20
def setPredictionCol(value: String): this.type
21
def fit(dataset: Dataset[_]): FPGrowthModel
22
}
23
24
/**
25
* FPGrowthModel - model fitted by FPGrowth containing frequent itemsets
26
* Provides methods for association rule generation and itemset-based predictions
27
*/
28
class FPGrowthModel extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {
29
val freqItemsets: DataFrame
30
def setMinConfidence(value: Double): this.type
31
def setItemsCol(value: String): this.type
32
def setPredictionCol(value: String): this.type
33
def associationRules: DataFrame
34
def transform(dataset: Dataset[_]): DataFrame
35
}
36
37
/**
38
* FPGrowthParams - parameters shared by FPGrowth and FPGrowthModel
39
* Defines configuration options for frequent pattern mining
40
*/
41
trait FPGrowthParams extends Params with HasPredictionCol {
42
val itemsCol: Param[String]
43
val minSupport: DoubleParam
44
val numPartitions: IntParam
45
val minConfidence: DoubleParam
46
def getItemsCol: String
47
def getMinSupport: Double
48
def getNumPartitions: Int
49
def getMinConfidence: Double
50
}
51
```
52
53
### PrefixSpan Algorithm
54
55
Sequential pattern mining algorithm for discovering frequent subsequences in sequential datasets.
56
57
```scala { .api }
58
/**
59
* PrefixSpan - parallel PrefixSpan algorithm for sequential pattern mining
60
* Discovers frequent sequential patterns using prefix-projected pattern growth
61
*/
62
class PrefixSpan extends Params {
63
val minSupport: DoubleParam
64
val maxPatternLength: IntParam
65
val maxLocalProjDBSize: LongParam
66
val sequenceCol: Param[String]
67
def setMinSupport(value: Double): this.type
68
def setMaxPatternLength(value: Int): this.type
69
def setMaxLocalProjDBSize(value: Long): this.type
70
def setSequenceCol(value: String): this.type
71
def getMinSupport: Double
72
def getMaxPatternLength: Int
73
def getMaxLocalProjDBSize: Long
74
def getSequenceCol: String
75
def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame
76
}
77
```
78
79
### RDD-based Legacy APIs
80
81
Lower-level RDD-based implementations for advanced users requiring fine-grained control.
82
83
```scala { .api }
84
/**
85
* FPGrowth (RDD-based) - legacy RDD-based FP-Growth implementation
86
* Provides direct RDD operations for frequent itemset mining
87
*/
88
class org.apache.spark.mllib.fpm.FPGrowth extends Logging with Serializable {
89
def setMinSupport(minSupport: Double): this.type
90
def setNumPartitions(numPartitions: Int): this.type
91
def getMinSupport: Double
92
def getNumPartitions: Int
93
def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item]
94
def run[Item, Basket <: JavaIterable[Item]](data: JavaRDD[Basket]): FPGrowthModel[Item]
95
}
96
97
/**
98
* FPGrowthModel (RDD-based) - model containing frequent itemsets
99
* Generated by RDD-based FPGrowth algorithm
100
*/
101
class org.apache.spark.mllib.fpm.FPGrowthModel[Item: ClassTag] extends Saveable with Serializable {
102
val freqItemsets: RDD[FreqItemset[Item]]
103
val itemSupport: Map[Item, Double]
104
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]]
105
def save(sc: SparkContext, path: String): Unit
106
}
107
108
/**
109
* PrefixSpan (RDD-based) - legacy RDD-based PrefixSpan implementation
110
* Provides direct RDD operations for sequential pattern mining
111
*/
112
class org.apache.spark.mllib.fpm.PrefixSpan extends Logging with Serializable {
113
def setMinSupport(minSupport: Double): this.type
114
def setMaxPatternLength(maxPatternLength: Int): this.type
115
def setMaxLocalProjDBSize(maxLocalProjDBSize: Long): this.type
116
def getMinSupport: Double
117
def getMaxPatternLength: Int
118
def getMaxLocalProjDBSize: Long
119
def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): PrefixSpanModel[Item]
120
def run[Item, Itemset <: jl.Iterable[Item], Sequence <: jl.Iterable[Itemset]](data: JavaRDD[Sequence]): PrefixSpanModel[Item]
121
}
122
123
/**
124
* PrefixSpanModel (RDD-based) - model containing frequent sequential patterns
125
* Generated by RDD-based PrefixSpan algorithm
126
*/
127
class org.apache.spark.mllib.fpm.PrefixSpanModel[Item] extends Saveable with Serializable {
128
val freqSequences: RDD[PrefixSpan.FreqSequence[Item]]
129
def save(sc: SparkContext, path: String): Unit
130
}
131
```
132
133
### Association Rules Mining
134
135
Generation of association rules from frequent itemsets with confidence and lift metrics.
136
137
```scala { .api }
138
/**
139
* AssociationRules - generates association rules from frequent itemsets
140
* Computes rules with single-item consequents and statistical measures
141
*/
142
class AssociationRules extends Logging with Serializable {
143
def setMinConfidence(minConfidence: Double): this.type
144
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]]
145
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]]
146
}
147
148
/**
149
* Rule - association rule with antecedent, consequent, and statistical measures
150
* Contains confidence, lift, and frequency statistics
151
*/
152
class AssociationRules.Rule[Item] extends Serializable {
153
val antecedent: Array[Item]
154
val consequent: Array[Item]
155
def confidence: Double
156
def lift: Option[Double]
157
def javaAntecedent: java.util.List[Item]
158
def javaConsequent: java.util.List[Item]
159
}
160
```
161
162
### Data Structures and Types
163
164
Core data structures used in frequent pattern mining algorithms.
165
166
```scala { .api }
167
/**
168
* FreqItemset - frequent itemset with items and frequency count
169
* Basic unit for itemset-based pattern mining results
170
*/
171
class FPGrowth.FreqItemset[Item] extends Serializable {
172
val items: Array[Item]
173
val freq: Long
174
def javaItems: java.util.List[Item]
175
}
176
177
/**
178
* FreqSequence - frequent sequence with pattern and frequency count
179
* Basic unit for sequential pattern mining results
180
*/
181
class PrefixSpan.FreqSequence[Item] extends Serializable {
182
val sequence: Array[Array[Item]]
183
val freq: Long
184
def javaSequence: java.util.List[java.util.List[Item]]
185
}
186
187
/**
188
* Parameter validation traits
189
* Provide range validation for algorithm parameters
190
*/
191
object ParamValidators {
192
def inRange(lowerBound: Double, upperBound: Double): Double => Boolean
193
def gtEq[T](lowerBound: T): T => Boolean
194
def gt[T](lowerBound: T): T => Boolean
195
}
196
```
197
198
## Usage Examples
199
200
### FP-Growth Frequent Itemset Mining
201
202
```scala
203
import org.apache.spark.ml.fpm.FPGrowth
204
import org.apache.spark.sql.Row
205
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
206
207
// Create sample transaction data
208
val transactions = Seq(
209
Row(Array("milk", "eggs", "bread", "cheese")),
210
Row(Array("eggs", "bread")),
211
Row(Array("milk", "bread")),
212
Row(Array("eggs", "bread", "butter")),
213
Row(Array("milk", "eggs", "bread", "butter"))
214
)
215
216
val schema = StructType(Array(StructField("items", ArrayType(StringType), true)))
217
val df = spark.createDataFrame(spark.sparkContext.parallelize(transactions), schema)
218
219
// Configure and fit FPGrowth model
220
val fpGrowth = new FPGrowth()
221
.setItemsCol("items")
222
.setMinSupport(0.5) // Minimum support threshold
223
.setMinConfidence(0.6) // Minimum confidence for association rules
224
225
val model = fpGrowth.fit(df)
226
227
// Display frequent itemsets
228
println("Frequent Itemsets:")
229
model.freqItemsets.show(false)
230
231
// Generate and display association rules
232
println("Association Rules:")
233
model.associationRules.show(false)
234
235
// Make predictions (recommend items for baskets)
236
val predictions = model.transform(df)
237
predictions.select("items", "prediction").show(false)
238
```
239
240
### Market Basket Analysis with Association Rules
241
242
```scala
243
import org.apache.spark.ml.fpm.FPGrowth
244
245
// Load retail transaction data
246
val rawTransactions = spark.read
247
.option("header", "true")
248
.csv("retail_transactions.csv")
249
250
// Transform to required format (Array[String] per transaction)
251
val transactions = rawTransactions
252
.groupBy("transaction_id")
253
.agg(collect_list("product_name").as("items"))
254
.select("items")
255
256
// Mine frequent itemsets with different support levels
257
val fpGrowth = new FPGrowth()
258
.setItemsCol("items")
259
.setMinSupport(0.01) // 1% minimum support
260
.setMinConfidence(0.5) // 50% confidence threshold
261
262
val model = fpGrowth.fit(transactions)
263
264
// Analyze frequent itemsets by size
265
val itemsetStats = model.freqItemsets
266
.withColumn("itemset_size", size(col("items")))
267
.groupBy("itemset_size")
268
.agg(count("*").as("count"), avg("freq").as("avg_frequency"))
269
270
itemsetStats.orderBy("itemset_size").show()
271
272
// Find high-confidence rules with lift > 1.0
273
val strongRules = model.associationRules
274
.filter(col("confidence") >= 0.7 && col("lift") >= 1.0)
275
.orderBy(desc("confidence"), desc("lift"))
276
277
println("Strong Association Rules:")
278
strongRules.show(20, false)
279
280
// Generate recommendations for new baskets
281
val newBaskets = Seq(
282
Row(Array("milk", "bread")),
283
Row(Array("eggs", "butter"))
284
).toDF("items")
285
286
val recommendations = model.transform(newBaskets)
287
recommendations.show(false)
288
```
289
290
### PrefixSpan Sequential Pattern Mining
291
292
```scala
293
import org.apache.spark.ml.fpm.PrefixSpan
294
import org.apache.spark.sql.Row
295
import org.apache.spark.sql.types._
296
297
// Create sample sequence data (web clickstreams)
298
val sequences = Seq(
299
Row(Array(Array("home"), Array("product_A"), Array("cart"), Array("checkout"))),
300
Row(Array(Array("home"), Array("search"), Array("product_B"), Array("product_A"), Array("cart"))),
301
Row(Array(Array("home"), Array("product_A"), Array("product_B"), Array("cart"), Array("checkout"))),
302
Row(Array(Array("search"), Array("product_A"), Array("cart")))
303
)
304
305
val schema = StructType(Array(
306
StructField("sequence", ArrayType(ArrayType(StringType)), true)
307
))
308
val df = spark.createDataFrame(spark.sparkContext.parallelize(sequences), schema)
309
310
// Configure PrefixSpan algorithm
311
val prefixSpan = new PrefixSpan()
312
.setMinSupport(0.5) // 50% minimum support
313
.setMaxPatternLength(5) // Maximum pattern length
314
.setSequenceCol("sequence")
315
316
// Find frequent sequential patterns
317
val frequentPatterns = prefixSpan.findFrequentSequentialPatterns(df)
318
319
println("Frequent Sequential Patterns:")
320
frequentPatterns.show(false)
321
322
// Analyze pattern lengths and frequencies
323
val patternStats = frequentPatterns
324
.withColumn("pattern_length", size(col("sequence")))
325
.groupBy("pattern_length")
326
.agg(count("*").as("pattern_count"), avg("freq").as("avg_frequency"))
327
328
patternStats.orderBy("pattern_length").show()
329
```
330
331
### Customer Journey Analysis
332
333
```scala
334
import org.apache.spark.ml.fpm.PrefixSpan
335
336
// Load customer journey data
337
val rawJourneys = spark.read
338
.option("header", "true")
339
.csv("customer_journeys.csv")
340
341
// Transform to sequence format
342
val journeys = rawJourneys
343
.groupBy("customer_id")
344
.agg(collect_list(
345
struct(col("timestamp"), col("page_type"), col("action"))
346
).as("events"))
347
.select(
348
col("customer_id"),
349
expr("transform(sort_array(events), e -> array(e.page_type, e.action))").as("sequence")
350
)
351
352
// Mine frequent customer journey patterns
353
val prefixSpan = new PrefixSpan()
354
.setMinSupport(0.05) // 5% minimum support
355
.setMaxPatternLength(8) // Track up to 8-step journeys
356
.setSequenceCol("sequence")
357
358
val journeyPatterns = prefixSpan.findFrequentSequentialPatterns(journeys)
359
360
// Identify conversion patterns (ending with purchase)
361
val conversionPatterns = journeyPatterns
362
.filter(expr("sequence[size(sequence)-1][0] = 'checkout' AND sequence[size(sequence)-1][1] = 'purchase'"))
363
.orderBy(desc("freq"))
364
365
println("Frequent Conversion Patterns:")
366
conversionPatterns.show(10, false)
367
368
// Find abandonment patterns (ending with cart but no purchase)
369
val abandonmentPatterns = journeyPatterns
370
.filter(expr("array_contains(flatten(sequence), 'cart') AND NOT array_contains(flatten(sequence), 'purchase')"))
371
.orderBy(desc("freq"))
372
373
println("Common Abandonment Patterns:")
374
abandonmentPatterns.show(10, false)
375
```
376
377
### RDD-based Advanced Mining
378
379
```scala
380
import org.apache.spark.mllib.fpm.{FPGrowth => MLlibFPGrowth, PrefixSpan => MLlibPrefixSpan}
381
import org.apache.spark.rdd.RDD
382
383
// RDD-based FP-Growth for large-scale processing
384
val transactionRDD: RDD[Array[String]] = spark.sparkContext.parallelize(Seq(
385
Array("a", "b", "c"),
386
Array("a", "c", "d"),
387
Array("b", "e"),
388
Array("a", "b", "c", "e")
389
))
390
391
val mllibFP = new MLlibFPGrowth()
392
.setMinSupport(0.5)
393
.setNumPartitions(4)
394
395
val fpModel = mllibFP.run(transactionRDD)
396
397
// Extract frequent itemsets
398
val frequentItemsets = fpModel.freqItemsets.collect()
399
frequentItemsets.foreach(itemset =>
400
println(s"${itemset.items.mkString(",")} : ${itemset.freq}")
401
)
402
403
// Generate association rules
404
val rules = fpModel.generateAssociationRules(0.8).collect()
405
rules.foreach(rule =>
406
println(s"${rule.antecedent.mkString(",")} => ${rule.consequent.mkString(",")} (conf: ${rule.confidence})")
407
)
408
409
// RDD-based PrefixSpan for sequential data
410
val sequenceRDD: RDD[Array[Array[String]]] = spark.sparkContext.parallelize(Seq(
411
Array(Array("a"), Array("a", "b", "c"), Array("a", "c")),
412
Array(Array("a", "d"), Array("c"), Array("b", "c")),
413
Array(Array("a"), Array("b"), Array("c"))
414
))
415
416
val mllibPS = new MLlibPrefixSpan()
417
.setMinSupport(0.5)
418
.setMaxPatternLength(4)
419
420
val psModel = mllibPS.run(sequenceRDD)
421
422
// Extract frequent sequences
423
val frequentSequences = psModel.freqSequences.collect()
424
frequentSequences.foreach(sequence =>
425
println(s"${sequence.sequence.map(_.mkString(",")).mkString(" -> ")} : ${sequence.freq}")
426
)
427
```
428
429
### Performance Optimization
430
431
```scala
432
// Optimized FP-Growth configuration for large datasets
433
val largeFPGrowth = new FPGrowth()
434
.setItemsCol("items")
435
.setMinSupport(0.001) // Lower support for rare patterns
436
.setNumPartitions(200) // Increase partitions for scalability
437
.setMinConfidence(0.1) // Lower confidence threshold
438
439
// Enable checkpointing for iterative algorithms
440
spark.sparkContext.setCheckpointDir("hdfs://path/to/checkpoint")
441
442
// Cache intermediate results
443
val cachedTransactions = transactions.cache()
444
val model = largeFPGrowth.fit(cachedTransactions)
445
446
// Optimized PrefixSpan for long sequences
447
val largePrefixSpan = new PrefixSpan()
448
.setMinSupport(0.001)
449
.setMaxPatternLength(15) // Longer patterns
450
.setMaxLocalProjDBSize(50000000L) // Larger local processing threshold
451
.setSequenceCol("sequence")
452
453
val patterns = largePrefixSpan.findFrequentSequentialPatterns(cachedSequences)
454
```
455
456
## Algorithm Parameters
457
458
### FP-Growth Configuration
459
- **minSupport**: Minimum support threshold (0.0-1.0), default 0.3
460
- **minConfidence**: Minimum confidence for association rules (0.0-1.0), default 0.8
461
- **numPartitions**: Number of partitions for parallel processing, default uses input partitions
462
- **itemsCol**: Input column name containing item arrays, default "items"
463
- **predictionCol**: Output column for predictions, default "prediction"
464
465
### PrefixSpan Configuration
466
- **minSupport**: Minimum support threshold (≥0.0), default 0.1
467
- **maxPatternLength**: Maximum pattern length (>0), default 10
468
- **maxLocalProjDBSize**: Local processing threshold, default 32000000
469
- **sequenceCol**: Input column containing sequence arrays, default "sequence"
470
471
### Performance Considerations
472
- **Memory Usage**: FP-Growth builds FP-trees in memory; ensure sufficient memory per partition
473
- **Partitioning**: Proper partitioning critical for performance; consider data skew
474
- **Caching**: Cache input datasets when running multiple algorithms or parameter tuning
475
- **Checkpointing**: Use checkpointing for large datasets to handle iterative processing
476
- **Local Processing**: PrefixSpan switches to local processing when projected databases are small
477
478
### Applications
479
- **Market Basket Analysis**: Discover purchasing patterns and product associations
480
- **Recommendation Systems**: Generate item recommendations based on frequent patterns
481
- **Web Usage Mining**: Analyze clickstream patterns and user navigation sequences
482
- **Bioinformatics**: Find frequent subsequences in DNA/protein sequences
483
- **Network Security**: Detect recurring attack patterns in security logs
484
- **Supply Chain**: Optimize inventory and identify purchasing dependencies