or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdcore-framework.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdrdd-api.mdrecommendation.mdregression.md

frequent-pattern-mining.mddocs/

0

# Frequent Pattern Mining

1

2

MLlib provides algorithms for discovering frequent patterns and sequential patterns in transactional data. These algorithms are useful for market basket analysis, web usage mining, and discovering association rules.

3

4

## FP-Growth Algorithm

5

6

### Estimator

7

8

```scala { .api }

9

class FPGrowth(override val uid: String) extends Estimator[FPGrowthModel]

10

with FPGrowthParams with DefaultParamsWritable {

11

12

def this() = this(Identifiable.randomUID("fpgrowth"))

13

14

def setItemsCol(value: String): FPGrowth

15

def setMinSupport(value: Double): FPGrowth

16

def setNumPartitions(value: Int): FPGrowth

17

def setMinConfidence(value: Double): FPGrowth

18

def setPredictionCol(value: String): FPGrowth

19

20

override def fit(dataset: Dataset[_]): FPGrowthModel

21

override def copy(extra: ParamMap): FPGrowth

22

}

23

```

24

25

### Model

26

27

```scala { .api }

28

class FPGrowthModel(override val uid: String, private val parentModel: MLlibFPGrowthModel[String])

29

extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {

30

31

def freqItemsets: DataFrame

32

def associationRules: DataFrame

33

def numItemsets: Long

34

35

def setItemsCol(value: String): FPGrowthModel

36

def setMinConfidence(value: Double): FPGrowthModel

37

def setPredictionCol(value: String): FPGrowthModel

38

39

override def transform(dataset: Dataset[_]): DataFrame

40

override def copy(extra: ParamMap): FPGrowthModel

41

def write: MLWriter

42

}

43

```

44

45

### Parameters

46

47

```scala { .api }

48

trait FPGrowthParams extends Params {

49

final val itemsCol: Param[String]

50

final val minSupport: DoubleParam

51

final val numPartitions: IntParam

52

final val minConfidence: DoubleParam

53

final val predictionCol: Param[String]

54

55

def getItemsCol: String

56

def getMinSupport: Double

57

def getNumPartitions: Int

58

def getMinConfidence: Double

59

def getPredictionCol: String

60

}

61

```

62

63

## PrefixSpan Algorithm

64

65

### Estimator

66

67

```scala { .api }

68

class PrefixSpan(override val uid: String) extends Estimator[PrefixSpanModel]

69

with PrefixSpanParams with DefaultParamsWritable {

70

71

def this() = this(Identifiable.randomUID("prefixspan"))

72

73

def setSequenceCol(value: String): PrefixSpan

74

def setMinSupport(value: Double): PrefixSpan

75

def setMaxPatternLength(value: Int): PrefixSpan

76

def setMaxLocalProjDBSize(value: Long): PrefixSpan

77

78

override def fit(dataset: Dataset[_]): PrefixSpanModel

79

override def copy(extra: ParamMap): PrefixSpan

80

}

81

```

82

83

### Model

84

85

```scala { .api }

86

class PrefixSpanModel(override val uid: String, private val parentModel: MLlibPrefixSpanModel[String])

87

extends Model[PrefixSpanModel] with PrefixSpanParams with MLWritable {

88

89

def freqSequences: DataFrame

90

91

def setSequenceCol(value: String): PrefixSpanModel

92

93

override def transform(dataset: Dataset[_]): DataFrame

94

override def copy(extra: ParamMap): PrefixSpanModel

95

def write: MLWriter

96

}

97

```

98

99

### Parameters

100

101

```scala { .api }

102

trait PrefixSpanParams extends Params {

103

final val sequenceCol: Param[String]

104

final val minSupport: DoubleParam

105

final val maxPatternLength: IntParam

106

final val maxLocalProjDBSize: LongParam

107

108

def getSequenceCol: String

109

def getMinSupport: Double

110

def getMaxPatternLength: Int

111

def getMaxLocalProjDBSize: Long

112

}

113

```

114

115

## Usage Examples

116

117

### Market Basket Analysis with FP-Growth

118

119

```scala

120

import org.apache.spark.ml.fpm.FPGrowth

121

import org.apache.spark.sql.functions._

122

123

// Sample transaction data - each row contains items in a transaction

124

val transactions = spark.createDataFrame(Seq(

125

(1, Array("bread", "milk", "butter")),

126

(2, Array("bread", "milk")),

127

(3, Array("bread", "butter", "jam")),

128

(4, Array("milk", "butter", "eggs")),

129

(5, Array("bread", "milk", "butter", "eggs")),

130

(6, Array("bread", "jam")),

131

(7, Array("milk", "eggs")),

132

(8, Array("bread", "butter")),

133

(9, Array("milk", "butter", "jam")),

134

(10, Array("bread", "milk", "jam"))

135

)).toDF("id", "items")

136

137

// Create FP-Growth model

138

val fpGrowth = new FPGrowth()

139

.setItemsCol("items")

140

.setMinSupport(0.3) // Minimum support threshold (30%)

141

.setMinConfidence(0.6) // Minimum confidence for association rules

142

.setNumPartitions(10) // Number of partitions for parallel processing

143

144

// Fit the model

145

val model = fpGrowth.fit(transactions)

146

147

// Display frequent itemsets

148

println("Frequent Itemsets:")

149

model.freqItemsets.show(truncate = false)

150

151

// Display association rules

152

println("Association Rules:")

153

model.associationRules.show(truncate = false)

154

155

// Get model statistics

156

println(s"Number of frequent itemsets: ${model.numItemsets}")

157

158

// Make predictions (find associated items for given items)

159

val testData = spark.createDataFrame(Seq(

160

(Array("bread", "milk"),),

161

(Array("butter"),)

162

)).toDF("items")

163

164

val predictions = model.transform(testData)

165

println("Predictions (items that frequently appear together):")

166

predictions.show(truncate = false)

167

```

168

169

### Advanced FP-Growth Analysis

170

171

```scala

172

// Analyze frequent itemsets by size

173

val itemsetsBySize = model.freqItemsets

174

.withColumn("itemset_size", size(col("items")))

175

.groupBy("itemset_size")

176

.agg(

177

count("*").alias("num_itemsets"),

178

avg("freq").alias("avg_support"),

179

max("freq").alias("max_support"),

180

min("freq").alias("min_support")

181

)

182

.orderBy("itemset_size")

183

184

println("Frequent Itemsets Analysis:")

185

itemsetsBySize.show()

186

187

// Find itemsets containing specific items

188

val itemsetsWithBread = model.freqItemsets

189

.filter(array_contains(col("items"), "bread"))

190

.orderBy(desc("freq"))

191

192

println("Itemsets containing 'bread':")

193

itemsetsWithBread.show(truncate = false)

194

195

// Analyze association rules

196

val ruleAnalysis = model.associationRules

197

.withColumn("antecedent_size", size(col("antecedent")))

198

.withColumn("consequent_size", size(col("consequent")))

199

.withColumn("lift", col("confidence") /

200

// Calculate consequent support (would need to join with itemsets)

201

lit(0.5)) // Placeholder - in practice, calculate from frequent itemsets

202

203

println("Association Rules Analysis:")

204

ruleAnalysis

205

.select("antecedent", "consequent", "confidence", "lift", "antecedent_size", "consequent_size")

206

.show(truncate = false)

207

208

// Filter high-confidence, high-lift rules

209

val strongRules = model.associationRules

210

.filter(col("confidence") > 0.8)

211

.orderBy(desc("confidence"))

212

213

println("Strong Association Rules (confidence > 0.8):")

214

strongRules.show(truncate = false)

215

```

216

217

### Sequential Pattern Mining with PrefixSpan

218

219

```scala

220

import org.apache.spark.ml.fpm.PrefixSpan

221

222

// Sequential data - each row contains a sequence of itemsets (transactions over time)

223

val sequences = spark.createDataFrame(Seq(

224

(1, Array(Array("a"), Array("a", "b", "c"), Array("a", "c"), Array("d"), Array("c", "f"))),

225

(2, Array(Array("a", "d"), Array("c"), Array("b", "c"), Array("a", "e"))),

226

(3, Array(Array("e", "f"), Array("a", "b"), Array("d", "f"), Array("c"), Array("b"))),

227

(4, Array(Array("e"), Array("g"), Array("a", "f"), Array("c"), Array("b"), Array("c")))

228

)).toDF("id", "sequence")

229

230

// Create PrefixSpan model

231

val prefixSpan = new PrefixSpan()

232

.setSequenceCol("sequence")

233

.setMinSupport(0.5) // Minimum support threshold (50%)

234

.setMaxPatternLength(5) // Maximum length of patterns to find

235

.setMaxLocalProjDBSize(32000000L) // Memory optimization parameter

236

237

// Fit the model

238

val psModel = prefixSpan.fit(sequences)

239

240

// Display frequent sequential patterns

241

println("Frequent Sequential Patterns:")

242

psModel.freqSequences.show(truncate = false)

243

244

// Analyze patterns by length

245

val patternAnalysis = psModel.freqSequences

246

.withColumn("pattern_length", size(col("sequence")))

247

.groupBy("pattern_length")

248

.agg(

249

count("*").alias("num_patterns"),

250

avg("freq").alias("avg_support"),

251

max("freq").alias("max_support")

252

)

253

.orderBy("pattern_length")

254

255

println("Sequential Patterns Analysis:")

256

patternAnalysis.show()

257

258

// Find patterns containing specific items

259

val patternsWithA = psModel.freqSequences

260

.filter(array_contains(flatten(col("sequence")), "a"))

261

.orderBy(desc("freq"))

262

263

println("Patterns containing item 'a':")

264

patternsWithA.show(truncate = false)

265

```

266

267

### Web Usage Pattern Mining

268

269

```scala

270

// Web clickstream data - user sessions with page visits

271

val clickstreams = spark.createDataFrame(Seq(

272

(1, Array("home", "products", "cart", "checkout")),

273

(2, Array("home", "about", "contact")),

274

(3, Array("home", "products", "product_detail", "cart")),

275

(4, Array("search", "products", "product_detail", "reviews", "cart", "checkout")),

276

(5, Array("home", "blog", "products", "cart")),

277

(6, Array("home", "products", "product_detail", "reviews")),

278

(7, Array("search", "products", "compare", "cart", "checkout")),

279

(8, Array("home", "promotions", "products", "cart"))

280

)).toDF("session_id", "page_sequence")

281

282

// Frequent page visit patterns

283

val webPatternMiner = new FPGrowth()

284

.setItemsCol("page_sequence")

285

.setMinSupport(0.25)

286

.setMinConfidence(0.5)

287

288

val webPatternModel = webPatternMiner.fit(clickstreams)

289

290

println("Frequent Page Visit Patterns:")

291

webPatternModel.freqItemsets.show(truncate = false)

292

293

println("Page Navigation Rules:")

294

webPatternModel.associationRules.show(truncate = false)

295

296

// Sequential navigation patterns

297

val navPatternMiner = new PrefixSpan()

298

.setSequenceCol("page_sequence")

299

.setMinSupport(0.3)

300

.setMaxPatternLength(4)

301

302

// Convert page sequences to the required format (array of arrays)

303

val navSequences = clickstreams

304

.withColumn("nav_sequence",

305

transform(col("page_sequence"), page => array(page)))

306

307

val navPatternModel = navPatternMiner.fit(navSequences.select("session_id", "nav_sequence"))

308

309

println("Sequential Navigation Patterns:")

310

navPatternModel.freqSequences.show(truncate = false)

311

```

312

313

### Product Recommendation Based on Frequent Patterns

314

315

```scala

316

// Use frequent patterns for product recommendations

317

def recommendProducts(frequentItemsets: DataFrame,

318

currentBasket: Array[String],

319

topK: Int = 5): DataFrame = {

320

321

val currentItems = currentBasket.toSet

322

323

// Find itemsets that contain all items in current basket

324

val relevantItemsets = frequentItemsets

325

.filter { row =>

326

val itemset = row.getAs[Seq[String]]("items").toSet

327

currentItems.subsetOf(itemset)

328

}

329

.withColumn("additional_items",

330

expr(s"filter(items, x -> !array_contains(array(${currentItems.map(s => s"'$s'").mkString(", ")}), x))"))

331

.filter(size(col("additional_items")) > 0)

332

.select(explode(col("additional_items")).alias("recommended_item"), col("freq"))

333

.groupBy("recommended_item")

334

.agg(sum("freq").alias("total_support"))

335

.orderBy(desc("total_support"))

336

.limit(topK)

337

338

relevantItemsets

339

}

340

341

// Example usage

342

val currentBasket = Array("bread", "milk")

343

val recommendations = recommendProducts(model.freqItemsets, currentBasket, 3)

344

345

println(s"Recommendations for basket: ${currentBasket.mkString(", ")}")

346

recommendations.show()

347

```

348

349

### Performance Optimization and Tuning

350

351

```scala

352

// Large-scale frequent pattern mining optimization

353

val largeFPGrowth = new FPGrowth()

354

.setItemsCol("items")

355

.setMinSupport(0.01) // Lower support for more patterns (be careful with memory)

356

.setMinConfidence(0.5)

357

.setNumPartitions(200) // Increase partitions for large datasets

358

359

// Monitor performance and memory usage

360

val startTime = System.currentTimeMillis()

361

val largeModel = largeFPGrowth.fit(largeTransactions)

362

val endTime = System.currentTimeMillis()

363

364

println(s"Training time: ${(endTime - startTime) / 1000.0} seconds")

365

println(s"Number of frequent itemsets: ${largeModel.numItemsets}")

366

367

// Cache frequent itemsets for multiple analyses

368

val cachedItemsets = largeModel.freqItemsets.cache()

369

cachedItemsets.count() // Trigger caching

370

371

// Analyze different confidence thresholds

372

val confidenceThresholds = Array(0.3, 0.5, 0.7, 0.9)

373

confidenceThresholds.foreach { threshold =>

374

val rules = largeModel.associationRules.filter(col("confidence") >= threshold)

375

println(s"Rules with confidence >= $threshold: ${rules.count()}")

376

}

377

378

// Sequential pattern mining optimization

379

val optimizedPrefixSpan = new PrefixSpan()

380

.setSequenceCol("sequence")

381

.setMinSupport(0.1)

382

.setMaxPatternLength(3) // Limit pattern length for performance

383

.setMaxLocalProjDBSize(64000000L) // Adjust based on available memory

384

385

// Batch processing for very large datasets

386

def processLargeDatasetInBatches(dataset: DataFrame, batchSize: Int = 10000): Unit = {

387

val totalRows = dataset.count()

388

val numBatches = (totalRows / batchSize).toInt + 1

389

390

(0 until numBatches).foreach { batchNum =>

391

val offset = batchNum * batchSize

392

val batch = dataset.limit(batchSize).offset(offset)

393

394

val batchModel = fpGrowth.fit(batch)

395

println(s"Batch $batchNum: ${batchModel.numItemsets} frequent itemsets")

396

397

// Process or save batch results

398

batchModel.freqItemsets.write

399

.mode("append")

400

.option("path", s"frequent_patterns/batch_$batchNum")

401

.save()

402

}

403

}

404

```

405

406

### Rule Quality Metrics and Filtering

407

408

```scala

409

import org.apache.spark.sql.functions._

410

411

// Calculate additional rule quality metrics

412

def enhanceAssociationRules(rules: DataFrame, itemsets: DataFrame): DataFrame = {

413

414

// Create a map of itemset support values

415

val supportMap = itemsets

416

.select("items", "freq")

417

.rdd

418

.map(row => (row.getAs[Seq[String]]("items").toSet, row.getAs[Long]("freq")))

419

.collectAsMap()

420

421

val broadcastSupport = spark.sparkContext.broadcast(supportMap)

422

423

rules.withColumn("lift",

424

col("confidence") /

425

// In practice, calculate consequent support from itemsets

426

lit(0.5)) // Placeholder

427

.withColumn("conviction",

428

(lit(1.0) - lit(0.5)) / (lit(1.0) - col("confidence"))) // Placeholder calculation

429

.withColumn("leverage",

430

// support(antecedent ∪ consequent) - support(antecedent) × support(consequent)

431

lit(0.0)) // Placeholder calculation

432

}

433

434

// Filter rules by multiple criteria

435

val qualityRules = model.associationRules

436

.filter(col("confidence") > 0.6)

437

.filter(col("lift") > 1.2) // Lift > 1 indicates positive correlation

438

.filter(size(col("consequent")) === 1) // Single-item consequents only

439

.orderBy(desc("confidence"), desc("lift"))

440

441

println("High-Quality Association Rules:")

442

qualityRules.show(truncate = false)

443

444

// Statistical significance testing (Chi-square test)

445

def filterSignificantRules(rules: DataFrame, alpha: Double = 0.05): DataFrame = {

446

// Implementation would require chi-square test calculation

447

// This is a placeholder for the concept

448

rules.filter(col("confidence") > 0.5) // Simplified filter

449

}

450

451

// Rule pruning - remove redundant rules

452

def pruneRedundantRules(rules: DataFrame): DataFrame = {

453

// Remove rules where antecedent is a superset of another rule with same consequent

454

// and higher or equal confidence

455

rules // Simplified - full implementation would require complex logic

456

}

457

```