or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

caching-persistence.mddocs/

0

# Caching and Persistence

1

2

Caching and persistence are crucial optimization techniques in Spark. They allow you to store intermediate RDD results in memory and/or disk to avoid recomputation, dramatically improving performance for iterative algorithms and interactive analysis.

3

4

## Storage Levels

5

6

Spark provides various storage levels that control how and where RDDs are cached.

7

8

### StorageLevel Class

9

10

```scala { .api }

11

class StorageLevel(

12

private var _useDisk: Boolean,

13

private var _useMemory: Boolean,

14

private var _useOffHeap: Boolean,

15

private var _deserialized: Boolean,

16

private var _replication: Int = 1

17

) extends Externalizable

18

```

19

20

### Predefined Storage Levels

21

22

```scala { .api }

23

object StorageLevel {

24

val NONE = new StorageLevel(false, false, false, false)

25

val DISK_ONLY = new StorageLevel(true, false, false, false)

26

val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

27

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

28

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

29

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

30

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

31

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

32

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

33

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

34

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

35

val OFF_HEAP = new StorageLevel(false, false, true, false)

36

}

37

```

38

39

#### Storage Level Breakdown

40

41

| Level | Uses Disk | Uses Memory | Serialized | Replication |

42

|-------|-----------|-------------|------------|-------------|

43

| `NONE` |||| 1 |

44

| `DISK_ONLY` |||| 1 |

45

| `DISK_ONLY_2` |||| 2 |

46

| `MEMORY_ONLY` |||| 1 |

47

| `MEMORY_ONLY_2` |||| 2 |

48

| `MEMORY_ONLY_SER` |||| 1 |

49

| `MEMORY_ONLY_SER_2` |||| 2 |

50

| `MEMORY_AND_DISK` |||| 1 |

51

| `MEMORY_AND_DISK_2` |||| 2 |

52

| `MEMORY_AND_DISK_SER` |||| 1 |

53

| `MEMORY_AND_DISK_SER_2` |||| 2 |

54

| `OFF_HEAP` || ✗* || 1 |

55

56

*OFF_HEAP uses off-heap memory (e.g., Tachyon)

57

58

## Basic Persistence Operations

59

60

### cache() Method

61

62

```scala { .api }

63

def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)

64

```

65

66

The simplest way to cache an RDD in memory:

67

68

```scala

69

val data = sc.textFile("large-dataset.txt")

70

val words = data.flatMap(_.split(" "))

71

.map(_.toLowerCase)

72

.filter(_.length > 3)

73

74

// Cache the filtered results

75

val cachedWords = words.cache()

76

77

// Multiple actions will reuse the cached data

78

val count = cachedWords.count()

79

val distinctCount = cachedWords.distinct().count()

80

val sample = cachedWords.sample(false, 0.1).collect()

81

```

82

83

### persist() Method

84

85

```scala { .api }

86

def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)

87

def persist(newLevel: StorageLevel): RDD[T]

88

```

89

90

More flexible caching with custom storage levels:

91

92

```scala

93

import org.apache.spark.storage.StorageLevel

94

95

val data = sc.textFile("huge-dataset.txt")

96

val processed = data.map(expensiveProcessing)

97

98

// Different persistence strategies

99

processed.persist(StorageLevel.MEMORY_ONLY) // Fast access, may lose data if not enough memory

100

processed.persist(StorageLevel.MEMORY_AND_DISK) // Spill to disk when memory full

101

processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialize to save memory

102

processed.persist(StorageLevel.DISK_ONLY) // Store only on disk

103

processed.persist(StorageLevel.MEMORY_AND_DISK_2) // Replicate for fault tolerance

104

```

105

106

### unpersist() Method

107

108

```scala { .api }

109

def unpersist(blocking: Boolean = true): RDD[T]

110

```

111

112

Remove RDD from cache and free memory:

113

114

```scala

115

val cachedData = data.cache()

116

117

// Use cached data

118

val result1 = cachedData.count()

119

val result2 = cachedData.filter(_ > 100).count()

120

121

// Remove from cache when no longer needed

122

cachedData.unpersist()

123

124

// Or unpersist asynchronously

125

cachedData.unpersist(blocking = false)

126

```

127

128

## Storage Level Properties and Queries

129

130

### getStorageLevel

131

132

```scala { .api }

133

def getStorageLevel: StorageLevel

134

```

135

136

Check current storage level:

137

138

```scala

139

val rdd = sc.parallelize(1 to 1000)

140

println(s"Default storage level: ${rdd.getStorageLevel}") // NONE

141

142

val cached = rdd.cache()

143

println(s"After cache(): ${cached.getStorageLevel}") // MEMORY_ONLY

144

145

val persisted = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

146

println(s"Custom persistence: ${persisted.getStorageLevel}") // MEMORY_AND_DISK_SER

147

```

148

149

### StorageLevel Properties

150

151

```scala { .api }

152

class StorageLevel {

153

def useDisk: Boolean

154

def useMemory: Boolean

155

def useOffHeap: Boolean

156

def deserialized: Boolean

157

def replication: Int

158

}

159

```

160

161

```scala

162

val level = StorageLevel.MEMORY_AND_DISK_SER_2

163

164

println(s"Uses disk: ${level.useDisk}") // true

165

println(s"Uses memory: ${level.useMemory}") // true

166

println(s"Deserialized: ${level.deserialized}") // false (serialized)

167

println(s"Replication: ${level.replication}") // 2

168

```

169

170

## Choosing Storage Levels

171

172

### Memory-Only Levels

173

174

**MEMORY_ONLY** - Best performance, no serialization overhead

175

```scala

176

// Use when:

177

// - RDD fits comfortably in memory

178

// - Fast CPU but limited memory bandwidth

179

// - Objects are not too expensive to reconstruct

180

val fastAccess = rdd.persist(StorageLevel.MEMORY_ONLY)

181

```

182

183

**MEMORY_ONLY_SER** - Compact storage, slower access

184

```scala

185

// Use when:

186

// - Memory is limited but RDD is important to cache

187

// - Objects have significant serialization overhead

188

// - CPU is fast relative to memory bandwidth

189

val compactCache = rdd.persist(StorageLevel.MEMORY_ONLY_SER)

190

```

191

192

### Memory and Disk Levels

193

194

**MEMORY_AND_DISK** - Balanced performance and reliability

195

```scala

196

// Use when:

197

// - RDD might not fit entirely in memory

198

// - Recomputation is expensive

199

// - Fault tolerance is important

200

val balanced = rdd.persist(StorageLevel.MEMORY_AND_DISK)

201

```

202

203

**MEMORY_AND_DISK_SER** - Space-efficient with disk fallback

204

```scala

205

// Use when:

206

// - Memory is very limited

207

// - Serialization cost is acceptable

208

// - Disk I/O is reasonably fast

209

val efficient = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

210

```

211

212

### Disk-Only Levels

213

214

**DISK_ONLY** - Reliable but slower

215

```scala

216

// Use when:

217

// - Memory is scarce

218

// - RDD is accessed infrequently

219

// - Recomputation is very expensive

220

val reliable = rdd.persist(StorageLevel.DISK_ONLY)

221

```

222

223

### Replicated Levels

224

225

**_2 variants** - Fault tolerance with replication

226

```scala

227

// Use when:

228

// - Fault tolerance is critical

229

// - Cluster has node failures

230

// - RDD recomputation is very expensive

231

val faultTolerant = rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

232

```

233

234

## Advanced Persistence Patterns

235

236

### Selective Persistence

237

238

Cache only the RDDs that will be reused multiple times:

239

240

```scala

241

val rawData = sc.textFile("input.txt")

242

val cleaned = rawData.filter(_.nonEmpty).map(_.trim)

243

244

// Don't cache - used only once

245

val parsed = cleaned.map(parseLine)

246

247

// Cache this - used multiple times

248

val validated = parsed.filter(isValid).cache()

249

250

// Multiple actions on cached RDD

251

val errors = validated.filter(hasError).count()

252

val summary = validated.map(extractSummary).collect()

253

val output = validated.map(formatOutput).saveAsTextFile("output")

254

```

255

256

### Iterative Algorithm Pattern

257

258

```scala

259

var current = sc.textFile("initial-data.txt").cache()

260

261

for (i <- 1 to 10) {

262

// Unpersist previous iteration

263

val previous = current

264

265

// Compute new iteration and cache it

266

current = current.map(iterativeFunction).cache()

267

268

// Force computation and unpersist old data

269

current.count()

270

previous.unpersist()

271

272

println(s"Iteration $i completed")

273

}

274

```

275

276

### Multi-Level Caching Strategy

277

278

```scala

279

val rawData = sc.textFile("large-dataset.txt")

280

281

// Level 1: Cache frequently accessed base data

282

val baseData = rawData.filter(isRelevant).cache()

283

284

// Level 2: Cache intermediate expensive computations

285

val features = baseData.map(extractFeatures)

286

.persist(StorageLevel.MEMORY_AND_DISK_SER)

287

288

// Level 3: Cache final results with replication for reliability

289

val results = features.map(expensiveMLModel)

290

.persist(StorageLevel.MEMORY_AND_DISK_2)

291

```

292

293

## Monitoring and Management

294

295

### SparkContext Storage Information

296

297

```scala { .api }

298

def getRDDStorageInfo: Array[RDDInfo]

299

def getPersistentRDDs: Map[Int, RDD[_]]

300

def getExecutorStorageStatus: Array[StorageStatus]

301

```

302

303

```scala

304

// Get information about cached RDDs

305

val storageInfo = sc.getRDDStorageInfo

306

storageInfo.foreach { info =>

307

println(s"RDD ${info.id} (${info.name}): ")

308

println(s" Memory size: ${info.memSize} bytes")

309

println(s" Disk size: ${info.diskSize} bytes")

310

println(s" Storage level: ${info.storageLevel}")

311

}

312

313

// Get all persistent RDDs

314

val persistentRDDs = sc.getPersistentRDDs

315

persistentRDDs.foreach { case (id, rdd) =>

316

println(s"RDD $id: ${rdd.name} - ${rdd.getStorageLevel}")

317

}

318

319

// Check executor storage status

320

val executorStatus = sc.getExecutorStorageStatus

321

executorStatus.foreach { status =>

322

println(s"Executor ${status.blockManagerId.executorId}:")

323

println(s" Max memory: ${status.maxMem} bytes")

324

println(s" Used memory: ${status.memUsed} bytes")

325

println(s" Remaining: ${status.memRemaining} bytes")

326

}

327

```

328

329

### RDD Naming for Monitoring

330

331

```scala { .api }

332

def setName(name: String): RDD[T]

333

def name: String

334

```

335

336

```scala

337

val data = sc.textFile("input.txt")

338

.setName("Input Data")

339

.cache()

340

341

val processed = data.map(process)

342

.setName("Processed Data")

343

.persist(StorageLevel.MEMORY_AND_DISK)

344

345

// Names will appear in Spark UI for easier monitoring

346

```

347

348

## Checkpointing

349

350

Checkpointing provides fault tolerance by saving RDD data to reliable storage.

351

352

### Setting Checkpoint Directory

353

354

```scala { .api }

355

def setCheckpointDir(directory: String): Unit

356

```

357

358

```scala

359

// Set checkpoint directory (must be reliable storage like HDFS)

360

sc.setCheckpointDir("hdfs://namenode/checkpoints")

361

```

362

363

### Checkpointing RDDs

364

365

```scala { .api }

366

def checkpoint(): Unit

367

def isCheckpointed: Boolean

368

def getCheckpointFile: Option[String]

369

```

370

371

```scala

372

val data = sc.textFile("input.txt")

373

val processed = data.map(expensiveOperation).filter(isValid)

374

375

// Mark for checkpointing

376

processed.checkpoint()

377

378

// Checkpoint happens after first action

379

val count = processed.count() // Triggers checkpoint

380

381

// Check if checkpointed

382

if (processed.isCheckpointed) {

383

println(s"Checkpointed to: ${processed.getCheckpointFile.get}")

384

}

385

```

386

387

### Checkpoint vs Persistence

388

389

```scala

390

// Persistence: keeps lineage, can be lost on failure

391

val persisted = rdd.cache()

392

393

// Checkpoint: truncates lineage, survives failures

394

rdd.checkpoint()

395

396

// Best practice: both for performance and reliability

397

val optimized = rdd.cache()

398

optimized.checkpoint()

399

optimized.count() // Trigger both cache and checkpoint

400

```

401

402

## Performance Guidelines

403

404

### When to Cache

405

406

1. **Multiple Actions**: RDD used in multiple actions

407

2. **Iterative Algorithms**: Machine learning, graph algorithms

408

3. **Interactive Analysis**: Jupyter notebooks, Spark shell

409

4. **Expensive Computations**: Complex transformations

410

5. **Data Reduction**: After significant filtering

411

412

```scala

413

// Good candidate for caching

414

val filtered = largeDataset

415

.filter(expensiveCondition) // Reduces data size significantly

416

.map(complexTransformation) // Expensive computation

417

418

filtered.cache()

419

420

// Multiple uses justify caching

421

val stats = filtered.map(extractStats).collect()

422

val sample = filtered.sample(false, 0.1).collect()

423

val export = filtered.saveAsTextFile("output")

424

```

425

426

### When Not to Cache

427

428

1. **Single Use**: RDD used only once

429

2. **Large Datasets**: Bigger than available memory

430

3. **Simple Operations**: Cheap to recompute

431

4. **Sequential Processing**: Linear data pipeline

432

433

```scala

434

// Don't cache - used only once

435

val result = sc.textFile("input.txt")

436

.map(_.toUpperCase)

437

.filter(_.startsWith("A"))

438

.count()

439

```

440

441

### Memory Management Best Practices

442

443

```scala

444

// 1. Unpersist when done

445

val temp = rdd.cache()

446

processData(temp)

447

temp.unpersist()

448

449

// 2. Use appropriate storage levels

450

val frequentData = rdd.persist(StorageLevel.MEMORY_ONLY)

451

val occasionalData = rdd.persist(StorageLevel.MEMORY_AND_DISK)

452

val backupData = rdd.persist(StorageLevel.DISK_ONLY)

453

454

// 3. Monitor memory usage

455

val memUsage = sc.getExecutorStorageStatus.map(_.memUsed).sum

456

val memTotal = sc.getExecutorStorageStatus.map(_.maxMem).sum

457

println(s"Memory utilization: ${memUsage.toDouble / memTotal * 100}%")

458

```

459

460

This comprehensive guide covers all aspects of RDD caching and persistence in Apache Spark, enabling you to optimize performance through intelligent data storage strategies.