RDD storage level management system for controlling persistence strategies across memory, disk, and off-heap storage with configurable replication and serialization options.
Main class for configuring RDD persistence behavior with fine-grained control over storage tiers and replication.
/**
* Flags for controlling the storage of an RDD
* Records whether to use memory, disk, or off-heap storage, serialization preferences, and replication factor
* @param useDisk Whether to store RDD partitions on disk
* @param useMemory Whether to store RDD partitions in memory
* @param useOffHeap Whether to use off-heap memory storage
* @param deserialized Whether to keep data in deserialized form
* @param replication Number of replicas to maintain (default 1)
*/
@DeveloperApi
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1
) extends Externalizable {
/** Returns whether this storage level uses disk storage */
def useDisk: Boolean
/** Returns whether this storage level uses memory storage */
def useMemory: Boolean
/** Returns whether this storage level uses off-heap memory */
def useOffHeap: Boolean
/** Returns whether data is stored in deserialized form */
def deserialized: Boolean
/** Returns the replication factor */
def replication: Int
/** Returns the memory mode (ON_HEAP or OFF_HEAP) */
private[spark] def memoryMode: MemoryMode
/** Validates that the storage level configuration is valid */
def isValid: Boolean
/** Returns human-readable description of the storage configuration */
def description: String
/** Creates a copy of this storage level */
override def clone(): StorageLevel
/** Converts storage level to integer representation for serialization */
def toInt: Int
}Usage Examples:
import org.apache.spark.storage.StorageLevel
// Using predefined storage levels
val memoryOnly = StorageLevel.MEMORY_ONLY
val memoryAndDisk = StorageLevel.MEMORY_AND_DISK_SER
// Custom storage level creation
val customLevel = StorageLevel(
useDisk = true,
useMemory = true,
deserialized = false,
replication = 2
)
// Checking storage level properties
if (customLevel.useMemory) {
println(s"Uses memory: ${customLevel.memoryMode}")
}
println(s"Configuration: ${customLevel.description}")
// Applying to RDDs (conceptual - would be used in broader Spark context)
// rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)Factory methods and predefined constants for common storage configurations.
object StorageLevel {
/** Predefined storage levels for common use cases */
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val DISK_ONLY_3: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
/** Create StorageLevel from string representation */
@DeveloperApi
def fromString(s: String): StorageLevel
/** Create StorageLevel with full parameter control */
@DeveloperApi
def apply(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int
): StorageLevel
/** Create StorageLevel without off-heap parameter */
@DeveloperApi
def apply(
useDisk: Boolean,
useMemory: Boolean,
deserialized: Boolean,
replication: Int = 1
): StorageLevel
/** Create StorageLevel from integer flags and replication */
@DeveloperApi
def apply(flags: Int, replication: Int): StorageLevel
/** Create StorageLevel from ObjectInput stream */
@DeveloperApi
def apply(in: ObjectInput): StorageLevel
}Usage Examples:
import org.apache.spark.storage.StorageLevel
// Using predefined levels
val diskOnly = StorageLevel.DISK_ONLY
val memoryAndDiskSer = StorageLevel.MEMORY_AND_DISK_SER_2
// Creating custom levels
val highReplication = StorageLevel(
useDisk = true,
useMemory = true,
useOffHeap = false,
deserialized = true,
replication = 3
)
val serializedMemory = StorageLevel(
useDisk = false,
useMemory = true,
deserialized = false,
replication = 2
)
// String-based creation
val fromString = StorageLevel.fromString("MEMORY_AND_DISK")
// Validation
if (highReplication.isValid) {
println(s"Valid configuration: ${highReplication.description}")
}Common storage configuration patterns for different use cases.
// Performance-oriented: Fast access, memory preferred
val performanceLevel = StorageLevel.MEMORY_ONLY
// Reliability-oriented: Fault tolerance with replication
val reliableLevel = StorageLevel.MEMORY_AND_DISK_2
// Memory-efficient: Serialized storage to reduce memory footprint
val efficientLevel = StorageLevel.MEMORY_ONLY_SER
// Large dataset: Disk storage with memory caching
val largeDataLevel = StorageLevel.MEMORY_AND_DISK
// Fault-tolerant large dataset: Disk + memory with replication
val faultTolerantLevel = StorageLevel.MEMORY_AND_DISK_SER_2
// Off-heap storage: Avoid GC pressure
val offHeapLevel = StorageLevel.OFF_HEAP
// No persistence: Recompute on demand
val noPersistence = StorageLevel.NONEStorage levels integrate with Spark's memory management system.
/**
* Memory storage mode enumeration
* Controls whether data is stored on-heap or off-heap
*/
@Private
public enum MemoryMode {
/** Standard JVM heap memory */
ON_HEAP,
/** Off-heap memory (e.g., using sun.misc.Unsafe) */
OFF_HEAP
}Usage Examples:
import org.apache.spark.storage.StorageLevel
import org.apache.spark.memory.MemoryMode
val onHeapLevel = StorageLevel.MEMORY_ONLY
val offHeapLevel = StorageLevel.OFF_HEAP
// Check memory mode
onHeapLevel.memoryMode match {
case MemoryMode.ON_HEAP => println("Using JVM heap memory")
case MemoryMode.OFF_HEAP => println("Using off-heap memory")
}
// Off-heap storage reduces GC pressure but requires configuration
val offHeapConfig = StorageLevel(
useDisk = true,
useMemory = true,
useOffHeap = true,
deserialized = false, // Off-heap is always serialized
replication = 1
)// Interactive/iterative workloads - prioritize speed
val interactiveLevel = StorageLevel.MEMORY_ONLY
// Production ETL - balance performance and reliability
val etlLevel = StorageLevel.MEMORY_AND_DISK_SER
// Large reference datasets - efficient memory usage
val referenceLevel = StorageLevel.MEMORY_ONLY_SER_2
// Temporary intermediate results - fast but can recompute
val temporaryLevel = StorageLevel.MEMORY_ONLY
// Critical datasets - maximum fault tolerance
val criticalLevel = StorageLevel.MEMORY_AND_DISK_2