Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
caching-persistence.md docs/
1# Caching and Persistence23Caching and persistence are crucial optimization techniques in Spark. They allow you to store intermediate RDD results in memory and/or disk to avoid recomputation, dramatically improving performance for iterative algorithms and interactive analysis.45## Storage Levels67Spark provides various storage levels that control how and where RDDs are cached.89### StorageLevel Class1011```scala { .api }12class StorageLevel(13private var _useDisk: Boolean,14private var _useMemory: Boolean,15private var _useOffHeap: Boolean,16private var _deserialized: Boolean,17private var _replication: Int = 118) extends Externalizable19```2021### Predefined Storage Levels2223```scala { .api }24object StorageLevel {25val NONE = new StorageLevel(false, false, false, false)26val DISK_ONLY = new StorageLevel(true, false, false, false)27val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)28val MEMORY_ONLY = new StorageLevel(false, true, false, true)29val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)30val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)31val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)32val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)33val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)34val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)35val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)36val OFF_HEAP = new StorageLevel(false, false, true, false)37}38```3940#### Storage Level Breakdown4142| Level | Uses Disk | Uses Memory | Serialized | Replication |43|-------|-----------|-------------|------------|-------------|44| `NONE` | ✗ | ✗ | ✗ | 1 |45| `DISK_ONLY` | ✓ | ✗ | ✓ | 1 |46| `DISK_ONLY_2` | ✓ | ✗ | ✓ | 2 |47| `MEMORY_ONLY` | ✗ | ✓ | ✗ | 1 |48| `MEMORY_ONLY_2` | ✗ | ✓ | ✗ | 2 |49| `MEMORY_ONLY_SER` | ✗ | ✓ | ✓ | 1 |50| `MEMORY_ONLY_SER_2` | ✗ | ✓ | ✓ | 2 |51| `MEMORY_AND_DISK` | ✓ | ✓ | ✗ | 1 |52| `MEMORY_AND_DISK_2` | ✓ | ✓ | ✗ | 2 |53| `MEMORY_AND_DISK_SER` | ✓ | ✓ | ✓ | 1 |54| `MEMORY_AND_DISK_SER_2` | ✓ | ✓ | ✓ | 2 |55| `OFF_HEAP` | ✗ | ✗* | ✓ | 1 |5657*OFF_HEAP uses off-heap memory (e.g., Tachyon)5859## Basic Persistence Operations6061### cache() Method6263```scala { .api }64def cache(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)65```6667The simplest way to cache an RDD in memory:6869```scala70val data = sc.textFile("large-dataset.txt")71val words = data.flatMap(_.split(" "))72.map(_.toLowerCase)73.filter(_.length > 3)7475// Cache the filtered results76val cachedWords = words.cache()7778// Multiple actions will reuse the cached data79val count = cachedWords.count()80val distinctCount = cachedWords.distinct().count()81val sample = cachedWords.sample(false, 0.1).collect()82```8384### persist() Method8586```scala { .api }87def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)88def persist(newLevel: StorageLevel): RDD[T]89```9091More flexible caching with custom storage levels:9293```scala94import org.apache.spark.storage.StorageLevel9596val data = sc.textFile("huge-dataset.txt")97val processed = data.map(expensiveProcessing)9899// Different persistence strategies100processed.persist(StorageLevel.MEMORY_ONLY) // Fast access, may lose data if not enough memory101processed.persist(StorageLevel.MEMORY_AND_DISK) // Spill to disk when memory full102processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialize to save memory103processed.persist(StorageLevel.DISK_ONLY) // Store only on disk104processed.persist(StorageLevel.MEMORY_AND_DISK_2) // Replicate for fault tolerance105```106107### unpersist() Method108109```scala { .api }110def unpersist(blocking: Boolean = true): RDD[T]111```112113Remove RDD from cache and free memory:114115```scala116val cachedData = data.cache()117118// Use cached data119val result1 = cachedData.count()120val result2 = cachedData.filter(_ > 100).count()121122// Remove from cache when no longer needed123cachedData.unpersist()124125// Or unpersist asynchronously126cachedData.unpersist(blocking = false)127```128129## Storage Level Properties and Queries130131### getStorageLevel132133```scala { .api }134def getStorageLevel: StorageLevel135```136137Check current storage level:138139```scala140val rdd = sc.parallelize(1 to 1000)141println(s"Default storage level: ${rdd.getStorageLevel}") // NONE142143val cached = rdd.cache()144println(s"After cache(): ${cached.getStorageLevel}") // MEMORY_ONLY145146val persisted = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)147println(s"Custom persistence: ${persisted.getStorageLevel}") // MEMORY_AND_DISK_SER148```149150### StorageLevel Properties151152```scala { .api }153class StorageLevel {154def useDisk: Boolean155def useMemory: Boolean156def useOffHeap: Boolean157def deserialized: Boolean158def replication: Int159}160```161162```scala163val level = StorageLevel.MEMORY_AND_DISK_SER_2164165println(s"Uses disk: ${level.useDisk}") // true166println(s"Uses memory: ${level.useMemory}") // true167println(s"Deserialized: ${level.deserialized}") // false (serialized)168println(s"Replication: ${level.replication}") // 2169```170171## Choosing Storage Levels172173### Memory-Only Levels174175**MEMORY_ONLY** - Best performance, no serialization overhead176```scala177// Use when:178// - RDD fits comfortably in memory179// - Fast CPU but limited memory bandwidth180// - Objects are not too expensive to reconstruct181val fastAccess = rdd.persist(StorageLevel.MEMORY_ONLY)182```183184**MEMORY_ONLY_SER** - Compact storage, slower access185```scala186// Use when:187// - Memory is limited but RDD is important to cache188// - Objects have significant serialization overhead189// - CPU is fast relative to memory bandwidth190val compactCache = rdd.persist(StorageLevel.MEMORY_ONLY_SER)191```192193### Memory and Disk Levels194195**MEMORY_AND_DISK** - Balanced performance and reliability196```scala197// Use when:198// - RDD might not fit entirely in memory199// - Recomputation is expensive200// - Fault tolerance is important201val balanced = rdd.persist(StorageLevel.MEMORY_AND_DISK)202```203204**MEMORY_AND_DISK_SER** - Space-efficient with disk fallback205```scala206// Use when:207// - Memory is very limited208// - Serialization cost is acceptable209// - Disk I/O is reasonably fast210val efficient = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)211```212213### Disk-Only Levels214215**DISK_ONLY** - Reliable but slower216```scala217// Use when:218// - Memory is scarce219// - RDD is accessed infrequently220// - Recomputation is very expensive221val reliable = rdd.persist(StorageLevel.DISK_ONLY)222```223224### Replicated Levels225226**_2 variants** - Fault tolerance with replication227```scala228// Use when:229// - Fault tolerance is critical230// - Cluster has node failures231// - RDD recomputation is very expensive232val faultTolerant = rdd.persist(StorageLevel.MEMORY_AND_DISK_2)233```234235## Advanced Persistence Patterns236237### Selective Persistence238239Cache only the RDDs that will be reused multiple times:240241```scala242val rawData = sc.textFile("input.txt")243val cleaned = rawData.filter(_.nonEmpty).map(_.trim)244245// Don't cache - used only once246val parsed = cleaned.map(parseLine)247248// Cache this - used multiple times249val validated = parsed.filter(isValid).cache()250251// Multiple actions on cached RDD252val errors = validated.filter(hasError).count()253val summary = validated.map(extractSummary).collect()254val output = validated.map(formatOutput).saveAsTextFile("output")255```256257### Iterative Algorithm Pattern258259```scala260var current = sc.textFile("initial-data.txt").cache()261262for (i <- 1 to 10) {263// Unpersist previous iteration264val previous = current265266// Compute new iteration and cache it267current = current.map(iterativeFunction).cache()268269// Force computation and unpersist old data270current.count()271previous.unpersist()272273println(s"Iteration $i completed")274}275```276277### Multi-Level Caching Strategy278279```scala280val rawData = sc.textFile("large-dataset.txt")281282// Level 1: Cache frequently accessed base data283val baseData = rawData.filter(isRelevant).cache()284285// Level 2: Cache intermediate expensive computations286val features = baseData.map(extractFeatures)287.persist(StorageLevel.MEMORY_AND_DISK_SER)288289// Level 3: Cache final results with replication for reliability290val results = features.map(expensiveMLModel)291.persist(StorageLevel.MEMORY_AND_DISK_2)292```293294## Monitoring and Management295296### SparkContext Storage Information297298```scala { .api }299def getRDDStorageInfo: Array[RDDInfo]300def getPersistentRDDs: Map[Int, RDD[_]]301def getExecutorStorageStatus: Array[StorageStatus]302```303304```scala305// Get information about cached RDDs306val storageInfo = sc.getRDDStorageInfo307storageInfo.foreach { info =>308println(s"RDD ${info.id} (${info.name}): ")309println(s" Memory size: ${info.memSize} bytes")310println(s" Disk size: ${info.diskSize} bytes")311println(s" Storage level: ${info.storageLevel}")312}313314// Get all persistent RDDs315val persistentRDDs = sc.getPersistentRDDs316persistentRDDs.foreach { case (id, rdd) =>317println(s"RDD $id: ${rdd.name} - ${rdd.getStorageLevel}")318}319320// Check executor storage status321val executorStatus = sc.getExecutorStorageStatus322executorStatus.foreach { status =>323println(s"Executor ${status.blockManagerId.executorId}:")324println(s" Max memory: ${status.maxMem} bytes")325println(s" Used memory: ${status.memUsed} bytes")326println(s" Remaining: ${status.memRemaining} bytes")327}328```329330### RDD Naming for Monitoring331332```scala { .api }333def setName(name: String): RDD[T]334def name: String335```336337```scala338val data = sc.textFile("input.txt")339.setName("Input Data")340.cache()341342val processed = data.map(process)343.setName("Processed Data")344.persist(StorageLevel.MEMORY_AND_DISK)345346// Names will appear in Spark UI for easier monitoring347```348349## Checkpointing350351Checkpointing provides fault tolerance by saving RDD data to reliable storage.352353### Setting Checkpoint Directory354355```scala { .api }356def setCheckpointDir(directory: String): Unit357```358359```scala360// Set checkpoint directory (must be reliable storage like HDFS)361sc.setCheckpointDir("hdfs://namenode/checkpoints")362```363364### Checkpointing RDDs365366```scala { .api }367def checkpoint(): Unit368def isCheckpointed: Boolean369def getCheckpointFile: Option[String]370```371372```scala373val data = sc.textFile("input.txt")374val processed = data.map(expensiveOperation).filter(isValid)375376// Mark for checkpointing377processed.checkpoint()378379// Checkpoint happens after first action380val count = processed.count() // Triggers checkpoint381382// Check if checkpointed383if (processed.isCheckpointed) {384println(s"Checkpointed to: ${processed.getCheckpointFile.get}")385}386```387388### Checkpoint vs Persistence389390```scala391// Persistence: keeps lineage, can be lost on failure392val persisted = rdd.cache()393394// Checkpoint: truncates lineage, survives failures395rdd.checkpoint()396397// Best practice: both for performance and reliability398val optimized = rdd.cache()399optimized.checkpoint()400optimized.count() // Trigger both cache and checkpoint401```402403## Performance Guidelines404405### When to Cache4064071. **Multiple Actions**: RDD used in multiple actions4082. **Iterative Algorithms**: Machine learning, graph algorithms4093. **Interactive Analysis**: Jupyter notebooks, Spark shell4104. **Expensive Computations**: Complex transformations4115. **Data Reduction**: After significant filtering412413```scala414// Good candidate for caching415val filtered = largeDataset416.filter(expensiveCondition) // Reduces data size significantly417.map(complexTransformation) // Expensive computation418419filtered.cache()420421// Multiple uses justify caching422val stats = filtered.map(extractStats).collect()423val sample = filtered.sample(false, 0.1).collect()424val export = filtered.saveAsTextFile("output")425```426427### When Not to Cache4284291. **Single Use**: RDD used only once4302. **Large Datasets**: Bigger than available memory4313. **Simple Operations**: Cheap to recompute4324. **Sequential Processing**: Linear data pipeline433434```scala435// Don't cache - used only once436val result = sc.textFile("input.txt")437.map(_.toUpperCase)438.filter(_.startsWith("A"))439.count()440```441442### Memory Management Best Practices443444```scala445// 1. Unpersist when done446val temp = rdd.cache()447processData(temp)448temp.unpersist()449450// 2. Use appropriate storage levels451val frequentData = rdd.persist(StorageLevel.MEMORY_ONLY)452val occasionalData = rdd.persist(StorageLevel.MEMORY_AND_DISK)453val backupData = rdd.persist(StorageLevel.DISK_ONLY)454455// 3. Monitor memory usage456val memUsage = sc.getExecutorStorageStatus.map(_.memUsed).sum457val memTotal = sc.getExecutorStorageStatus.map(_.maxMem).sum458println(s"Memory utilization: ${memUsage.toDouble / memTotal * 100}%")459```460461This comprehensive guide covers all aspects of RDD caching and persistence in Apache Spark, enabling you to optimize performance through intelligent data storage strategies.