CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

caching-persistence.mddocs/

Caching and Persistence

Caching and persistence are crucial optimization techniques in Spark. They allow you to store intermediate RDD results in memory and/or disk to avoid recomputation, dramatically improving performance for iterative algorithms and interactive analysis.

Storage Levels

Spark provides various storage levels that control how and where RDDs are cached.

StorageLevel Class

class StorageLevel(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,  
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int = 1
) extends Externalizable

Predefined Storage Levels

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
}

Storage Level Breakdown

LevelUses DiskUses MemorySerializedReplication
NONE1
DISK_ONLY1
DISK_ONLY_22
MEMORY_ONLY1
MEMORY_ONLY_22
MEMORY_ONLY_SER1
MEMORY_ONLY_SER_22
MEMORY_AND_DISK1
MEMORY_AND_DISK_22
MEMORY_AND_DISK_SER1
MEMORY_AND_DISK_SER_22
OFF_HEAP✗*1

*OFF_HEAP uses off-heap memory (e.g., Tachyon)

Basic Persistence Operations

cache() Method

def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)

The simplest way to cache an RDD in memory:

val data = sc.textFile("large-dataset.txt")
val words = data.flatMap(_.split(" "))
  .map(_.toLowerCase)
  .filter(_.length > 3)

// Cache the filtered results
val cachedWords = words.cache()

// Multiple actions will reuse the cached data
val count = cachedWords.count()
val distinctCount = cachedWords.distinct().count()
val sample = cachedWords.sample(false, 0.1).collect()

persist() Method

def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def persist(newLevel: StorageLevel): RDD[T]

More flexible caching with custom storage levels:

import org.apache.spark.storage.StorageLevel

val data = sc.textFile("huge-dataset.txt")
val processed = data.map(expensiveProcessing)

// Different persistence strategies
processed.persist(StorageLevel.MEMORY_ONLY)           // Fast access, may lose data if not enough memory
processed.persist(StorageLevel.MEMORY_AND_DISK)       // Spill to disk when memory full
processed.persist(StorageLevel.MEMORY_ONLY_SER)       // Serialize to save memory
processed.persist(StorageLevel.DISK_ONLY)             // Store only on disk
processed.persist(StorageLevel.MEMORY_AND_DISK_2)     // Replicate for fault tolerance

unpersist() Method

def unpersist(blocking: Boolean = true): RDD[T]

Remove RDD from cache and free memory:

val cachedData = data.cache()

// Use cached data
val result1 = cachedData.count()
val result2 = cachedData.filter(_ > 100).count()

// Remove from cache when no longer needed
cachedData.unpersist()

// Or unpersist asynchronously
cachedData.unpersist(blocking = false)

Storage Level Properties and Queries

getStorageLevel

def getStorageLevel: StorageLevel

Check current storage level:

val rdd = sc.parallelize(1 to 1000)
println(s"Default storage level: ${rdd.getStorageLevel}")  // NONE

val cached = rdd.cache()
println(s"After cache(): ${cached.getStorageLevel}")      // MEMORY_ONLY

val persisted = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
println(s"Custom persistence: ${persisted.getStorageLevel}") // MEMORY_AND_DISK_SER

StorageLevel Properties

class StorageLevel {
  def useDisk: Boolean
  def useMemory: Boolean
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
}
val level = StorageLevel.MEMORY_AND_DISK_SER_2

println(s"Uses disk: ${level.useDisk}")           // true
println(s"Uses memory: ${level.useMemory}")       // true
println(s"Deserialized: ${level.deserialized}")   // false (serialized)
println(s"Replication: ${level.replication}")     // 2

Choosing Storage Levels

Memory-Only Levels

MEMORY_ONLY - Best performance, no serialization overhead

// Use when:
// - RDD fits comfortably in memory
// - Fast CPU but limited memory bandwidth
// - Objects are not too expensive to reconstruct
val fastAccess = rdd.persist(StorageLevel.MEMORY_ONLY)

MEMORY_ONLY_SER - Compact storage, slower access

// Use when:
// - Memory is limited but RDD is important to cache
// - Objects have significant serialization overhead
// - CPU is fast relative to memory bandwidth
val compactCache = rdd.persist(StorageLevel.MEMORY_ONLY_SER)

Memory and Disk Levels

MEMORY_AND_DISK - Balanced performance and reliability

// Use when:
// - RDD might not fit entirely in memory
// - Recomputation is expensive
// - Fault tolerance is important
val balanced = rdd.persist(StorageLevel.MEMORY_AND_DISK)

MEMORY_AND_DISK_SER - Space-efficient with disk fallback

// Use when:
// - Memory is very limited
// - Serialization cost is acceptable
// - Disk I/O is reasonably fast
val efficient = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

Disk-Only Levels

DISK_ONLY - Reliable but slower

// Use when:
// - Memory is scarce
// - RDD is accessed infrequently
// - Recomputation is very expensive
val reliable = rdd.persist(StorageLevel.DISK_ONLY)

Replicated Levels

_2 variants - Fault tolerance with replication

// Use when:
// - Fault tolerance is critical
// - Cluster has node failures
// - RDD recomputation is very expensive
val faultTolerant = rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

Advanced Persistence Patterns

Selective Persistence

Cache only the RDDs that will be reused multiple times:

val rawData = sc.textFile("input.txt")
val cleaned = rawData.filter(_.nonEmpty).map(_.trim)

// Don't cache - used only once
val parsed = cleaned.map(parseLine)

// Cache this - used multiple times
val validated = parsed.filter(isValid).cache()

// Multiple actions on cached RDD
val errors = validated.filter(hasError).count()
val summary = validated.map(extractSummary).collect()
val output = validated.map(formatOutput).saveAsTextFile("output")

Iterative Algorithm Pattern

var current = sc.textFile("initial-data.txt").cache()

for (i <- 1 to 10) {
  // Unpersist previous iteration
  val previous = current
  
  // Compute new iteration and cache it
  current = current.map(iterativeFunction).cache()
  
  // Force computation and unpersist old data
  current.count()
  previous.unpersist()
  
  println(s"Iteration $i completed")
}

Multi-Level Caching Strategy

val rawData = sc.textFile("large-dataset.txt")

// Level 1: Cache frequently accessed base data
val baseData = rawData.filter(isRelevant).cache()

// Level 2: Cache intermediate expensive computations  
val features = baseData.map(extractFeatures)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

// Level 3: Cache final results with replication for reliability
val results = features.map(expensiveMLModel)
  .persist(StorageLevel.MEMORY_AND_DISK_2)

Monitoring and Management

SparkContext Storage Information

def getRDDStorageInfo: Array[RDDInfo]
def getPersistentRDDs: Map[Int, RDD[_]]
def getExecutorStorageStatus: Array[StorageStatus]
// Get information about cached RDDs
val storageInfo = sc.getRDDStorageInfo
storageInfo.foreach { info =>
  println(s"RDD ${info.id} (${info.name}): ")
  println(s"  Memory size: ${info.memSize} bytes")
  println(s"  Disk size: ${info.diskSize} bytes")
  println(s"  Storage level: ${info.storageLevel}")
}

// Get all persistent RDDs
val persistentRDDs = sc.getPersistentRDDs
persistentRDDs.foreach { case (id, rdd) =>
  println(s"RDD $id: ${rdd.name} - ${rdd.getStorageLevel}")
}

// Check executor storage status
val executorStatus = sc.getExecutorStorageStatus
executorStatus.foreach { status =>
  println(s"Executor ${status.blockManagerId.executorId}:")
  println(s"  Max memory: ${status.maxMem} bytes")
  println(s"  Used memory: ${status.memUsed} bytes")
  println(s"  Remaining: ${status.memRemaining} bytes")
}

RDD Naming for Monitoring

def setName(name: String): RDD[T]
def name: String
val data = sc.textFile("input.txt")
  .setName("Input Data")
  .cache()

val processed = data.map(process)
  .setName("Processed Data")
  .persist(StorageLevel.MEMORY_AND_DISK)

// Names will appear in Spark UI for easier monitoring

Checkpointing

Checkpointing provides fault tolerance by saving RDD data to reliable storage.

Setting Checkpoint Directory

def setCheckpointDir(directory: String): Unit
// Set checkpoint directory (must be reliable storage like HDFS)
sc.setCheckpointDir("hdfs://namenode/checkpoints")

Checkpointing RDDs

def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
val data = sc.textFile("input.txt")
val processed = data.map(expensiveOperation).filter(isValid)

// Mark for checkpointing
processed.checkpoint()

// Checkpoint happens after first action
val count = processed.count()  // Triggers checkpoint

// Check if checkpointed
if (processed.isCheckpointed) {
  println(s"Checkpointed to: ${processed.getCheckpointFile.get}")
}

Checkpoint vs Persistence

// Persistence: keeps lineage, can be lost on failure
val persisted = rdd.cache()

// Checkpoint: truncates lineage, survives failures
rdd.checkpoint()

// Best practice: both for performance and reliability
val optimized = rdd.cache()
optimized.checkpoint()
optimized.count()  // Trigger both cache and checkpoint

Performance Guidelines

When to Cache

  1. Multiple Actions: RDD used in multiple actions
  2. Iterative Algorithms: Machine learning, graph algorithms
  3. Interactive Analysis: Jupyter notebooks, Spark shell
  4. Expensive Computations: Complex transformations
  5. Data Reduction: After significant filtering
// Good candidate for caching
val filtered = largeDataset
  .filter(expensiveCondition)  // Reduces data size significantly
  .map(complexTransformation)  // Expensive computation

filtered.cache()

// Multiple uses justify caching
val stats = filtered.map(extractStats).collect()
val sample = filtered.sample(false, 0.1).collect()  
val export = filtered.saveAsTextFile("output")

When Not to Cache

  1. Single Use: RDD used only once
  2. Large Datasets: Bigger than available memory
  3. Simple Operations: Cheap to recompute
  4. Sequential Processing: Linear data pipeline
// Don't cache - used only once
val result = sc.textFile("input.txt")
  .map(_.toUpperCase)
  .filter(_.startsWith("A"))
  .count()

Memory Management Best Practices

// 1. Unpersist when done
val temp = rdd.cache()
processData(temp)
temp.unpersist()

// 2. Use appropriate storage levels
val frequentData = rdd.persist(StorageLevel.MEMORY_ONLY)
val occasionalData = rdd.persist(StorageLevel.MEMORY_AND_DISK)
val backupData = rdd.persist(StorageLevel.DISK_ONLY)

// 3. Monitor memory usage
val memUsage = sc.getExecutorStorageStatus.map(_.memUsed).sum
val memTotal = sc.getExecutorStorageStatus.map(_.maxMem).sum
println(s"Memory utilization: ${memUsage.toDouble / memTotal * 100}%")

This comprehensive guide covers all aspects of RDD caching and persistence in Apache Spark, enabling you to optimize performance through intelligent data storage strategies.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json