Data storage levels and caching mechanisms for optimizing repeated access to RDDs with configurable memory and disk usage strategies.
Configuration for how RDDs are stored when persisted, controlling memory usage, disk usage, serialization, and replication.
/**
* Storage level configuration for RDD persistence
* @param useDisk whether to use disk storage
* @param useMemory whether to use memory storage
* @param useOffHeap whether to use off-heap memory
* @param deserialized whether to store in deserialized format
* @param replication number of replicas to maintain
*/
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int
) extends Externalizable {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
/** Create a copy with different replication factor */
def clone(newReplication: Int): StorageLevel
/** Check if this storage level is valid */
def isValid: Boolean
}
object StorageLevel {
/** Memory only, deserialized */
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
/** Memory only, serialized */
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
/** Memory and disk spillover, deserialized */
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
/** Memory and disk spillover, serialized */
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
/** Disk only */
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
/** Memory only with 2x replication, deserialized */
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
/** Memory and disk with 2x replication, deserialized */
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, false, 2)
/** Off-heap memory only */
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
/** No storage (unpersist) */
val NONE = new StorageLevel(false, false, false, false, 1)
/** Apply method for creating custom storage levels */
def apply(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean = false,
deserialized: Boolean = true,
replication: Int = 1
): StorageLevel
}Methods available on RDD for controlling persistence and caching behavior.
abstract class RDD[T: ClassTag] {
/** Persist RDD with specified storage level */
def persist(newLevel: StorageLevel = MEMORY_ONLY): this.type
/** Cache RDD in memory (shortcut for persist(MEMORY_ONLY)) */
def cache(): this.type
/** Remove persisted RDD from storage */
def unpersist(blocking: Boolean = false): this.type
/** Mark RDD for checkpointing to reliable storage */
def checkpoint(): Unit
/** Check if RDD is checkpointed */
def isCheckpointed: Boolean
/** Get checkpoint file if available */
def getCheckpointFile: Option[String]
/** Get current storage level */
def getStorageLevel: StorageLevel
/** Check if RDD is cached */
def isCached: Boolean = getStorageLevel != StorageLevel.NONE
}Core storage management component handling data blocks across the cluster.
/**
* Manager for reading and writing data blocks
*/
class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
master: BlockManagerMaster,
serializerManager: SerializerManager,
conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int
) extends BlockDataManager with BlockEvictionHandler with Logging {
/** Initialize block manager */
def initialize(appId: String): Unit
/** Get local block data */
def getBlockData(blockId: BlockId): ManagedBuffer
/** Put block data */
def putBlockData(
blockId: BlockId,
data: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]
): Boolean
/** Get block from local storage or remote */
def get[T: ClassTag](blockId: BlockId): Option[BlockResult]
/** Put block into storage */
def putSingle[T: ClassTag](
blockId: BlockId,
value: T,
level: StorageLevel,
tellMaster: Boolean = true
): Boolean
/** Put iterator of values into storage */
def putIterator[T: ClassTag](
blockId: BlockId,
values: Iterator[T],
level: StorageLevel,
tellMaster: Boolean = true
): Boolean
/** Remove block from storage */
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit
/** Get memory status */
def memoryStatus: Map[BlockId, (StorageLevel, Long, Long)]
/** Get disk usage */
def diskBlockSize(blockId: BlockId): Long
}Type-safe identifiers for different types of data blocks.
/**
* Base class for block identifiers
*/
sealed abstract class BlockId {
def name: String
def asRDDId: Option[RDDBlockId] = None
}
/** Block identifier for RDD blocks */
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = s"rdd_${rddId}_$splitIndex"
override def asRDDId: Option[RDDBlockId] = Some(this)
}
/** Block identifier for shuffle blocks */
case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
override def name: String = s"shuffle_${shuffleId}_${mapId}_$reduceId"
}
/** Block identifier for shuffle data blocks */
case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.data"
}
/** Block identifier for shuffle index blocks */
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.index"
}
/** Block identifier for broadcast blocks */
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
override def name: String = s"broadcast_${broadcastId}${if (field.nonEmpty) "_" + field else ""}"
}
/** Block identifier for task result blocks */
case class TaskResultBlockId(taskId: Long) extends BlockId {
override def name: String = s"taskresult_$taskId"
}
/** Block identifier for stream blocks */
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
override def name: String = s"input-${streamId}-$uniqueId"
}Mechanism for saving RDD lineage to reliable storage for fault tolerance.
/**
* Checkpoint data management
*/
abstract class CheckpointData[T: ClassTag](rdd: RDD[T]) extends Serializable {
/** Get checkpoint state */
def cpState: CheckpointState
/** Get checkpoint RDD */
def checkpointRDD: Option[CheckpointRDD[T]]
/** Materialize checkpoint */
def checkpoint(): Unit
/** Check if checkpointed */
def isCheckpointed: Boolean
/** Get checkpoint file */
def getCheckpointDir: Option[String]
}
/**
* Checkpoint states
*/
object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, CheckpointingInProgress, Checkpointed = Value
}
/**
* RDD representing checkpointed data
*/
abstract class CheckpointRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
/** Get checkpoint directory */
def getCheckpointDir: String
}
/**
* Reliable checkpoint RDD (HDFS, S3, etc.)
*/
class ReliableCheckpointRDD[T: ClassTag](
sc: SparkContext,
checkpointPath: String,
partitioner: Option[Partitioner] = None
) extends CheckpointRDD[T](sc)
/**
* Local checkpoint RDD (local filesystem)
*/
class LocalCheckpointRDD[T: ClassTag](
sc: SparkContext,
checkpointPath: String,
originalRDD: RDD[T],
partitioner: Option[Partitioner] = None
) extends CheckpointRDD[T](sc)Components managing memory allocation for caching and execution.
/**
* Abstract memory manager for Spark
*/
abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long
) extends Logging {
/** Maximum memory available for storage */
def maxOnHeapStorageMemory: Long
def maxOffHeapStorageMemory: Long
/** Acquire memory for storage */
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode
): Boolean
/** Acquire memory for execution */
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode
): Long
/** Release storage memory */
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
/** Release execution memory */
def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode
): Unit
/** Get current storage memory usage */
def storageMemoryUsed: Long
/** Get current execution memory usage */
def executionMemoryUsed: Long
}
/**
* Memory allocation modes
*/
object MemoryMode extends Enumeration {
type MemoryMode = Value
val ON_HEAP, OFF_HEAP = Value
}
/**
* Unified memory manager (default in Spark 1.6+)
*/
class UnifiedMemoryManager(
conf: SparkConf,
maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int
) extends MemoryManager(
conf,
numCores,
onHeapStorageRegionSize,
maxHeapMemory - onHeapStorageRegionSize
)Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
val sc = new SparkContext(new SparkConf().setAppName("Storage Example"))
// Create RDD
val data = sc.parallelize(1 to 1000000)
val processed = data.map(_ * 2).filter(_ > 100)
// Different persistence strategies
processed.persist(StorageLevel.MEMORY_ONLY) // Cache in memory only
processed.persist(StorageLevel.MEMORY_AND_DISK) // Memory with disk spillover
processed.persist(StorageLevel.DISK_ONLY) // Disk only storage
processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialized in memory
// Cache shortcut
processed.cache() // Equivalent to persist(StorageLevel.MEMORY_ONLY)
// Use cached RDD multiple times
val count1 = processed.count()
val count2 = processed.filter(_ < 1000).count() // Reuses cache
// Checkpointing for fault tolerance
sc.setCheckpointDir("hdfs://checkpoints")
processed.checkpoint()
processed.count() // Triggers checkpoint
// Check storage status
println(s"Storage level: ${processed.getStorageLevel}")
println(s"Is cached: ${processed.isCached}")
println(s"Is checkpointed: ${processed.isCheckpointed}")
// Cleanup
processed.unpersist()
sc.stop()