or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md
tile.json

storage-persistence.mddocs/

Storage and Persistence

Storage levels and caching mechanisms for optimizing RDD persistence across memory and disk with various replication strategies.

Capabilities

StorageLevel

Defines how RDD partitions are stored, including memory usage, disk usage, and replication settings.

/**
 * Storage level for RDD persistence, defining memory/disk usage and replication
 */
class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int) extends Externalizable {
  
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  
  override def toString: String = {
    "StorageLevel(%s, %s, %s, %s, %d)".format(
      _useDisk, _useMemory, _useOffHeap, _deserialized, _replication)
  }
}

object StorageLevel {
  // Predefined storage levels
  val NONE = new StorageLevel(false, false, false, false, 1)
  
  val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  
  val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  
  val OFF_HEAP = new StorageLevel(false, false, true, false, 1)
  
  /** Create a new StorageLevel with custom settings */
  def apply(
    useDisk: Boolean,
    useMemory: Boolean,
    useOffHeap: Boolean,
    deserialized: Boolean,
    replication: Int): StorageLevel = {
    new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
  }
}

RDD Persistence Methods

Methods available on RDD for controlling persistence and caching behavior.

// RDD persistence methods
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def persist(storageLevel: StorageLevel): RDD[T]
def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def unpersist(blocking: Boolean = true): RDD[T]

// Query persistence status
def getStorageLevel: StorageLevel
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]

// Checkpointing methods
def checkpoint(): Unit
def localCheckpoint(): RDD[T]

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

val sc = new SparkContext(new SparkConf().setAppName("Storage Example").setMaster("local[*]"))

val data = sc.textFile("hdfs://large-dataset.txt")

// Basic caching (MEMORY_ONLY)
val cachedData = data.cache()

// Explicit persistence with different storage levels
val memoryOnlyData = data.persist(StorageLevel.MEMORY_ONLY)
val memoryAndDiskData = data.persist(StorageLevel.MEMORY_AND_DISK)
val diskOnlyData = data.persist(StorageLevel.DISK_ONLY)
val serializedData = data.persist(StorageLevel.MEMORY_ONLY_SER)

// Replication for fault tolerance
val replicatedData = data.persist(StorageLevel.MEMORY_ONLY_2) // 2 replicas

// Off-heap storage (requires off-heap memory configuration)
val offHeapData = data.persist(StorageLevel.OFF_HEAP)

// Custom storage level
val customStorage = StorageLevel(
  useDisk = true,
  useMemory = true,
  useOffHeap = false,
  deserialized = false, // serialized for memory efficiency
  replication = 2
)
val customPersistedData = data.persist(customStorage)

// Use persisted data multiple times
val wordCounts = cachedData
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)

val lineCount = cachedData.count()
val charCount = cachedData.map(_.length).sum()

// Clean up when done
cachedData.unpersist()
memoryAndDiskData.unpersist(blocking = false) // Non-blocking cleanup

Checkpointing

Fault-tolerance mechanism that saves RDD data to reliable storage for recovery.

// SparkContext checkpointing methods
def setCheckpointDir(directory: String): Unit
def getCheckpointDir: Option[String]

// RDD checkpointing methods  
def checkpoint(): Unit
def localCheckpoint(): RDD[T]
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("Checkpoint Example"))

// Set checkpoint directory (should be on reliable storage like HDFS)
sc.setCheckpointDir("hdfs://namenode:port/spark-checkpoints")

val data = sc.textFile("hdfs://input-data.txt")

// Long chain of transformations
val processedData = data
  .filter(_.nonEmpty)
  .map(_.toLowerCase)
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .filter(_._2 > 5)
  .map(_.swap) // (count, word)
  .sortByKey(ascending = false)
  .map(_.swap) // back to (word, count)

// Checkpoint to break lineage and prevent recomputation
processedData.checkpoint()

// Trigger checkpointing (requires an action)
processedData.count()

// Verify checkpointing
println(s"Is checkpointed: ${processedData.isCheckpointed}")
println(s"Checkpoint file: ${processedData.getCheckpointFile}")

// Use checkpointed RDD
val topWords = processedData.take(100)
val totalUniqueWords = processedData.count()

// Local checkpointing (for shorter lineages)
val shortChain = data.map(_.toUpperCase)
val localCheckpointed = shortChain.localCheckpoint()
localCheckpointed.count() // Trigger local checkpointing

Storage Optimization Strategies

Memory Management

// Monitor storage usage
val largeDataset = sc.textFile("hdfs://very-large-file.txt")

// Use serialized storage for memory efficiency
val efficientStorage = largeDataset.persist(StorageLevel.MEMORY_ONLY_SER)

// Check storage usage programmatically
def printStorageInfo(sc: SparkContext): Unit = {
  val storageStatusListener = sc.statusTracker
  val executorInfos = storageStatusListener.getExecutorInfos
  
  executorInfos.foreach { executor =>
    println(s"Executor ${executor.executorId}: " +
      s"${executor.memoryUsed / (1024 * 1024)}MB used, " +
      s"${executor.memoryTotal / (1024 * 1024)}MB total")
  }
}

// Use the storage info
efficientStorage.count()
printStorageInfo(sc)

Adaptive Storage Strategies

import org.apache.spark.storage.StorageLevel

def adaptiveStorage[T](rdd: RDD[T], estimatedSize: Long): RDD[T] = {
  val memoryFraction = 0.6 // Assume 60% of executor memory available for storage
  val executorMemory = sc.getConf.getSizeAsBytes("spark.executor.memory", "1g")
  val availableMemory = (executorMemory * memoryFraction).toLong
  
  val storageLevel = if (estimatedSize < availableMemory) {
    StorageLevel.MEMORY_ONLY
  } else if (estimatedSize < availableMemory * 2) {
    StorageLevel.MEMORY_ONLY_SER
  } else if (estimatedSize < availableMemory * 5) {
    StorageLevel.MEMORY_AND_DISK_SER
  } else {
    StorageLevel.DISK_ONLY
  }
  
  println(s"Using storage level: $storageLevel for estimated size: ${estimatedSize / (1024 * 1024)}MB")
  rdd.persist(storageLevel)
}

// Usage
val dataSize = estimateRDDSize(largeDataset) // Custom function to estimate size
val optimizedRDD = adaptiveStorage(largeDataset, dataSize)

Multi-Stage Processing with Checkpointing

// Complex ETL pipeline with strategic checkpointing
def processLargeDataset(inputPath: String, outputPath: String): Unit = {
  sc.setCheckpointDir("hdfs://checkpoints/etl-pipeline")
  
  // Stage 1: Initial data loading and cleaning
  val rawData = sc.textFile(inputPath)
  val cleanedData = rawData
    .filter(_.nonEmpty)
    .filter(!_.startsWith("#")) // Remove comments
    .map(_.trim)
    .cache() // Cache cleaned data for reuse
  
  val recordCount = cleanedData.count()
  println(s"Loaded and cleaned $recordCount records")
  
  // Stage 2: Complex transformations
  val transformedData = cleanedData
    .map(parseRecord) // Complex parsing
    .filter(_.isValid) // Remove invalid records
    .map(enrichRecord) // Add external data
    .groupBy(_.category)
    .mapValues(aggregateRecords) // Complex aggregation
  
  // Checkpoint after expensive transformations
  transformedData.checkpoint()
  val checkpointCount = transformedData.count() // Trigger checkpointing
  println(s"Checkpointed $checkpointCount transformed records")
  
  // Stage 3: Final processing using checkpointed data
  val finalResults = transformedData
    .filter(_._2.score > threshold)
    .sortBy(_._2.score, ascending = false)
    .take(1000)
  
  // Clean up cached data
  cleanedData.unpersist()
  
  // Save results
  sc.parallelize(finalResults).saveAsTextFile(outputPath)
}

Performance Monitoring

import org.apache.spark.storage.RDDInfo

def monitorRDDStorage(sc: SparkContext): Unit = {
  val rddInfos = sc.getRDDStorageInfo
  
  println("RDD Storage Information:")
  println("=" * 50)
  
  rddInfos.foreach { rddInfo =>
    println(s"RDD ID: ${rddInfo.id}")
    println(s"Name: ${rddInfo.name}")
    println(s"Storage Level: ${rddInfo.storageLevel}")
    println(s"Cached Partitions: ${rddInfo.numCachedPartitions}/${rddInfo.numPartitions}")
    println(s"Memory Size: ${rddInfo.memSize / (1024 * 1024)} MB")
    println(s"Disk Size: ${rddInfo.diskSize / (1024 * 1024)} MB")
    println("-" * 30)
  }
}

// Custom storage metrics
case class StorageMetrics(
  rddId: Int,
  memoryUsed: Long,
  diskUsed: Long,
  partitionsCached: Int,
  totalPartitions: Int
)

def collectStorageMetrics(sc: SparkContext): List[StorageMetrics] = {
  sc.getRDDStorageInfo.map { rddInfo =>
    StorageMetrics(
      rddId = rddInfo.id,
      memoryUsed = rddInfo.memSize,
      diskUsed = rddInfo.diskSize,
      partitionsCached = rddInfo.numCachedPartitions,
      totalPartitions = rddInfo.numPartitions
    )
  }.toList
}

// Usage in application
val beforeMetrics = collectStorageMetrics(sc)
// ... perform operations ...
val afterMetrics = collectStorageMetrics(sc)

// Compare metrics
println("Storage usage changes:")
afterMetrics.foreach { after =>
  beforeMetrics.find(_.rddId == after.rddId) match {
    case Some(before) =>
      val memoryChange = after.memoryUsed - before.memoryUsed
      val diskChange = after.diskUsed - before.diskUsed
      println(s"RDD ${after.rddId}: Memory: ${memoryChange / (1024 * 1024)}MB, Disk: ${diskChange / (1024 * 1024)}MB")
    case None =>
      println(s"New RDD ${after.rddId}: Memory: ${after.memoryUsed / (1024 * 1024)}MB, Disk: ${after.diskUsed / (1024 * 1024)}MB")
  }
}

Best Practices

When to Use Different Storage Levels

// 1. MEMORY_ONLY: Small datasets, frequently accessed
val smallFrequentData = sc.parallelize(1 to 1000).persist(StorageLevel.MEMORY_ONLY)

// 2. MEMORY_ONLY_SER: Larger datasets, CPU available for deserialization
val mediumData = sc.textFile("medium-file.txt").persist(StorageLevel.MEMORY_ONLY_SER)

// 3. MEMORY_AND_DISK: Important datasets, some memory pressure
val importantData = sc.textFile("important-file.txt").persist(StorageLevel.MEMORY_AND_DISK)

// 4. DISK_ONLY: Very large datasets, limited memory
val hugeData = sc.textFile("huge-file.txt").persist(StorageLevel.DISK_ONLY)

// 5. Replication levels: Critical data in fault-prone environments
val criticalData = sc.textFile("critical-file.txt").persist(StorageLevel.MEMORY_AND_DISK_2)

Efficient Cache Management

// Cache management pattern
class CacheManager(sc: SparkContext) {
  private var cachedRDDs = List.empty[RDD[_]]
  
  def cacheRDD[T](rdd: RDD[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): RDD[T] = {
    val cachedRDD = rdd.persist(storageLevel)
    cachedRDDs = cachedRDD :: cachedRDDs
    cachedRDD
  }
  
  def unpersistAll(): Unit = {
    cachedRDDs.foreach(_.unpersist())
    cachedRDDs = List.empty
  }
  
  def getStorageInfo: List[RDDInfo] = {
    cachedRDDs.flatMap { rdd =>
      sc.getRDDStorageInfo.find(_.id == rdd.id)
    }
  }
}

// Usage
val cacheManager = new CacheManager(sc)

val data1 = cacheManager.cacheRDD(sc.textFile("file1.txt"))
val data2 = cacheManager.cacheRDD(sc.textFile("file2.txt"), StorageLevel.MEMORY_ONLY_SER)

// Process data...

// Clean up all cached RDDs
cacheManager.unpersistAll()

Checkpointing Strategy

// Intelligent checkpointing based on lineage depth
def smartCheckpoint[T](rdd: RDD[T], maxLineageDepth: Int = 10): RDD[T] = {
  def countLineageDepth(rdd: RDD[_]): Int = {
    rdd.dependencies.map {
      case dep => countLineageDepth(dep.rdd) + 1
    }.foldLeft(0)(math.max)
  }
  
  val depth = countLineageDepth(rdd)
  if (depth > maxLineageDepth) {
    rdd.checkpoint()
    println(s"Checkpointing RDD with lineage depth: $depth")
  }
  rdd
}

// Usage in complex pipeline
val result = inputData
  .map(transform1)
  .filter(filter1)
  .map(transform2)
  .groupByKey()
  .map(transform3) // This might have deep lineage
  
val checkpointedResult = smartCheckpoint(result)
checkpointedResult.count() // Trigger checkpointing if needed

// Continue processing with potentially checkpointed RDD
val finalResult = checkpointedResult
  .filter(finalFilter)
  .sortBy(_._2)