0
# Storage and Persistence
1
2
Spark provides fine-grained control over RDD caching and persistence strategies across memory and disk, enabling optimization for different data access patterns and cluster configurations.
3
4
## StorageLevel
5
6
StorageLevel defines how and where RDDs are stored when persisted.
7
8
```scala { .api }
9
class StorageLevel private(
10
private var _useDisk: Boolean,
11
private var _useMemory: Boolean,
12
private var _useOffHeap: Boolean,
13
private var _deserialized: Boolean,
14
private var _replication: Int) {
15
16
def useDisk: Boolean
17
def useMemory: Boolean
18
def useOffHeap: Boolean
19
def deserialized: Boolean
20
def replication: Int
21
def clone(): StorageLevel
22
def isValid: Boolean
23
def toInt: Int
24
def writeExternal(out: ObjectOutput): Unit
25
def readExternal(in: ObjectInput): Unit
26
}
27
```
28
29
### Predefined Storage Levels
30
31
```scala { .api }
32
object StorageLevel {
33
val NONE: StorageLevel
34
val DISK_ONLY: StorageLevel
35
val DISK_ONLY_2: StorageLevel
36
val MEMORY_ONLY: StorageLevel
37
val MEMORY_ONLY_2: StorageLevel
38
val MEMORY_ONLY_SER: StorageLevel
39
val MEMORY_ONLY_SER_2: StorageLevel
40
val MEMORY_AND_DISK: StorageLevel
41
val MEMORY_AND_DISK_2: StorageLevel
42
val MEMORY_AND_DISK_SER: StorageLevel
43
val MEMORY_AND_DISK_SER_2: StorageLevel
44
val OFF_HEAP: StorageLevel
45
46
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int): StorageLevel
47
def fromString(s: String): StorageLevel
48
}
49
```
50
51
### Storage Level Characteristics
52
53
| Storage Level | Memory | Disk | Serialized | Replication | Use Case |
54
|---------------|--------|------|------------|-------------|----------|
55
| `MEMORY_ONLY` | Yes | No | No | 1 | Fast access, sufficient memory |
56
| `MEMORY_ONLY_SER` | Yes | No | Yes | 1 | Memory-constrained, CPU available |
57
| `MEMORY_AND_DISK` | Yes | Yes | No | 1 | Fallback to disk when memory full |
58
| `MEMORY_AND_DISK_SER` | Yes | Yes | Yes | 1 | Memory + CPU constrained |
59
| `DISK_ONLY` | No | Yes | Yes | 1 | Large datasets, infrequent access |
60
| `MEMORY_ONLY_2` | Yes | No | No | 2 | Fast access + fault tolerance |
61
| `MEMORY_AND_DISK_2` | Yes | Yes | No | 2 | Balanced performance + fault tolerance |
62
| `OFF_HEAP` | Off-heap | No | Yes | 1 | Avoid GC overhead |
63
64
## Persistence Operations
65
66
### Basic Persistence
67
68
```scala
69
import org.apache.spark.{SparkContext, SparkConf}
70
import org.apache.spark.storage.StorageLevel
71
72
val sc = new SparkContext(new SparkConf().setAppName("Persistence Example").setMaster("local[*]"))
73
74
// Create an expensive RDD
75
val expensiveRDD = sc.textFile("large-dataset.txt")
76
.filter(_.nonEmpty)
77
.map(complexTransformation)
78
.filter(complexFilter)
79
80
// Cache in memory (shorthand for MEMORY_ONLY)
81
val cachedRDD = expensiveRDD.cache()
82
83
// Explicit persistence with custom storage level
84
val persistedRDD = expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
85
86
// Use the cached RDD multiple times - computation happens only once
87
val count1 = cachedRDD.count()
88
val count2 = cachedRDD.filter(_.contains("error")).count()
89
val sample = cachedRDD.take(10)
90
91
// Check storage level
92
println(s"Storage level: ${cachedRDD.getStorageLevel}")
93
94
// Remove from cache when no longer needed
95
cachedRDD.unpersist()
96
```
97
98
### Persistence Strategies by Use Case
99
100
```scala
101
// High-frequency access, sufficient memory
102
val frequentlyUsedRDD = inputRDD
103
.map(preprocessData)
104
.persist(StorageLevel.MEMORY_ONLY)
105
106
// Large dataset with memory constraints
107
val largeRDD = inputRDD
108
.flatMap(expandData)
109
.persist(StorageLevel.MEMORY_AND_DISK_SER)
110
111
// Critical data requiring fault tolerance
112
val criticalRDD = inputRDD
113
.map(importantTransformation)
114
.persist(StorageLevel.MEMORY_AND_DISK_2)
115
116
// Infrequently accessed large dataset
117
val archivalRDD = inputRDD
118
.map(heavyProcessing)
119
.persist(StorageLevel.DISK_ONLY)
120
121
// Avoiding GC pressure for long-lived RDDs
122
val longLivedRDD = inputRDD
123
.map(createLargeObjects)
124
.persist(StorageLevel.OFF_HEAP)
125
```
126
127
## Checkpointing
128
129
Checkpointing saves RDD data to reliable storage (like HDFS) to truncate lineage and improve fault tolerance.
130
131
### Basic Checkpointing
132
133
```scala
134
// Set checkpoint directory (must be fault-tolerant storage)
135
sc.setCheckpointDir("hdfs://namenode:port/checkpoints")
136
137
val longLineageRDD = sc.textFile("input.txt")
138
.map(transformation1)
139
.filter(filter1)
140
.map(transformation2)
141
.filter(filter2)
142
.map(transformation3)
143
.filter(filter3)
144
.map(transformation4)
145
146
// Mark for checkpointing
147
longLineageRDD.checkpoint()
148
149
// Trigger checkpointing with an action
150
val count = longLineageRDD.count()
151
152
// Verify checkpointing
153
println(s"Is checkpointed: ${longLineageRDD.isCheckpointed}")
154
println(s"Checkpoint file: ${longLineageRDD.getCheckpointFile}")
155
156
// The RDD lineage is now truncated - subsequent failures will recover from checkpoint
157
val furtherProcessed = longLineageRDD.map(additionalTransformation)
158
```
159
160
### Local Checkpointing
161
162
```scala
163
// Local checkpointing (persists to local executor disk)
164
val rddToCheckpoint = expensiveComputationRDD.localCheckpoint()
165
166
// Trigger checkpointing
167
rddToCheckpoint.count()
168
169
// Use the checkpointed RDD
170
val results = rddToCheckpoint.map(finalTransformation).collect()
171
```
172
173
### Strategic Checkpointing
174
175
```scala
176
// Checkpoint at strategic points in long pipelines
177
val pipeline = sc.textFile("massive-dataset.txt")
178
.map(parseRecord)
179
.filter(isValid)
180
.map(enrichWithLookup)
181
.filter(passesQualityCheck)
182
183
// Checkpoint after expensive preprocessing
184
val preprocessed = pipeline.checkpoint()
185
preprocessed.count() // Materialize checkpoint
186
187
// Continue pipeline from checkpoint
188
val aggregated = preprocessed
189
.map(extractFeatures)
190
.groupByKey()
191
.mapValues(aggregate)
192
193
// Checkpoint again before final processing
194
val checkpoint2 = aggregated.checkpoint()
195
checkpoint2.count()
196
197
val finalResults = checkpoint2
198
.map(finalTransformation)
199
.collect()
200
```
201
202
## Advanced Persistence Patterns
203
204
### Conditional Persistence
205
206
```scala
207
def smartPersist[T](rdd: RDD[T], estimatedSize: Long, memoryAvailable: Long): RDD[T] = {
208
val storageLevel = if (estimatedSize < memoryAvailable * 0.7) {
209
StorageLevel.MEMORY_ONLY
210
} else if (estimatedSize < memoryAvailable * 1.5) {
211
StorageLevel.MEMORY_AND_DISK_SER
212
} else {
213
StorageLevel.DISK_ONLY
214
}
215
216
rdd.persist(storageLevel)
217
}
218
219
// Usage
220
val processedRDD = inputRDD.map(expensiveTransformation)
221
val estimatedSize = processedRDD.count() * averageRecordSizeBytes
222
val smartPersistedRDD = smartPersist(processedRDD, estimatedSize, availableMemoryBytes)
223
```
224
225
### Multi-level Caching Strategy
226
227
```scala
228
class CacheManager(sc: SparkContext) {
229
private val cachedRDDs = mutable.Map[String, RDD[_]]()
230
231
def cacheWithEviction[T](name: String, rdd: RDD[T], level: StorageLevel): RDD[T] = {
232
// Evict old cache if memory is getting full
233
if (getMemoryUsage() > 0.8) {
234
evictLeastRecentlyUsed()
235
}
236
237
val cached = rdd.persist(level)
238
cachedRDDs(name) = cached
239
cached
240
}
241
242
def uncache(name: String): Unit = {
243
cachedRDDs.get(name).foreach(_.unpersist())
244
cachedRDDs.remove(name)
245
}
246
247
private def getMemoryUsage(): Double = {
248
// Implementation to check memory usage
249
val memoryStatus = sc.getExecutorMemoryStatus
250
val totalMemory = memoryStatus.values.map(_._1).sum
251
val usedMemory = memoryStatus.values.map(m => m._1 - m._2).sum
252
usedMemory.toDouble / totalMemory
253
}
254
255
private def evictLeastRecentlyUsed(): Unit = {
256
// Implementation to evict based on access patterns
257
cachedRDDs.headOption.foreach { case (name, rdd) =>
258
rdd.unpersist()
259
cachedRDDs.remove(name)
260
}
261
}
262
}
263
```
264
265
### Temperature-based Storage
266
267
```scala
268
object DataTemperature extends Enumeration {
269
type DataTemperature = Value
270
val HOT, WARM, COLD = Value
271
}
272
273
import DataTemperature._
274
275
class TemperatureAwareStorage(sc: SparkContext) {
276
def persist[T](rdd: RDD[T], temperature: DataTemperature): RDD[T] = {
277
val storageLevel = temperature match {
278
case HOT => StorageLevel.MEMORY_ONLY // Frequently accessed
279
case WARM => StorageLevel.MEMORY_AND_DISK_SER // Occasionally accessed
280
case COLD => StorageLevel.DISK_ONLY // Rarely accessed
281
}
282
rdd.persist(storageLevel)
283
}
284
}
285
286
// Usage
287
val storage = new TemperatureAwareStorage(sc)
288
289
val hotData = storage.persist(frequentlyUsedRDD, HOT)
290
val warmData = storage.persist(occasionallyUsedRDD, WARM)
291
val coldData = storage.persist(archivalRDD, COLD)
292
```
293
294
## Performance Optimization
295
296
### Memory Fraction Configuration
297
298
```scala
299
// Configure storage memory fraction in SparkConf
300
val conf = new SparkConf()
301
.setAppName("Storage Optimization")
302
.set("spark.storage.memoryFraction", "0.6") // 60% for storage (legacy)
303
.set("spark.storage.unrollMemoryFraction", "0.2") // 20% for unrolling (legacy)
304
.set("spark.storage.memoryMapThreshold", "2m") // Memory map files > 2MB
305
.set("spark.storage.blockManagerSlaveTimeoutMs", "120s") // Block manager timeout
306
307
// Unified memory manager (Spark 1.6+)
308
val modernConf = new SparkConf()
309
.set("spark.memory.useLegacyMode", "false")
310
.set("spark.memory.storageFraction", "0.5") // 50% of heap for storage
311
```
312
313
### Serialization Optimization
314
315
```scala
316
// Use Kryo serialization for better performance
317
val conf = new SparkConf()
318
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
319
.set("spark.kryo.unsafe", "true")
320
.set("spark.kryoserializer.buffer.max", "1g")
321
322
// Register classes for better Kryo performance
323
conf.registerKryoClasses(Array(
324
classOf[MyCustomClass],
325
classOf[AnotherClass]
326
))
327
328
val sc = new SparkContext(conf)
329
330
// Prefer serialized storage for large objects
331
val serializedRDD = largeObjectRDD.persist(StorageLevel.MEMORY_ONLY_SER)
332
```
333
334
### Block Size Optimization
335
336
```scala
337
// Configure block sizes for different access patterns
338
val conf = new SparkConf()
339
.set("spark.storage.blockManagerPort", "0")
340
.set("spark.storage.blockManagerHeartbeatMs", "10s")
341
.set("spark.storage.getBlockTimeoutMs", "60s")
342
343
// For streaming workloads
344
val streamingConf = new SparkConf()
345
.set("spark.storage.memoryMapThreshold", "128k") // Smaller threshold
346
.set("spark.storage.unrollMemoryThreshold", "1m") // Smaller unroll buffer
347
348
// For batch workloads with large blocks
349
val batchConf = new SparkConf()
350
.set("spark.storage.memoryMapThreshold", "8m") // Larger threshold
351
.set("spark.storage.unrollMemoryThreshold", "16m") // Larger unroll buffer
352
```
353
354
## Monitoring and Debugging
355
356
### Storage Information
357
358
```scala
359
// Check RDD storage information
360
def printStorageInfo(rdd: RDD[_]): Unit = {
361
println(s"RDD ${rdd.id} Storage Info:")
362
println(s" Name: ${rdd.name}")
363
println(s" Storage Level: ${rdd.getStorageLevel}")
364
println(s" Is Cached: ${rdd.getStorageLevel != StorageLevel.NONE}")
365
println(s" Is Checkpointed: ${rdd.isCheckpointed}")
366
println(s" Partitions: ${rdd.partitions.length}")
367
368
if (rdd.isCheckpointed) {
369
println(s" Checkpoint File: ${rdd.getCheckpointFile}")
370
}
371
}
372
373
// Check storage status across cluster
374
def printClusterStorageStatus(sc: SparkContext): Unit = {
375
val status = sc.getExecutorMemoryStatus
376
377
println("Cluster Storage Status:")
378
status.foreach { case (executorId, (maxMemory, remainingMemory)) =>
379
val usedMemory = maxMemory - remainingMemory
380
val usagePercent = (usedMemory.toDouble / maxMemory) * 100
381
382
println(f" Executor $executorId:")
383
println(f" Total Memory: ${maxMemory / (1024 * 1024)}%,d MB")
384
println(f" Used Memory: ${usedMemory / (1024 * 1024)}%,d MB")
385
println(f" Usage: $usagePercent%.1f%%")
386
}
387
}
388
```
389
390
### Cache Management Utilities
391
392
```scala
393
class CacheMonitor(sc: SparkContext) {
394
def getCachedRDDs(): Array[(Int, String, StorageLevel)] = {
395
sc.getPersistentRDDs.values.map { rdd =>
396
(rdd.id, rdd.name, rdd.getStorageLevel)
397
}.toArray
398
}
399
400
def clearAllCache(): Unit = {
401
sc.getPersistentRDDs.values.foreach(_.unpersist())
402
}
403
404
def clearCacheByName(namePattern: String): Int = {
405
val regex = namePattern.r
406
var cleared = 0
407
408
sc.getPersistentRDDs.values.foreach { rdd =>
409
if (rdd.name != null && regex.findFirstIn(rdd.name).isDefined) {
410
rdd.unpersist()
411
cleared += 1
412
}
413
}
414
cleared
415
}
416
417
def getStorageStats(): Map[StorageLevel, Int] = {
418
sc.getPersistentRDDs.values
419
.groupBy(_.getStorageLevel)
420
.mapValues(_.size)
421
}
422
}
423
424
// Usage
425
val monitor = new CacheMonitor(sc)
426
427
// Before processing
428
println("Cached RDDs before processing:")
429
monitor.getCachedRDDs().foreach { case (id, name, level) =>
430
println(s" RDD $id ($name): $level")
431
}
432
433
// Process data with caching
434
val results = processDataWithCaching()
435
436
// After processing
437
println("\nStorage statistics:")
438
monitor.getStorageStats().foreach { case (level, count) =>
439
println(s" $level: $count RDDs")
440
}
441
442
// Cleanup
443
monitor.clearAllCache()
444
```
445
446
## Best Practices
447
448
### When to Persist
449
- RDD is used multiple times in the application
450
- RDD has expensive computation (complex transformations, joins)
451
- RDD has many dependencies (long lineage)
452
- Recovery time would be significant
453
454
### When to Checkpoint
455
- Very long RDD lineages (> 10 transformations)
456
- Wide transformations that are expensive to recompute
457
- Before expensive iterative algorithms
458
- When using RDDs across multiple Spark applications
459
460
### Storage Level Selection
461
- **MEMORY_ONLY**: Fast access, sufficient memory, objects don't need serialization
462
- **MEMORY_ONLY_SER**: Memory limited, CPU available, objects benefit from serialization
463
- **MEMORY_AND_DISK**: Balanced approach, fallback for memory pressure
464
- **MEMORY_AND_DISK_SER**: Memory and CPU constrained, good balance
465
- **DISK_ONLY**: Very large datasets, infrequent access
466
- **_2 variants**: When fault tolerance is critical and cluster is unreliable
467
468
### Performance Tips
469
```scala
470
// Good practices
471
val optimizedRDD = inputRDD
472
.filter(isRelevant) // Filter early to reduce data size
473
.map(lightweightTransform) // Apply cheap transformations first
474
.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist before expensive operations
475
.map(expensiveTransform) // Apply expensive operations after persistence
476
477
// Avoid anti-patterns
478
val inefficientRDD = inputRDD
479
.persist(StorageLevel.MEMORY_ONLY) // Persisting before filtering
480
.filter(isRelevant) // Could reduce memory pressure first
481
.map(heavyTransformation) // Heavy operation on larger dataset
482
```