Storage levels and caching mechanisms for optimizing RDD persistence across memory and disk with various replication strategies.
Defines how RDD partitions are stored, including memory usage, disk usage, and replication settings.
/**
* Storage level for RDD persistence, defining memory/disk usage and replication
*/
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int) extends Externalizable {
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
override def toString: String = {
"StorageLevel(%s, %s, %s, %s, %d)".format(
_useDisk, _useMemory, _useOffHeap, _deserialized, _replication)
}
}
object StorageLevel {
// Predefined storage levels
val NONE = new StorageLevel(false, false, false, false, 1)
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false, 1)
/** Create a new StorageLevel with custom settings */
def apply(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int): StorageLevel = {
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
}
}Methods available on RDD for controlling persistence and caching behavior.
// RDD persistence methods
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def persist(storageLevel: StorageLevel): RDD[T]
def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def unpersist(blocking: Boolean = true): RDD[T]
// Query persistence status
def getStorageLevel: StorageLevel
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
// Checkpointing methods
def checkpoint(): Unit
def localCheckpoint(): RDD[T]Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
val sc = new SparkContext(new SparkConf().setAppName("Storage Example").setMaster("local[*]"))
val data = sc.textFile("hdfs://large-dataset.txt")
// Basic caching (MEMORY_ONLY)
val cachedData = data.cache()
// Explicit persistence with different storage levels
val memoryOnlyData = data.persist(StorageLevel.MEMORY_ONLY)
val memoryAndDiskData = data.persist(StorageLevel.MEMORY_AND_DISK)
val diskOnlyData = data.persist(StorageLevel.DISK_ONLY)
val serializedData = data.persist(StorageLevel.MEMORY_ONLY_SER)
// Replication for fault tolerance
val replicatedData = data.persist(StorageLevel.MEMORY_ONLY_2) // 2 replicas
// Off-heap storage (requires off-heap memory configuration)
val offHeapData = data.persist(StorageLevel.OFF_HEAP)
// Custom storage level
val customStorage = StorageLevel(
useDisk = true,
useMemory = true,
useOffHeap = false,
deserialized = false, // serialized for memory efficiency
replication = 2
)
val customPersistedData = data.persist(customStorage)
// Use persisted data multiple times
val wordCounts = cachedData
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
val lineCount = cachedData.count()
val charCount = cachedData.map(_.length).sum()
// Clean up when done
cachedData.unpersist()
memoryAndDiskData.unpersist(blocking = false) // Non-blocking cleanupFault-tolerance mechanism that saves RDD data to reliable storage for recovery.
// SparkContext checkpointing methods
def setCheckpointDir(directory: String): Unit
def getCheckpointDir: Option[String]
// RDD checkpointing methods
def checkpoint(): Unit
def localCheckpoint(): RDD[T]
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Checkpoint Example"))
// Set checkpoint directory (should be on reliable storage like HDFS)
sc.setCheckpointDir("hdfs://namenode:port/spark-checkpoints")
val data = sc.textFile("hdfs://input-data.txt")
// Long chain of transformations
val processedData = data
.filter(_.nonEmpty)
.map(_.toLowerCase)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.filter(_._2 > 5)
.map(_.swap) // (count, word)
.sortByKey(ascending = false)
.map(_.swap) // back to (word, count)
// Checkpoint to break lineage and prevent recomputation
processedData.checkpoint()
// Trigger checkpointing (requires an action)
processedData.count()
// Verify checkpointing
println(s"Is checkpointed: ${processedData.isCheckpointed}")
println(s"Checkpoint file: ${processedData.getCheckpointFile}")
// Use checkpointed RDD
val topWords = processedData.take(100)
val totalUniqueWords = processedData.count()
// Local checkpointing (for shorter lineages)
val shortChain = data.map(_.toUpperCase)
val localCheckpointed = shortChain.localCheckpoint()
localCheckpointed.count() // Trigger local checkpointing// Monitor storage usage
val largeDataset = sc.textFile("hdfs://very-large-file.txt")
// Use serialized storage for memory efficiency
val efficientStorage = largeDataset.persist(StorageLevel.MEMORY_ONLY_SER)
// Check storage usage programmatically
def printStorageInfo(sc: SparkContext): Unit = {
val storageStatusListener = sc.statusTracker
val executorInfos = storageStatusListener.getExecutorInfos
executorInfos.foreach { executor =>
println(s"Executor ${executor.executorId}: " +
s"${executor.memoryUsed / (1024 * 1024)}MB used, " +
s"${executor.memoryTotal / (1024 * 1024)}MB total")
}
}
// Use the storage info
efficientStorage.count()
printStorageInfo(sc)import org.apache.spark.storage.StorageLevel
def adaptiveStorage[T](rdd: RDD[T], estimatedSize: Long): RDD[T] = {
val memoryFraction = 0.6 // Assume 60% of executor memory available for storage
val executorMemory = sc.getConf.getSizeAsBytes("spark.executor.memory", "1g")
val availableMemory = (executorMemory * memoryFraction).toLong
val storageLevel = if (estimatedSize < availableMemory) {
StorageLevel.MEMORY_ONLY
} else if (estimatedSize < availableMemory * 2) {
StorageLevel.MEMORY_ONLY_SER
} else if (estimatedSize < availableMemory * 5) {
StorageLevel.MEMORY_AND_DISK_SER
} else {
StorageLevel.DISK_ONLY
}
println(s"Using storage level: $storageLevel for estimated size: ${estimatedSize / (1024 * 1024)}MB")
rdd.persist(storageLevel)
}
// Usage
val dataSize = estimateRDDSize(largeDataset) // Custom function to estimate size
val optimizedRDD = adaptiveStorage(largeDataset, dataSize)// Complex ETL pipeline with strategic checkpointing
def processLargeDataset(inputPath: String, outputPath: String): Unit = {
sc.setCheckpointDir("hdfs://checkpoints/etl-pipeline")
// Stage 1: Initial data loading and cleaning
val rawData = sc.textFile(inputPath)
val cleanedData = rawData
.filter(_.nonEmpty)
.filter(!_.startsWith("#")) // Remove comments
.map(_.trim)
.cache() // Cache cleaned data for reuse
val recordCount = cleanedData.count()
println(s"Loaded and cleaned $recordCount records")
// Stage 2: Complex transformations
val transformedData = cleanedData
.map(parseRecord) // Complex parsing
.filter(_.isValid) // Remove invalid records
.map(enrichRecord) // Add external data
.groupBy(_.category)
.mapValues(aggregateRecords) // Complex aggregation
// Checkpoint after expensive transformations
transformedData.checkpoint()
val checkpointCount = transformedData.count() // Trigger checkpointing
println(s"Checkpointed $checkpointCount transformed records")
// Stage 3: Final processing using checkpointed data
val finalResults = transformedData
.filter(_._2.score > threshold)
.sortBy(_._2.score, ascending = false)
.take(1000)
// Clean up cached data
cleanedData.unpersist()
// Save results
sc.parallelize(finalResults).saveAsTextFile(outputPath)
}import org.apache.spark.storage.RDDInfo
def monitorRDDStorage(sc: SparkContext): Unit = {
val rddInfos = sc.getRDDStorageInfo
println("RDD Storage Information:")
println("=" * 50)
rddInfos.foreach { rddInfo =>
println(s"RDD ID: ${rddInfo.id}")
println(s"Name: ${rddInfo.name}")
println(s"Storage Level: ${rddInfo.storageLevel}")
println(s"Cached Partitions: ${rddInfo.numCachedPartitions}/${rddInfo.numPartitions}")
println(s"Memory Size: ${rddInfo.memSize / (1024 * 1024)} MB")
println(s"Disk Size: ${rddInfo.diskSize / (1024 * 1024)} MB")
println("-" * 30)
}
}
// Custom storage metrics
case class StorageMetrics(
rddId: Int,
memoryUsed: Long,
diskUsed: Long,
partitionsCached: Int,
totalPartitions: Int
)
def collectStorageMetrics(sc: SparkContext): List[StorageMetrics] = {
sc.getRDDStorageInfo.map { rddInfo =>
StorageMetrics(
rddId = rddInfo.id,
memoryUsed = rddInfo.memSize,
diskUsed = rddInfo.diskSize,
partitionsCached = rddInfo.numCachedPartitions,
totalPartitions = rddInfo.numPartitions
)
}.toList
}
// Usage in application
val beforeMetrics = collectStorageMetrics(sc)
// ... perform operations ...
val afterMetrics = collectStorageMetrics(sc)
// Compare metrics
println("Storage usage changes:")
afterMetrics.foreach { after =>
beforeMetrics.find(_.rddId == after.rddId) match {
case Some(before) =>
val memoryChange = after.memoryUsed - before.memoryUsed
val diskChange = after.diskUsed - before.diskUsed
println(s"RDD ${after.rddId}: Memory: ${memoryChange / (1024 * 1024)}MB, Disk: ${diskChange / (1024 * 1024)}MB")
case None =>
println(s"New RDD ${after.rddId}: Memory: ${after.memoryUsed / (1024 * 1024)}MB, Disk: ${after.diskUsed / (1024 * 1024)}MB")
}
}// 1. MEMORY_ONLY: Small datasets, frequently accessed
val smallFrequentData = sc.parallelize(1 to 1000).persist(StorageLevel.MEMORY_ONLY)
// 2. MEMORY_ONLY_SER: Larger datasets, CPU available for deserialization
val mediumData = sc.textFile("medium-file.txt").persist(StorageLevel.MEMORY_ONLY_SER)
// 3. MEMORY_AND_DISK: Important datasets, some memory pressure
val importantData = sc.textFile("important-file.txt").persist(StorageLevel.MEMORY_AND_DISK)
// 4. DISK_ONLY: Very large datasets, limited memory
val hugeData = sc.textFile("huge-file.txt").persist(StorageLevel.DISK_ONLY)
// 5. Replication levels: Critical data in fault-prone environments
val criticalData = sc.textFile("critical-file.txt").persist(StorageLevel.MEMORY_AND_DISK_2)// Cache management pattern
class CacheManager(sc: SparkContext) {
private var cachedRDDs = List.empty[RDD[_]]
def cacheRDD[T](rdd: RDD[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): RDD[T] = {
val cachedRDD = rdd.persist(storageLevel)
cachedRDDs = cachedRDD :: cachedRDDs
cachedRDD
}
def unpersistAll(): Unit = {
cachedRDDs.foreach(_.unpersist())
cachedRDDs = List.empty
}
def getStorageInfo: List[RDDInfo] = {
cachedRDDs.flatMap { rdd =>
sc.getRDDStorageInfo.find(_.id == rdd.id)
}
}
}
// Usage
val cacheManager = new CacheManager(sc)
val data1 = cacheManager.cacheRDD(sc.textFile("file1.txt"))
val data2 = cacheManager.cacheRDD(sc.textFile("file2.txt"), StorageLevel.MEMORY_ONLY_SER)
// Process data...
// Clean up all cached RDDs
cacheManager.unpersistAll()// Intelligent checkpointing based on lineage depth
def smartCheckpoint[T](rdd: RDD[T], maxLineageDepth: Int = 10): RDD[T] = {
def countLineageDepth(rdd: RDD[_]): Int = {
rdd.dependencies.map {
case dep => countLineageDepth(dep.rdd) + 1
}.foldLeft(0)(math.max)
}
val depth = countLineageDepth(rdd)
if (depth > maxLineageDepth) {
rdd.checkpoint()
println(s"Checkpointing RDD with lineage depth: $depth")
}
rdd
}
// Usage in complex pipeline
val result = inputData
.map(transform1)
.filter(filter1)
.map(transform2)
.groupByKey()
.map(transform3) // This might have deep lineage
val checkpointedResult = smartCheckpoint(result)
checkpointedResult.count() // Trigger checkpointing if needed
// Continue processing with potentially checkpointed RDD
val finalResult = checkpointedResult
.filter(finalFilter)
.sortBy(_._2)