or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md
tile.json

storage-caching.mddocs/

Storage and Caching

StorageLevel

Storage levels define how RDDs are persisted across memory, disk, and off-heap storage with various serialization and replication options.

class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int
) {
  def useDisk: Boolean
  def useMemory: Boolean
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
  
  def clone(): StorageLevel
  def writeExternal(out: ObjectOutput): Unit
  def readExternal(in: ObjectInput): Unit
}

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false, 1)
  val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

RDD Persistence Methods

Methods available on all RDD types for controlling persistence behavior.

// From RDD[T]
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]  
def cache(): RDD[T]
def unpersist(blocking: Boolean = false): RDD[T]
def getStorageLevel: StorageLevel
def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]

BlockManager

Internal storage management system (some public interfaces available for advanced usage).

class BlockManager(
  executorId: String,
  rpcEnv: RpcEnv,
  master: BlockManagerMaster,
  serializerManager: SerializerManager,
  conf: SparkConf,
  memoryManager: MemoryManager,
  mapOutputTracker: MapOutputTracker,
  shuffleManager: ShuffleManager,
  blockTransferService: BlockTransferService,
  securityManager: SecurityManager,
  numUsableCores: Int
) {
  // Public methods for advanced users
  def putSingle[T: ClassTag](blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean
  def putIterator[T: ClassTag](blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean
  def getLocalValues(blockId: BlockId): Option[BlockResult]
  def getOrElseUpdate[T](blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]]
  def remove(blockId: BlockId, tellMaster: Boolean = true): Boolean
}

Storage Configuration

Key configuration properties for storage and caching behavior.

Memory Management

  • spark.storage.memoryFraction - Fraction of JVM heap space used for storage cache (deprecated in favor of unified memory management)
  • spark.storage.safetyFraction - Safety fraction to prevent storage region from using entire heap
  • spark.storage.unrollFraction - Fraction of storage memory used for unrolling blocks
  • spark.storage.replication.proactive - Enable proactive block replication
  • spark.storage.blockManagerTimeoutIntervalMs - Timeout for block manager operations

Disk Storage

  • spark.storage.diskStore.subDirectories - Number of subdirectories for disk storage
  • spark.storage.localDirs - Directories for storing blocks on disk
  • spark.storage.localDirs.fallback - Fallback directory if localDirs not available

Off-Heap Storage

  • spark.memory.offHeap.enabled - Enable off-heap storage
  • spark.memory.offHeap.size - Size of off-heap storage region

Usage Examples

Basic Persistence

val expensiveData = sc.textFile("large-dataset.txt")
  .map(processExpensiveOperation)
  .filter(complexFilter)

// Cache for multiple uses
expensiveData.cache()

// Use multiple times (only computed once)
val count = expensiveData.count()
val sample = expensiveData.take(10)
val stats = expensiveData.map(_.length).stats()

// Clean up when done
expensiveData.unpersist()

Storage Level Selection

import org.apache.spark.storage.StorageLevel

val data = sc.parallelize(1 to 1000000)

// Memory only (fastest access, but limited by memory)
data.persist(StorageLevel.MEMORY_ONLY)

// Memory and disk (spills to disk if memory full)
data.persist(StorageLevel.MEMORY_AND_DISK)

// Serialized storage (more memory efficient, slower access)
data.persist(StorageLevel.MEMORY_ONLY_SER)

// With replication for fault tolerance
data.persist(StorageLevel.MEMORY_AND_DISK_2)

// Off-heap storage (requires off-heap memory configuration)
data.persist(StorageLevel.OFF_HEAP)

Checkpointing

// Set checkpoint directory (must be fault-tolerant storage like HDFS)
sc.setCheckpointDir("hdfs://namenode:port/checkpoint")

val iterativeData = sc.textFile("input.txt")
  .map(parseData)
  .filter(isValid)

// Checkpoint to break long lineage chains
iterativeData.checkpoint()

// Force materialization to complete checkpoint
iterativeData.count()

// Now safe to use in iterative algorithms
var current = iterativeData
for (i <- 1 to 10) {
  current = current.map(iterativeFunction)
  if (i % 3 == 0) {
    current.checkpoint()
    current.count() // Force checkpoint
  }
}

Advanced Persistence Patterns

// Pattern 1: Conditional persistence based on data size
val rdd = sc.textFile("data.txt").map(transform)
val estimatedSize = rdd.take(1000).map(_.length).sum * (rdd.count() / 1000.0)

val persistedRDD = if (estimatedSize > 1000000) {
  rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
} else {
  rdd.persist(StorageLevel.MEMORY_ONLY)
}

// Pattern 2: Layered persistence for different access patterns
val rawData = sc.textFile("data.txt")
val cleanedData = rawData.map(clean).persist(StorageLevel.MEMORY_AND_DISK)
val aggregatedData = cleanedData.groupByKey().persist(StorageLevel.MEMORY_ONLY)

// Use cleaned data for multiple transformations
val result1 = cleanedData.filter(condition1).collect()
val result2 = cleanedData.filter(condition2).collect()

// Use aggregated data for summary statistics
val summary1 = aggregatedData.mapValues(_.size).collect()
val summary2 = aggregatedData.mapValues(_.sum).collect()

Monitoring Storage Usage

// Get current storage level
val level = rdd.getStorageLevel
println(s"Storage level: $level")

// Check if RDD is cached
if (level != StorageLevel.NONE) {
  println("RDD is persisted")
}

// Monitor through Spark UI
// Access http://driver:4040 to see Storage tab with:
// - RDD storage levels
// - Memory usage
// - Disk usage  
// - Partition information

Storage Level Guidelines

MEMORY_ONLY

  • Use for: Small to medium datasets that fit in memory
  • Best for: Frequently accessed data with fast transformations
  • Avoid if: Dataset is larger than available memory

MEMORY_AND_DISK

  • Use for: Large datasets with frequent access
  • Best for: Iterative algorithms, multiple actions on same RDD
  • Trade-off: Some operations may spill to disk

MEMORY_ONLY_SER

  • Use for: Memory-constrained environments
  • Best for: Large objects that compress well
  • Trade-off: CPU overhead for serialization/deserialization

DISK_ONLY

  • Use for: Very large datasets, limited memory
  • Best for: One-time processing of massive datasets
  • Trade-off: Slower access due to disk I/O

Replication (_2 variants)

  • Use for: Critical data requiring fault tolerance
  • Best for: Long-running applications in unreliable environments
  • Trade-off: Double storage cost, network overhead

OFF_HEAP

  • Use for: Very large datasets with configured off-heap storage
  • Best for: Reducing GC pressure in memory-intensive applications
  • Requirements: Requires spark.memory.offHeap.enabled=true