0
# Storage and Persistence
1
2
Storage levels and caching mechanisms for optimizing RDD persistence across memory and disk with various replication strategies.
3
4
## Capabilities
5
6
### StorageLevel
7
8
Defines how RDD partitions are stored, including memory usage, disk usage, and replication settings.
9
10
```scala { .api }
11
/**
12
* Storage level for RDD persistence, defining memory/disk usage and replication
13
*/
14
class StorageLevel private(
15
private var _useDisk: Boolean,
16
private var _useMemory: Boolean,
17
private var _useOffHeap: Boolean,
18
private var _deserialized: Boolean,
19
private var _replication: Int) extends Externalizable {
20
21
def useDisk: Boolean = _useDisk
22
def useMemory: Boolean = _useMemory
23
def useOffHeap: Boolean = _useOffHeap
24
def deserialized: Boolean = _deserialized
25
def replication: Int = _replication
26
27
override def toString: String = {
28
"StorageLevel(%s, %s, %s, %s, %d)".format(
29
_useDisk, _useMemory, _useOffHeap, _deserialized, _replication)
30
}
31
}
32
33
object StorageLevel {
34
// Predefined storage levels
35
val NONE = new StorageLevel(false, false, false, false, 1)
36
37
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
38
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
39
40
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
41
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
42
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
43
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
44
45
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
46
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
47
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
48
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
49
50
val OFF_HEAP = new StorageLevel(false, false, true, false, 1)
51
52
/** Create a new StorageLevel with custom settings */
53
def apply(
54
useDisk: Boolean,
55
useMemory: Boolean,
56
useOffHeap: Boolean,
57
deserialized: Boolean,
58
replication: Int): StorageLevel = {
59
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
60
}
61
}
62
```
63
64
### RDD Persistence Methods
65
66
Methods available on RDD for controlling persistence and caching behavior.
67
68
```scala { .api }
69
// RDD persistence methods
70
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
71
def persist(storageLevel: StorageLevel): RDD[T]
72
def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
73
def unpersist(blocking: Boolean = true): RDD[T]
74
75
// Query persistence status
76
def getStorageLevel: StorageLevel
77
def isCheckpointed: Boolean
78
def getCheckpointFile: Option[String]
79
80
// Checkpointing methods
81
def checkpoint(): Unit
82
def localCheckpoint(): RDD[T]
83
```
84
85
**Usage Examples:**
86
87
```scala
88
import org.apache.spark.{SparkContext, SparkConf}
89
import org.apache.spark.storage.StorageLevel
90
91
val sc = new SparkContext(new SparkConf().setAppName("Storage Example").setMaster("local[*]"))
92
93
val data = sc.textFile("hdfs://large-dataset.txt")
94
95
// Basic caching (MEMORY_ONLY)
96
val cachedData = data.cache()
97
98
// Explicit persistence with different storage levels
99
val memoryOnlyData = data.persist(StorageLevel.MEMORY_ONLY)
100
val memoryAndDiskData = data.persist(StorageLevel.MEMORY_AND_DISK)
101
val diskOnlyData = data.persist(StorageLevel.DISK_ONLY)
102
val serializedData = data.persist(StorageLevel.MEMORY_ONLY_SER)
103
104
// Replication for fault tolerance
105
val replicatedData = data.persist(StorageLevel.MEMORY_ONLY_2) // 2 replicas
106
107
// Off-heap storage (requires off-heap memory configuration)
108
val offHeapData = data.persist(StorageLevel.OFF_HEAP)
109
110
// Custom storage level
111
val customStorage = StorageLevel(
112
useDisk = true,
113
useMemory = true,
114
useOffHeap = false,
115
deserialized = false, // serialized for memory efficiency
116
replication = 2
117
)
118
val customPersistedData = data.persist(customStorage)
119
120
// Use persisted data multiple times
121
val wordCounts = cachedData
122
.flatMap(_.split(" "))
123
.map((_, 1))
124
.reduceByKey(_ + _)
125
126
val lineCount = cachedData.count()
127
val charCount = cachedData.map(_.length).sum()
128
129
// Clean up when done
130
cachedData.unpersist()
131
memoryAndDiskData.unpersist(blocking = false) // Non-blocking cleanup
132
```
133
134
### Checkpointing
135
136
Fault-tolerance mechanism that saves RDD data to reliable storage for recovery.
137
138
```scala { .api }
139
// SparkContext checkpointing methods
140
def setCheckpointDir(directory: String): Unit
141
def getCheckpointDir: Option[String]
142
143
// RDD checkpointing methods
144
def checkpoint(): Unit
145
def localCheckpoint(): RDD[T]
146
def isCheckpointed: Boolean
147
def getCheckpointFile: Option[String]
148
```
149
150
**Usage Examples:**
151
152
```scala
153
import org.apache.spark.{SparkContext, SparkConf}
154
155
val sc = new SparkContext(new SparkConf().setAppName("Checkpoint Example"))
156
157
// Set checkpoint directory (should be on reliable storage like HDFS)
158
sc.setCheckpointDir("hdfs://namenode:port/spark-checkpoints")
159
160
val data = sc.textFile("hdfs://input-data.txt")
161
162
// Long chain of transformations
163
val processedData = data
164
.filter(_.nonEmpty)
165
.map(_.toLowerCase)
166
.flatMap(_.split(" "))
167
.map((_, 1))
168
.reduceByKey(_ + _)
169
.filter(_._2 > 5)
170
.map(_.swap) // (count, word)
171
.sortByKey(ascending = false)
172
.map(_.swap) // back to (word, count)
173
174
// Checkpoint to break lineage and prevent recomputation
175
processedData.checkpoint()
176
177
// Trigger checkpointing (requires an action)
178
processedData.count()
179
180
// Verify checkpointing
181
println(s"Is checkpointed: ${processedData.isCheckpointed}")
182
println(s"Checkpoint file: ${processedData.getCheckpointFile}")
183
184
// Use checkpointed RDD
185
val topWords = processedData.take(100)
186
val totalUniqueWords = processedData.count()
187
188
// Local checkpointing (for shorter lineages)
189
val shortChain = data.map(_.toUpperCase)
190
val localCheckpointed = shortChain.localCheckpoint()
191
localCheckpointed.count() // Trigger local checkpointing
192
```
193
194
## Storage Optimization Strategies
195
196
### Memory Management
197
198
```scala
199
// Monitor storage usage
200
val largeDataset = sc.textFile("hdfs://very-large-file.txt")
201
202
// Use serialized storage for memory efficiency
203
val efficientStorage = largeDataset.persist(StorageLevel.MEMORY_ONLY_SER)
204
205
// Check storage usage programmatically
206
def printStorageInfo(sc: SparkContext): Unit = {
207
val storageStatusListener = sc.statusTracker
208
val executorInfos = storageStatusListener.getExecutorInfos
209
210
executorInfos.foreach { executor =>
211
println(s"Executor ${executor.executorId}: " +
212
s"${executor.memoryUsed / (1024 * 1024)}MB used, " +
213
s"${executor.memoryTotal / (1024 * 1024)}MB total")
214
}
215
}
216
217
// Use the storage info
218
efficientStorage.count()
219
printStorageInfo(sc)
220
```
221
222
### Adaptive Storage Strategies
223
224
```scala
225
import org.apache.spark.storage.StorageLevel
226
227
def adaptiveStorage[T](rdd: RDD[T], estimatedSize: Long): RDD[T] = {
228
val memoryFraction = 0.6 // Assume 60% of executor memory available for storage
229
val executorMemory = sc.getConf.getSizeAsBytes("spark.executor.memory", "1g")
230
val availableMemory = (executorMemory * memoryFraction).toLong
231
232
val storageLevel = if (estimatedSize < availableMemory) {
233
StorageLevel.MEMORY_ONLY
234
} else if (estimatedSize < availableMemory * 2) {
235
StorageLevel.MEMORY_ONLY_SER
236
} else if (estimatedSize < availableMemory * 5) {
237
StorageLevel.MEMORY_AND_DISK_SER
238
} else {
239
StorageLevel.DISK_ONLY
240
}
241
242
println(s"Using storage level: $storageLevel for estimated size: ${estimatedSize / (1024 * 1024)}MB")
243
rdd.persist(storageLevel)
244
}
245
246
// Usage
247
val dataSize = estimateRDDSize(largeDataset) // Custom function to estimate size
248
val optimizedRDD = adaptiveStorage(largeDataset, dataSize)
249
```
250
251
### Multi-Stage Processing with Checkpointing
252
253
```scala
254
// Complex ETL pipeline with strategic checkpointing
255
def processLargeDataset(inputPath: String, outputPath: String): Unit = {
256
sc.setCheckpointDir("hdfs://checkpoints/etl-pipeline")
257
258
// Stage 1: Initial data loading and cleaning
259
val rawData = sc.textFile(inputPath)
260
val cleanedData = rawData
261
.filter(_.nonEmpty)
262
.filter(!_.startsWith("#")) // Remove comments
263
.map(_.trim)
264
.cache() // Cache cleaned data for reuse
265
266
val recordCount = cleanedData.count()
267
println(s"Loaded and cleaned $recordCount records")
268
269
// Stage 2: Complex transformations
270
val transformedData = cleanedData
271
.map(parseRecord) // Complex parsing
272
.filter(_.isValid) // Remove invalid records
273
.map(enrichRecord) // Add external data
274
.groupBy(_.category)
275
.mapValues(aggregateRecords) // Complex aggregation
276
277
// Checkpoint after expensive transformations
278
transformedData.checkpoint()
279
val checkpointCount = transformedData.count() // Trigger checkpointing
280
println(s"Checkpointed $checkpointCount transformed records")
281
282
// Stage 3: Final processing using checkpointed data
283
val finalResults = transformedData
284
.filter(_._2.score > threshold)
285
.sortBy(_._2.score, ascending = false)
286
.take(1000)
287
288
// Clean up cached data
289
cleanedData.unpersist()
290
291
// Save results
292
sc.parallelize(finalResults).saveAsTextFile(outputPath)
293
}
294
```
295
296
### Performance Monitoring
297
298
```scala
299
import org.apache.spark.storage.RDDInfo
300
301
def monitorRDDStorage(sc: SparkContext): Unit = {
302
val rddInfos = sc.getRDDStorageInfo
303
304
println("RDD Storage Information:")
305
println("=" * 50)
306
307
rddInfos.foreach { rddInfo =>
308
println(s"RDD ID: ${rddInfo.id}")
309
println(s"Name: ${rddInfo.name}")
310
println(s"Storage Level: ${rddInfo.storageLevel}")
311
println(s"Cached Partitions: ${rddInfo.numCachedPartitions}/${rddInfo.numPartitions}")
312
println(s"Memory Size: ${rddInfo.memSize / (1024 * 1024)} MB")
313
println(s"Disk Size: ${rddInfo.diskSize / (1024 * 1024)} MB")
314
println("-" * 30)
315
}
316
}
317
318
// Custom storage metrics
319
case class StorageMetrics(
320
rddId: Int,
321
memoryUsed: Long,
322
diskUsed: Long,
323
partitionsCached: Int,
324
totalPartitions: Int
325
)
326
327
def collectStorageMetrics(sc: SparkContext): List[StorageMetrics] = {
328
sc.getRDDStorageInfo.map { rddInfo =>
329
StorageMetrics(
330
rddId = rddInfo.id,
331
memoryUsed = rddInfo.memSize,
332
diskUsed = rddInfo.diskSize,
333
partitionsCached = rddInfo.numCachedPartitions,
334
totalPartitions = rddInfo.numPartitions
335
)
336
}.toList
337
}
338
339
// Usage in application
340
val beforeMetrics = collectStorageMetrics(sc)
341
// ... perform operations ...
342
val afterMetrics = collectStorageMetrics(sc)
343
344
// Compare metrics
345
println("Storage usage changes:")
346
afterMetrics.foreach { after =>
347
beforeMetrics.find(_.rddId == after.rddId) match {
348
case Some(before) =>
349
val memoryChange = after.memoryUsed - before.memoryUsed
350
val diskChange = after.diskUsed - before.diskUsed
351
println(s"RDD ${after.rddId}: Memory: ${memoryChange / (1024 * 1024)}MB, Disk: ${diskChange / (1024 * 1024)}MB")
352
case None =>
353
println(s"New RDD ${after.rddId}: Memory: ${after.memoryUsed / (1024 * 1024)}MB, Disk: ${after.diskUsed / (1024 * 1024)}MB")
354
}
355
}
356
```
357
358
## Best Practices
359
360
### When to Use Different Storage Levels
361
362
```scala
363
// 1. MEMORY_ONLY: Small datasets, frequently accessed
364
val smallFrequentData = sc.parallelize(1 to 1000).persist(StorageLevel.MEMORY_ONLY)
365
366
// 2. MEMORY_ONLY_SER: Larger datasets, CPU available for deserialization
367
val mediumData = sc.textFile("medium-file.txt").persist(StorageLevel.MEMORY_ONLY_SER)
368
369
// 3. MEMORY_AND_DISK: Important datasets, some memory pressure
370
val importantData = sc.textFile("important-file.txt").persist(StorageLevel.MEMORY_AND_DISK)
371
372
// 4. DISK_ONLY: Very large datasets, limited memory
373
val hugeData = sc.textFile("huge-file.txt").persist(StorageLevel.DISK_ONLY)
374
375
// 5. Replication levels: Critical data in fault-prone environments
376
val criticalData = sc.textFile("critical-file.txt").persist(StorageLevel.MEMORY_AND_DISK_2)
377
```
378
379
### Efficient Cache Management
380
381
```scala
382
// Cache management pattern
383
class CacheManager(sc: SparkContext) {
384
private var cachedRDDs = List.empty[RDD[_]]
385
386
def cacheRDD[T](rdd: RDD[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): RDD[T] = {
387
val cachedRDD = rdd.persist(storageLevel)
388
cachedRDDs = cachedRDD :: cachedRDDs
389
cachedRDD
390
}
391
392
def unpersistAll(): Unit = {
393
cachedRDDs.foreach(_.unpersist())
394
cachedRDDs = List.empty
395
}
396
397
def getStorageInfo: List[RDDInfo] = {
398
cachedRDDs.flatMap { rdd =>
399
sc.getRDDStorageInfo.find(_.id == rdd.id)
400
}
401
}
402
}
403
404
// Usage
405
val cacheManager = new CacheManager(sc)
406
407
val data1 = cacheManager.cacheRDD(sc.textFile("file1.txt"))
408
val data2 = cacheManager.cacheRDD(sc.textFile("file2.txt"), StorageLevel.MEMORY_ONLY_SER)
409
410
// Process data...
411
412
// Clean up all cached RDDs
413
cacheManager.unpersistAll()
414
```
415
416
### Checkpointing Strategy
417
418
```scala
419
// Intelligent checkpointing based on lineage depth
420
def smartCheckpoint[T](rdd: RDD[T], maxLineageDepth: Int = 10): RDD[T] = {
421
def countLineageDepth(rdd: RDD[_]): Int = {
422
rdd.dependencies.map {
423
case dep => countLineageDepth(dep.rdd) + 1
424
}.foldLeft(0)(math.max)
425
}
426
427
val depth = countLineageDepth(rdd)
428
if (depth > maxLineageDepth) {
429
rdd.checkpoint()
430
println(s"Checkpointing RDD with lineage depth: $depth")
431
}
432
rdd
433
}
434
435
// Usage in complex pipeline
436
val result = inputData
437
.map(transform1)
438
.filter(filter1)
439
.map(transform2)
440
.groupByKey()
441
.map(transform3) // This might have deep lineage
442
443
val checkpointedResult = smartCheckpoint(result)
444
checkpointedResult.count() // Trigger checkpointing if needed
445
446
// Continue processing with potentially checkpointed RDD
447
val finalResult = checkpointedResult
448
.filter(finalFilter)
449
.sortBy(_._2)
450
```