Storage levels define how RDDs are persisted across memory, disk, and off-heap storage with various serialization and replication options.
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int
) {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
def clone(): StorageLevel
def writeExternal(out: ObjectOutput): Unit
def readExternal(in: ObjectInput): Unit
}
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false, 1)
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}Methods available on all RDD types for controlling persistence behavior.
// From RDD[T]
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]
def cache(): RDD[T]
def unpersist(blocking: Boolean = false): RDD[T]
def getStorageLevel: StorageLevel
def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]Internal storage management system (some public interfaces available for advanced usage).
class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
master: BlockManagerMaster,
serializerManager: SerializerManager,
conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int
) {
// Public methods for advanced users
def putSingle[T: ClassTag](blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean
def putIterator[T: ClassTag](blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean
def getLocalValues(blockId: BlockId): Option[BlockResult]
def getOrElseUpdate[T](blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]]
def remove(blockId: BlockId, tellMaster: Boolean = true): Boolean
}Key configuration properties for storage and caching behavior.
spark.storage.memoryFraction - Fraction of JVM heap space used for storage cache (deprecated in favor of unified memory management)spark.storage.safetyFraction - Safety fraction to prevent storage region from using entire heapspark.storage.unrollFraction - Fraction of storage memory used for unrolling blocksspark.storage.replication.proactive - Enable proactive block replicationspark.storage.blockManagerTimeoutIntervalMs - Timeout for block manager operationsspark.storage.diskStore.subDirectories - Number of subdirectories for disk storagespark.storage.localDirs - Directories for storing blocks on diskspark.storage.localDirs.fallback - Fallback directory if localDirs not availablespark.memory.offHeap.enabled - Enable off-heap storagespark.memory.offHeap.size - Size of off-heap storage regionval expensiveData = sc.textFile("large-dataset.txt")
.map(processExpensiveOperation)
.filter(complexFilter)
// Cache for multiple uses
expensiveData.cache()
// Use multiple times (only computed once)
val count = expensiveData.count()
val sample = expensiveData.take(10)
val stats = expensiveData.map(_.length).stats()
// Clean up when done
expensiveData.unpersist()import org.apache.spark.storage.StorageLevel
val data = sc.parallelize(1 to 1000000)
// Memory only (fastest access, but limited by memory)
data.persist(StorageLevel.MEMORY_ONLY)
// Memory and disk (spills to disk if memory full)
data.persist(StorageLevel.MEMORY_AND_DISK)
// Serialized storage (more memory efficient, slower access)
data.persist(StorageLevel.MEMORY_ONLY_SER)
// With replication for fault tolerance
data.persist(StorageLevel.MEMORY_AND_DISK_2)
// Off-heap storage (requires off-heap memory configuration)
data.persist(StorageLevel.OFF_HEAP)// Set checkpoint directory (must be fault-tolerant storage like HDFS)
sc.setCheckpointDir("hdfs://namenode:port/checkpoint")
val iterativeData = sc.textFile("input.txt")
.map(parseData)
.filter(isValid)
// Checkpoint to break long lineage chains
iterativeData.checkpoint()
// Force materialization to complete checkpoint
iterativeData.count()
// Now safe to use in iterative algorithms
var current = iterativeData
for (i <- 1 to 10) {
current = current.map(iterativeFunction)
if (i % 3 == 0) {
current.checkpoint()
current.count() // Force checkpoint
}
}// Pattern 1: Conditional persistence based on data size
val rdd = sc.textFile("data.txt").map(transform)
val estimatedSize = rdd.take(1000).map(_.length).sum * (rdd.count() / 1000.0)
val persistedRDD = if (estimatedSize > 1000000) {
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
} else {
rdd.persist(StorageLevel.MEMORY_ONLY)
}
// Pattern 2: Layered persistence for different access patterns
val rawData = sc.textFile("data.txt")
val cleanedData = rawData.map(clean).persist(StorageLevel.MEMORY_AND_DISK)
val aggregatedData = cleanedData.groupByKey().persist(StorageLevel.MEMORY_ONLY)
// Use cleaned data for multiple transformations
val result1 = cleanedData.filter(condition1).collect()
val result2 = cleanedData.filter(condition2).collect()
// Use aggregated data for summary statistics
val summary1 = aggregatedData.mapValues(_.size).collect()
val summary2 = aggregatedData.mapValues(_.sum).collect()// Get current storage level
val level = rdd.getStorageLevel
println(s"Storage level: $level")
// Check if RDD is cached
if (level != StorageLevel.NONE) {
println("RDD is persisted")
}
// Monitor through Spark UI
// Access http://driver:4040 to see Storage tab with:
// - RDD storage levels
// - Memory usage
// - Disk usage
// - Partition informationMEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
DISK_ONLY
Replication (_2 variants)
OFF_HEAP