Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
Spark provides fine-grained control over RDD caching and persistence strategies across memory and disk, enabling optimization for different data access patterns and cluster configurations.
StorageLevel defines how and where RDDs are stored when persisted.
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int) {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
def clone(): StorageLevel
def isValid: Boolean
def toInt: Int
def writeExternal(out: ObjectOutput): Unit
def readExternal(in: ObjectInput): Unit
}object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int): StorageLevel
def fromString(s: String): StorageLevel
}| Storage Level | Memory | Disk | Serialized | Replication | Use Case |
|---|---|---|---|---|---|
MEMORY_ONLY | Yes | No | No | 1 | Fast access, sufficient memory |
MEMORY_ONLY_SER | Yes | No | Yes | 1 | Memory-constrained, CPU available |
MEMORY_AND_DISK | Yes | Yes | No | 1 | Fallback to disk when memory full |
MEMORY_AND_DISK_SER | Yes | Yes | Yes | 1 | Memory + CPU constrained |
DISK_ONLY | No | Yes | Yes | 1 | Large datasets, infrequent access |
MEMORY_ONLY_2 | Yes | No | No | 2 | Fast access + fault tolerance |
MEMORY_AND_DISK_2 | Yes | Yes | No | 2 | Balanced performance + fault tolerance |
OFF_HEAP | Off-heap | No | Yes | 1 | Avoid GC overhead |
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
val sc = new SparkContext(new SparkConf().setAppName("Persistence Example").setMaster("local[*]"))
// Create an expensive RDD
val expensiveRDD = sc.textFile("large-dataset.txt")
.filter(_.nonEmpty)
.map(complexTransformation)
.filter(complexFilter)
// Cache in memory (shorthand for MEMORY_ONLY)
val cachedRDD = expensiveRDD.cache()
// Explicit persistence with custom storage level
val persistedRDD = expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Use the cached RDD multiple times - computation happens only once
val count1 = cachedRDD.count()
val count2 = cachedRDD.filter(_.contains("error")).count()
val sample = cachedRDD.take(10)
// Check storage level
println(s"Storage level: ${cachedRDD.getStorageLevel}")
// Remove from cache when no longer needed
cachedRDD.unpersist()// High-frequency access, sufficient memory
val frequentlyUsedRDD = inputRDD
.map(preprocessData)
.persist(StorageLevel.MEMORY_ONLY)
// Large dataset with memory constraints
val largeRDD = inputRDD
.flatMap(expandData)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Critical data requiring fault tolerance
val criticalRDD = inputRDD
.map(importantTransformation)
.persist(StorageLevel.MEMORY_AND_DISK_2)
// Infrequently accessed large dataset
val archivalRDD = inputRDD
.map(heavyProcessing)
.persist(StorageLevel.DISK_ONLY)
// Avoiding GC pressure for long-lived RDDs
val longLivedRDD = inputRDD
.map(createLargeObjects)
.persist(StorageLevel.OFF_HEAP)Checkpointing saves RDD data to reliable storage (like HDFS) to truncate lineage and improve fault tolerance.
// Set checkpoint directory (must be fault-tolerant storage)
sc.setCheckpointDir("hdfs://namenode:port/checkpoints")
val longLineageRDD = sc.textFile("input.txt")
.map(transformation1)
.filter(filter1)
.map(transformation2)
.filter(filter2)
.map(transformation3)
.filter(filter3)
.map(transformation4)
// Mark for checkpointing
longLineageRDD.checkpoint()
// Trigger checkpointing with an action
val count = longLineageRDD.count()
// Verify checkpointing
println(s"Is checkpointed: ${longLineageRDD.isCheckpointed}")
println(s"Checkpoint file: ${longLineageRDD.getCheckpointFile}")
// The RDD lineage is now truncated - subsequent failures will recover from checkpoint
val furtherProcessed = longLineageRDD.map(additionalTransformation)// Local checkpointing (persists to local executor disk)
val rddToCheckpoint = expensiveComputationRDD.localCheckpoint()
// Trigger checkpointing
rddToCheckpoint.count()
// Use the checkpointed RDD
val results = rddToCheckpoint.map(finalTransformation).collect()// Checkpoint at strategic points in long pipelines
val pipeline = sc.textFile("massive-dataset.txt")
.map(parseRecord)
.filter(isValid)
.map(enrichWithLookup)
.filter(passesQualityCheck)
// Checkpoint after expensive preprocessing
val preprocessed = pipeline.checkpoint()
preprocessed.count() // Materialize checkpoint
// Continue pipeline from checkpoint
val aggregated = preprocessed
.map(extractFeatures)
.groupByKey()
.mapValues(aggregate)
// Checkpoint again before final processing
val checkpoint2 = aggregated.checkpoint()
checkpoint2.count()
val finalResults = checkpoint2
.map(finalTransformation)
.collect()def smartPersist[T](rdd: RDD[T], estimatedSize: Long, memoryAvailable: Long): RDD[T] = {
val storageLevel = if (estimatedSize < memoryAvailable * 0.7) {
StorageLevel.MEMORY_ONLY
} else if (estimatedSize < memoryAvailable * 1.5) {
StorageLevel.MEMORY_AND_DISK_SER
} else {
StorageLevel.DISK_ONLY
}
rdd.persist(storageLevel)
}
// Usage
val processedRDD = inputRDD.map(expensiveTransformation)
val estimatedSize = processedRDD.count() * averageRecordSizeBytes
val smartPersistedRDD = smartPersist(processedRDD, estimatedSize, availableMemoryBytes)class CacheManager(sc: SparkContext) {
private val cachedRDDs = mutable.Map[String, RDD[_]]()
def cacheWithEviction[T](name: String, rdd: RDD[T], level: StorageLevel): RDD[T] = {
// Evict old cache if memory is getting full
if (getMemoryUsage() > 0.8) {
evictLeastRecentlyUsed()
}
val cached = rdd.persist(level)
cachedRDDs(name) = cached
cached
}
def uncache(name: String): Unit = {
cachedRDDs.get(name).foreach(_.unpersist())
cachedRDDs.remove(name)
}
private def getMemoryUsage(): Double = {
// Implementation to check memory usage
val memoryStatus = sc.getExecutorMemoryStatus
val totalMemory = memoryStatus.values.map(_._1).sum
val usedMemory = memoryStatus.values.map(m => m._1 - m._2).sum
usedMemory.toDouble / totalMemory
}
private def evictLeastRecentlyUsed(): Unit = {
// Implementation to evict based on access patterns
cachedRDDs.headOption.foreach { case (name, rdd) =>
rdd.unpersist()
cachedRDDs.remove(name)
}
}
}object DataTemperature extends Enumeration {
type DataTemperature = Value
val HOT, WARM, COLD = Value
}
import DataTemperature._
class TemperatureAwareStorage(sc: SparkContext) {
def persist[T](rdd: RDD[T], temperature: DataTemperature): RDD[T] = {
val storageLevel = temperature match {
case HOT => StorageLevel.MEMORY_ONLY // Frequently accessed
case WARM => StorageLevel.MEMORY_AND_DISK_SER // Occasionally accessed
case COLD => StorageLevel.DISK_ONLY // Rarely accessed
}
rdd.persist(storageLevel)
}
}
// Usage
val storage = new TemperatureAwareStorage(sc)
val hotData = storage.persist(frequentlyUsedRDD, HOT)
val warmData = storage.persist(occasionallyUsedRDD, WARM)
val coldData = storage.persist(archivalRDD, COLD)// Configure storage memory fraction in SparkConf
val conf = new SparkConf()
.setAppName("Storage Optimization")
.set("spark.storage.memoryFraction", "0.6") // 60% for storage (legacy)
.set("spark.storage.unrollMemoryFraction", "0.2") // 20% for unrolling (legacy)
.set("spark.storage.memoryMapThreshold", "2m") // Memory map files > 2MB
.set("spark.storage.blockManagerSlaveTimeoutMs", "120s") // Block manager timeout
// Unified memory manager (Spark 1.6+)
val modernConf = new SparkConf()
.set("spark.memory.useLegacyMode", "false")
.set("spark.memory.storageFraction", "0.5") // 50% of heap for storage// Use Kryo serialization for better performance
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.unsafe", "true")
.set("spark.kryoserializer.buffer.max", "1g")
// Register classes for better Kryo performance
conf.registerKryoClasses(Array(
classOf[MyCustomClass],
classOf[AnotherClass]
))
val sc = new SparkContext(conf)
// Prefer serialized storage for large objects
val serializedRDD = largeObjectRDD.persist(StorageLevel.MEMORY_ONLY_SER)// Configure block sizes for different access patterns
val conf = new SparkConf()
.set("spark.storage.blockManagerPort", "0")
.set("spark.storage.blockManagerHeartbeatMs", "10s")
.set("spark.storage.getBlockTimeoutMs", "60s")
// For streaming workloads
val streamingConf = new SparkConf()
.set("spark.storage.memoryMapThreshold", "128k") // Smaller threshold
.set("spark.storage.unrollMemoryThreshold", "1m") // Smaller unroll buffer
// For batch workloads with large blocks
val batchConf = new SparkConf()
.set("spark.storage.memoryMapThreshold", "8m") // Larger threshold
.set("spark.storage.unrollMemoryThreshold", "16m") // Larger unroll buffer// Check RDD storage information
def printStorageInfo(rdd: RDD[_]): Unit = {
println(s"RDD ${rdd.id} Storage Info:")
println(s" Name: ${rdd.name}")
println(s" Storage Level: ${rdd.getStorageLevel}")
println(s" Is Cached: ${rdd.getStorageLevel != StorageLevel.NONE}")
println(s" Is Checkpointed: ${rdd.isCheckpointed}")
println(s" Partitions: ${rdd.partitions.length}")
if (rdd.isCheckpointed) {
println(s" Checkpoint File: ${rdd.getCheckpointFile}")
}
}
// Check storage status across cluster
def printClusterStorageStatus(sc: SparkContext): Unit = {
val status = sc.getExecutorMemoryStatus
println("Cluster Storage Status:")
status.foreach { case (executorId, (maxMemory, remainingMemory)) =>
val usedMemory = maxMemory - remainingMemory
val usagePercent = (usedMemory.toDouble / maxMemory) * 100
println(f" Executor $executorId:")
println(f" Total Memory: ${maxMemory / (1024 * 1024)}%,d MB")
println(f" Used Memory: ${usedMemory / (1024 * 1024)}%,d MB")
println(f" Usage: $usagePercent%.1f%%")
}
}class CacheMonitor(sc: SparkContext) {
def getCachedRDDs(): Array[(Int, String, StorageLevel)] = {
sc.getPersistentRDDs.values.map { rdd =>
(rdd.id, rdd.name, rdd.getStorageLevel)
}.toArray
}
def clearAllCache(): Unit = {
sc.getPersistentRDDs.values.foreach(_.unpersist())
}
def clearCacheByName(namePattern: String): Int = {
val regex = namePattern.r
var cleared = 0
sc.getPersistentRDDs.values.foreach { rdd =>
if (rdd.name != null && regex.findFirstIn(rdd.name).isDefined) {
rdd.unpersist()
cleared += 1
}
}
cleared
}
def getStorageStats(): Map[StorageLevel, Int] = {
sc.getPersistentRDDs.values
.groupBy(_.getStorageLevel)
.mapValues(_.size)
}
}
// Usage
val monitor = new CacheMonitor(sc)
// Before processing
println("Cached RDDs before processing:")
monitor.getCachedRDDs().foreach { case (id, name, level) =>
println(s" RDD $id ($name): $level")
}
// Process data with caching
val results = processDataWithCaching()
// After processing
println("\nStorage statistics:")
monitor.getStorageStats().foreach { case (level, count) =>
println(s" $level: $count RDDs")
}
// Cleanup
monitor.clearAllCache()// Good practices
val optimizedRDD = inputRDD
.filter(isRelevant) // Filter early to reduce data size
.map(lightweightTransform) // Apply cheap transformations first
.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist before expensive operations
.map(expensiveTransform) // Apply expensive operations after persistence
// Avoid anti-patterns
val inefficientRDD = inputRDD
.persist(StorageLevel.MEMORY_ONLY) // Persisting before filtering
.filter(isRelevant) // Could reduce memory pressure first
.map(heavyTransformation) // Heavy operation on larger datasetInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11