0
# Storage and Persistence
1
2
Spark's storage and persistence capabilities allow RDDs to be cached in memory or persisted to disk with configurable storage levels, enabling significant performance improvements for iterative algorithms and interactive analytics.
3
4
## RDD Persistence Methods
5
6
### Basic Persistence Operations
7
8
```scala { .api }
9
abstract class RDD[T] {
10
def persist(newLevel: StorageLevel): this.type
11
def persist(): this.type // Uses MEMORY_ONLY
12
def cache(): this.type // Alias for persist() with MEMORY_ONLY
13
def unpersist(blocking: Boolean = true): this.type
14
def getStorageLevel: StorageLevel
15
}
16
```
17
18
### Checkpointing
19
20
```scala { .api }
21
abstract class RDD[T] {
22
def checkpoint(): Unit
23
def localCheckpoint(): this.type
24
def isCheckpointed: Boolean
25
def getCheckpointFile: Option[String]
26
}
27
```
28
29
## StorageLevel
30
31
Defines how RDDs are stored across memory, disk, serialization, and replication options.
32
33
### Constructor
34
35
```scala { .api }
36
class StorageLevel(
37
useDisk: Boolean,
38
useMemory: Boolean,
39
useOffHeap: Boolean,
40
deserialized: Boolean,
41
replication: Int
42
)
43
```
44
45
### Properties
46
47
```scala { .api }
48
class StorageLevel {
49
def useDisk: Boolean
50
def useMemory: Boolean
51
def useOffHeap: Boolean
52
def deserialized: Boolean
53
def replication: Int
54
def isValid: Boolean
55
def clone: StorageLevel
56
}
57
```
58
59
### Predefined Storage Levels
60
61
```scala { .api }
62
object StorageLevel {
63
val NONE: StorageLevel
64
val DISK_ONLY: StorageLevel
65
val DISK_ONLY_2: StorageLevel
66
val MEMORY_ONLY: StorageLevel
67
val MEMORY_ONLY_2: StorageLevel
68
val MEMORY_ONLY_SER: StorageLevel
69
val MEMORY_ONLY_SER_2: StorageLevel
70
val MEMORY_AND_DISK: StorageLevel
71
val MEMORY_AND_DISK_2: StorageLevel
72
val MEMORY_AND_DISK_SER: StorageLevel
73
val MEMORY_AND_DISK_SER_2: StorageLevel
74
val OFF_HEAP: StorageLevel
75
}
76
```
77
78
## RDDInfo
79
80
Information about RDD storage status and statistics.
81
82
```scala { .api }
83
case class RDDInfo(
84
id: Int,
85
name: String,
86
numPartitions: Int,
87
storageLevel: StorageLevel,
88
numCachedPartitions: Int,
89
memSize: Long,
90
diskSize: Long,
91
externalBlockStoreSize: Long
92
) {
93
def isCached: Boolean
94
}
95
```
96
97
## Storage Management Methods
98
99
### SparkContext Storage Information
100
101
```scala { .api }
102
class SparkContext {
103
def getPersistentRDDs: Map[Int, RDD[_]]
104
def getRDDStorageInfo: Array[RDDInfo]
105
def getExecutorMemoryStatus: Map[String, (Long, Long)]
106
}
107
```
108
109
## Storage Level Details
110
111
### Memory-Only Storage
112
113
- **MEMORY_ONLY**: Store RDD as deserialized Java objects in the JVM heap
114
- **MEMORY_ONLY_SER**: Store RDD as serialized Java objects (more space-efficient)
115
- **MEMORY_ONLY_2**: Store RDD as deserialized objects with 2x replication
116
- **MEMORY_ONLY_SER_2**: Store RDD as serialized objects with 2x replication
117
118
### Disk-Only Storage
119
120
- **DISK_ONLY**: Store RDD partitions only on disk
121
- **DISK_ONLY_2**: Store RDD partitions on disk with 2x replication
122
123
### Memory and Disk Storage
124
125
- **MEMORY_AND_DISK**: Store RDD in memory, spill to disk if memory is insufficient
126
- **MEMORY_AND_DISK_SER**: Store RDD in memory as serialized objects, spill to disk
127
- **MEMORY_AND_DISK_2**: Memory and disk storage with 2x replication
128
- **MEMORY_AND_DISK_SER_2**: Memory and disk serialized storage with 2x replication
129
130
### Off-Heap Storage
131
132
- **OFF_HEAP**: Store RDD in off-heap memory (requires configuration)
133
134
### No Storage
135
136
- **NONE**: Do not persist the RDD
137
138
## Usage Examples
139
140
### Basic Persistence
141
142
```scala
143
val data = sc.parallelize(1 to 1000000)
144
145
// Cache in memory (most common)
146
val cachedData = data.cache()
147
148
// Equivalent to cache()
149
val persistedData = data.persist()
150
val memoryOnlyData = data.persist(StorageLevel.MEMORY_ONLY)
151
152
// Use the cached RDD multiple times
153
val sum1 = cachedData.sum()
154
val sum2 = cachedData.map(_ * 2).sum() // Uses cached data
155
156
// Remove from cache when done
157
cachedData.unpersist()
158
```
159
160
### Different Storage Levels
161
162
```scala
163
val largeRDD = sc.parallelize(1 to 10000000)
164
165
// Memory and disk with spillover
166
val spillableRDD = largeRDD.persist(StorageLevel.MEMORY_AND_DISK)
167
168
// Serialized storage (less memory usage, more CPU)
169
val serializedRDD = largeRDD.persist(StorageLevel.MEMORY_ONLY_SER)
170
171
// Disk-only storage
172
val diskRDD = largeRDD.persist(StorageLevel.DISK_ONLY)
173
174
// High availability with replication
175
val replicatedRDD = largeRDD.persist(StorageLevel.MEMORY_AND_DISK_2)
176
```
177
178
### Off-Heap Storage
179
180
```scala
181
// Requires spark.memory.offHeap.enabled=true and spark.memory.offHeap.size
182
val offHeapRDD = largeRDD.persist(StorageLevel.OFF_HEAP)
183
```
184
185
### Iterative Algorithms
186
187
```scala
188
// K-means clustering example
189
var centroids = sc.parallelize(initialCentroids).cache()
190
val points = sc.textFile("points.txt")
191
.map(parsePoint)
192
.cache() // Cache input data
193
194
for (i <- 1 to maxIterations) {
195
val closest = points.map(p => (closestCentroid(p, centroids.collect()), p))
196
.groupByKey()
197
198
val newCentroids = closest.map { case (centroidIndex, points) =>
199
(centroidIndex, averagePoints(points))
200
}
201
202
// Update centroids
203
centroids.unpersist()
204
centroids = newCentroids.cache()
205
}
206
207
// Clean up
208
points.unpersist()
209
centroids.unpersist()
210
```
211
212
### Monitoring Storage Usage
213
214
```scala
215
val rdd = sc.parallelize(1 to 1000).cache()
216
rdd.count() // Trigger caching
217
218
// Get storage information
219
val storageInfo = sc.getRDDStorageInfo
220
storageInfo.foreach { info =>
221
println(s"RDD ${info.id}: ${info.name}")
222
println(s" Cached partitions: ${info.numCachedPartitions}/${info.numPartitions}")
223
println(s" Memory size: ${info.memSize} bytes")
224
println(s" Disk size: ${info.diskSize} bytes")
225
println(s" Storage level: ${info.storageLevel}")
226
}
227
228
// Get executor memory status
229
val memoryStatus = sc.getExecutorMemoryStatus
230
memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>
231
val usedMem = maxMem - remainingMem
232
println(s"Executor $executorId: ${usedMem}/${maxMem} bytes used")
233
}
234
235
// Get all persistent RDDs
236
val persistentRDDs = sc.getPersistentRDDs
237
println(s"Number of persistent RDDs: ${persistentRDDs.size}")
238
```
239
240
### Checkpointing
241
242
```scala
243
// Set checkpoint directory (required before checkpointing)
244
sc.setCheckpointDir("hdfs://path/to/checkpoint")
245
246
val rdd = sc.parallelize(1 to 1000)
247
.map(_ * 2)
248
.filter(_ > 100)
249
250
// Mark for checkpointing
251
rdd.checkpoint()
252
253
// Trigger computation and checkpointing
254
val result = rdd.count()
255
256
// Check if checkpointed
257
if (rdd.isCheckpointed) {
258
println(s"RDD checkpointed to: ${rdd.getCheckpointFile}")
259
}
260
261
// Local checkpointing (faster but less fault-tolerant)
262
val localRDD = sc.parallelize(1 to 1000)
263
localRDD.localCheckpoint()
264
```
265
266
### Custom Storage Levels
267
268
```scala
269
// Create custom storage level
270
val customLevel = new StorageLevel(
271
useDisk = true,
272
useMemory = true,
273
useOffHeap = false,
274
deserialized = false, // Serialized
275
replication = 3 // Triple replication
276
)
277
278
val rdd = sc.parallelize(criticalData).persist(customLevel)
279
```
280
281
## Performance Considerations
282
283
### When to Cache
284
285
- **Iterative algorithms**: Cache datasets that are accessed multiple times
286
- **Interactive analysis**: Cache datasets for exploratory queries
287
- **Branching workflows**: Cache shared datasets before multiple transformations
288
- **Expensive computations**: Cache results of costly operations
289
290
### When Not to Cache
291
292
- **One-time use**: Don't cache RDDs accessed only once
293
- **Memory pressure**: Avoid caching when memory is limited
294
- **Large datasets**: Consider serialized or disk storage for large datasets
295
- **Simple computations**: Don't cache if recomputation is cheaper than storage
296
297
### Storage Level Selection
298
299
#### Choose MEMORY_ONLY when:
300
- Dataset fits comfortably in memory
301
- Fast access is critical
302
- CPU for deserialization is not a concern
303
304
#### Choose MEMORY_ONLY_SER when:
305
- Memory is limited but dataset should stay in memory
306
- Network/disk I/O should be minimized
307
- CPU for serialization/deserialization is acceptable
308
309
#### Choose MEMORY_AND_DISK when:
310
- Dataset might not fit in memory
311
- Fault tolerance is important
312
- Balance between memory and disk performance is needed
313
314
#### Choose DISK_ONLY when:
315
- Dataset is too large for memory
316
- Memory should be reserved for other operations
317
- Disk I/O performance is acceptable
318
319
#### Choose replication levels (2 or higher) when:
320
- High availability is required
321
- Cluster has frequent node failures
322
- Cost of recomputation is very high
323
324
### Memory Management
325
326
```scala
327
// Check RDD memory footprint before caching large datasets
328
val estimatedSize = SizeEstimator.estimate(rdd.first()) * rdd.count()
329
println(s"Estimated RDD size: ${estimatedSize} bytes")
330
331
// Configure memory fractions for storage
332
// spark.memory.fraction = 0.6 (fraction of heap for execution + storage)
333
// spark.memory.storageFraction = 0.5 (fraction of above for storage)
334
335
// Monitor and tune garbage collection
336
// Use G1GC for large heaps: -XX:+UseG1GC
337
// Tune GC: -XX:G1HeapRegionSize=32m
338
```
339
340
## Configuration Properties
341
342
### Memory Management
343
344
- `spark.memory.fraction` - Fraction of heap space for execution and storage (default: 0.6)
345
- `spark.memory.storageFraction` - Fraction of storage memory for caching (default: 0.5)
346
- `spark.memory.offHeap.enabled` - Enable off-heap memory (default: false)
347
- `spark.memory.offHeap.size` - Off-heap memory size (default: 0)
348
349
### Storage Behavior
350
351
- `spark.rdd.compress` - Compress serialized RDD partitions (default: false)
352
- `spark.serializer` - Serializer for cached objects (KryoSerializer recommended)
353
- `spark.storage.memoryFraction` - (Legacy) Fraction of JVM heap for RDD storage
354
- `spark.storage.unrollFraction` - (Legacy) Fraction of storage memory for unrolling
355
356
### Checkpointing
357
358
- `spark.checkpoint.compress` - Compress checkpoint files (default: false)
359
360
## Best Practices
361
362
### Caching Strategy
363
364
1. **Cache after expensive operations** but before multiple uses
365
2. **Use appropriate storage levels** based on memory availability and access patterns
366
3. **Monitor storage usage** and adjust cache strategy accordingly
367
4. **Unpersist RDDs** when no longer needed to free memory
368
5. **Consider serialization overhead** when choosing between serialized and deserialized storage
369
370
### Memory Optimization
371
372
1. **Use Kryo serialization** for better performance with serialized storage
373
2. **Tune memory allocation** between storage and execution
374
3. **Monitor GC overhead** and tune garbage collection settings
375
4. **Consider off-heap storage** for very large datasets
376
5. **Profile memory usage** with tools like Spark UI and OS monitoring
377
378
### Fault Tolerance
379
380
1. **Use replication** for critical datasets that are expensive to recompute
381
2. **Set up checkpointing** for long lineages or iterative algorithms
382
3. **Balance fault tolerance** with storage overhead and performance
383
4. **Choose appropriate checkpoint intervals** to avoid excessive overhead
384
5. **Monitor checkpoint performance** and adjust directory and compression settings
385
386
## Important Notes
387
388
- **Caching is lazy** - RDD is not cached until an action is executed
389
- **Storage levels are immutable** - cannot be changed after persistence
390
- **Unpersist is asynchronous** by default - use `blocking = true` for synchronous unpersist
391
- **Memory pressure triggers eviction** - least recently used RDD partitions are removed first
392
- **Checkpointing truncates lineage** - provides fault tolerance for long computation chains
393
- **Off-heap storage requires configuration** - must enable and allocate off-heap memory
394
- **Replication doubles storage overhead** - consider trade-offs between availability and resources