0
# Caching and Persistence
1
2
Caching and persistence are crucial optimization techniques in Spark. They allow you to store intermediate RDD results in memory and/or disk to avoid recomputation, dramatically improving performance for iterative algorithms and interactive analysis.
3
4
## Storage Levels
5
6
Spark provides various storage levels that control how and where RDDs are cached.
7
8
### StorageLevel Class
9
10
```scala { .api }
11
class StorageLevel(
12
private var _useDisk: Boolean,
13
private var _useMemory: Boolean,
14
private var _useOffHeap: Boolean,
15
private var _deserialized: Boolean,
16
private var _replication: Int = 1
17
) extends Externalizable
18
```
19
20
### Predefined Storage Levels
21
22
```scala { .api }
23
object StorageLevel {
24
val NONE = new StorageLevel(false, false, false, false)
25
val DISK_ONLY = new StorageLevel(true, false, false, false)
26
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
27
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
28
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
29
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
30
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
31
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
32
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
33
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
34
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
35
val OFF_HEAP = new StorageLevel(false, false, true, false)
36
}
37
```
38
39
#### Storage Level Breakdown
40
41
| Level | Uses Disk | Uses Memory | Serialized | Replication |
42
|-------|-----------|-------------|------------|-------------|
43
| `NONE` | ✗ | ✗ | ✗ | 1 |
44
| `DISK_ONLY` | ✓ | ✗ | ✓ | 1 |
45
| `DISK_ONLY_2` | ✓ | ✗ | ✓ | 2 |
46
| `MEMORY_ONLY` | ✗ | ✓ | ✗ | 1 |
47
| `MEMORY_ONLY_2` | ✗ | ✓ | ✗ | 2 |
48
| `MEMORY_ONLY_SER` | ✗ | ✓ | ✓ | 1 |
49
| `MEMORY_ONLY_SER_2` | ✗ | ✓ | ✓ | 2 |
50
| `MEMORY_AND_DISK` | ✓ | ✓ | ✗ | 1 |
51
| `MEMORY_AND_DISK_2` | ✓ | ✓ | ✗ | 2 |
52
| `MEMORY_AND_DISK_SER` | ✓ | ✓ | ✓ | 1 |
53
| `MEMORY_AND_DISK_SER_2` | ✓ | ✓ | ✓ | 2 |
54
| `OFF_HEAP` | ✗ | ✗* | ✓ | 1 |
55
56
*OFF_HEAP uses off-heap memory (e.g., Tachyon)
57
58
## Basic Persistence Operations
59
60
### cache() Method
61
62
```scala { .api }
63
def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
64
```
65
66
The simplest way to cache an RDD in memory:
67
68
```scala
69
val data = sc.textFile("large-dataset.txt")
70
val words = data.flatMap(_.split(" "))
71
.map(_.toLowerCase)
72
.filter(_.length > 3)
73
74
// Cache the filtered results
75
val cachedWords = words.cache()
76
77
// Multiple actions will reuse the cached data
78
val count = cachedWords.count()
79
val distinctCount = cachedWords.distinct().count()
80
val sample = cachedWords.sample(false, 0.1).collect()
81
```
82
83
### persist() Method
84
85
```scala { .api }
86
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
87
def persist(newLevel: StorageLevel): RDD[T]
88
```
89
90
More flexible caching with custom storage levels:
91
92
```scala
93
import org.apache.spark.storage.StorageLevel
94
95
val data = sc.textFile("huge-dataset.txt")
96
val processed = data.map(expensiveProcessing)
97
98
// Different persistence strategies
99
processed.persist(StorageLevel.MEMORY_ONLY) // Fast access, may lose data if not enough memory
100
processed.persist(StorageLevel.MEMORY_AND_DISK) // Spill to disk when memory full
101
processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialize to save memory
102
processed.persist(StorageLevel.DISK_ONLY) // Store only on disk
103
processed.persist(StorageLevel.MEMORY_AND_DISK_2) // Replicate for fault tolerance
104
```
105
106
### unpersist() Method
107
108
```scala { .api }
109
def unpersist(blocking: Boolean = true): RDD[T]
110
```
111
112
Remove RDD from cache and free memory:
113
114
```scala
115
val cachedData = data.cache()
116
117
// Use cached data
118
val result1 = cachedData.count()
119
val result2 = cachedData.filter(_ > 100).count()
120
121
// Remove from cache when no longer needed
122
cachedData.unpersist()
123
124
// Or unpersist asynchronously
125
cachedData.unpersist(blocking = false)
126
```
127
128
## Storage Level Properties and Queries
129
130
### getStorageLevel
131
132
```scala { .api }
133
def getStorageLevel: StorageLevel
134
```
135
136
Check current storage level:
137
138
```scala
139
val rdd = sc.parallelize(1 to 1000)
140
println(s"Default storage level: ${rdd.getStorageLevel}") // NONE
141
142
val cached = rdd.cache()
143
println(s"After cache(): ${cached.getStorageLevel}") // MEMORY_ONLY
144
145
val persisted = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
146
println(s"Custom persistence: ${persisted.getStorageLevel}") // MEMORY_AND_DISK_SER
147
```
148
149
### StorageLevel Properties
150
151
```scala { .api }
152
class StorageLevel {
153
def useDisk: Boolean
154
def useMemory: Boolean
155
def useOffHeap: Boolean
156
def deserialized: Boolean
157
def replication: Int
158
}
159
```
160
161
```scala
162
val level = StorageLevel.MEMORY_AND_DISK_SER_2
163
164
println(s"Uses disk: ${level.useDisk}") // true
165
println(s"Uses memory: ${level.useMemory}") // true
166
println(s"Deserialized: ${level.deserialized}") // false (serialized)
167
println(s"Replication: ${level.replication}") // 2
168
```
169
170
## Choosing Storage Levels
171
172
### Memory-Only Levels
173
174
**MEMORY_ONLY** - Best performance, no serialization overhead
175
```scala
176
// Use when:
177
// - RDD fits comfortably in memory
178
// - Fast CPU but limited memory bandwidth
179
// - Objects are not too expensive to reconstruct
180
val fastAccess = rdd.persist(StorageLevel.MEMORY_ONLY)
181
```
182
183
**MEMORY_ONLY_SER** - Compact storage, slower access
184
```scala
185
// Use when:
186
// - Memory is limited but RDD is important to cache
187
// - Objects have significant serialization overhead
188
// - CPU is fast relative to memory bandwidth
189
val compactCache = rdd.persist(StorageLevel.MEMORY_ONLY_SER)
190
```
191
192
### Memory and Disk Levels
193
194
**MEMORY_AND_DISK** - Balanced performance and reliability
195
```scala
196
// Use when:
197
// - RDD might not fit entirely in memory
198
// - Recomputation is expensive
199
// - Fault tolerance is important
200
val balanced = rdd.persist(StorageLevel.MEMORY_AND_DISK)
201
```
202
203
**MEMORY_AND_DISK_SER** - Space-efficient with disk fallback
204
```scala
205
// Use when:
206
// - Memory is very limited
207
// - Serialization cost is acceptable
208
// - Disk I/O is reasonably fast
209
val efficient = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
210
```
211
212
### Disk-Only Levels
213
214
**DISK_ONLY** - Reliable but slower
215
```scala
216
// Use when:
217
// - Memory is scarce
218
// - RDD is accessed infrequently
219
// - Recomputation is very expensive
220
val reliable = rdd.persist(StorageLevel.DISK_ONLY)
221
```
222
223
### Replicated Levels
224
225
**_2 variants** - Fault tolerance with replication
226
```scala
227
// Use when:
228
// - Fault tolerance is critical
229
// - Cluster has node failures
230
// - RDD recomputation is very expensive
231
val faultTolerant = rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
232
```
233
234
## Advanced Persistence Patterns
235
236
### Selective Persistence
237
238
Cache only the RDDs that will be reused multiple times:
239
240
```scala
241
val rawData = sc.textFile("input.txt")
242
val cleaned = rawData.filter(_.nonEmpty).map(_.trim)
243
244
// Don't cache - used only once
245
val parsed = cleaned.map(parseLine)
246
247
// Cache this - used multiple times
248
val validated = parsed.filter(isValid).cache()
249
250
// Multiple actions on cached RDD
251
val errors = validated.filter(hasError).count()
252
val summary = validated.map(extractSummary).collect()
253
val output = validated.map(formatOutput).saveAsTextFile("output")
254
```
255
256
### Iterative Algorithm Pattern
257
258
```scala
259
var current = sc.textFile("initial-data.txt").cache()
260
261
for (i <- 1 to 10) {
262
// Unpersist previous iteration
263
val previous = current
264
265
// Compute new iteration and cache it
266
current = current.map(iterativeFunction).cache()
267
268
// Force computation and unpersist old data
269
current.count()
270
previous.unpersist()
271
272
println(s"Iteration $i completed")
273
}
274
```
275
276
### Multi-Level Caching Strategy
277
278
```scala
279
val rawData = sc.textFile("large-dataset.txt")
280
281
// Level 1: Cache frequently accessed base data
282
val baseData = rawData.filter(isRelevant).cache()
283
284
// Level 2: Cache intermediate expensive computations
285
val features = baseData.map(extractFeatures)
286
.persist(StorageLevel.MEMORY_AND_DISK_SER)
287
288
// Level 3: Cache final results with replication for reliability
289
val results = features.map(expensiveMLModel)
290
.persist(StorageLevel.MEMORY_AND_DISK_2)
291
```
292
293
## Monitoring and Management
294
295
### SparkContext Storage Information
296
297
```scala { .api }
298
def getRDDStorageInfo: Array[RDDInfo]
299
def getPersistentRDDs: Map[Int, RDD[_]]
300
def getExecutorStorageStatus: Array[StorageStatus]
301
```
302
303
```scala
304
// Get information about cached RDDs
305
val storageInfo = sc.getRDDStorageInfo
306
storageInfo.foreach { info =>
307
println(s"RDD ${info.id} (${info.name}): ")
308
println(s" Memory size: ${info.memSize} bytes")
309
println(s" Disk size: ${info.diskSize} bytes")
310
println(s" Storage level: ${info.storageLevel}")
311
}
312
313
// Get all persistent RDDs
314
val persistentRDDs = sc.getPersistentRDDs
315
persistentRDDs.foreach { case (id, rdd) =>
316
println(s"RDD $id: ${rdd.name} - ${rdd.getStorageLevel}")
317
}
318
319
// Check executor storage status
320
val executorStatus = sc.getExecutorStorageStatus
321
executorStatus.foreach { status =>
322
println(s"Executor ${status.blockManagerId.executorId}:")
323
println(s" Max memory: ${status.maxMem} bytes")
324
println(s" Used memory: ${status.memUsed} bytes")
325
println(s" Remaining: ${status.memRemaining} bytes")
326
}
327
```
328
329
### RDD Naming for Monitoring
330
331
```scala { .api }
332
def setName(name: String): RDD[T]
333
def name: String
334
```
335
336
```scala
337
val data = sc.textFile("input.txt")
338
.setName("Input Data")
339
.cache()
340
341
val processed = data.map(process)
342
.setName("Processed Data")
343
.persist(StorageLevel.MEMORY_AND_DISK)
344
345
// Names will appear in Spark UI for easier monitoring
346
```
347
348
## Checkpointing
349
350
Checkpointing provides fault tolerance by saving RDD data to reliable storage.
351
352
### Setting Checkpoint Directory
353
354
```scala { .api }
355
def setCheckpointDir(directory: String): Unit
356
```
357
358
```scala
359
// Set checkpoint directory (must be reliable storage like HDFS)
360
sc.setCheckpointDir("hdfs://namenode/checkpoints")
361
```
362
363
### Checkpointing RDDs
364
365
```scala { .api }
366
def checkpoint(): Unit
367
def isCheckpointed: Boolean
368
def getCheckpointFile: Option[String]
369
```
370
371
```scala
372
val data = sc.textFile("input.txt")
373
val processed = data.map(expensiveOperation).filter(isValid)
374
375
// Mark for checkpointing
376
processed.checkpoint()
377
378
// Checkpoint happens after first action
379
val count = processed.count() // Triggers checkpoint
380
381
// Check if checkpointed
382
if (processed.isCheckpointed) {
383
println(s"Checkpointed to: ${processed.getCheckpointFile.get}")
384
}
385
```
386
387
### Checkpoint vs Persistence
388
389
```scala
390
// Persistence: keeps lineage, can be lost on failure
391
val persisted = rdd.cache()
392
393
// Checkpoint: truncates lineage, survives failures
394
rdd.checkpoint()
395
396
// Best practice: both for performance and reliability
397
val optimized = rdd.cache()
398
optimized.checkpoint()
399
optimized.count() // Trigger both cache and checkpoint
400
```
401
402
## Performance Guidelines
403
404
### When to Cache
405
406
1. **Multiple Actions**: RDD used in multiple actions
407
2. **Iterative Algorithms**: Machine learning, graph algorithms
408
3. **Interactive Analysis**: Jupyter notebooks, Spark shell
409
4. **Expensive Computations**: Complex transformations
410
5. **Data Reduction**: After significant filtering
411
412
```scala
413
// Good candidate for caching
414
val filtered = largeDataset
415
.filter(expensiveCondition) // Reduces data size significantly
416
.map(complexTransformation) // Expensive computation
417
418
filtered.cache()
419
420
// Multiple uses justify caching
421
val stats = filtered.map(extractStats).collect()
422
val sample = filtered.sample(false, 0.1).collect()
423
val export = filtered.saveAsTextFile("output")
424
```
425
426
### When Not to Cache
427
428
1. **Single Use**: RDD used only once
429
2. **Large Datasets**: Bigger than available memory
430
3. **Simple Operations**: Cheap to recompute
431
4. **Sequential Processing**: Linear data pipeline
432
433
```scala
434
// Don't cache - used only once
435
val result = sc.textFile("input.txt")
436
.map(_.toUpperCase)
437
.filter(_.startsWith("A"))
438
.count()
439
```
440
441
### Memory Management Best Practices
442
443
```scala
444
// 1. Unpersist when done
445
val temp = rdd.cache()
446
processData(temp)
447
temp.unpersist()
448
449
// 2. Use appropriate storage levels
450
val frequentData = rdd.persist(StorageLevel.MEMORY_ONLY)
451
val occasionalData = rdd.persist(StorageLevel.MEMORY_AND_DISK)
452
val backupData = rdd.persist(StorageLevel.DISK_ONLY)
453
454
// 3. Monitor memory usage
455
val memUsage = sc.getExecutorStorageStatus.map(_.memUsed).sum
456
val memTotal = sc.getExecutorStorageStatus.map(_.maxMem).sum
457
println(s"Memory utilization: ${memUsage.toDouble / memTotal * 100}%")
458
```
459
460
This comprehensive guide covers all aspects of RDD caching and persistence in Apache Spark, enabling you to optimize performance through intelligent data storage strategies.