0
# Storage and Caching
1
2
## StorageLevel
3
4
Storage levels define how RDDs are persisted across memory, disk, and off-heap storage with various serialization and replication options.
5
6
```scala { .api }
7
class StorageLevel private(
8
private var _useDisk: Boolean,
9
private var _useMemory: Boolean,
10
private var _useOffHeap: Boolean,
11
private var _deserialized: Boolean,
12
private var _replication: Int
13
) {
14
def useDisk: Boolean
15
def useMemory: Boolean
16
def useOffHeap: Boolean
17
def deserialized: Boolean
18
def replication: Int
19
20
def clone(): StorageLevel
21
def writeExternal(out: ObjectOutput): Unit
22
def readExternal(in: ObjectInput): Unit
23
}
24
25
object StorageLevel {
26
val NONE = new StorageLevel(false, false, false, false, 1)
27
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
28
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
29
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
30
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
31
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
32
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
33
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
34
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
35
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
36
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
37
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
38
}
39
```
40
41
## RDD Persistence Methods
42
43
Methods available on all RDD types for controlling persistence behavior.
44
45
```scala { .api }
46
// From RDD[T]
47
def persist(): RDD[T]
48
def persist(newLevel: StorageLevel): RDD[T]
49
def cache(): RDD[T]
50
def unpersist(blocking: Boolean = false): RDD[T]
51
def getStorageLevel: StorageLevel
52
def checkpoint(): Unit
53
def isCheckpointed: Boolean
54
def getCheckpointFile: Option[String]
55
```
56
57
## BlockManager
58
59
Internal storage management system (some public interfaces available for advanced usage).
60
61
```scala { .api }
62
class BlockManager(
63
executorId: String,
64
rpcEnv: RpcEnv,
65
master: BlockManagerMaster,
66
serializerManager: SerializerManager,
67
conf: SparkConf,
68
memoryManager: MemoryManager,
69
mapOutputTracker: MapOutputTracker,
70
shuffleManager: ShuffleManager,
71
blockTransferService: BlockTransferService,
72
securityManager: SecurityManager,
73
numUsableCores: Int
74
) {
75
// Public methods for advanced users
76
def putSingle[T: ClassTag](blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean
77
def putIterator[T: ClassTag](blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean
78
def getLocalValues(blockId: BlockId): Option[BlockResult]
79
def getOrElseUpdate[T](blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]]
80
def remove(blockId: BlockId, tellMaster: Boolean = true): Boolean
81
}
82
```
83
84
## Storage Configuration
85
86
Key configuration properties for storage and caching behavior.
87
88
### Memory Management
89
- `spark.storage.memoryFraction` - Fraction of JVM heap space used for storage cache (deprecated in favor of unified memory management)
90
- `spark.storage.safetyFraction` - Safety fraction to prevent storage region from using entire heap
91
- `spark.storage.unrollFraction` - Fraction of storage memory used for unrolling blocks
92
- `spark.storage.replication.proactive` - Enable proactive block replication
93
- `spark.storage.blockManagerTimeoutIntervalMs` - Timeout for block manager operations
94
95
### Disk Storage
96
- `spark.storage.diskStore.subDirectories` - Number of subdirectories for disk storage
97
- `spark.storage.localDirs` - Directories for storing blocks on disk
98
- `spark.storage.localDirs.fallback` - Fallback directory if localDirs not available
99
100
### Off-Heap Storage
101
- `spark.memory.offHeap.enabled` - Enable off-heap storage
102
- `spark.memory.offHeap.size` - Size of off-heap storage region
103
104
## Usage Examples
105
106
### Basic Persistence
107
```scala
108
val expensiveData = sc.textFile("large-dataset.txt")
109
.map(processExpensiveOperation)
110
.filter(complexFilter)
111
112
// Cache for multiple uses
113
expensiveData.cache()
114
115
// Use multiple times (only computed once)
116
val count = expensiveData.count()
117
val sample = expensiveData.take(10)
118
val stats = expensiveData.map(_.length).stats()
119
120
// Clean up when done
121
expensiveData.unpersist()
122
```
123
124
### Storage Level Selection
125
```scala
126
import org.apache.spark.storage.StorageLevel
127
128
val data = sc.parallelize(1 to 1000000)
129
130
// Memory only (fastest access, but limited by memory)
131
data.persist(StorageLevel.MEMORY_ONLY)
132
133
// Memory and disk (spills to disk if memory full)
134
data.persist(StorageLevel.MEMORY_AND_DISK)
135
136
// Serialized storage (more memory efficient, slower access)
137
data.persist(StorageLevel.MEMORY_ONLY_SER)
138
139
// With replication for fault tolerance
140
data.persist(StorageLevel.MEMORY_AND_DISK_2)
141
142
// Off-heap storage (requires off-heap memory configuration)
143
data.persist(StorageLevel.OFF_HEAP)
144
```
145
146
### Checkpointing
147
```scala
148
// Set checkpoint directory (must be fault-tolerant storage like HDFS)
149
sc.setCheckpointDir("hdfs://namenode:port/checkpoint")
150
151
val iterativeData = sc.textFile("input.txt")
152
.map(parseData)
153
.filter(isValid)
154
155
// Checkpoint to break long lineage chains
156
iterativeData.checkpoint()
157
158
// Force materialization to complete checkpoint
159
iterativeData.count()
160
161
// Now safe to use in iterative algorithms
162
var current = iterativeData
163
for (i <- 1 to 10) {
164
current = current.map(iterativeFunction)
165
if (i % 3 == 0) {
166
current.checkpoint()
167
current.count() // Force checkpoint
168
}
169
}
170
```
171
172
### Advanced Persistence Patterns
173
```scala
174
// Pattern 1: Conditional persistence based on data size
175
val rdd = sc.textFile("data.txt").map(transform)
176
val estimatedSize = rdd.take(1000).map(_.length).sum * (rdd.count() / 1000.0)
177
178
val persistedRDD = if (estimatedSize > 1000000) {
179
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
180
} else {
181
rdd.persist(StorageLevel.MEMORY_ONLY)
182
}
183
184
// Pattern 2: Layered persistence for different access patterns
185
val rawData = sc.textFile("data.txt")
186
val cleanedData = rawData.map(clean).persist(StorageLevel.MEMORY_AND_DISK)
187
val aggregatedData = cleanedData.groupByKey().persist(StorageLevel.MEMORY_ONLY)
188
189
// Use cleaned data for multiple transformations
190
val result1 = cleanedData.filter(condition1).collect()
191
val result2 = cleanedData.filter(condition2).collect()
192
193
// Use aggregated data for summary statistics
194
val summary1 = aggregatedData.mapValues(_.size).collect()
195
val summary2 = aggregatedData.mapValues(_.sum).collect()
196
```
197
198
### Monitoring Storage Usage
199
```scala
200
// Get current storage level
201
val level = rdd.getStorageLevel
202
println(s"Storage level: $level")
203
204
// Check if RDD is cached
205
if (level != StorageLevel.NONE) {
206
println("RDD is persisted")
207
}
208
209
// Monitor through Spark UI
210
// Access http://driver:4040 to see Storage tab with:
211
// - RDD storage levels
212
// - Memory usage
213
// - Disk usage
214
// - Partition information
215
```
216
217
### Storage Level Guidelines
218
219
**MEMORY_ONLY**
220
- Use for: Small to medium datasets that fit in memory
221
- Best for: Frequently accessed data with fast transformations
222
- Avoid if: Dataset is larger than available memory
223
224
**MEMORY_AND_DISK**
225
- Use for: Large datasets with frequent access
226
- Best for: Iterative algorithms, multiple actions on same RDD
227
- Trade-off: Some operations may spill to disk
228
229
**MEMORY_ONLY_SER**
230
- Use for: Memory-constrained environments
231
- Best for: Large objects that compress well
232
- Trade-off: CPU overhead for serialization/deserialization
233
234
**DISK_ONLY**
235
- Use for: Very large datasets, limited memory
236
- Best for: One-time processing of massive datasets
237
- Trade-off: Slower access due to disk I/O
238
239
**Replication (\_2 variants)**
240
- Use for: Critical data requiring fault tolerance
241
- Best for: Long-running applications in unreliable environments
242
- Trade-off: Double storage cost, network overhead
243
244
**OFF_HEAP**
245
- Use for: Very large datasets with configured off-heap storage
246
- Best for: Reducing GC pressure in memory-intensive applications
247
- Requirements: Requires spark.memory.offHeap.enabled=true