or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-configuration.mdindex.mdjava-api.mdkey-value-operations.mdrdd-operations.mdstatus-monitoring.mdstorage-persistence.mdtask-context.md

storage-persistence.mddocs/

0

# Storage and Persistence

1

2

Spark provides fine-grained control over RDD caching and persistence strategies across memory and disk, enabling optimization for different data access patterns and cluster configurations.

3

4

## StorageLevel

5

6

StorageLevel defines how and where RDDs are stored when persisted.

7

8

```scala { .api }

9

class StorageLevel private(

10

private var _useDisk: Boolean,

11

private var _useMemory: Boolean,

12

private var _useOffHeap: Boolean,

13

private var _deserialized: Boolean,

14

private var _replication: Int) {

15

16

def useDisk: Boolean

17

def useMemory: Boolean

18

def useOffHeap: Boolean

19

def deserialized: Boolean

20

def replication: Int

21

def clone(): StorageLevel

22

def isValid: Boolean

23

def toInt: Int

24

def writeExternal(out: ObjectOutput): Unit

25

def readExternal(in: ObjectInput): Unit

26

}

27

```

28

29

### Predefined Storage Levels

30

31

```scala { .api }

32

object StorageLevel {

33

val NONE: StorageLevel

34

val DISK_ONLY: StorageLevel

35

val DISK_ONLY_2: StorageLevel

36

val MEMORY_ONLY: StorageLevel

37

val MEMORY_ONLY_2: StorageLevel

38

val MEMORY_ONLY_SER: StorageLevel

39

val MEMORY_ONLY_SER_2: StorageLevel

40

val MEMORY_AND_DISK: StorageLevel

41

val MEMORY_AND_DISK_2: StorageLevel

42

val MEMORY_AND_DISK_SER: StorageLevel

43

val MEMORY_AND_DISK_SER_2: StorageLevel

44

val OFF_HEAP: StorageLevel

45

46

def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int): StorageLevel

47

def fromString(s: String): StorageLevel

48

}

49

```

50

51

### Storage Level Characteristics

52

53

| Storage Level | Memory | Disk | Serialized | Replication | Use Case |

54

|---------------|--------|------|------------|-------------|----------|

55

| `MEMORY_ONLY` | Yes | No | No | 1 | Fast access, sufficient memory |

56

| `MEMORY_ONLY_SER` | Yes | No | Yes | 1 | Memory-constrained, CPU available |

57

| `MEMORY_AND_DISK` | Yes | Yes | No | 1 | Fallback to disk when memory full |

58

| `MEMORY_AND_DISK_SER` | Yes | Yes | Yes | 1 | Memory + CPU constrained |

59

| `DISK_ONLY` | No | Yes | Yes | 1 | Large datasets, infrequent access |

60

| `MEMORY_ONLY_2` | Yes | No | No | 2 | Fast access + fault tolerance |

61

| `MEMORY_AND_DISK_2` | Yes | Yes | No | 2 | Balanced performance + fault tolerance |

62

| `OFF_HEAP` | Off-heap | No | Yes | 1 | Avoid GC overhead |

63

64

## Persistence Operations

65

66

### Basic Persistence

67

68

```scala

69

import org.apache.spark.{SparkContext, SparkConf}

70

import org.apache.spark.storage.StorageLevel

71

72

val sc = new SparkContext(new SparkConf().setAppName("Persistence Example").setMaster("local[*]"))

73

74

// Create an expensive RDD

75

val expensiveRDD = sc.textFile("large-dataset.txt")

76

.filter(_.nonEmpty)

77

.map(complexTransformation)

78

.filter(complexFilter)

79

80

// Cache in memory (shorthand for MEMORY_ONLY)

81

val cachedRDD = expensiveRDD.cache()

82

83

// Explicit persistence with custom storage level

84

val persistedRDD = expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

85

86

// Use the cached RDD multiple times - computation happens only once

87

val count1 = cachedRDD.count()

88

val count2 = cachedRDD.filter(_.contains("error")).count()

89

val sample = cachedRDD.take(10)

90

91

// Check storage level

92

println(s"Storage level: ${cachedRDD.getStorageLevel}")

93

94

// Remove from cache when no longer needed

95

cachedRDD.unpersist()

96

```

97

98

### Persistence Strategies by Use Case

99

100

```scala

101

// High-frequency access, sufficient memory

102

val frequentlyUsedRDD = inputRDD

103

.map(preprocessData)

104

.persist(StorageLevel.MEMORY_ONLY)

105

106

// Large dataset with memory constraints

107

val largeRDD = inputRDD

108

.flatMap(expandData)

109

.persist(StorageLevel.MEMORY_AND_DISK_SER)

110

111

// Critical data requiring fault tolerance

112

val criticalRDD = inputRDD

113

.map(importantTransformation)

114

.persist(StorageLevel.MEMORY_AND_DISK_2)

115

116

// Infrequently accessed large dataset

117

val archivalRDD = inputRDD

118

.map(heavyProcessing)

119

.persist(StorageLevel.DISK_ONLY)

120

121

// Avoiding GC pressure for long-lived RDDs

122

val longLivedRDD = inputRDD

123

.map(createLargeObjects)

124

.persist(StorageLevel.OFF_HEAP)

125

```

126

127

## Checkpointing

128

129

Checkpointing saves RDD data to reliable storage (like HDFS) to truncate lineage and improve fault tolerance.

130

131

### Basic Checkpointing

132

133

```scala

134

// Set checkpoint directory (must be fault-tolerant storage)

135

sc.setCheckpointDir("hdfs://namenode:port/checkpoints")

136

137

val longLineageRDD = sc.textFile("input.txt")

138

.map(transformation1)

139

.filter(filter1)

140

.map(transformation2)

141

.filter(filter2)

142

.map(transformation3)

143

.filter(filter3)

144

.map(transformation4)

145

146

// Mark for checkpointing

147

longLineageRDD.checkpoint()

148

149

// Trigger checkpointing with an action

150

val count = longLineageRDD.count()

151

152

// Verify checkpointing

153

println(s"Is checkpointed: ${longLineageRDD.isCheckpointed}")

154

println(s"Checkpoint file: ${longLineageRDD.getCheckpointFile}")

155

156

// The RDD lineage is now truncated - subsequent failures will recover from checkpoint

157

val furtherProcessed = longLineageRDD.map(additionalTransformation)

158

```

159

160

### Local Checkpointing

161

162

```scala

163

// Local checkpointing (persists to local executor disk)

164

val rddToCheckpoint = expensiveComputationRDD.localCheckpoint()

165

166

// Trigger checkpointing

167

rddToCheckpoint.count()

168

169

// Use the checkpointed RDD

170

val results = rddToCheckpoint.map(finalTransformation).collect()

171

```

172

173

### Strategic Checkpointing

174

175

```scala

176

// Checkpoint at strategic points in long pipelines

177

val pipeline = sc.textFile("massive-dataset.txt")

178

.map(parseRecord)

179

.filter(isValid)

180

.map(enrichWithLookup)

181

.filter(passesQualityCheck)

182

183

// Checkpoint after expensive preprocessing

184

val preprocessed = pipeline.checkpoint()

185

preprocessed.count() // Materialize checkpoint

186

187

// Continue pipeline from checkpoint

188

val aggregated = preprocessed

189

.map(extractFeatures)

190

.groupByKey()

191

.mapValues(aggregate)

192

193

// Checkpoint again before final processing

194

val checkpoint2 = aggregated.checkpoint()

195

checkpoint2.count()

196

197

val finalResults = checkpoint2

198

.map(finalTransformation)

199

.collect()

200

```

201

202

## Advanced Persistence Patterns

203

204

### Conditional Persistence

205

206

```scala

207

def smartPersist[T](rdd: RDD[T], estimatedSize: Long, memoryAvailable: Long): RDD[T] = {

208

val storageLevel = if (estimatedSize < memoryAvailable * 0.7) {

209

StorageLevel.MEMORY_ONLY

210

} else if (estimatedSize < memoryAvailable * 1.5) {

211

StorageLevel.MEMORY_AND_DISK_SER

212

} else {

213

StorageLevel.DISK_ONLY

214

}

215

216

rdd.persist(storageLevel)

217

}

218

219

// Usage

220

val processedRDD = inputRDD.map(expensiveTransformation)

221

val estimatedSize = processedRDD.count() * averageRecordSizeBytes

222

val smartPersistedRDD = smartPersist(processedRDD, estimatedSize, availableMemoryBytes)

223

```

224

225

### Multi-level Caching Strategy

226

227

```scala

228

class CacheManager(sc: SparkContext) {

229

private val cachedRDDs = mutable.Map[String, RDD[_]]()

230

231

def cacheWithEviction[T](name: String, rdd: RDD[T], level: StorageLevel): RDD[T] = {

232

// Evict old cache if memory is getting full

233

if (getMemoryUsage() > 0.8) {

234

evictLeastRecentlyUsed()

235

}

236

237

val cached = rdd.persist(level)

238

cachedRDDs(name) = cached

239

cached

240

}

241

242

def uncache(name: String): Unit = {

243

cachedRDDs.get(name).foreach(_.unpersist())

244

cachedRDDs.remove(name)

245

}

246

247

private def getMemoryUsage(): Double = {

248

// Implementation to check memory usage

249

val memoryStatus = sc.getExecutorMemoryStatus

250

val totalMemory = memoryStatus.values.map(_._1).sum

251

val usedMemory = memoryStatus.values.map(m => m._1 - m._2).sum

252

usedMemory.toDouble / totalMemory

253

}

254

255

private def evictLeastRecentlyUsed(): Unit = {

256

// Implementation to evict based on access patterns

257

cachedRDDs.headOption.foreach { case (name, rdd) =>

258

rdd.unpersist()

259

cachedRDDs.remove(name)

260

}

261

}

262

}

263

```

264

265

### Temperature-based Storage

266

267

```scala

268

object DataTemperature extends Enumeration {

269

type DataTemperature = Value

270

val HOT, WARM, COLD = Value

271

}

272

273

import DataTemperature._

274

275

class TemperatureAwareStorage(sc: SparkContext) {

276

def persist[T](rdd: RDD[T], temperature: DataTemperature): RDD[T] = {

277

val storageLevel = temperature match {

278

case HOT => StorageLevel.MEMORY_ONLY // Frequently accessed

279

case WARM => StorageLevel.MEMORY_AND_DISK_SER // Occasionally accessed

280

case COLD => StorageLevel.DISK_ONLY // Rarely accessed

281

}

282

rdd.persist(storageLevel)

283

}

284

}

285

286

// Usage

287

val storage = new TemperatureAwareStorage(sc)

288

289

val hotData = storage.persist(frequentlyUsedRDD, HOT)

290

val warmData = storage.persist(occasionallyUsedRDD, WARM)

291

val coldData = storage.persist(archivalRDD, COLD)

292

```

293

294

## Performance Optimization

295

296

### Memory Fraction Configuration

297

298

```scala

299

// Configure storage memory fraction in SparkConf

300

val conf = new SparkConf()

301

.setAppName("Storage Optimization")

302

.set("spark.storage.memoryFraction", "0.6") // 60% for storage (legacy)

303

.set("spark.storage.unrollMemoryFraction", "0.2") // 20% for unrolling (legacy)

304

.set("spark.storage.memoryMapThreshold", "2m") // Memory map files > 2MB

305

.set("spark.storage.blockManagerSlaveTimeoutMs", "120s") // Block manager timeout

306

307

// Unified memory manager (Spark 1.6+)

308

val modernConf = new SparkConf()

309

.set("spark.memory.useLegacyMode", "false")

310

.set("spark.memory.storageFraction", "0.5") // 50% of heap for storage

311

```

312

313

### Serialization Optimization

314

315

```scala

316

// Use Kryo serialization for better performance

317

val conf = new SparkConf()

318

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

319

.set("spark.kryo.unsafe", "true")

320

.set("spark.kryoserializer.buffer.max", "1g")

321

322

// Register classes for better Kryo performance

323

conf.registerKryoClasses(Array(

324

classOf[MyCustomClass],

325

classOf[AnotherClass]

326

))

327

328

val sc = new SparkContext(conf)

329

330

// Prefer serialized storage for large objects

331

val serializedRDD = largeObjectRDD.persist(StorageLevel.MEMORY_ONLY_SER)

332

```

333

334

### Block Size Optimization

335

336

```scala

337

// Configure block sizes for different access patterns

338

val conf = new SparkConf()

339

.set("spark.storage.blockManagerPort", "0")

340

.set("spark.storage.blockManagerHeartbeatMs", "10s")

341

.set("spark.storage.getBlockTimeoutMs", "60s")

342

343

// For streaming workloads

344

val streamingConf = new SparkConf()

345

.set("spark.storage.memoryMapThreshold", "128k") // Smaller threshold

346

.set("spark.storage.unrollMemoryThreshold", "1m") // Smaller unroll buffer

347

348

// For batch workloads with large blocks

349

val batchConf = new SparkConf()

350

.set("spark.storage.memoryMapThreshold", "8m") // Larger threshold

351

.set("spark.storage.unrollMemoryThreshold", "16m") // Larger unroll buffer

352

```

353

354

## Monitoring and Debugging

355

356

### Storage Information

357

358

```scala

359

// Check RDD storage information

360

def printStorageInfo(rdd: RDD[_]): Unit = {

361

println(s"RDD ${rdd.id} Storage Info:")

362

println(s" Name: ${rdd.name}")

363

println(s" Storage Level: ${rdd.getStorageLevel}")

364

println(s" Is Cached: ${rdd.getStorageLevel != StorageLevel.NONE}")

365

println(s" Is Checkpointed: ${rdd.isCheckpointed}")

366

println(s" Partitions: ${rdd.partitions.length}")

367

368

if (rdd.isCheckpointed) {

369

println(s" Checkpoint File: ${rdd.getCheckpointFile}")

370

}

371

}

372

373

// Check storage status across cluster

374

def printClusterStorageStatus(sc: SparkContext): Unit = {

375

val status = sc.getExecutorMemoryStatus

376

377

println("Cluster Storage Status:")

378

status.foreach { case (executorId, (maxMemory, remainingMemory)) =>

379

val usedMemory = maxMemory - remainingMemory

380

val usagePercent = (usedMemory.toDouble / maxMemory) * 100

381

382

println(f" Executor $executorId:")

383

println(f" Total Memory: ${maxMemory / (1024 * 1024)}%,d MB")

384

println(f" Used Memory: ${usedMemory / (1024 * 1024)}%,d MB")

385

println(f" Usage: $usagePercent%.1f%%")

386

}

387

}

388

```

389

390

### Cache Management Utilities

391

392

```scala

393

class CacheMonitor(sc: SparkContext) {

394

def getCachedRDDs(): Array[(Int, String, StorageLevel)] = {

395

sc.getPersistentRDDs.values.map { rdd =>

396

(rdd.id, rdd.name, rdd.getStorageLevel)

397

}.toArray

398

}

399

400

def clearAllCache(): Unit = {

401

sc.getPersistentRDDs.values.foreach(_.unpersist())

402

}

403

404

def clearCacheByName(namePattern: String): Int = {

405

val regex = namePattern.r

406

var cleared = 0

407

408

sc.getPersistentRDDs.values.foreach { rdd =>

409

if (rdd.name != null && regex.findFirstIn(rdd.name).isDefined) {

410

rdd.unpersist()

411

cleared += 1

412

}

413

}

414

cleared

415

}

416

417

def getStorageStats(): Map[StorageLevel, Int] = {

418

sc.getPersistentRDDs.values

419

.groupBy(_.getStorageLevel)

420

.mapValues(_.size)

421

}

422

}

423

424

// Usage

425

val monitor = new CacheMonitor(sc)

426

427

// Before processing

428

println("Cached RDDs before processing:")

429

monitor.getCachedRDDs().foreach { case (id, name, level) =>

430

println(s" RDD $id ($name): $level")

431

}

432

433

// Process data with caching

434

val results = processDataWithCaching()

435

436

// After processing

437

println("\nStorage statistics:")

438

monitor.getStorageStats().foreach { case (level, count) =>

439

println(s" $level: $count RDDs")

440

}

441

442

// Cleanup

443

monitor.clearAllCache()

444

```

445

446

## Best Practices

447

448

### When to Persist

449

- RDD is used multiple times in the application

450

- RDD has expensive computation (complex transformations, joins)

451

- RDD has many dependencies (long lineage)

452

- Recovery time would be significant

453

454

### When to Checkpoint

455

- Very long RDD lineages (> 10 transformations)

456

- Wide transformations that are expensive to recompute

457

- Before expensive iterative algorithms

458

- When using RDDs across multiple Spark applications

459

460

### Storage Level Selection

461

- **MEMORY_ONLY**: Fast access, sufficient memory, objects don't need serialization

462

- **MEMORY_ONLY_SER**: Memory limited, CPU available, objects benefit from serialization

463

- **MEMORY_AND_DISK**: Balanced approach, fallback for memory pressure

464

- **MEMORY_AND_DISK_SER**: Memory and CPU constrained, good balance

465

- **DISK_ONLY**: Very large datasets, infrequent access

466

- **_2 variants**: When fault tolerance is critical and cluster is unreliable

467

468

### Performance Tips

469

```scala

470

// Good practices

471

val optimizedRDD = inputRDD

472

.filter(isRelevant) // Filter early to reduce data size

473

.map(lightweightTransform) // Apply cheap transformations first

474

.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist before expensive operations

475

.map(expensiveTransform) // Apply expensive operations after persistence

476

477

// Avoid anti-patterns

478

val inefficientRDD = inputRDD

479

.persist(StorageLevel.MEMORY_ONLY) // Persisting before filtering

480

.filter(isRelevant) // Could reduce memory pressure first

481

.map(heavyTransformation) // Heavy operation on larger dataset

482

```