0
# Storage and Persistence
1
2
Memory management and persistence strategies for optimizing RDD storage across cluster nodes, including various storage levels and caching mechanisms.
3
4
## Capabilities
5
6
### StorageLevel
7
8
Defines how RDDs should be stored in memory and/or disk across the cluster.
9
10
```scala { .api }
11
/**
12
* Storage level for an RDD. Determines how the RDD should be stored in memory and/or disk.
13
* Storage levels can specify whether to use disk, memory, off-heap memory, serialization,
14
* and replication.
15
*/
16
class StorageLevel private(
17
private var _useDisk: Boolean,
18
private var _useMemory: Boolean,
19
private var _useOffHeap: Boolean,
20
private var _deserialized: Boolean,
21
private var _replication: Int = 1) extends Externalizable {
22
23
/** Whether to use disk storage */
24
def useDisk: Boolean = _useDisk
25
26
/** Whether to use memory storage */
27
def useMemory: Boolean = _useMemory
28
29
/** Whether to use off-heap memory */
30
def useOffHeap: Boolean = _useOffHeap
31
32
/** Whether to store RDD as deserialized objects */
33
def deserialized: Boolean = _deserialized
34
35
/** Number of replicas to store */
36
def replication: Int = _replication
37
38
/** Create a copy with different replication level */
39
def clone(newReplication: Int): StorageLevel
40
}
41
42
/**
43
* Storage level constants for common storage strategies
44
*/
45
object StorageLevel {
46
47
/** No persistence - RDD partitions are not stored */
48
val NONE = new StorageLevel(false, false, false, false)
49
50
/** Store RDD partitions only on disk */
51
val DISK_ONLY = new StorageLevel(true, false, false, false)
52
53
/** Store RDD partitions on disk with 2x replication */
54
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
55
56
/** Store RDD partitions only in memory as deserialized Java objects */
57
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
58
59
/** Store RDD partitions in memory with 2x replication */
60
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
61
62
/** Store RDD partitions in memory as serialized Java objects (more space-efficient) */
63
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
64
65
/** Store RDD partitions in memory as serialized objects with 2x replication */
66
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
67
68
/** Store RDD partitions in memory, spill to disk if not enough memory */
69
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
70
71
/** Store RDD partitions in memory and disk with 2x replication */
72
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
73
74
/** Store RDD partitions in memory as serialized objects, spill to disk if needed */
75
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
76
77
/** Store RDD partitions in memory and disk as serialized objects with 2x replication */
78
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
79
80
/** Store RDD partitions in off-heap memory (requires off-heap memory configured) */
81
val OFF_HEAP = new StorageLevel(false, false, true, false)
82
}
83
```
84
85
**Usage Examples:**
86
87
```scala
88
import org.apache.spark.storage.StorageLevel
89
90
val sc = new SparkContext(conf)
91
val data = sc.parallelize(1 to 1000000)
92
93
// Cache in memory (equivalent to .cache())
94
data.persist(StorageLevel.MEMORY_ONLY)
95
96
// Cache in memory with serialization (more memory efficient)
97
data.persist(StorageLevel.MEMORY_ONLY_SER)
98
99
// Memory with disk fallback
100
data.persist(StorageLevel.MEMORY_AND_DISK)
101
102
// Disk only storage
103
data.persist(StorageLevel.DISK_ONLY)
104
105
// High availability with replication
106
data.persist(StorageLevel.MEMORY_AND_DISK_2)
107
108
// Custom storage level
109
val customLevel = new StorageLevel(
110
useDisk = true,
111
useMemory = true,
112
useOffHeap = false,
113
deserialized = false,
114
replication = 3
115
)
116
data.persist(customLevel)
117
```
118
119
### RDD Persistence Methods
120
121
Core methods for controlling RDD persistence and caching.
122
123
```scala { .api }
124
/**
125
* Persistence methods available on all RDDs
126
*/
127
abstract class RDD[T: ClassTag] extends Serializable {
128
129
/** Persist this RDD with the default storage level (MEMORY_ONLY) */
130
def cache(): RDD.this.type = persist()
131
132
/** Persist this RDD with the specified storage level */
133
def persist(newLevel: StorageLevel): RDD.this.type
134
135
/** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk */
136
def unpersist(blocking: Boolean = true): RDD.this.type
137
138
/** Get the current storage level of this RDD */
139
def getStorageLevel: StorageLevel
140
141
/** Mark this RDD for checkpointing */
142
def checkpoint(): Unit
143
144
/** Return whether this RDD has been checkpointed or not */
145
def isCheckpointed: Boolean
146
147
/** Gets the name of the directory to which this RDD was checkpointed */
148
def getCheckpointFile: Option[String]
149
150
/** Mark this RDD for local checkpointing using Spark's existing caching layer */
151
def localCheckpoint(): RDD.this.type
152
}
153
```
154
155
**Usage Examples:**
156
157
```scala
158
val expensiveRDD = sc.textFile("large-dataset.txt")
159
.map(heavyProcessingFunction)
160
.filter(_.nonEmpty)
161
162
// Cache for multiple uses
163
expensiveRDD.cache()
164
165
// Use the cached RDD multiple times
166
val count1 = expensiveRDD.count()
167
val count2 = expensiveRDD.filter(_.contains("error")).count()
168
169
// Check storage level
170
println(s"Storage level: ${expensiveRDD.getStorageLevel}")
171
172
// Remove from cache when done
173
expensiveRDD.unpersist()
174
175
// Checkpoint for fault tolerance
176
sc.setCheckpointDir("hdfs://path/to/checkpoint")
177
expensiveRDD.checkpoint()
178
expensiveRDD.count() // Trigger checkpoint creation
179
```
180
181
### Storage Management in SparkContext
182
183
Methods for managing storage and cache across the application.
184
185
```scala { .api }
186
class SparkContext(config: SparkConf) extends Logging {
187
188
/** Set the directory under which RDDs are going to be checkpointed */
189
def setCheckpointDir(directory: String): Unit
190
191
/** Return the directory where checkpoints are stored, if one is set */
192
def getCheckpointDir: Option[String]
193
}
194
```
195
196
### BlockManager Integration
197
198
Understanding how Spark manages storage blocks internally.
199
200
```scala { .api }
201
/**
202
* Storage information for RDD blocks
203
*/
204
case class RDDInfo(
205
id: Int,
206
name: String,
207
numPartitions: Int,
208
storageLevel: StorageLevel,
209
numCachedPartitions: Int,
210
memSize: Long,
211
diskSize: Long) {
212
213
/** Whether this RDD is currently persisted */
214
def isCached: Boolean = numCachedPartitions > 0
215
}
216
217
/**
218
* Information about memory usage for storage
219
*/
220
case class MemoryStatus(maxMemory: Long, memoryUsed: Long, memoryRemaining: Long) {
221
def memoryFraction: Double = memoryUsed.toDouble / maxMemory
222
}
223
224
/**
225
* Information about a storage block
226
*/
227
case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
228
def isCached: Boolean = memSize > 0 || diskSize > 0
229
}
230
```
231
232
## Storage Strategy Guidelines
233
234
### Choosing the Right Storage Level
235
236
**MEMORY_ONLY (default cache())**
237
- Best for: Small to medium RDDs that fit in memory
238
- Fastest access but limited by memory capacity
239
- Use when: RDD will be accessed multiple times and fits in cluster memory
240
241
```scala
242
// Good for small, frequently accessed RDDs
243
val smallLookupTable = sc.parallelize(lookupData).cache()
244
```
245
246
**MEMORY_ONLY_SER**
247
- Best for: Large RDDs where memory is constrained
248
- Slower than MEMORY_ONLY but uses less memory
249
- Use when: Memory is limited but want to avoid disk I/O
250
251
```scala
252
// Memory-efficient caching for large datasets
253
val largeRDD = sc.textFile("huge-file.txt")
254
.map(expensiveTransformation)
255
.persist(StorageLevel.MEMORY_ONLY_SER)
256
```
257
258
**MEMORY_AND_DISK**
259
- Best for: Large RDDs that may not fit entirely in memory
260
- Graceful degradation from memory to disk
261
- Use when: Want speed of memory with disk fallback
262
263
```scala
264
// Balanced approach for uncertain memory requirements
265
val uncertainSizeRDD = sc.textFile("variable-size-data/*")
266
.persist(StorageLevel.MEMORY_AND_DISK)
267
```
268
269
**DISK_ONLY**
270
- Best for: Very large RDDs where memory is needed for other operations
271
- Slowest but most reliable for large datasets
272
- Use when: Memory is needed for computations, not storage
273
274
```scala
275
// Store intermediate results on disk
276
val intermediateResults = complexProcessing(inputRDD)
277
.persist(StorageLevel.DISK_ONLY)
278
```
279
280
**Replication Levels (_2 suffix)**
281
- Best for: Critical RDDs where fault tolerance is important
282
- Doubles storage cost but provides redundancy
283
- Use when: Recomputation cost is very high
284
285
```scala
286
// High availability for critical data
287
val criticalRDD = sc.textFile("important-data.txt")
288
.map(veryExpensiveComputation)
289
.persist(StorageLevel.MEMORY_AND_DISK_2)
290
```
291
292
### Performance Optimization Patterns
293
294
**Iterative Algorithms**
295
```scala
296
var currentRDD = initialData.cache()
297
298
for (i <- 1 to numIterations) {
299
val nextRDD = currentRDD.map(iterationFunction).cache()
300
currentRDD.unpersist() // Remove previous iteration
301
currentRDD = nextRDD
302
}
303
```
304
305
**Multi-Stage Pipelines**
306
```scala
307
// Cache intermediate stages that branch
308
val cleaned = rawData.filter(isValid).cache()
309
310
val analysis1 = cleaned.map(analysisFunction1).collect()
311
val analysis2 = cleaned.map(analysisFunction2).collect()
312
313
cleaned.unpersist() // Clean up when done
314
```
315
316
**Memory Management**
317
```scala
318
// Monitor and manage memory usage
319
val rdd1 = data.map(transform1).persist(StorageLevel.MEMORY_ONLY_SER)
320
val rdd2 = rdd1.map(transform2).persist(StorageLevel.MEMORY_ONLY_SER)
321
322
// Use the RDDs
323
processResults(rdd2)
324
325
// Clean up in reverse order
326
rdd2.unpersist()
327
rdd1.unpersist()
328
```
329
330
### Checkpointing Strategies
331
332
**When to Use Checkpointing**
333
- Long lineage chains that would be expensive to recompute
334
- Iterative algorithms where intermediate state is valuable
335
- After expensive operations that you don't want to repeat
336
337
```scala
338
// Set checkpoint directory once per application
339
sc.setCheckpointDir("hdfs://cluster/checkpoints/myapp")
340
341
// Checkpoint expensive transformations
342
val expensiveResult = rawData
343
.map(veryExpensiveFunction)
344
.filter(complexPredicate)
345
346
expensiveResult.checkpoint() // Mark for checkpointing
347
expensiveResult.count() // Trigger the checkpoint
348
349
// Now expensiveResult can be used without recomputing
350
val analysis = expensiveResult.map(analysisFunction).collect()
351
```
352
353
**Local vs Reliable Checkpointing**
354
```scala
355
// Local checkpointing (faster but less fault-tolerant)
356
rdd.localCheckpoint()
357
358
// Reliable checkpointing (slower but fault-tolerant)
359
rdd.checkpoint() // Requires checkpoint directory
360
```
361
362
## Monitoring Storage Usage
363
364
### Spark UI Storage Tab
365
- Monitor cached RDDs and their memory/disk usage
366
- See which RDDs are taking up the most space
367
- Identify RDDs that should be unpersisted
368
369
### Programmatic Monitoring
370
```scala
371
// Get storage status for all RDDs
372
val storageStatus = sc.getExecutorStorageStatus
373
storageStatus.foreach { status =>
374
println(s"Executor ${status.blockManagerId}: ${status.memoryUsed} / ${status.maxMemory}")
375
}
376
377
// Get specific RDD storage info (if available through custom monitoring)
378
def checkRDDStorage(rdd: RDD[_]): Unit = {
379
println(s"RDD ${rdd.id} storage level: ${rdd.getStorageLevel}")
380
println(s"RDD ${rdd.id} is cached: ${rdd.getStorageLevel != StorageLevel.NONE}")
381
}
382
```
383
384
### Memory Tuning Parameters
385
386
Key Spark configuration properties for storage tuning:
387
388
```scala
389
val conf = new SparkConf()
390
// Fraction of heap space used for storage (default: 0.6)
391
.set("spark.storage.memoryFraction", "0.7")
392
393
// Fraction of storage memory immune to eviction (default: 0.5)
394
.set("spark.storage.safetyFraction", "0.6")
395
396
// Whether to compress RDD partitions in memory (default: false)
397
.set("spark.rdd.compress", "true")
398
399
// Whether to compress broadcast variables (default: true)
400
.set("spark.broadcast.compress", "true")
401
402
// Size of serializer buffer (default: 64k)
403
.set("spark.serializer.objectStreamReset", "100")
404
```
405
406
## Common Storage Anti-patterns
407
408
### Over-caching
409
```scala
410
// Bad: Caching everything
411
val rdd1 = data.map(f1).cache() // Only used once
412
val rdd2 = rdd1.map(f2).cache() // Only used once
413
val result = rdd2.collect()
414
415
// Good: Cache only what's reused
416
val rdd1 = data.map(f1)
417
val rdd2 = rdd1.map(f2)
418
val result = rdd2.collect()
419
```
420
421
### Not Unpersisting
422
```scala
423
// Bad: Never cleaning up
424
def processData(): Unit = {
425
val cached = data.map(expensiveFunc).cache()
426
cached.collect()
427
// cached remains in memory forever
428
}
429
430
// Good: Clean up when done
431
def processData(): Unit = {
432
val cached = data.map(expensiveFunc).cache()
433
try {
434
cached.collect()
435
} finally {
436
cached.unpersist()
437
}
438
}
439
```
440
441
### Wrong Storage Level
442
```scala
443
// Bad: Using MEMORY_ONLY for large datasets
444
val huge = sc.textFile("100GB-file.txt").cache() // Will spill and thrash
445
446
// Good: Use appropriate level for data size
447
val huge = sc.textFile("100GB-file.txt")
448
.persist(StorageLevel.MEMORY_AND_DISK_SER)
449
```
450
451
### Caching Too Early
452
```scala
453
// Bad: Caching before filtering
454
val cached = allData.cache()
455
val filtered = cached.filter(rarePredicate) // Most data filtered out
456
457
// Good: Filter first, then cache
458
val filtered = allData.filter(rarePredicate).cache()
459
```