Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

caching-persistence.md docs/

1
# Caching and Persistence
2
3
Caching and persistence are crucial optimization techniques in Spark. They allow you to store intermediate RDD results in memory and/or disk to avoid recomputation, dramatically improving performance for iterative algorithms and interactive analysis.
4
5
## Storage Levels
6
7
Spark provides various storage levels that control how and where RDDs are cached.
8
9
### StorageLevel Class
10
11
```scala { .api }
12
class StorageLevel(
13
private var _useDisk: Boolean,
14
private var _useMemory: Boolean,
15
private var _useOffHeap: Boolean,
16
private var _deserialized: Boolean,
17
private var _replication: Int = 1
18
) extends Externalizable
19
```
20
21
### Predefined Storage Levels
22
23
```scala { .api }
24
object StorageLevel {
25
val NONE = new StorageLevel(false, false, false, false)
26
val DISK_ONLY = new StorageLevel(true, false, false, false)
27
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
28
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
29
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
30
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
31
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
32
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
33
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
34
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
35
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
36
val OFF_HEAP = new StorageLevel(false, false, true, false)
37
}
38
```
39
40
#### Storage Level Breakdown
41
42
| Level | Uses Disk | Uses Memory | Serialized | Replication |
43
|-------|-----------|-------------|------------|-------------|
44
| `NONE` |||| 1 |
45
| `DISK_ONLY` |||| 1 |
46
| `DISK_ONLY_2` |||| 2 |
47
| `MEMORY_ONLY` |||| 1 |
48
| `MEMORY_ONLY_2` |||| 2 |
49
| `MEMORY_ONLY_SER` |||| 1 |
50
| `MEMORY_ONLY_SER_2` |||| 2 |
51
| `MEMORY_AND_DISK` |||| 1 |
52
| `MEMORY_AND_DISK_2` |||| 2 |
53
| `MEMORY_AND_DISK_SER` |||| 1 |
54
| `MEMORY_AND_DISK_SER_2` |||| 2 |
55
| `OFF_HEAP` || ✗* || 1 |
56
57
*OFF_HEAP uses off-heap memory (e.g., Tachyon)
58
59
## Basic Persistence Operations
60
61
### cache() Method
62
63
```scala { .api }
64
def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
65
```
66
67
The simplest way to cache an RDD in memory:
68
69
```scala
70
val data = sc.textFile("large-dataset.txt")
71
val words = data.flatMap(_.split(" "))
72
.map(_.toLowerCase)
73
.filter(_.length > 3)
74
75
// Cache the filtered results
76
val cachedWords = words.cache()
77
78
// Multiple actions will reuse the cached data
79
val count = cachedWords.count()
80
val distinctCount = cachedWords.distinct().count()
81
val sample = cachedWords.sample(false, 0.1).collect()
82
```
83
84
### persist() Method
85
86
```scala { .api }
87
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
88
def persist(newLevel: StorageLevel): RDD[T]
89
```
90
91
More flexible caching with custom storage levels:
92
93
```scala
94
import org.apache.spark.storage.StorageLevel
95
96
val data = sc.textFile("huge-dataset.txt")
97
val processed = data.map(expensiveProcessing)
98
99
// Different persistence strategies
100
processed.persist(StorageLevel.MEMORY_ONLY) // Fast access, may lose data if not enough memory
101
processed.persist(StorageLevel.MEMORY_AND_DISK) // Spill to disk when memory full
102
processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialize to save memory
103
processed.persist(StorageLevel.DISK_ONLY) // Store only on disk
104
processed.persist(StorageLevel.MEMORY_AND_DISK_2) // Replicate for fault tolerance
105
```
106
107
### unpersist() Method
108
109
```scala { .api }
110
def unpersist(blocking: Boolean = true): RDD[T]
111
```
112
113
Remove RDD from cache and free memory:
114
115
```scala
116
val cachedData = data.cache()
117
118
// Use cached data
119
val result1 = cachedData.count()
120
val result2 = cachedData.filter(_ > 100).count()
121
122
// Remove from cache when no longer needed
123
cachedData.unpersist()
124
125
// Or unpersist asynchronously
126
cachedData.unpersist(blocking = false)
127
```
128
129
## Storage Level Properties and Queries
130
131
### getStorageLevel
132
133
```scala { .api }
134
def getStorageLevel: StorageLevel
135
```
136
137
Check current storage level:
138
139
```scala
140
val rdd = sc.parallelize(1 to 1000)
141
println(s"Default storage level: ${rdd.getStorageLevel}") // NONE
142
143
val cached = rdd.cache()
144
println(s"After cache(): ${cached.getStorageLevel}") // MEMORY_ONLY
145
146
val persisted = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
147
println(s"Custom persistence: ${persisted.getStorageLevel}") // MEMORY_AND_DISK_SER
148
```
149
150
### StorageLevel Properties
151
152
```scala { .api }
153
class StorageLevel {
154
def useDisk: Boolean
155
def useMemory: Boolean
156
def useOffHeap: Boolean
157
def deserialized: Boolean
158
def replication: Int
159
}
160
```
161
162
```scala
163
val level = StorageLevel.MEMORY_AND_DISK_SER_2
164
165
println(s"Uses disk: ${level.useDisk}") // true
166
println(s"Uses memory: ${level.useMemory}") // true
167
println(s"Deserialized: ${level.deserialized}") // false (serialized)
168
println(s"Replication: ${level.replication}") // 2
169
```
170
171
## Choosing Storage Levels
172
173
### Memory-Only Levels
174
175
**MEMORY_ONLY** - Best performance, no serialization overhead
176
```scala
177
// Use when:
178
// - RDD fits comfortably in memory
179
// - Fast CPU but limited memory bandwidth
180
// - Objects are not too expensive to reconstruct
181
val fastAccess = rdd.persist(StorageLevel.MEMORY_ONLY)
182
```
183
184
**MEMORY_ONLY_SER** - Compact storage, slower access
185
```scala
186
// Use when:
187
// - Memory is limited but RDD is important to cache
188
// - Objects have significant serialization overhead
189
// - CPU is fast relative to memory bandwidth
190
val compactCache = rdd.persist(StorageLevel.MEMORY_ONLY_SER)
191
```
192
193
### Memory and Disk Levels
194
195
**MEMORY_AND_DISK** - Balanced performance and reliability
196
```scala
197
// Use when:
198
// - RDD might not fit entirely in memory
199
// - Recomputation is expensive
200
// - Fault tolerance is important
201
val balanced = rdd.persist(StorageLevel.MEMORY_AND_DISK)
202
```
203
204
**MEMORY_AND_DISK_SER** - Space-efficient with disk fallback
205
```scala
206
// Use when:
207
// - Memory is very limited
208
// - Serialization cost is acceptable
209
// - Disk I/O is reasonably fast
210
val efficient = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
211
```
212
213
### Disk-Only Levels
214
215
**DISK_ONLY** - Reliable but slower
216
```scala
217
// Use when:
218
// - Memory is scarce
219
// - RDD is accessed infrequently
220
// - Recomputation is very expensive
221
val reliable = rdd.persist(StorageLevel.DISK_ONLY)
222
```
223
224
### Replicated Levels
225
226
**_2 variants** - Fault tolerance with replication
227
```scala
228
// Use when:
229
// - Fault tolerance is critical
230
// - Cluster has node failures
231
// - RDD recomputation is very expensive
232
val faultTolerant = rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
233
```
234
235
## Advanced Persistence Patterns
236
237
### Selective Persistence
238
239
Cache only the RDDs that will be reused multiple times:
240
241
```scala
242
val rawData = sc.textFile("input.txt")
243
val cleaned = rawData.filter(_.nonEmpty).map(_.trim)
244
245
// Don't cache - used only once
246
val parsed = cleaned.map(parseLine)
247
248
// Cache this - used multiple times
249
val validated = parsed.filter(isValid).cache()
250
251
// Multiple actions on cached RDD
252
val errors = validated.filter(hasError).count()
253
val summary = validated.map(extractSummary).collect()
254
val output = validated.map(formatOutput).saveAsTextFile("output")
255
```
256
257
### Iterative Algorithm Pattern
258
259
```scala
260
var current = sc.textFile("initial-data.txt").cache()
261
262
for (i <- 1 to 10) {
263
// Unpersist previous iteration
264
val previous = current
265
266
// Compute new iteration and cache it
267
current = current.map(iterativeFunction).cache()
268
269
// Force computation and unpersist old data
270
current.count()
271
previous.unpersist()
272
273
println(s"Iteration $i completed")
274
}
275
```
276
277
### Multi-Level Caching Strategy
278
279
```scala
280
val rawData = sc.textFile("large-dataset.txt")
281
282
// Level 1: Cache frequently accessed base data
283
val baseData = rawData.filter(isRelevant).cache()
284
285
// Level 2: Cache intermediate expensive computations
286
val features = baseData.map(extractFeatures)
287
.persist(StorageLevel.MEMORY_AND_DISK_SER)
288
289
// Level 3: Cache final results with replication for reliability
290
val results = features.map(expensiveMLModel)
291
.persist(StorageLevel.MEMORY_AND_DISK_2)
292
```
293
294
## Monitoring and Management
295
296
### SparkContext Storage Information
297
298
```scala { .api }
299
def getRDDStorageInfo: Array[RDDInfo]
300
def getPersistentRDDs: Map[Int, RDD[_]]
301
def getExecutorStorageStatus: Array[StorageStatus]
302
```
303
304
```scala
305
// Get information about cached RDDs
306
val storageInfo = sc.getRDDStorageInfo
307
storageInfo.foreach { info =>
308
println(s"RDD ${info.id} (${info.name}): ")
309
println(s" Memory size: ${info.memSize} bytes")
310
println(s" Disk size: ${info.diskSize} bytes")
311
println(s" Storage level: ${info.storageLevel}")
312
}
313
314
// Get all persistent RDDs
315
val persistentRDDs = sc.getPersistentRDDs
316
persistentRDDs.foreach { case (id, rdd) =>
317
println(s"RDD $id: ${rdd.name} - ${rdd.getStorageLevel}")
318
}
319
320
// Check executor storage status
321
val executorStatus = sc.getExecutorStorageStatus
322
executorStatus.foreach { status =>
323
println(s"Executor ${status.blockManagerId.executorId}:")
324
println(s" Max memory: ${status.maxMem} bytes")
325
println(s" Used memory: ${status.memUsed} bytes")
326
println(s" Remaining: ${status.memRemaining} bytes")
327
}
328
```
329
330
### RDD Naming for Monitoring
331
332
```scala { .api }
333
def setName(name: String): RDD[T]
334
def name: String
335
```
336
337
```scala
338
val data = sc.textFile("input.txt")
339
.setName("Input Data")
340
.cache()
341
342
val processed = data.map(process)
343
.setName("Processed Data")
344
.persist(StorageLevel.MEMORY_AND_DISK)
345
346
// Names will appear in Spark UI for easier monitoring
347
```
348
349
## Checkpointing
350
351
Checkpointing provides fault tolerance by saving RDD data to reliable storage.
352
353
### Setting Checkpoint Directory
354
355
```scala { .api }
356
def setCheckpointDir(directory: String): Unit
357
```
358
359
```scala
360
// Set checkpoint directory (must be reliable storage like HDFS)
361
sc.setCheckpointDir("hdfs://namenode/checkpoints")
362
```
363
364
### Checkpointing RDDs
365
366
```scala { .api }
367
def checkpoint(): Unit
368
def isCheckpointed: Boolean
369
def getCheckpointFile: Option[String]
370
```
371
372
```scala
373
val data = sc.textFile("input.txt")
374
val processed = data.map(expensiveOperation).filter(isValid)
375
376
// Mark for checkpointing
377
processed.checkpoint()
378
379
// Checkpoint happens after first action
380
val count = processed.count() // Triggers checkpoint
381
382
// Check if checkpointed
383
if (processed.isCheckpointed) {
384
println(s"Checkpointed to: ${processed.getCheckpointFile.get}")
385
}
386
```
387
388
### Checkpoint vs Persistence
389
390
```scala
391
// Persistence: keeps lineage, can be lost on failure
392
val persisted = rdd.cache()
393
394
// Checkpoint: truncates lineage, survives failures
395
rdd.checkpoint()
396
397
// Best practice: both for performance and reliability
398
val optimized = rdd.cache()
399
optimized.checkpoint()
400
optimized.count() // Trigger both cache and checkpoint
401
```
402
403
## Performance Guidelines
404
405
### When to Cache
406
407
1. **Multiple Actions**: RDD used in multiple actions
408
2. **Iterative Algorithms**: Machine learning, graph algorithms
409
3. **Interactive Analysis**: Jupyter notebooks, Spark shell
410
4. **Expensive Computations**: Complex transformations
411
5. **Data Reduction**: After significant filtering
412
413
```scala
414
// Good candidate for caching
415
val filtered = largeDataset
416
.filter(expensiveCondition) // Reduces data size significantly
417
.map(complexTransformation) // Expensive computation
418
419
filtered.cache()
420
421
// Multiple uses justify caching
422
val stats = filtered.map(extractStats).collect()
423
val sample = filtered.sample(false, 0.1).collect()
424
val export = filtered.saveAsTextFile("output")
425
```
426
427
### When Not to Cache
428
429
1. **Single Use**: RDD used only once
430
2. **Large Datasets**: Bigger than available memory
431
3. **Simple Operations**: Cheap to recompute
432
4. **Sequential Processing**: Linear data pipeline
433
434
```scala
435
// Don't cache - used only once
436
val result = sc.textFile("input.txt")
437
.map(_.toUpperCase)
438
.filter(_.startsWith("A"))
439
.count()
440
```
441
442
### Memory Management Best Practices
443
444
```scala
445
// 1. Unpersist when done
446
val temp = rdd.cache()
447
processData(temp)
448
temp.unpersist()
449
450
// 2. Use appropriate storage levels
451
val frequentData = rdd.persist(StorageLevel.MEMORY_ONLY)
452
val occasionalData = rdd.persist(StorageLevel.MEMORY_AND_DISK)
453
val backupData = rdd.persist(StorageLevel.DISK_ONLY)
454
455
// 3. Monitor memory usage
456
val memUsage = sc.getExecutorStorageStatus.map(_.memUsed).sum
457
val memTotal = sc.getExecutorStorageStatus.map(_.maxMem).sum
458
println(s"Memory utilization: ${memUsage.toDouble / memTotal * 100}%")
459
```
460
461
This comprehensive guide covers all aspects of RDD caching and persistence in Apache Spark, enabling you to optimize performance through intelligent data storage strategies.