or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md

storage-persistence.mddocs/

0

# Storage and Persistence

1

2

Storage levels and caching mechanisms for optimizing RDD persistence across memory and disk with various replication strategies.

3

4

## Capabilities

5

6

### StorageLevel

7

8

Defines how RDD partitions are stored, including memory usage, disk usage, and replication settings.

9

10

```scala { .api }

11

/**

12

* Storage level for RDD persistence, defining memory/disk usage and replication

13

*/

14

class StorageLevel private(

15

private var _useDisk: Boolean,

16

private var _useMemory: Boolean,

17

private var _useOffHeap: Boolean,

18

private var _deserialized: Boolean,

19

private var _replication: Int) extends Externalizable {

20

21

def useDisk: Boolean = _useDisk

22

def useMemory: Boolean = _useMemory

23

def useOffHeap: Boolean = _useOffHeap

24

def deserialized: Boolean = _deserialized

25

def replication: Int = _replication

26

27

override def toString: String = {

28

"StorageLevel(%s, %s, %s, %s, %d)".format(

29

_useDisk, _useMemory, _useOffHeap, _deserialized, _replication)

30

}

31

}

32

33

object StorageLevel {

34

// Predefined storage levels

35

val NONE = new StorageLevel(false, false, false, false, 1)

36

37

val DISK_ONLY = new StorageLevel(true, false, false, false, 1)

38

val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

39

40

val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)

41

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

42

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)

43

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

44

45

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)

46

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

47

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)

48

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

49

50

val OFF_HEAP = new StorageLevel(false, false, true, false, 1)

51

52

/** Create a new StorageLevel with custom settings */

53

def apply(

54

useDisk: Boolean,

55

useMemory: Boolean,

56

useOffHeap: Boolean,

57

deserialized: Boolean,

58

replication: Int): StorageLevel = {

59

new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)

60

}

61

}

62

```

63

64

### RDD Persistence Methods

65

66

Methods available on RDD for controlling persistence and caching behavior.

67

68

```scala { .api }

69

// RDD persistence methods

70

def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)

71

def persist(storageLevel: StorageLevel): RDD[T]

72

def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)

73

def unpersist(blocking: Boolean = true): RDD[T]

74

75

// Query persistence status

76

def getStorageLevel: StorageLevel

77

def isCheckpointed: Boolean

78

def getCheckpointFile: Option[String]

79

80

// Checkpointing methods

81

def checkpoint(): Unit

82

def localCheckpoint(): RDD[T]

83

```

84

85

**Usage Examples:**

86

87

```scala

88

import org.apache.spark.{SparkContext, SparkConf}

89

import org.apache.spark.storage.StorageLevel

90

91

val sc = new SparkContext(new SparkConf().setAppName("Storage Example").setMaster("local[*]"))

92

93

val data = sc.textFile("hdfs://large-dataset.txt")

94

95

// Basic caching (MEMORY_ONLY)

96

val cachedData = data.cache()

97

98

// Explicit persistence with different storage levels

99

val memoryOnlyData = data.persist(StorageLevel.MEMORY_ONLY)

100

val memoryAndDiskData = data.persist(StorageLevel.MEMORY_AND_DISK)

101

val diskOnlyData = data.persist(StorageLevel.DISK_ONLY)

102

val serializedData = data.persist(StorageLevel.MEMORY_ONLY_SER)

103

104

// Replication for fault tolerance

105

val replicatedData = data.persist(StorageLevel.MEMORY_ONLY_2) // 2 replicas

106

107

// Off-heap storage (requires off-heap memory configuration)

108

val offHeapData = data.persist(StorageLevel.OFF_HEAP)

109

110

// Custom storage level

111

val customStorage = StorageLevel(

112

useDisk = true,

113

useMemory = true,

114

useOffHeap = false,

115

deserialized = false, // serialized for memory efficiency

116

replication = 2

117

)

118

val customPersistedData = data.persist(customStorage)

119

120

// Use persisted data multiple times

121

val wordCounts = cachedData

122

.flatMap(_.split(" "))

123

.map((_, 1))

124

.reduceByKey(_ + _)

125

126

val lineCount = cachedData.count()

127

val charCount = cachedData.map(_.length).sum()

128

129

// Clean up when done

130

cachedData.unpersist()

131

memoryAndDiskData.unpersist(blocking = false) // Non-blocking cleanup

132

```

133

134

### Checkpointing

135

136

Fault-tolerance mechanism that saves RDD data to reliable storage for recovery.

137

138

```scala { .api }

139

// SparkContext checkpointing methods

140

def setCheckpointDir(directory: String): Unit

141

def getCheckpointDir: Option[String]

142

143

// RDD checkpointing methods

144

def checkpoint(): Unit

145

def localCheckpoint(): RDD[T]

146

def isCheckpointed: Boolean

147

def getCheckpointFile: Option[String]

148

```

149

150

**Usage Examples:**

151

152

```scala

153

import org.apache.spark.{SparkContext, SparkConf}

154

155

val sc = new SparkContext(new SparkConf().setAppName("Checkpoint Example"))

156

157

// Set checkpoint directory (should be on reliable storage like HDFS)

158

sc.setCheckpointDir("hdfs://namenode:port/spark-checkpoints")

159

160

val data = sc.textFile("hdfs://input-data.txt")

161

162

// Long chain of transformations

163

val processedData = data

164

.filter(_.nonEmpty)

165

.map(_.toLowerCase)

166

.flatMap(_.split(" "))

167

.map((_, 1))

168

.reduceByKey(_ + _)

169

.filter(_._2 > 5)

170

.map(_.swap) // (count, word)

171

.sortByKey(ascending = false)

172

.map(_.swap) // back to (word, count)

173

174

// Checkpoint to break lineage and prevent recomputation

175

processedData.checkpoint()

176

177

// Trigger checkpointing (requires an action)

178

processedData.count()

179

180

// Verify checkpointing

181

println(s"Is checkpointed: ${processedData.isCheckpointed}")

182

println(s"Checkpoint file: ${processedData.getCheckpointFile}")

183

184

// Use checkpointed RDD

185

val topWords = processedData.take(100)

186

val totalUniqueWords = processedData.count()

187

188

// Local checkpointing (for shorter lineages)

189

val shortChain = data.map(_.toUpperCase)

190

val localCheckpointed = shortChain.localCheckpoint()

191

localCheckpointed.count() // Trigger local checkpointing

192

```

193

194

## Storage Optimization Strategies

195

196

### Memory Management

197

198

```scala

199

// Monitor storage usage

200

val largeDataset = sc.textFile("hdfs://very-large-file.txt")

201

202

// Use serialized storage for memory efficiency

203

val efficientStorage = largeDataset.persist(StorageLevel.MEMORY_ONLY_SER)

204

205

// Check storage usage programmatically

206

def printStorageInfo(sc: SparkContext): Unit = {

207

val storageStatusListener = sc.statusTracker

208

val executorInfos = storageStatusListener.getExecutorInfos

209

210

executorInfos.foreach { executor =>

211

println(s"Executor ${executor.executorId}: " +

212

s"${executor.memoryUsed / (1024 * 1024)}MB used, " +

213

s"${executor.memoryTotal / (1024 * 1024)}MB total")

214

}

215

}

216

217

// Use the storage info

218

efficientStorage.count()

219

printStorageInfo(sc)

220

```

221

222

### Adaptive Storage Strategies

223

224

```scala

225

import org.apache.spark.storage.StorageLevel

226

227

def adaptiveStorage[T](rdd: RDD[T], estimatedSize: Long): RDD[T] = {

228

val memoryFraction = 0.6 // Assume 60% of executor memory available for storage

229

val executorMemory = sc.getConf.getSizeAsBytes("spark.executor.memory", "1g")

230

val availableMemory = (executorMemory * memoryFraction).toLong

231

232

val storageLevel = if (estimatedSize < availableMemory) {

233

StorageLevel.MEMORY_ONLY

234

} else if (estimatedSize < availableMemory * 2) {

235

StorageLevel.MEMORY_ONLY_SER

236

} else if (estimatedSize < availableMemory * 5) {

237

StorageLevel.MEMORY_AND_DISK_SER

238

} else {

239

StorageLevel.DISK_ONLY

240

}

241

242

println(s"Using storage level: $storageLevel for estimated size: ${estimatedSize / (1024 * 1024)}MB")

243

rdd.persist(storageLevel)

244

}

245

246

// Usage

247

val dataSize = estimateRDDSize(largeDataset) // Custom function to estimate size

248

val optimizedRDD = adaptiveStorage(largeDataset, dataSize)

249

```

250

251

### Multi-Stage Processing with Checkpointing

252

253

```scala

254

// Complex ETL pipeline with strategic checkpointing

255

def processLargeDataset(inputPath: String, outputPath: String): Unit = {

256

sc.setCheckpointDir("hdfs://checkpoints/etl-pipeline")

257

258

// Stage 1: Initial data loading and cleaning

259

val rawData = sc.textFile(inputPath)

260

val cleanedData = rawData

261

.filter(_.nonEmpty)

262

.filter(!_.startsWith("#")) // Remove comments

263

.map(_.trim)

264

.cache() // Cache cleaned data for reuse

265

266

val recordCount = cleanedData.count()

267

println(s"Loaded and cleaned $recordCount records")

268

269

// Stage 2: Complex transformations

270

val transformedData = cleanedData

271

.map(parseRecord) // Complex parsing

272

.filter(_.isValid) // Remove invalid records

273

.map(enrichRecord) // Add external data

274

.groupBy(_.category)

275

.mapValues(aggregateRecords) // Complex aggregation

276

277

// Checkpoint after expensive transformations

278

transformedData.checkpoint()

279

val checkpointCount = transformedData.count() // Trigger checkpointing

280

println(s"Checkpointed $checkpointCount transformed records")

281

282

// Stage 3: Final processing using checkpointed data

283

val finalResults = transformedData

284

.filter(_._2.score > threshold)

285

.sortBy(_._2.score, ascending = false)

286

.take(1000)

287

288

// Clean up cached data

289

cleanedData.unpersist()

290

291

// Save results

292

sc.parallelize(finalResults).saveAsTextFile(outputPath)

293

}

294

```

295

296

### Performance Monitoring

297

298

```scala

299

import org.apache.spark.storage.RDDInfo

300

301

def monitorRDDStorage(sc: SparkContext): Unit = {

302

val rddInfos = sc.getRDDStorageInfo

303

304

println("RDD Storage Information:")

305

println("=" * 50)

306

307

rddInfos.foreach { rddInfo =>

308

println(s"RDD ID: ${rddInfo.id}")

309

println(s"Name: ${rddInfo.name}")

310

println(s"Storage Level: ${rddInfo.storageLevel}")

311

println(s"Cached Partitions: ${rddInfo.numCachedPartitions}/${rddInfo.numPartitions}")

312

println(s"Memory Size: ${rddInfo.memSize / (1024 * 1024)} MB")

313

println(s"Disk Size: ${rddInfo.diskSize / (1024 * 1024)} MB")

314

println("-" * 30)

315

}

316

}

317

318

// Custom storage metrics

319

case class StorageMetrics(

320

rddId: Int,

321

memoryUsed: Long,

322

diskUsed: Long,

323

partitionsCached: Int,

324

totalPartitions: Int

325

)

326

327

def collectStorageMetrics(sc: SparkContext): List[StorageMetrics] = {

328

sc.getRDDStorageInfo.map { rddInfo =>

329

StorageMetrics(

330

rddId = rddInfo.id,

331

memoryUsed = rddInfo.memSize,

332

diskUsed = rddInfo.diskSize,

333

partitionsCached = rddInfo.numCachedPartitions,

334

totalPartitions = rddInfo.numPartitions

335

)

336

}.toList

337

}

338

339

// Usage in application

340

val beforeMetrics = collectStorageMetrics(sc)

341

// ... perform operations ...

342

val afterMetrics = collectStorageMetrics(sc)

343

344

// Compare metrics

345

println("Storage usage changes:")

346

afterMetrics.foreach { after =>

347

beforeMetrics.find(_.rddId == after.rddId) match {

348

case Some(before) =>

349

val memoryChange = after.memoryUsed - before.memoryUsed

350

val diskChange = after.diskUsed - before.diskUsed

351

println(s"RDD ${after.rddId}: Memory: ${memoryChange / (1024 * 1024)}MB, Disk: ${diskChange / (1024 * 1024)}MB")

352

case None =>

353

println(s"New RDD ${after.rddId}: Memory: ${after.memoryUsed / (1024 * 1024)}MB, Disk: ${after.diskUsed / (1024 * 1024)}MB")

354

}

355

}

356

```

357

358

## Best Practices

359

360

### When to Use Different Storage Levels

361

362

```scala

363

// 1. MEMORY_ONLY: Small datasets, frequently accessed

364

val smallFrequentData = sc.parallelize(1 to 1000).persist(StorageLevel.MEMORY_ONLY)

365

366

// 2. MEMORY_ONLY_SER: Larger datasets, CPU available for deserialization

367

val mediumData = sc.textFile("medium-file.txt").persist(StorageLevel.MEMORY_ONLY_SER)

368

369

// 3. MEMORY_AND_DISK: Important datasets, some memory pressure

370

val importantData = sc.textFile("important-file.txt").persist(StorageLevel.MEMORY_AND_DISK)

371

372

// 4. DISK_ONLY: Very large datasets, limited memory

373

val hugeData = sc.textFile("huge-file.txt").persist(StorageLevel.DISK_ONLY)

374

375

// 5. Replication levels: Critical data in fault-prone environments

376

val criticalData = sc.textFile("critical-file.txt").persist(StorageLevel.MEMORY_AND_DISK_2)

377

```

378

379

### Efficient Cache Management

380

381

```scala

382

// Cache management pattern

383

class CacheManager(sc: SparkContext) {

384

private var cachedRDDs = List.empty[RDD[_]]

385

386

def cacheRDD[T](rdd: RDD[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): RDD[T] = {

387

val cachedRDD = rdd.persist(storageLevel)

388

cachedRDDs = cachedRDD :: cachedRDDs

389

cachedRDD

390

}

391

392

def unpersistAll(): Unit = {

393

cachedRDDs.foreach(_.unpersist())

394

cachedRDDs = List.empty

395

}

396

397

def getStorageInfo: List[RDDInfo] = {

398

cachedRDDs.flatMap { rdd =>

399

sc.getRDDStorageInfo.find(_.id == rdd.id)

400

}

401

}

402

}

403

404

// Usage

405

val cacheManager = new CacheManager(sc)

406

407

val data1 = cacheManager.cacheRDD(sc.textFile("file1.txt"))

408

val data2 = cacheManager.cacheRDD(sc.textFile("file2.txt"), StorageLevel.MEMORY_ONLY_SER)

409

410

// Process data...

411

412

// Clean up all cached RDDs

413

cacheManager.unpersistAll()

414

```

415

416

### Checkpointing Strategy

417

418

```scala

419

// Intelligent checkpointing based on lineage depth

420

def smartCheckpoint[T](rdd: RDD[T], maxLineageDepth: Int = 10): RDD[T] = {

421

def countLineageDepth(rdd: RDD[_]): Int = {

422

rdd.dependencies.map {

423

case dep => countLineageDepth(dep.rdd) + 1

424

}.foldLeft(0)(math.max)

425

}

426

427

val depth = countLineageDepth(rdd)

428

if (depth > maxLineageDepth) {

429

rdd.checkpoint()

430

println(s"Checkpointing RDD with lineage depth: $depth")

431

}

432

rdd

433

}

434

435

// Usage in complex pipeline

436

val result = inputData

437

.map(transform1)

438

.filter(filter1)

439

.map(transform2)

440

.groupByKey()

441

.map(transform3) // This might have deep lineage

442

443

val checkpointedResult = smartCheckpoint(result)

444

checkpointedResult.count() // Trigger checkpointing if needed

445

446

// Continue processing with potentially checkpointed RDD

447

val finalResult = checkpointedResult

448

.filter(finalFilter)

449

.sortBy(_._2)

450

```