Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
Caching and persistence are crucial optimization techniques in Spark. They allow you to store intermediate RDD results in memory and/or disk to avoid recomputation, dramatically improving performance for iterative algorithms and interactive analysis.
Spark provides various storage levels that control how and where RDDs are cached.
class StorageLevel(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1
) extends Externalizableobject StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
}| Level | Uses Disk | Uses Memory | Serialized | Replication |
|---|---|---|---|---|
NONE | ✗ | ✗ | ✗ | 1 |
DISK_ONLY | ✓ | ✗ | ✓ | 1 |
DISK_ONLY_2 | ✓ | ✗ | ✓ | 2 |
MEMORY_ONLY | ✗ | ✓ | ✗ | 1 |
MEMORY_ONLY_2 | ✗ | ✓ | ✗ | 2 |
MEMORY_ONLY_SER | ✗ | ✓ | ✓ | 1 |
MEMORY_ONLY_SER_2 | ✗ | ✓ | ✓ | 2 |
MEMORY_AND_DISK | ✓ | ✓ | ✗ | 1 |
MEMORY_AND_DISK_2 | ✓ | ✓ | ✗ | 2 |
MEMORY_AND_DISK_SER | ✓ | ✓ | ✓ | 1 |
MEMORY_AND_DISK_SER_2 | ✓ | ✓ | ✓ | 2 |
OFF_HEAP | ✗ | ✗* | ✓ | 1 |
*OFF_HEAP uses off-heap memory (e.g., Tachyon)
def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)The simplest way to cache an RDD in memory:
val data = sc.textFile("large-dataset.txt")
val words = data.flatMap(_.split(" "))
.map(_.toLowerCase)
.filter(_.length > 3)
// Cache the filtered results
val cachedWords = words.cache()
// Multiple actions will reuse the cached data
val count = cachedWords.count()
val distinctCount = cachedWords.distinct().count()
val sample = cachedWords.sample(false, 0.1).collect()def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def persist(newLevel: StorageLevel): RDD[T]More flexible caching with custom storage levels:
import org.apache.spark.storage.StorageLevel
val data = sc.textFile("huge-dataset.txt")
val processed = data.map(expensiveProcessing)
// Different persistence strategies
processed.persist(StorageLevel.MEMORY_ONLY) // Fast access, may lose data if not enough memory
processed.persist(StorageLevel.MEMORY_AND_DISK) // Spill to disk when memory full
processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialize to save memory
processed.persist(StorageLevel.DISK_ONLY) // Store only on disk
processed.persist(StorageLevel.MEMORY_AND_DISK_2) // Replicate for fault tolerancedef unpersist(blocking: Boolean = true): RDD[T]Remove RDD from cache and free memory:
val cachedData = data.cache()
// Use cached data
val result1 = cachedData.count()
val result2 = cachedData.filter(_ > 100).count()
// Remove from cache when no longer needed
cachedData.unpersist()
// Or unpersist asynchronously
cachedData.unpersist(blocking = false)def getStorageLevel: StorageLevelCheck current storage level:
val rdd = sc.parallelize(1 to 1000)
println(s"Default storage level: ${rdd.getStorageLevel}") // NONE
val cached = rdd.cache()
println(s"After cache(): ${cached.getStorageLevel}") // MEMORY_ONLY
val persisted = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
println(s"Custom persistence: ${persisted.getStorageLevel}") // MEMORY_AND_DISK_SERclass StorageLevel {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
}val level = StorageLevel.MEMORY_AND_DISK_SER_2
println(s"Uses disk: ${level.useDisk}") // true
println(s"Uses memory: ${level.useMemory}") // true
println(s"Deserialized: ${level.deserialized}") // false (serialized)
println(s"Replication: ${level.replication}") // 2MEMORY_ONLY - Best performance, no serialization overhead
// Use when:
// - RDD fits comfortably in memory
// - Fast CPU but limited memory bandwidth
// - Objects are not too expensive to reconstruct
val fastAccess = rdd.persist(StorageLevel.MEMORY_ONLY)MEMORY_ONLY_SER - Compact storage, slower access
// Use when:
// - Memory is limited but RDD is important to cache
// - Objects have significant serialization overhead
// - CPU is fast relative to memory bandwidth
val compactCache = rdd.persist(StorageLevel.MEMORY_ONLY_SER)MEMORY_AND_DISK - Balanced performance and reliability
// Use when:
// - RDD might not fit entirely in memory
// - Recomputation is expensive
// - Fault tolerance is important
val balanced = rdd.persist(StorageLevel.MEMORY_AND_DISK)MEMORY_AND_DISK_SER - Space-efficient with disk fallback
// Use when:
// - Memory is very limited
// - Serialization cost is acceptable
// - Disk I/O is reasonably fast
val efficient = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)DISK_ONLY - Reliable but slower
// Use when:
// - Memory is scarce
// - RDD is accessed infrequently
// - Recomputation is very expensive
val reliable = rdd.persist(StorageLevel.DISK_ONLY)_2 variants - Fault tolerance with replication
// Use when:
// - Fault tolerance is critical
// - Cluster has node failures
// - RDD recomputation is very expensive
val faultTolerant = rdd.persist(StorageLevel.MEMORY_AND_DISK_2)Cache only the RDDs that will be reused multiple times:
val rawData = sc.textFile("input.txt")
val cleaned = rawData.filter(_.nonEmpty).map(_.trim)
// Don't cache - used only once
val parsed = cleaned.map(parseLine)
// Cache this - used multiple times
val validated = parsed.filter(isValid).cache()
// Multiple actions on cached RDD
val errors = validated.filter(hasError).count()
val summary = validated.map(extractSummary).collect()
val output = validated.map(formatOutput).saveAsTextFile("output")var current = sc.textFile("initial-data.txt").cache()
for (i <- 1 to 10) {
// Unpersist previous iteration
val previous = current
// Compute new iteration and cache it
current = current.map(iterativeFunction).cache()
// Force computation and unpersist old data
current.count()
previous.unpersist()
println(s"Iteration $i completed")
}val rawData = sc.textFile("large-dataset.txt")
// Level 1: Cache frequently accessed base data
val baseData = rawData.filter(isRelevant).cache()
// Level 2: Cache intermediate expensive computations
val features = baseData.map(extractFeatures)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Level 3: Cache final results with replication for reliability
val results = features.map(expensiveMLModel)
.persist(StorageLevel.MEMORY_AND_DISK_2)def getRDDStorageInfo: Array[RDDInfo]
def getPersistentRDDs: Map[Int, RDD[_]]
def getExecutorStorageStatus: Array[StorageStatus]// Get information about cached RDDs
val storageInfo = sc.getRDDStorageInfo
storageInfo.foreach { info =>
println(s"RDD ${info.id} (${info.name}): ")
println(s" Memory size: ${info.memSize} bytes")
println(s" Disk size: ${info.diskSize} bytes")
println(s" Storage level: ${info.storageLevel}")
}
// Get all persistent RDDs
val persistentRDDs = sc.getPersistentRDDs
persistentRDDs.foreach { case (id, rdd) =>
println(s"RDD $id: ${rdd.name} - ${rdd.getStorageLevel}")
}
// Check executor storage status
val executorStatus = sc.getExecutorStorageStatus
executorStatus.foreach { status =>
println(s"Executor ${status.blockManagerId.executorId}:")
println(s" Max memory: ${status.maxMem} bytes")
println(s" Used memory: ${status.memUsed} bytes")
println(s" Remaining: ${status.memRemaining} bytes")
}def setName(name: String): RDD[T]
def name: Stringval data = sc.textFile("input.txt")
.setName("Input Data")
.cache()
val processed = data.map(process)
.setName("Processed Data")
.persist(StorageLevel.MEMORY_AND_DISK)
// Names will appear in Spark UI for easier monitoringCheckpointing provides fault tolerance by saving RDD data to reliable storage.
def setCheckpointDir(directory: String): Unit// Set checkpoint directory (must be reliable storage like HDFS)
sc.setCheckpointDir("hdfs://namenode/checkpoints")def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]val data = sc.textFile("input.txt")
val processed = data.map(expensiveOperation).filter(isValid)
// Mark for checkpointing
processed.checkpoint()
// Checkpoint happens after first action
val count = processed.count() // Triggers checkpoint
// Check if checkpointed
if (processed.isCheckpointed) {
println(s"Checkpointed to: ${processed.getCheckpointFile.get}")
}// Persistence: keeps lineage, can be lost on failure
val persisted = rdd.cache()
// Checkpoint: truncates lineage, survives failures
rdd.checkpoint()
// Best practice: both for performance and reliability
val optimized = rdd.cache()
optimized.checkpoint()
optimized.count() // Trigger both cache and checkpoint// Good candidate for caching
val filtered = largeDataset
.filter(expensiveCondition) // Reduces data size significantly
.map(complexTransformation) // Expensive computation
filtered.cache()
// Multiple uses justify caching
val stats = filtered.map(extractStats).collect()
val sample = filtered.sample(false, 0.1).collect()
val export = filtered.saveAsTextFile("output")// Don't cache - used only once
val result = sc.textFile("input.txt")
.map(_.toUpperCase)
.filter(_.startsWith("A"))
.count()// 1. Unpersist when done
val temp = rdd.cache()
processData(temp)
temp.unpersist()
// 2. Use appropriate storage levels
val frequentData = rdd.persist(StorageLevel.MEMORY_ONLY)
val occasionalData = rdd.persist(StorageLevel.MEMORY_AND_DISK)
val backupData = rdd.persist(StorageLevel.DISK_ONLY)
// 3. Monitor memory usage
val memUsage = sc.getExecutorStorageStatus.map(_.memUsed).sum
val memTotal = sc.getExecutorStorageStatus.map(_.maxMem).sum
println(s"Memory utilization: ${memUsage.toDouble / memTotal * 100}%")This comprehensive guide covers all aspects of RDD caching and persistence in Apache Spark, enabling you to optimize performance through intelligent data storage strategies.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark