or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

build-info.mdexception-handling.mdindex.mdjava-functions.mdlogging.mdstorage-management.mdutilities.md
tile.json

storage-management.mddocs/

Storage Management

RDD storage level management system for controlling persistence strategies across memory, disk, and off-heap storage with configurable replication and serialization options.

Capabilities

StorageLevel Class

Main class for configuring RDD persistence behavior with fine-grained control over storage tiers and replication.

/**
 * Flags for controlling the storage of an RDD
 * Records whether to use memory, disk, or off-heap storage, serialization preferences, and replication factor
 * @param useDisk Whether to store RDD partitions on disk
 * @param useMemory Whether to store RDD partitions in memory
 * @param useOffHeap Whether to use off-heap memory storage
 * @param deserialized Whether to keep data in deserialized form
 * @param replication Number of replicas to maintain (default 1)
 */
@DeveloperApi
class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean, 
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int = 1
) extends Externalizable {

  /** Returns whether this storage level uses disk storage */
  def useDisk: Boolean
  
  /** Returns whether this storage level uses memory storage */
  def useMemory: Boolean
  
  /** Returns whether this storage level uses off-heap memory */
  def useOffHeap: Boolean
  
  /** Returns whether data is stored in deserialized form */
  def deserialized: Boolean
  
  /** Returns the replication factor */
  def replication: Int
  
  /** Returns the memory mode (ON_HEAP or OFF_HEAP) */
  private[spark] def memoryMode: MemoryMode
  
  /** Validates that the storage level configuration is valid */
  def isValid: Boolean
  
  /** Returns human-readable description of the storage configuration */
  def description: String
  
  /** Creates a copy of this storage level */
  override def clone(): StorageLevel
  
  /** Converts storage level to integer representation for serialization */
  def toInt: Int
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Using predefined storage levels
val memoryOnly = StorageLevel.MEMORY_ONLY
val memoryAndDisk = StorageLevel.MEMORY_AND_DISK_SER

// Custom storage level creation
val customLevel = StorageLevel(
  useDisk = true,
  useMemory = true, 
  deserialized = false,
  replication = 2
)

// Checking storage level properties
if (customLevel.useMemory) {
  println(s"Uses memory: ${customLevel.memoryMode}")
}
println(s"Configuration: ${customLevel.description}")

// Applying to RDDs (conceptual - would be used in broader Spark context)
// rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

StorageLevel Companion Object

Factory methods and predefined constants for common storage configurations.

object StorageLevel {
  /** Predefined storage levels for common use cases */
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel  
  val DISK_ONLY_2: StorageLevel
  val DISK_ONLY_3: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel

  /** Create StorageLevel from string representation */
  @DeveloperApi
  def fromString(s: String): StorageLevel
  
  /** Create StorageLevel with full parameter control */
  @DeveloperApi
  def apply(
    useDisk: Boolean,
    useMemory: Boolean,
    useOffHeap: Boolean, 
    deserialized: Boolean,
    replication: Int
  ): StorageLevel
  
  /** Create StorageLevel without off-heap parameter */
  @DeveloperApi  
  def apply(
    useDisk: Boolean,
    useMemory: Boolean,
    deserialized: Boolean,
    replication: Int = 1
  ): StorageLevel
  
  /** Create StorageLevel from integer flags and replication */
  @DeveloperApi
  def apply(flags: Int, replication: Int): StorageLevel
  
  /** Create StorageLevel from ObjectInput stream */
  @DeveloperApi
  def apply(in: ObjectInput): StorageLevel
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Using predefined levels
val diskOnly = StorageLevel.DISK_ONLY
val memoryAndDiskSer = StorageLevel.MEMORY_AND_DISK_SER_2

// Creating custom levels
val highReplication = StorageLevel(
  useDisk = true,
  useMemory = true,
  useOffHeap = false,
  deserialized = true,
  replication = 3
)

val serializedMemory = StorageLevel(
  useDisk = false,
  useMemory = true, 
  deserialized = false,
  replication = 2
)

// String-based creation
val fromString = StorageLevel.fromString("MEMORY_AND_DISK")

// Validation
if (highReplication.isValid) {
  println(s"Valid configuration: ${highReplication.description}")
}

Storage Level Patterns

Common storage configuration patterns for different use cases.

// Performance-oriented: Fast access, memory preferred
val performanceLevel = StorageLevel.MEMORY_ONLY

// Reliability-oriented: Fault tolerance with replication  
val reliableLevel = StorageLevel.MEMORY_AND_DISK_2

// Memory-efficient: Serialized storage to reduce memory footprint
val efficientLevel = StorageLevel.MEMORY_ONLY_SER

// Large dataset: Disk storage with memory caching
val largeDataLevel = StorageLevel.MEMORY_AND_DISK

// Fault-tolerant large dataset: Disk + memory with replication
val faultTolerantLevel = StorageLevel.MEMORY_AND_DISK_SER_2

// Off-heap storage: Avoid GC pressure
val offHeapLevel = StorageLevel.OFF_HEAP

// No persistence: Recompute on demand
val noPersistence = StorageLevel.NONE

Memory Mode Integration

Storage levels integrate with Spark's memory management system.

/**
 * Memory storage mode enumeration
 * Controls whether data is stored on-heap or off-heap
 */
@Private
public enum MemoryMode {
  /** Standard JVM heap memory */
  ON_HEAP,
  
  /** Off-heap memory (e.g., using sun.misc.Unsafe) */
  OFF_HEAP
}

Usage Examples:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.memory.MemoryMode

val onHeapLevel = StorageLevel.MEMORY_ONLY
val offHeapLevel = StorageLevel.OFF_HEAP

// Check memory mode
onHeapLevel.memoryMode match {
  case MemoryMode.ON_HEAP => println("Using JVM heap memory")
  case MemoryMode.OFF_HEAP => println("Using off-heap memory") 
}

// Off-heap storage reduces GC pressure but requires configuration
val offHeapConfig = StorageLevel(
  useDisk = true,
  useMemory = true,
  useOffHeap = true,
  deserialized = false,  // Off-heap is always serialized
  replication = 1
)

Storage Strategy Selection

Performance Considerations

  • MEMORY_ONLY: Fastest access, limited by available memory
  • MEMORY_ONLY_SER: Reduced memory footprint, CPU overhead for serialization
  • MEMORY_AND_DISK: Automatic spill to disk when memory full
  • DISK_ONLY: Slowest but handles unlimited data sizes
  • OFF_HEAP: Avoids GC pressure, requires additional configuration

Fault Tolerance Considerations

  • Replication Factor: Higher replication increases fault tolerance but uses more resources
  • Disk Storage: Provides durability across node failures
  • Memory + Disk: Balances performance and reliability

Memory Management Considerations

  • Deserialized: Faster access but higher memory usage
  • Serialized: Compact storage but CPU overhead for access
  • Off-heap: Avoids GC impact but requires serialized format

Selection Guidelines

// Interactive/iterative workloads - prioritize speed
val interactiveLevel = StorageLevel.MEMORY_ONLY

// Production ETL - balance performance and reliability  
val etlLevel = StorageLevel.MEMORY_AND_DISK_SER

// Large reference datasets - efficient memory usage
val referenceLevel = StorageLevel.MEMORY_ONLY_SER_2

// Temporary intermediate results - fast but can recompute
val temporaryLevel = StorageLevel.MEMORY_ONLY

// Critical datasets - maximum fault tolerance
val criticalLevel = StorageLevel.MEMORY_AND_DISK_2