or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-management.mdindex.mdjava-api.mdpair-rdd-operations.mdrdd-operations.mdstorage-persistence.md

storage-persistence.mddocs/

0

# Storage and Persistence

1

2

Memory management and persistence strategies for optimizing RDD storage across cluster nodes, including various storage levels and caching mechanisms.

3

4

## Capabilities

5

6

### StorageLevel

7

8

Defines how RDDs should be stored in memory and/or disk across the cluster.

9

10

```scala { .api }

11

/**

12

* Storage level for an RDD. Determines how the RDD should be stored in memory and/or disk.

13

* Storage levels can specify whether to use disk, memory, off-heap memory, serialization,

14

* and replication.

15

*/

16

class StorageLevel private(

17

private var _useDisk: Boolean,

18

private var _useMemory: Boolean,

19

private var _useOffHeap: Boolean,

20

private var _deserialized: Boolean,

21

private var _replication: Int = 1) extends Externalizable {

22

23

/** Whether to use disk storage */

24

def useDisk: Boolean = _useDisk

25

26

/** Whether to use memory storage */

27

def useMemory: Boolean = _useMemory

28

29

/** Whether to use off-heap memory */

30

def useOffHeap: Boolean = _useOffHeap

31

32

/** Whether to store RDD as deserialized objects */

33

def deserialized: Boolean = _deserialized

34

35

/** Number of replicas to store */

36

def replication: Int = _replication

37

38

/** Create a copy with different replication level */

39

def clone(newReplication: Int): StorageLevel

40

}

41

42

/**

43

* Storage level constants for common storage strategies

44

*/

45

object StorageLevel {

46

47

/** No persistence - RDD partitions are not stored */

48

val NONE = new StorageLevel(false, false, false, false)

49

50

/** Store RDD partitions only on disk */

51

val DISK_ONLY = new StorageLevel(true, false, false, false)

52

53

/** Store RDD partitions on disk with 2x replication */

54

val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

55

56

/** Store RDD partitions only in memory as deserialized Java objects */

57

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

58

59

/** Store RDD partitions in memory with 2x replication */

60

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

61

62

/** Store RDD partitions in memory as serialized Java objects (more space-efficient) */

63

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

64

65

/** Store RDD partitions in memory as serialized objects with 2x replication */

66

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

67

68

/** Store RDD partitions in memory, spill to disk if not enough memory */

69

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

70

71

/** Store RDD partitions in memory and disk with 2x replication */

72

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

73

74

/** Store RDD partitions in memory as serialized objects, spill to disk if needed */

75

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

76

77

/** Store RDD partitions in memory and disk as serialized objects with 2x replication */

78

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

79

80

/** Store RDD partitions in off-heap memory (requires off-heap memory configured) */

81

val OFF_HEAP = new StorageLevel(false, false, true, false)

82

}

83

```

84

85

**Usage Examples:**

86

87

```scala

88

import org.apache.spark.storage.StorageLevel

89

90

val sc = new SparkContext(conf)

91

val data = sc.parallelize(1 to 1000000)

92

93

// Cache in memory (equivalent to .cache())

94

data.persist(StorageLevel.MEMORY_ONLY)

95

96

// Cache in memory with serialization (more memory efficient)

97

data.persist(StorageLevel.MEMORY_ONLY_SER)

98

99

// Memory with disk fallback

100

data.persist(StorageLevel.MEMORY_AND_DISK)

101

102

// Disk only storage

103

data.persist(StorageLevel.DISK_ONLY)

104

105

// High availability with replication

106

data.persist(StorageLevel.MEMORY_AND_DISK_2)

107

108

// Custom storage level

109

val customLevel = new StorageLevel(

110

useDisk = true,

111

useMemory = true,

112

useOffHeap = false,

113

deserialized = false,

114

replication = 3

115

)

116

data.persist(customLevel)

117

```

118

119

### RDD Persistence Methods

120

121

Core methods for controlling RDD persistence and caching.

122

123

```scala { .api }

124

/**

125

* Persistence methods available on all RDDs

126

*/

127

abstract class RDD[T: ClassTag] extends Serializable {

128

129

/** Persist this RDD with the default storage level (MEMORY_ONLY) */

130

def cache(): RDD.this.type = persist()

131

132

/** Persist this RDD with the specified storage level */

133

def persist(newLevel: StorageLevel): RDD.this.type

134

135

/** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk */

136

def unpersist(blocking: Boolean = true): RDD.this.type

137

138

/** Get the current storage level of this RDD */

139

def getStorageLevel: StorageLevel

140

141

/** Mark this RDD for checkpointing */

142

def checkpoint(): Unit

143

144

/** Return whether this RDD has been checkpointed or not */

145

def isCheckpointed: Boolean

146

147

/** Gets the name of the directory to which this RDD was checkpointed */

148

def getCheckpointFile: Option[String]

149

150

/** Mark this RDD for local checkpointing using Spark's existing caching layer */

151

def localCheckpoint(): RDD.this.type

152

}

153

```

154

155

**Usage Examples:**

156

157

```scala

158

val expensiveRDD = sc.textFile("large-dataset.txt")

159

.map(heavyProcessingFunction)

160

.filter(_.nonEmpty)

161

162

// Cache for multiple uses

163

expensiveRDD.cache()

164

165

// Use the cached RDD multiple times

166

val count1 = expensiveRDD.count()

167

val count2 = expensiveRDD.filter(_.contains("error")).count()

168

169

// Check storage level

170

println(s"Storage level: ${expensiveRDD.getStorageLevel}")

171

172

// Remove from cache when done

173

expensiveRDD.unpersist()

174

175

// Checkpoint for fault tolerance

176

sc.setCheckpointDir("hdfs://path/to/checkpoint")

177

expensiveRDD.checkpoint()

178

expensiveRDD.count() // Trigger checkpoint creation

179

```

180

181

### Storage Management in SparkContext

182

183

Methods for managing storage and cache across the application.

184

185

```scala { .api }

186

class SparkContext(config: SparkConf) extends Logging {

187

188

/** Set the directory under which RDDs are going to be checkpointed */

189

def setCheckpointDir(directory: String): Unit

190

191

/** Return the directory where checkpoints are stored, if one is set */

192

def getCheckpointDir: Option[String]

193

}

194

```

195

196

### BlockManager Integration

197

198

Understanding how Spark manages storage blocks internally.

199

200

```scala { .api }

201

/**

202

* Storage information for RDD blocks

203

*/

204

case class RDDInfo(

205

id: Int,

206

name: String,

207

numPartitions: Int,

208

storageLevel: StorageLevel,

209

numCachedPartitions: Int,

210

memSize: Long,

211

diskSize: Long) {

212

213

/** Whether this RDD is currently persisted */

214

def isCached: Boolean = numCachedPartitions > 0

215

}

216

217

/**

218

* Information about memory usage for storage

219

*/

220

case class MemoryStatus(maxMemory: Long, memoryUsed: Long, memoryRemaining: Long) {

221

def memoryFraction: Double = memoryUsed.toDouble / maxMemory

222

}

223

224

/**

225

* Information about a storage block

226

*/

227

case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {

228

def isCached: Boolean = memSize > 0 || diskSize > 0

229

}

230

```

231

232

## Storage Strategy Guidelines

233

234

### Choosing the Right Storage Level

235

236

**MEMORY_ONLY (default cache())**

237

- Best for: Small to medium RDDs that fit in memory

238

- Fastest access but limited by memory capacity

239

- Use when: RDD will be accessed multiple times and fits in cluster memory

240

241

```scala

242

// Good for small, frequently accessed RDDs

243

val smallLookupTable = sc.parallelize(lookupData).cache()

244

```

245

246

**MEMORY_ONLY_SER**

247

- Best for: Large RDDs where memory is constrained

248

- Slower than MEMORY_ONLY but uses less memory

249

- Use when: Memory is limited but want to avoid disk I/O

250

251

```scala

252

// Memory-efficient caching for large datasets

253

val largeRDD = sc.textFile("huge-file.txt")

254

.map(expensiveTransformation)

255

.persist(StorageLevel.MEMORY_ONLY_SER)

256

```

257

258

**MEMORY_AND_DISK**

259

- Best for: Large RDDs that may not fit entirely in memory

260

- Graceful degradation from memory to disk

261

- Use when: Want speed of memory with disk fallback

262

263

```scala

264

// Balanced approach for uncertain memory requirements

265

val uncertainSizeRDD = sc.textFile("variable-size-data/*")

266

.persist(StorageLevel.MEMORY_AND_DISK)

267

```

268

269

**DISK_ONLY**

270

- Best for: Very large RDDs where memory is needed for other operations

271

- Slowest but most reliable for large datasets

272

- Use when: Memory is needed for computations, not storage

273

274

```scala

275

// Store intermediate results on disk

276

val intermediateResults = complexProcessing(inputRDD)

277

.persist(StorageLevel.DISK_ONLY)

278

```

279

280

**Replication Levels (_2 suffix)**

281

- Best for: Critical RDDs where fault tolerance is important

282

- Doubles storage cost but provides redundancy

283

- Use when: Recomputation cost is very high

284

285

```scala

286

// High availability for critical data

287

val criticalRDD = sc.textFile("important-data.txt")

288

.map(veryExpensiveComputation)

289

.persist(StorageLevel.MEMORY_AND_DISK_2)

290

```

291

292

### Performance Optimization Patterns

293

294

**Iterative Algorithms**

295

```scala

296

var currentRDD = initialData.cache()

297

298

for (i <- 1 to numIterations) {

299

val nextRDD = currentRDD.map(iterationFunction).cache()

300

currentRDD.unpersist() // Remove previous iteration

301

currentRDD = nextRDD

302

}

303

```

304

305

**Multi-Stage Pipelines**

306

```scala

307

// Cache intermediate stages that branch

308

val cleaned = rawData.filter(isValid).cache()

309

310

val analysis1 = cleaned.map(analysisFunction1).collect()

311

val analysis2 = cleaned.map(analysisFunction2).collect()

312

313

cleaned.unpersist() // Clean up when done

314

```

315

316

**Memory Management**

317

```scala

318

// Monitor and manage memory usage

319

val rdd1 = data.map(transform1).persist(StorageLevel.MEMORY_ONLY_SER)

320

val rdd2 = rdd1.map(transform2).persist(StorageLevel.MEMORY_ONLY_SER)

321

322

// Use the RDDs

323

processResults(rdd2)

324

325

// Clean up in reverse order

326

rdd2.unpersist()

327

rdd1.unpersist()

328

```

329

330

### Checkpointing Strategies

331

332

**When to Use Checkpointing**

333

- Long lineage chains that would be expensive to recompute

334

- Iterative algorithms where intermediate state is valuable

335

- After expensive operations that you don't want to repeat

336

337

```scala

338

// Set checkpoint directory once per application

339

sc.setCheckpointDir("hdfs://cluster/checkpoints/myapp")

340

341

// Checkpoint expensive transformations

342

val expensiveResult = rawData

343

.map(veryExpensiveFunction)

344

.filter(complexPredicate)

345

346

expensiveResult.checkpoint() // Mark for checkpointing

347

expensiveResult.count() // Trigger the checkpoint

348

349

// Now expensiveResult can be used without recomputing

350

val analysis = expensiveResult.map(analysisFunction).collect()

351

```

352

353

**Local vs Reliable Checkpointing**

354

```scala

355

// Local checkpointing (faster but less fault-tolerant)

356

rdd.localCheckpoint()

357

358

// Reliable checkpointing (slower but fault-tolerant)

359

rdd.checkpoint() // Requires checkpoint directory

360

```

361

362

## Monitoring Storage Usage

363

364

### Spark UI Storage Tab

365

- Monitor cached RDDs and their memory/disk usage

366

- See which RDDs are taking up the most space

367

- Identify RDDs that should be unpersisted

368

369

### Programmatic Monitoring

370

```scala

371

// Get storage status for all RDDs

372

val storageStatus = sc.getExecutorStorageStatus

373

storageStatus.foreach { status =>

374

println(s"Executor ${status.blockManagerId}: ${status.memoryUsed} / ${status.maxMemory}")

375

}

376

377

// Get specific RDD storage info (if available through custom monitoring)

378

def checkRDDStorage(rdd: RDD[_]): Unit = {

379

println(s"RDD ${rdd.id} storage level: ${rdd.getStorageLevel}")

380

println(s"RDD ${rdd.id} is cached: ${rdd.getStorageLevel != StorageLevel.NONE}")

381

}

382

```

383

384

### Memory Tuning Parameters

385

386

Key Spark configuration properties for storage tuning:

387

388

```scala

389

val conf = new SparkConf()

390

// Fraction of heap space used for storage (default: 0.6)

391

.set("spark.storage.memoryFraction", "0.7")

392

393

// Fraction of storage memory immune to eviction (default: 0.5)

394

.set("spark.storage.safetyFraction", "0.6")

395

396

// Whether to compress RDD partitions in memory (default: false)

397

.set("spark.rdd.compress", "true")

398

399

// Whether to compress broadcast variables (default: true)

400

.set("spark.broadcast.compress", "true")

401

402

// Size of serializer buffer (default: 64k)

403

.set("spark.serializer.objectStreamReset", "100")

404

```

405

406

## Common Storage Anti-patterns

407

408

### Over-caching

409

```scala

410

// Bad: Caching everything

411

val rdd1 = data.map(f1).cache() // Only used once

412

val rdd2 = rdd1.map(f2).cache() // Only used once

413

val result = rdd2.collect()

414

415

// Good: Cache only what's reused

416

val rdd1 = data.map(f1)

417

val rdd2 = rdd1.map(f2)

418

val result = rdd2.collect()

419

```

420

421

### Not Unpersisting

422

```scala

423

// Bad: Never cleaning up

424

def processData(): Unit = {

425

val cached = data.map(expensiveFunc).cache()

426

cached.collect()

427

// cached remains in memory forever

428

}

429

430

// Good: Clean up when done

431

def processData(): Unit = {

432

val cached = data.map(expensiveFunc).cache()

433

try {

434

cached.collect()

435

} finally {

436

cached.unpersist()

437

}

438

}

439

```

440

441

### Wrong Storage Level

442

```scala

443

// Bad: Using MEMORY_ONLY for large datasets

444

val huge = sc.textFile("100GB-file.txt").cache() // Will spill and thrash

445

446

// Good: Use appropriate level for data size

447

val huge = sc.textFile("100GB-file.txt")

448

.persist(StorageLevel.MEMORY_AND_DISK_SER)

449

```

450

451

### Caching Too Early

452

```scala

453

// Bad: Caching before filtering

454

val cached = allData.cache()

455

val filtered = cached.filter(rarePredicate) // Most data filtered out

456

457

// Good: Filter first, then cache

458

val filtered = allData.filter(rarePredicate).cache()

459

```