or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md
tile.json

storage-persistence.mddocs/

Storage and Persistence

Data storage levels and caching mechanisms for optimizing repeated access to RDDs with configurable memory and disk usage strategies.

Capabilities

StorageLevel

Configuration for how RDDs are stored when persisted, controlling memory usage, disk usage, serialization, and replication.

/**
 * Storage level configuration for RDD persistence
 * @param useDisk whether to use disk storage
 * @param useMemory whether to use memory storage
 * @param useOffHeap whether to use off-heap memory
 * @param deserialized whether to store in deserialized format
 * @param replication number of replicas to maintain
 */
class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int
) extends Externalizable {
  
  def useDisk: Boolean
  def useMemory: Boolean
  def useOffHeap: Boolean 
  def deserialized: Boolean
  def replication: Int
  
  /** Create a copy with different replication factor */
  def clone(newReplication: Int): StorageLevel
  
  /** Check if this storage level is valid */
  def isValid: Boolean
}

object StorageLevel {
  /** Memory only, deserialized */
  val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
  
  /** Memory only, serialized */
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
  
  /** Memory and disk spillover, deserialized */
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
  
  /** Memory and disk spillover, serialized */
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
  
  /** Disk only */
  val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
  
  /** Memory only with 2x replication, deserialized */
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  
  /** Memory and disk with 2x replication, deserialized */
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, false, 2)
  
  /** Off-heap memory only */
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
  
  /** No storage (unpersist) */
  val NONE = new StorageLevel(false, false, false, false, 1)
  
  /** Apply method for creating custom storage levels */
  def apply(
    useDisk: Boolean,
    useMemory: Boolean,
    useOffHeap: Boolean = false,
    deserialized: Boolean = true,
    replication: Int = 1
  ): StorageLevel
}

RDD Persistence Methods

Methods available on RDD for controlling persistence and caching behavior.

abstract class RDD[T: ClassTag] {
  /** Persist RDD with specified storage level */
  def persist(newLevel: StorageLevel = MEMORY_ONLY): this.type
  
  /** Cache RDD in memory (shortcut for persist(MEMORY_ONLY)) */
  def cache(): this.type
  
  /** Remove persisted RDD from storage */
  def unpersist(blocking: Boolean = false): this.type
  
  /** Mark RDD for checkpointing to reliable storage */
  def checkpoint(): Unit
  
  /** Check if RDD is checkpointed */
  def isCheckpointed: Boolean
  
  /** Get checkpoint file if available */
  def getCheckpointFile: Option[String]
  
  /** Get current storage level */
  def getStorageLevel: StorageLevel
  
  /** Check if RDD is cached */
  def isCached: Boolean = getStorageLevel != StorageLevel.NONE
}

BlockManager

Core storage management component handling data blocks across the cluster.

/**
 * Manager for reading and writing data blocks
 */
class BlockManager(
  executorId: String,
  rpcEnv: RpcEnv,
  master: BlockManagerMaster,
  serializerManager: SerializerManager,
  conf: SparkConf,
  memoryManager: MemoryManager,
  mapOutputTracker: MapOutputTracker,
  shuffleManager: ShuffleManager,
  blockTransferService: BlockTransferService,
  securityManager: SecurityManager,
  numUsableCores: Int
) extends BlockDataManager with BlockEvictionHandler with Logging {
  
  /** Initialize block manager */
  def initialize(appId: String): Unit
  
  /** Get local block data */
  def getBlockData(blockId: BlockId): ManagedBuffer
  
  /** Put block data */
  def putBlockData(
    blockId: BlockId,
    data: ManagedBuffer,
    level: StorageLevel,
    classTag: ClassTag[_]
  ): Boolean
  
  /** Get block from local storage or remote */
  def get[T: ClassTag](blockId: BlockId): Option[BlockResult]
  
  /** Put block into storage */
  def putSingle[T: ClassTag](
    blockId: BlockId,
    value: T,
    level: StorageLevel,
    tellMaster: Boolean = true
  ): Boolean
  
  /** Put iterator of values into storage */
  def putIterator[T: ClassTag](
    blockId: BlockId,
    values: Iterator[T],
    level: StorageLevel,
    tellMaster: Boolean = true
  ): Boolean
  
  /** Remove block from storage */
  def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit
  
  /** Get memory status */
  def memoryStatus: Map[BlockId, (StorageLevel, Long, Long)]
  
  /** Get disk usage */
  def diskBlockSize(blockId: BlockId): Long
}

Block Identifiers

Type-safe identifiers for different types of data blocks.

/**
 * Base class for block identifiers
 */
sealed abstract class BlockId {
  def name: String
  def asRDDId: Option[RDDBlockId] = None
}

/** Block identifier for RDD blocks */
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
  override def name: String = s"rdd_${rddId}_$splitIndex"
  override def asRDDId: Option[RDDBlockId] = Some(this)
}

/** Block identifier for shuffle blocks */
case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
  override def name: String = s"shuffle_${shuffleId}_${mapId}_$reduceId"
}

/** Block identifier for shuffle data blocks */
case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
  override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.data"
}

/** Block identifier for shuffle index blocks */  
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
  override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.index"
}

/** Block identifier for broadcast blocks */
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  override def name: String = s"broadcast_${broadcastId}${if (field.nonEmpty) "_" + field else ""}"
}

/** Block identifier for task result blocks */
case class TaskResultBlockId(taskId: Long) extends BlockId {
  override def name: String = s"taskresult_$taskId"
}

/** Block identifier for stream blocks */
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
  override def name: String = s"input-${streamId}-$uniqueId"
}

Checkpointing

Mechanism for saving RDD lineage to reliable storage for fault tolerance.

/**
 * Checkpoint data management
 */
abstract class CheckpointData[T: ClassTag](rdd: RDD[T]) extends Serializable {
  /** Get checkpoint state */
  def cpState: CheckpointState
  
  /** Get checkpoint RDD */
  def checkpointRDD: Option[CheckpointRDD[T]]
  
  /** Materialize checkpoint */
  def checkpoint(): Unit
  
  /** Check if checkpointed */
  def isCheckpointed: Boolean
  
  /** Get checkpoint file */
  def getCheckpointDir: Option[String]
}

/**
 * Checkpoint states
 */
object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, CheckpointingInProgress, Checkpointed = Value
}

/**
 * RDD representing checkpointed data
 */
abstract class CheckpointRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
  /** Get checkpoint directory */
  def getCheckpointDir: String
}

/**
 * Reliable checkpoint RDD (HDFS, S3, etc.)
 */
class ReliableCheckpointRDD[T: ClassTag](
  sc: SparkContext,
  checkpointPath: String,
  partitioner: Option[Partitioner] = None
) extends CheckpointRDD[T](sc)

/**
 * Local checkpoint RDD (local filesystem)
 */
class LocalCheckpointRDD[T: ClassTag](
  sc: SparkContext,
  checkpointPath: String,
  originalRDD: RDD[T],
  partitioner: Option[Partitioner] = None
) extends CheckpointRDD[T](sc)

Memory Management

Components managing memory allocation for caching and execution.

/**
 * Abstract memory manager for Spark
 */
abstract class MemoryManager(
  conf: SparkConf,
  numCores: Int,
  onHeapStorageMemory: Long,
  onHeapExecutionMemory: Long
) extends Logging {
  
  /** Maximum memory available for storage */
  def maxOnHeapStorageMemory: Long
  def maxOffHeapStorageMemory: Long
  
  /** Acquire memory for storage */
  def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    memoryMode: MemoryMode
  ): Boolean
  
  /** Acquire memory for execution */
  def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode
  ): Long
  
  /** Release storage memory */
  def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
  
  /** Release execution memory */
  def releaseExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode
  ): Unit
  
  /** Get current storage memory usage */
  def storageMemoryUsed: Long
  
  /** Get current execution memory usage */
  def executionMemoryUsed: Long
}

/**
 * Memory allocation modes
 */
object MemoryMode extends Enumeration {
  type MemoryMode = Value
  val ON_HEAP, OFF_HEAP = Value
}

/**
 * Unified memory manager (default in Spark 1.6+)
 */
class UnifiedMemoryManager(
  conf: SparkConf,
  maxHeapMemory: Long,
  onHeapStorageRegionSize: Long,
  numCores: Int
) extends MemoryManager(
  conf,
  numCores,
  onHeapStorageRegionSize,
  maxHeapMemory - onHeapStorageRegionSize
)

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

val sc = new SparkContext(new SparkConf().setAppName("Storage Example"))

// Create RDD
val data = sc.parallelize(1 to 1000000)
val processed = data.map(_ * 2).filter(_ > 100)

// Different persistence strategies
processed.persist(StorageLevel.MEMORY_ONLY)     // Cache in memory only
processed.persist(StorageLevel.MEMORY_AND_DISK) // Memory with disk spillover
processed.persist(StorageLevel.DISK_ONLY)       // Disk only storage
processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialized in memory

// Cache shortcut
processed.cache() // Equivalent to persist(StorageLevel.MEMORY_ONLY)

// Use cached RDD multiple times
val count1 = processed.count()
val count2 = processed.filter(_ < 1000).count() // Reuses cache

// Checkpointing for fault tolerance
sc.setCheckpointDir("hdfs://checkpoints")
processed.checkpoint()
processed.count() // Triggers checkpoint

// Check storage status
println(s"Storage level: ${processed.getStorageLevel}")
println(s"Is cached: ${processed.isCached}")
println(s"Is checkpointed: ${processed.isCheckpointed}")

// Cleanup
processed.unpersist()
sc.stop()

Performance Considerations

  • MEMORY_ONLY: Fastest access but limited by available RAM
  • MEMORY_AND_DISK: Good balance with automatic spillover
  • DISK_ONLY: Slower but handles large datasets
  • Serialized formats: More compact but require deserialization
  • Replication: Improves fault tolerance at cost of storage space
  • Checkpointing: Breaks lineage chains for long dependency chains