CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

storage-persistence.mddocs/

Storage and Persistence

Spark provides fine-grained control over RDD caching and persistence strategies across memory and disk, enabling optimization for different data access patterns and cluster configurations.

StorageLevel

StorageLevel defines how and where RDDs are stored when persisted.

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int) {
  
  def useDisk: Boolean
  def useMemory: Boolean  
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
  def clone(): StorageLevel
  def isValid: Boolean
  def toInt: Int
  def writeExternal(out: ObjectOutput): Unit
  def readExternal(in: ObjectInput): Unit
}

Predefined Storage Levels

object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
  
  def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int): StorageLevel
  def fromString(s: String): StorageLevel
}

Storage Level Characteristics

Storage LevelMemoryDiskSerializedReplicationUse Case
MEMORY_ONLYYesNoNo1Fast access, sufficient memory
MEMORY_ONLY_SERYesNoYes1Memory-constrained, CPU available
MEMORY_AND_DISKYesYesNo1Fallback to disk when memory full
MEMORY_AND_DISK_SERYesYesYes1Memory + CPU constrained
DISK_ONLYNoYesYes1Large datasets, infrequent access
MEMORY_ONLY_2YesNoNo2Fast access + fault tolerance
MEMORY_AND_DISK_2YesYesNo2Balanced performance + fault tolerance
OFF_HEAPOff-heapNoYes1Avoid GC overhead

Persistence Operations

Basic Persistence

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

val sc = new SparkContext(new SparkConf().setAppName("Persistence Example").setMaster("local[*]"))

// Create an expensive RDD
val expensiveRDD = sc.textFile("large-dataset.txt")
  .filter(_.nonEmpty)
  .map(complexTransformation)
  .filter(complexFilter)

// Cache in memory (shorthand for MEMORY_ONLY)
val cachedRDD = expensiveRDD.cache()

// Explicit persistence with custom storage level
val persistedRDD = expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

// Use the cached RDD multiple times - computation happens only once
val count1 = cachedRDD.count()
val count2 = cachedRDD.filter(_.contains("error")).count()
val sample = cachedRDD.take(10)

// Check storage level
println(s"Storage level: ${cachedRDD.getStorageLevel}")

// Remove from cache when no longer needed
cachedRDD.unpersist()

Persistence Strategies by Use Case

// High-frequency access, sufficient memory
val frequentlyUsedRDD = inputRDD
  .map(preprocessData)
  .persist(StorageLevel.MEMORY_ONLY)

// Large dataset with memory constraints  
val largeRDD = inputRDD
  .flatMap(expandData)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

// Critical data requiring fault tolerance
val criticalRDD = inputRDD
  .map(importantTransformation)
  .persist(StorageLevel.MEMORY_AND_DISK_2)

// Infrequently accessed large dataset
val archivalRDD = inputRDD
  .map(heavyProcessing)
  .persist(StorageLevel.DISK_ONLY)

// Avoiding GC pressure for long-lived RDDs
val longLivedRDD = inputRDD
  .map(createLargeObjects)
  .persist(StorageLevel.OFF_HEAP)

Checkpointing

Checkpointing saves RDD data to reliable storage (like HDFS) to truncate lineage and improve fault tolerance.

Basic Checkpointing

// Set checkpoint directory (must be fault-tolerant storage)
sc.setCheckpointDir("hdfs://namenode:port/checkpoints")

val longLineageRDD = sc.textFile("input.txt")
  .map(transformation1)
  .filter(filter1)
  .map(transformation2)
  .filter(filter2)
  .map(transformation3)
  .filter(filter3)
  .map(transformation4)

// Mark for checkpointing
longLineageRDD.checkpoint()

// Trigger checkpointing with an action
val count = longLineageRDD.count()

// Verify checkpointing
println(s"Is checkpointed: ${longLineageRDD.isCheckpointed}")
println(s"Checkpoint file: ${longLineageRDD.getCheckpointFile}")

// The RDD lineage is now truncated - subsequent failures will recover from checkpoint
val furtherProcessed = longLineageRDD.map(additionalTransformation)

Local Checkpointing

// Local checkpointing (persists to local executor disk)
val rddToCheckpoint = expensiveComputationRDD.localCheckpoint()

// Trigger checkpointing
rddToCheckpoint.count()

// Use the checkpointed RDD
val results = rddToCheckpoint.map(finalTransformation).collect()

Strategic Checkpointing

// Checkpoint at strategic points in long pipelines
val pipeline = sc.textFile("massive-dataset.txt")
  .map(parseRecord)
  .filter(isValid)
  .map(enrichWithLookup)
  .filter(passesQualityCheck)

// Checkpoint after expensive preprocessing
val preprocessed = pipeline.checkpoint()
preprocessed.count() // Materialize checkpoint

// Continue pipeline from checkpoint
val aggregated = preprocessed
  .map(extractFeatures)
  .groupByKey()
  .mapValues(aggregate)

// Checkpoint again before final processing
val checkpoint2 = aggregated.checkpoint()
checkpoint2.count()

val finalResults = checkpoint2
  .map(finalTransformation)
  .collect()

Advanced Persistence Patterns

Conditional Persistence

def smartPersist[T](rdd: RDD[T], estimatedSize: Long, memoryAvailable: Long): RDD[T] = {
  val storageLevel = if (estimatedSize < memoryAvailable * 0.7) {
    StorageLevel.MEMORY_ONLY
  } else if (estimatedSize < memoryAvailable * 1.5) {
    StorageLevel.MEMORY_AND_DISK_SER
  } else {
    StorageLevel.DISK_ONLY
  }
  
  rdd.persist(storageLevel)
}

// Usage
val processedRDD = inputRDD.map(expensiveTransformation)
val estimatedSize = processedRDD.count() * averageRecordSizeBytes
val smartPersistedRDD = smartPersist(processedRDD, estimatedSize, availableMemoryBytes)

Multi-level Caching Strategy

class CacheManager(sc: SparkContext) {
  private val cachedRDDs = mutable.Map[String, RDD[_]]()
  
  def cacheWithEviction[T](name: String, rdd: RDD[T], level: StorageLevel): RDD[T] = {
    // Evict old cache if memory is getting full
    if (getMemoryUsage() > 0.8) {
      evictLeastRecentlyUsed()
    }
    
    val cached = rdd.persist(level)
    cachedRDDs(name) = cached
    cached
  }
  
  def uncache(name: String): Unit = {
    cachedRDDs.get(name).foreach(_.unpersist())
    cachedRDDs.remove(name)
  }
  
  private def getMemoryUsage(): Double = {
    // Implementation to check memory usage
    val memoryStatus = sc.getExecutorMemoryStatus
    val totalMemory = memoryStatus.values.map(_._1).sum
    val usedMemory = memoryStatus.values.map(m => m._1 - m._2).sum
    usedMemory.toDouble / totalMemory
  }
  
  private def evictLeastRecentlyUsed(): Unit = {
    // Implementation to evict based on access patterns
    cachedRDDs.headOption.foreach { case (name, rdd) =>
      rdd.unpersist()
      cachedRDDs.remove(name)
    }
  }
}

Temperature-based Storage

object DataTemperature extends Enumeration {
  type DataTemperature = Value
  val HOT, WARM, COLD = Value
}

import DataTemperature._

class TemperatureAwareStorage(sc: SparkContext) {
  def persist[T](rdd: RDD[T], temperature: DataTemperature): RDD[T] = {
    val storageLevel = temperature match {
      case HOT  => StorageLevel.MEMORY_ONLY        // Frequently accessed
      case WARM => StorageLevel.MEMORY_AND_DISK_SER // Occasionally accessed  
      case COLD => StorageLevel.DISK_ONLY          // Rarely accessed
    }
    rdd.persist(storageLevel)
  }
}

// Usage
val storage = new TemperatureAwareStorage(sc)

val hotData = storage.persist(frequentlyUsedRDD, HOT)
val warmData = storage.persist(occasionallyUsedRDD, WARM)  
val coldData = storage.persist(archivalRDD, COLD)

Performance Optimization

Memory Fraction Configuration

// Configure storage memory fraction in SparkConf
val conf = new SparkConf()
  .setAppName("Storage Optimization")
  .set("spark.storage.memoryFraction", "0.6")         // 60% for storage (legacy)
  .set("spark.storage.unrollMemoryFraction", "0.2")   // 20% for unrolling (legacy)
  .set("spark.storage.memoryMapThreshold", "2m")      // Memory map files > 2MB
  .set("spark.storage.blockManagerSlaveTimeoutMs", "120s") // Block manager timeout

// Unified memory manager (Spark 1.6+)
val modernConf = new SparkConf()
  .set("spark.memory.useLegacyMode", "false")
  .set("spark.memory.storageFraction", "0.5")         // 50% of heap for storage

Serialization Optimization

// Use Kryo serialization for better performance
val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.unsafe", "true")
  .set("spark.kryoserializer.buffer.max", "1g")

// Register classes for better Kryo performance
conf.registerKryoClasses(Array(
  classOf[MyCustomClass],
  classOf[AnotherClass]
))

val sc = new SparkContext(conf)

// Prefer serialized storage for large objects
val serializedRDD = largeObjectRDD.persist(StorageLevel.MEMORY_ONLY_SER)

Block Size Optimization

// Configure block sizes for different access patterns
val conf = new SparkConf()
  .set("spark.storage.blockManagerPort", "0")
  .set("spark.storage.blockManagerHeartbeatMs", "10s")
  .set("spark.storage.getBlockTimeoutMs", "60s")

// For streaming workloads
val streamingConf = new SparkConf()
  .set("spark.storage.memoryMapThreshold", "128k")    // Smaller threshold
  .set("spark.storage.unrollMemoryThreshold", "1m")  // Smaller unroll buffer

// For batch workloads with large blocks
val batchConf = new SparkConf() 
  .set("spark.storage.memoryMapThreshold", "8m")     // Larger threshold
  .set("spark.storage.unrollMemoryThreshold", "16m") // Larger unroll buffer

Monitoring and Debugging

Storage Information

// Check RDD storage information
def printStorageInfo(rdd: RDD[_]): Unit = {
  println(s"RDD ${rdd.id} Storage Info:")
  println(s"  Name: ${rdd.name}")
  println(s"  Storage Level: ${rdd.getStorageLevel}")
  println(s"  Is Cached: ${rdd.getStorageLevel != StorageLevel.NONE}")
  println(s"  Is Checkpointed: ${rdd.isCheckpointed}")
  println(s"  Partitions: ${rdd.partitions.length}")
  
  if (rdd.isCheckpointed) {
    println(s"  Checkpoint File: ${rdd.getCheckpointFile}")
  }
}

// Check storage status across cluster
def printClusterStorageStatus(sc: SparkContext): Unit = {
  val status = sc.getExecutorMemoryStatus
  
  println("Cluster Storage Status:")
  status.foreach { case (executorId, (maxMemory, remainingMemory)) =>
    val usedMemory = maxMemory - remainingMemory
    val usagePercent = (usedMemory.toDouble / maxMemory) * 100
    
    println(f"  Executor $executorId:")
    println(f"    Total Memory: ${maxMemory / (1024 * 1024)}%,d MB")
    println(f"    Used Memory: ${usedMemory / (1024 * 1024)}%,d MB")
    println(f"    Usage: $usagePercent%.1f%%")
  }
}

Cache Management Utilities

class CacheMonitor(sc: SparkContext) {
  def getCachedRDDs(): Array[(Int, String, StorageLevel)] = {
    sc.getPersistentRDDs.values.map { rdd =>
      (rdd.id, rdd.name, rdd.getStorageLevel)
    }.toArray
  }
  
  def clearAllCache(): Unit = {
    sc.getPersistentRDDs.values.foreach(_.unpersist())
  }
  
  def clearCacheByName(namePattern: String): Int = {
    val regex = namePattern.r
    var cleared = 0
    
    sc.getPersistentRDDs.values.foreach { rdd =>
      if (rdd.name != null && regex.findFirstIn(rdd.name).isDefined) {
        rdd.unpersist()
        cleared += 1
      }
    }
    cleared
  }
  
  def getStorageStats(): Map[StorageLevel, Int] = {
    sc.getPersistentRDDs.values
      .groupBy(_.getStorageLevel)
      .mapValues(_.size)
  }
}

// Usage
val monitor = new CacheMonitor(sc)

// Before processing
println("Cached RDDs before processing:")
monitor.getCachedRDDs().foreach { case (id, name, level) =>
  println(s"  RDD $id ($name): $level")
}

// Process data with caching
val results = processDataWithCaching()

// After processing
println("\nStorage statistics:")
monitor.getStorageStats().foreach { case (level, count) =>
  println(s"  $level: $count RDDs")
}

// Cleanup
monitor.clearAllCache()

Best Practices

When to Persist

  • RDD is used multiple times in the application
  • RDD has expensive computation (complex transformations, joins)
  • RDD has many dependencies (long lineage)
  • Recovery time would be significant

When to Checkpoint

  • Very long RDD lineages (> 10 transformations)
  • Wide transformations that are expensive to recompute
  • Before expensive iterative algorithms
  • When using RDDs across multiple Spark applications

Storage Level Selection

  • MEMORY_ONLY: Fast access, sufficient memory, objects don't need serialization
  • MEMORY_ONLY_SER: Memory limited, CPU available, objects benefit from serialization
  • MEMORY_AND_DISK: Balanced approach, fallback for memory pressure
  • MEMORY_AND_DISK_SER: Memory and CPU constrained, good balance
  • DISK_ONLY: Very large datasets, infrequent access
  • _2 variants: When fault tolerance is critical and cluster is unreliable

Performance Tips

// Good practices
val optimizedRDD = inputRDD
  .filter(isRelevant)           // Filter early to reduce data size
  .map(lightweightTransform)    // Apply cheap transformations first
  .persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist before expensive operations
  .map(expensiveTransform)      // Apply expensive operations after persistence

// Avoid anti-patterns
val inefficientRDD = inputRDD
  .persist(StorageLevel.MEMORY_ONLY) // Persisting before filtering
  .filter(isRelevant)               // Could reduce memory pressure first
  .map(heavyTransformation)         // Heavy operation on larger dataset

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json