or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdevaluation.mdfeature-processing.mdfrequent-pattern-mining.mdindex.mdlinear-algebra.mdpipeline.mdrecommendation.mdregression.mdstatistics.md

frequent-pattern-mining.mddocs/

0

# Frequent Pattern Mining

1

2

Pattern discovery algorithms for finding recurring patterns in transactional and sequential data. MLlib provides FP-Growth for frequent itemset mining and PrefixSpan for sequential pattern mining, enabling market basket analysis, recommendation systems, and behavioral pattern discovery.

3

4

## Capabilities

5

6

### FP-Growth Algorithm

7

8

Parallel FP-Growth implementation for mining frequent itemsets using the divide-and-conquer approach described in Li et al.'s PFP algorithm.

9

10

```scala { .api }

11

/**

12

* FPGrowth - parallel FP-growth algorithm for frequent itemset mining

13

* Discovers frequent itemsets in transactional data using FP-trees

14

*/

15

class FPGrowth extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable {

16

def setMinSupport(value: Double): this.type

17

def setNumPartitions(value: Int): this.type

18

def setMinConfidence(value: Double): this.type

19

def setItemsCol(value: String): this.type

20

def setPredictionCol(value: String): this.type

21

def fit(dataset: Dataset[_]): FPGrowthModel

22

}

23

24

/**

25

* FPGrowthModel - model fitted by FPGrowth containing frequent itemsets

26

* Provides methods for association rule generation and itemset-based predictions

27

*/

28

class FPGrowthModel extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {

29

val freqItemsets: DataFrame

30

def setMinConfidence(value: Double): this.type

31

def setItemsCol(value: String): this.type

32

def setPredictionCol(value: String): this.type

33

def associationRules: DataFrame

34

def transform(dataset: Dataset[_]): DataFrame

35

}

36

37

/**

38

* FPGrowthParams - parameters shared by FPGrowth and FPGrowthModel

39

* Defines configuration options for frequent pattern mining

40

*/

41

trait FPGrowthParams extends Params with HasPredictionCol {

42

val itemsCol: Param[String]

43

val minSupport: DoubleParam

44

val numPartitions: IntParam

45

val minConfidence: DoubleParam

46

def getItemsCol: String

47

def getMinSupport: Double

48

def getNumPartitions: Int

49

def getMinConfidence: Double

50

}

51

```

52

53

### PrefixSpan Algorithm

54

55

Sequential pattern mining algorithm for discovering frequent subsequences in sequential datasets.

56

57

```scala { .api }

58

/**

59

* PrefixSpan - parallel PrefixSpan algorithm for sequential pattern mining

60

* Discovers frequent sequential patterns using prefix-projected pattern growth

61

*/

62

class PrefixSpan extends Params {

63

val minSupport: DoubleParam

64

val maxPatternLength: IntParam

65

val maxLocalProjDBSize: LongParam

66

val sequenceCol: Param[String]

67

def setMinSupport(value: Double): this.type

68

def setMaxPatternLength(value: Int): this.type

69

def setMaxLocalProjDBSize(value: Long): this.type

70

def setSequenceCol(value: String): this.type

71

def getMinSupport: Double

72

def getMaxPatternLength: Int

73

def getMaxLocalProjDBSize: Long

74

def getSequenceCol: String

75

def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame

76

}

77

```

78

79

### RDD-based Legacy APIs

80

81

Lower-level RDD-based implementations for advanced users requiring fine-grained control.

82

83

```scala { .api }

84

/**

85

* FPGrowth (RDD-based) - legacy RDD-based FP-Growth implementation

86

* Provides direct RDD operations for frequent itemset mining

87

*/

88

class org.apache.spark.mllib.fpm.FPGrowth extends Logging with Serializable {

89

def setMinSupport(minSupport: Double): this.type

90

def setNumPartitions(numPartitions: Int): this.type

91

def getMinSupport: Double

92

def getNumPartitions: Int

93

def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item]

94

def run[Item, Basket <: JavaIterable[Item]](data: JavaRDD[Basket]): FPGrowthModel[Item]

95

}

96

97

/**

98

* FPGrowthModel (RDD-based) - model containing frequent itemsets

99

* Generated by RDD-based FPGrowth algorithm

100

*/

101

class org.apache.spark.mllib.fpm.FPGrowthModel[Item: ClassTag] extends Saveable with Serializable {

102

val freqItemsets: RDD[FreqItemset[Item]]

103

val itemSupport: Map[Item, Double]

104

def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]]

105

def save(sc: SparkContext, path: String): Unit

106

}

107

108

/**

109

* PrefixSpan (RDD-based) - legacy RDD-based PrefixSpan implementation

110

* Provides direct RDD operations for sequential pattern mining

111

*/

112

class org.apache.spark.mllib.fpm.PrefixSpan extends Logging with Serializable {

113

def setMinSupport(minSupport: Double): this.type

114

def setMaxPatternLength(maxPatternLength: Int): this.type

115

def setMaxLocalProjDBSize(maxLocalProjDBSize: Long): this.type

116

def getMinSupport: Double

117

def getMaxPatternLength: Int

118

def getMaxLocalProjDBSize: Long

119

def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): PrefixSpanModel[Item]

120

def run[Item, Itemset <: jl.Iterable[Item], Sequence <: jl.Iterable[Itemset]](data: JavaRDD[Sequence]): PrefixSpanModel[Item]

121

}

122

123

/**

124

* PrefixSpanModel (RDD-based) - model containing frequent sequential patterns

125

* Generated by RDD-based PrefixSpan algorithm

126

*/

127

class org.apache.spark.mllib.fpm.PrefixSpanModel[Item] extends Saveable with Serializable {

128

val freqSequences: RDD[PrefixSpan.FreqSequence[Item]]

129

def save(sc: SparkContext, path: String): Unit

130

}

131

```

132

133

### Association Rules Mining

134

135

Generation of association rules from frequent itemsets with confidence and lift metrics.

136

137

```scala { .api }

138

/**

139

* AssociationRules - generates association rules from frequent itemsets

140

* Computes rules with single-item consequents and statistical measures

141

*/

142

class AssociationRules extends Logging with Serializable {

143

def setMinConfidence(minConfidence: Double): this.type

144

def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]]

145

def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]]

146

}

147

148

/**

149

* Rule - association rule with antecedent, consequent, and statistical measures

150

* Contains confidence, lift, and frequency statistics

151

*/

152

class AssociationRules.Rule[Item] extends Serializable {

153

val antecedent: Array[Item]

154

val consequent: Array[Item]

155

def confidence: Double

156

def lift: Option[Double]

157

def javaAntecedent: java.util.List[Item]

158

def javaConsequent: java.util.List[Item]

159

}

160

```

161

162

### Data Structures and Types

163

164

Core data structures used in frequent pattern mining algorithms.

165

166

```scala { .api }

167

/**

168

* FreqItemset - frequent itemset with items and frequency count

169

* Basic unit for itemset-based pattern mining results

170

*/

171

class FPGrowth.FreqItemset[Item] extends Serializable {

172

val items: Array[Item]

173

val freq: Long

174

def javaItems: java.util.List[Item]

175

}

176

177

/**

178

* FreqSequence - frequent sequence with pattern and frequency count

179

* Basic unit for sequential pattern mining results

180

*/

181

class PrefixSpan.FreqSequence[Item] extends Serializable {

182

val sequence: Array[Array[Item]]

183

val freq: Long

184

def javaSequence: java.util.List[java.util.List[Item]]

185

}

186

187

/**

188

* Parameter validation traits

189

* Provide range validation for algorithm parameters

190

*/

191

object ParamValidators {

192

def inRange(lowerBound: Double, upperBound: Double): Double => Boolean

193

def gtEq[T](lowerBound: T): T => Boolean

194

def gt[T](lowerBound: T): T => Boolean

195

}

196

```

197

198

## Usage Examples

199

200

### FP-Growth Frequent Itemset Mining

201

202

```scala

203

import org.apache.spark.ml.fpm.FPGrowth

204

import org.apache.spark.sql.Row

205

import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}

206

207

// Create sample transaction data

208

val transactions = Seq(

209

Row(Array("milk", "eggs", "bread", "cheese")),

210

Row(Array("eggs", "bread")),

211

Row(Array("milk", "bread")),

212

Row(Array("eggs", "bread", "butter")),

213

Row(Array("milk", "eggs", "bread", "butter"))

214

)

215

216

val schema = StructType(Array(StructField("items", ArrayType(StringType), true)))

217

val df = spark.createDataFrame(spark.sparkContext.parallelize(transactions), schema)

218

219

// Configure and fit FPGrowth model

220

val fpGrowth = new FPGrowth()

221

.setItemsCol("items")

222

.setMinSupport(0.5) // Minimum support threshold

223

.setMinConfidence(0.6) // Minimum confidence for association rules

224

225

val model = fpGrowth.fit(df)

226

227

// Display frequent itemsets

228

println("Frequent Itemsets:")

229

model.freqItemsets.show(false)

230

231

// Generate and display association rules

232

println("Association Rules:")

233

model.associationRules.show(false)

234

235

// Make predictions (recommend items for baskets)

236

val predictions = model.transform(df)

237

predictions.select("items", "prediction").show(false)

238

```

239

240

### Market Basket Analysis with Association Rules

241

242

```scala

243

import org.apache.spark.ml.fpm.FPGrowth

244

245

// Load retail transaction data

246

val rawTransactions = spark.read

247

.option("header", "true")

248

.csv("retail_transactions.csv")

249

250

// Transform to required format (Array[String] per transaction)

251

val transactions = rawTransactions

252

.groupBy("transaction_id")

253

.agg(collect_list("product_name").as("items"))

254

.select("items")

255

256

// Mine frequent itemsets with different support levels

257

val fpGrowth = new FPGrowth()

258

.setItemsCol("items")

259

.setMinSupport(0.01) // 1% minimum support

260

.setMinConfidence(0.5) // 50% confidence threshold

261

262

val model = fpGrowth.fit(transactions)

263

264

// Analyze frequent itemsets by size

265

val itemsetStats = model.freqItemsets

266

.withColumn("itemset_size", size(col("items")))

267

.groupBy("itemset_size")

268

.agg(count("*").as("count"), avg("freq").as("avg_frequency"))

269

270

itemsetStats.orderBy("itemset_size").show()

271

272

// Find high-confidence rules with lift > 1.0

273

val strongRules = model.associationRules

274

.filter(col("confidence") >= 0.7 && col("lift") >= 1.0)

275

.orderBy(desc("confidence"), desc("lift"))

276

277

println("Strong Association Rules:")

278

strongRules.show(20, false)

279

280

// Generate recommendations for new baskets

281

val newBaskets = Seq(

282

Row(Array("milk", "bread")),

283

Row(Array("eggs", "butter"))

284

).toDF("items")

285

286

val recommendations = model.transform(newBaskets)

287

recommendations.show(false)

288

```

289

290

### PrefixSpan Sequential Pattern Mining

291

292

```scala

293

import org.apache.spark.ml.fpm.PrefixSpan

294

import org.apache.spark.sql.Row

295

import org.apache.spark.sql.types._

296

297

// Create sample sequence data (web clickstreams)

298

val sequences = Seq(

299

Row(Array(Array("home"), Array("product_A"), Array("cart"), Array("checkout"))),

300

Row(Array(Array("home"), Array("search"), Array("product_B"), Array("product_A"), Array("cart"))),

301

Row(Array(Array("home"), Array("product_A"), Array("product_B"), Array("cart"), Array("checkout"))),

302

Row(Array(Array("search"), Array("product_A"), Array("cart")))

303

)

304

305

val schema = StructType(Array(

306

StructField("sequence", ArrayType(ArrayType(StringType)), true)

307

))

308

val df = spark.createDataFrame(spark.sparkContext.parallelize(sequences), schema)

309

310

// Configure PrefixSpan algorithm

311

val prefixSpan = new PrefixSpan()

312

.setMinSupport(0.5) // 50% minimum support

313

.setMaxPatternLength(5) // Maximum pattern length

314

.setSequenceCol("sequence")

315

316

// Find frequent sequential patterns

317

val frequentPatterns = prefixSpan.findFrequentSequentialPatterns(df)

318

319

println("Frequent Sequential Patterns:")

320

frequentPatterns.show(false)

321

322

// Analyze pattern lengths and frequencies

323

val patternStats = frequentPatterns

324

.withColumn("pattern_length", size(col("sequence")))

325

.groupBy("pattern_length")

326

.agg(count("*").as("pattern_count"), avg("freq").as("avg_frequency"))

327

328

patternStats.orderBy("pattern_length").show()

329

```

330

331

### Customer Journey Analysis

332

333

```scala

334

import org.apache.spark.ml.fpm.PrefixSpan

335

336

// Load customer journey data

337

val rawJourneys = spark.read

338

.option("header", "true")

339

.csv("customer_journeys.csv")

340

341

// Transform to sequence format

342

val journeys = rawJourneys

343

.groupBy("customer_id")

344

.agg(collect_list(

345

struct(col("timestamp"), col("page_type"), col("action"))

346

).as("events"))

347

.select(

348

col("customer_id"),

349

expr("transform(sort_array(events), e -> array(e.page_type, e.action))").as("sequence")

350

)

351

352

// Mine frequent customer journey patterns

353

val prefixSpan = new PrefixSpan()

354

.setMinSupport(0.05) // 5% minimum support

355

.setMaxPatternLength(8) // Track up to 8-step journeys

356

.setSequenceCol("sequence")

357

358

val journeyPatterns = prefixSpan.findFrequentSequentialPatterns(journeys)

359

360

// Identify conversion patterns (ending with purchase)

361

val conversionPatterns = journeyPatterns

362

.filter(expr("sequence[size(sequence)-1][0] = 'checkout' AND sequence[size(sequence)-1][1] = 'purchase'"))

363

.orderBy(desc("freq"))

364

365

println("Frequent Conversion Patterns:")

366

conversionPatterns.show(10, false)

367

368

// Find abandonment patterns (ending with cart but no purchase)

369

val abandonmentPatterns = journeyPatterns

370

.filter(expr("array_contains(flatten(sequence), 'cart') AND NOT array_contains(flatten(sequence), 'purchase')"))

371

.orderBy(desc("freq"))

372

373

println("Common Abandonment Patterns:")

374

abandonmentPatterns.show(10, false)

375

```

376

377

### RDD-based Advanced Mining

378

379

```scala

380

import org.apache.spark.mllib.fpm.{FPGrowth => MLlibFPGrowth, PrefixSpan => MLlibPrefixSpan}

381

import org.apache.spark.rdd.RDD

382

383

// RDD-based FP-Growth for large-scale processing

384

val transactionRDD: RDD[Array[String]] = spark.sparkContext.parallelize(Seq(

385

Array("a", "b", "c"),

386

Array("a", "c", "d"),

387

Array("b", "e"),

388

Array("a", "b", "c", "e")

389

))

390

391

val mllibFP = new MLlibFPGrowth()

392

.setMinSupport(0.5)

393

.setNumPartitions(4)

394

395

val fpModel = mllibFP.run(transactionRDD)

396

397

// Extract frequent itemsets

398

val frequentItemsets = fpModel.freqItemsets.collect()

399

frequentItemsets.foreach(itemset =>

400

println(s"${itemset.items.mkString(",")} : ${itemset.freq}")

401

)

402

403

// Generate association rules

404

val rules = fpModel.generateAssociationRules(0.8).collect()

405

rules.foreach(rule =>

406

println(s"${rule.antecedent.mkString(",")} => ${rule.consequent.mkString(",")} (conf: ${rule.confidence})")

407

)

408

409

// RDD-based PrefixSpan for sequential data

410

val sequenceRDD: RDD[Array[Array[String]]] = spark.sparkContext.parallelize(Seq(

411

Array(Array("a"), Array("a", "b", "c"), Array("a", "c")),

412

Array(Array("a", "d"), Array("c"), Array("b", "c")),

413

Array(Array("a"), Array("b"), Array("c"))

414

))

415

416

val mllibPS = new MLlibPrefixSpan()

417

.setMinSupport(0.5)

418

.setMaxPatternLength(4)

419

420

val psModel = mllibPS.run(sequenceRDD)

421

422

// Extract frequent sequences

423

val frequentSequences = psModel.freqSequences.collect()

424

frequentSequences.foreach(sequence =>

425

println(s"${sequence.sequence.map(_.mkString(",")).mkString(" -> ")} : ${sequence.freq}")

426

)

427

```

428

429

### Performance Optimization

430

431

```scala

432

// Optimized FP-Growth configuration for large datasets

433

val largeFPGrowth = new FPGrowth()

434

.setItemsCol("items")

435

.setMinSupport(0.001) // Lower support for rare patterns

436

.setNumPartitions(200) // Increase partitions for scalability

437

.setMinConfidence(0.1) // Lower confidence threshold

438

439

// Enable checkpointing for iterative algorithms

440

spark.sparkContext.setCheckpointDir("hdfs://path/to/checkpoint")

441

442

// Cache intermediate results

443

val cachedTransactions = transactions.cache()

444

val model = largeFPGrowth.fit(cachedTransactions)

445

446

// Optimized PrefixSpan for long sequences

447

val largePrefixSpan = new PrefixSpan()

448

.setMinSupport(0.001)

449

.setMaxPatternLength(15) // Longer patterns

450

.setMaxLocalProjDBSize(50000000L) // Larger local processing threshold

451

.setSequenceCol("sequence")

452

453

val patterns = largePrefixSpan.findFrequentSequentialPatterns(cachedSequences)

454

```

455

456

## Algorithm Parameters

457

458

### FP-Growth Configuration

459

- **minSupport**: Minimum support threshold (0.0-1.0), default 0.3

460

- **minConfidence**: Minimum confidence for association rules (0.0-1.0), default 0.8

461

- **numPartitions**: Number of partitions for parallel processing, default uses input partitions

462

- **itemsCol**: Input column name containing item arrays, default "items"

463

- **predictionCol**: Output column for predictions, default "prediction"

464

465

### PrefixSpan Configuration

466

- **minSupport**: Minimum support threshold (≥0.0), default 0.1

467

- **maxPatternLength**: Maximum pattern length (>0), default 10

468

- **maxLocalProjDBSize**: Local processing threshold, default 32000000

469

- **sequenceCol**: Input column containing sequence arrays, default "sequence"

470

471

### Performance Considerations

472

- **Memory Usage**: FP-Growth builds FP-trees in memory; ensure sufficient memory per partition

473

- **Partitioning**: Proper partitioning critical for performance; consider data skew

474

- **Caching**: Cache input datasets when running multiple algorithms or parameter tuning

475

- **Checkpointing**: Use checkpointing for large datasets to handle iterative processing

476

- **Local Processing**: PrefixSpan switches to local processing when projected databases are small

477

478

### Applications

479

- **Market Basket Analysis**: Discover purchasing patterns and product associations

480

- **Recommendation Systems**: Generate item recommendations based on frequent patterns

481

- **Web Usage Mining**: Analyze clickstream patterns and user navigation sequences

482

- **Bioinformatics**: Find frequent subsequences in DNA/protein sequences

483

- **Network Security**: Detect recurring attack patterns in security logs

484

- **Supply Chain**: Optimize inventory and identify purchasing dependencies