0
# Storage and Persistence
1
2
Data storage levels and caching mechanisms for optimizing repeated access to RDDs with configurable memory and disk usage strategies.
3
4
## Capabilities
5
6
### StorageLevel
7
8
Configuration for how RDDs are stored when persisted, controlling memory usage, disk usage, serialization, and replication.
9
10
```scala { .api }
11
/**
12
* Storage level configuration for RDD persistence
13
* @param useDisk whether to use disk storage
14
* @param useMemory whether to use memory storage
15
* @param useOffHeap whether to use off-heap memory
16
* @param deserialized whether to store in deserialized format
17
* @param replication number of replicas to maintain
18
*/
19
class StorageLevel private(
20
private var _useDisk: Boolean,
21
private var _useMemory: Boolean,
22
private var _useOffHeap: Boolean,
23
private var _deserialized: Boolean,
24
private var _replication: Int
25
) extends Externalizable {
26
27
def useDisk: Boolean
28
def useMemory: Boolean
29
def useOffHeap: Boolean
30
def deserialized: Boolean
31
def replication: Int
32
33
/** Create a copy with different replication factor */
34
def clone(newReplication: Int): StorageLevel
35
36
/** Check if this storage level is valid */
37
def isValid: Boolean
38
}
39
40
object StorageLevel {
41
/** Memory only, deserialized */
42
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
43
44
/** Memory only, serialized */
45
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
46
47
/** Memory and disk spillover, deserialized */
48
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
49
50
/** Memory and disk spillover, serialized */
51
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
52
53
/** Disk only */
54
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
55
56
/** Memory only with 2x replication, deserialized */
57
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
58
59
/** Memory and disk with 2x replication, deserialized */
60
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, false, 2)
61
62
/** Off-heap memory only */
63
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
64
65
/** No storage (unpersist) */
66
val NONE = new StorageLevel(false, false, false, false, 1)
67
68
/** Apply method for creating custom storage levels */
69
def apply(
70
useDisk: Boolean,
71
useMemory: Boolean,
72
useOffHeap: Boolean = false,
73
deserialized: Boolean = true,
74
replication: Int = 1
75
): StorageLevel
76
}
77
```
78
79
### RDD Persistence Methods
80
81
Methods available on RDD for controlling persistence and caching behavior.
82
83
```scala { .api }
84
abstract class RDD[T: ClassTag] {
85
/** Persist RDD with specified storage level */
86
def persist(newLevel: StorageLevel = MEMORY_ONLY): this.type
87
88
/** Cache RDD in memory (shortcut for persist(MEMORY_ONLY)) */
89
def cache(): this.type
90
91
/** Remove persisted RDD from storage */
92
def unpersist(blocking: Boolean = false): this.type
93
94
/** Mark RDD for checkpointing to reliable storage */
95
def checkpoint(): Unit
96
97
/** Check if RDD is checkpointed */
98
def isCheckpointed: Boolean
99
100
/** Get checkpoint file if available */
101
def getCheckpointFile: Option[String]
102
103
/** Get current storage level */
104
def getStorageLevel: StorageLevel
105
106
/** Check if RDD is cached */
107
def isCached: Boolean = getStorageLevel != StorageLevel.NONE
108
}
109
```
110
111
### BlockManager
112
113
Core storage management component handling data blocks across the cluster.
114
115
```scala { .api }
116
/**
117
* Manager for reading and writing data blocks
118
*/
119
class BlockManager(
120
executorId: String,
121
rpcEnv: RpcEnv,
122
master: BlockManagerMaster,
123
serializerManager: SerializerManager,
124
conf: SparkConf,
125
memoryManager: MemoryManager,
126
mapOutputTracker: MapOutputTracker,
127
shuffleManager: ShuffleManager,
128
blockTransferService: BlockTransferService,
129
securityManager: SecurityManager,
130
numUsableCores: Int
131
) extends BlockDataManager with BlockEvictionHandler with Logging {
132
133
/** Initialize block manager */
134
def initialize(appId: String): Unit
135
136
/** Get local block data */
137
def getBlockData(blockId: BlockId): ManagedBuffer
138
139
/** Put block data */
140
def putBlockData(
141
blockId: BlockId,
142
data: ManagedBuffer,
143
level: StorageLevel,
144
classTag: ClassTag[_]
145
): Boolean
146
147
/** Get block from local storage or remote */
148
def get[T: ClassTag](blockId: BlockId): Option[BlockResult]
149
150
/** Put block into storage */
151
def putSingle[T: ClassTag](
152
blockId: BlockId,
153
value: T,
154
level: StorageLevel,
155
tellMaster: Boolean = true
156
): Boolean
157
158
/** Put iterator of values into storage */
159
def putIterator[T: ClassTag](
160
blockId: BlockId,
161
values: Iterator[T],
162
level: StorageLevel,
163
tellMaster: Boolean = true
164
): Boolean
165
166
/** Remove block from storage */
167
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit
168
169
/** Get memory status */
170
def memoryStatus: Map[BlockId, (StorageLevel, Long, Long)]
171
172
/** Get disk usage */
173
def diskBlockSize(blockId: BlockId): Long
174
}
175
```
176
177
### Block Identifiers
178
179
Type-safe identifiers for different types of data blocks.
180
181
```scala { .api }
182
/**
183
* Base class for block identifiers
184
*/
185
sealed abstract class BlockId {
186
def name: String
187
def asRDDId: Option[RDDBlockId] = None
188
}
189
190
/** Block identifier for RDD blocks */
191
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
192
override def name: String = s"rdd_${rddId}_$splitIndex"
193
override def asRDDId: Option[RDDBlockId] = Some(this)
194
}
195
196
/** Block identifier for shuffle blocks */
197
case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
198
override def name: String = s"shuffle_${shuffleId}_${mapId}_$reduceId"
199
}
200
201
/** Block identifier for shuffle data blocks */
202
case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
203
override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.data"
204
}
205
206
/** Block identifier for shuffle index blocks */
207
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
208
override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.index"
209
}
210
211
/** Block identifier for broadcast blocks */
212
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
213
override def name: String = s"broadcast_${broadcastId}${if (field.nonEmpty) "_" + field else ""}"
214
}
215
216
/** Block identifier for task result blocks */
217
case class TaskResultBlockId(taskId: Long) extends BlockId {
218
override def name: String = s"taskresult_$taskId"
219
}
220
221
/** Block identifier for stream blocks */
222
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
223
override def name: String = s"input-${streamId}-$uniqueId"
224
}
225
```
226
227
### Checkpointing
228
229
Mechanism for saving RDD lineage to reliable storage for fault tolerance.
230
231
```scala { .api }
232
/**
233
* Checkpoint data management
234
*/
235
abstract class CheckpointData[T: ClassTag](rdd: RDD[T]) extends Serializable {
236
/** Get checkpoint state */
237
def cpState: CheckpointState
238
239
/** Get checkpoint RDD */
240
def checkpointRDD: Option[CheckpointRDD[T]]
241
242
/** Materialize checkpoint */
243
def checkpoint(): Unit
244
245
/** Check if checkpointed */
246
def isCheckpointed: Boolean
247
248
/** Get checkpoint file */
249
def getCheckpointDir: Option[String]
250
}
251
252
/**
253
* Checkpoint states
254
*/
255
object CheckpointState extends Enumeration {
256
type CheckpointState = Value
257
val Initialized, CheckpointingInProgress, Checkpointed = Value
258
}
259
260
/**
261
* RDD representing checkpointed data
262
*/
263
abstract class CheckpointRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
264
/** Get checkpoint directory */
265
def getCheckpointDir: String
266
}
267
268
/**
269
* Reliable checkpoint RDD (HDFS, S3, etc.)
270
*/
271
class ReliableCheckpointRDD[T: ClassTag](
272
sc: SparkContext,
273
checkpointPath: String,
274
partitioner: Option[Partitioner] = None
275
) extends CheckpointRDD[T](sc)
276
277
/**
278
* Local checkpoint RDD (local filesystem)
279
*/
280
class LocalCheckpointRDD[T: ClassTag](
281
sc: SparkContext,
282
checkpointPath: String,
283
originalRDD: RDD[T],
284
partitioner: Option[Partitioner] = None
285
) extends CheckpointRDD[T](sc)
286
```
287
288
### Memory Management
289
290
Components managing memory allocation for caching and execution.
291
292
```scala { .api }
293
/**
294
* Abstract memory manager for Spark
295
*/
296
abstract class MemoryManager(
297
conf: SparkConf,
298
numCores: Int,
299
onHeapStorageMemory: Long,
300
onHeapExecutionMemory: Long
301
) extends Logging {
302
303
/** Maximum memory available for storage */
304
def maxOnHeapStorageMemory: Long
305
def maxOffHeapStorageMemory: Long
306
307
/** Acquire memory for storage */
308
def acquireStorageMemory(
309
blockId: BlockId,
310
numBytes: Long,
311
memoryMode: MemoryMode
312
): Boolean
313
314
/** Acquire memory for execution */
315
def acquireExecutionMemory(
316
numBytes: Long,
317
taskAttemptId: Long,
318
memoryMode: MemoryMode
319
): Long
320
321
/** Release storage memory */
322
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
323
324
/** Release execution memory */
325
def releaseExecutionMemory(
326
numBytes: Long,
327
taskAttemptId: Long,
328
memoryMode: MemoryMode
329
): Unit
330
331
/** Get current storage memory usage */
332
def storageMemoryUsed: Long
333
334
/** Get current execution memory usage */
335
def executionMemoryUsed: Long
336
}
337
338
/**
339
* Memory allocation modes
340
*/
341
object MemoryMode extends Enumeration {
342
type MemoryMode = Value
343
val ON_HEAP, OFF_HEAP = Value
344
}
345
346
/**
347
* Unified memory manager (default in Spark 1.6+)
348
*/
349
class UnifiedMemoryManager(
350
conf: SparkConf,
351
maxHeapMemory: Long,
352
onHeapStorageRegionSize: Long,
353
numCores: Int
354
) extends MemoryManager(
355
conf,
356
numCores,
357
onHeapStorageRegionSize,
358
maxHeapMemory - onHeapStorageRegionSize
359
)
360
```
361
362
**Usage Examples:**
363
364
```scala
365
import org.apache.spark.{SparkContext, SparkConf}
366
import org.apache.spark.storage.StorageLevel
367
368
val sc = new SparkContext(new SparkConf().setAppName("Storage Example"))
369
370
// Create RDD
371
val data = sc.parallelize(1 to 1000000)
372
val processed = data.map(_ * 2).filter(_ > 100)
373
374
// Different persistence strategies
375
processed.persist(StorageLevel.MEMORY_ONLY) // Cache in memory only
376
processed.persist(StorageLevel.MEMORY_AND_DISK) // Memory with disk spillover
377
processed.persist(StorageLevel.DISK_ONLY) // Disk only storage
378
processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialized in memory
379
380
// Cache shortcut
381
processed.cache() // Equivalent to persist(StorageLevel.MEMORY_ONLY)
382
383
// Use cached RDD multiple times
384
val count1 = processed.count()
385
val count2 = processed.filter(_ < 1000).count() // Reuses cache
386
387
// Checkpointing for fault tolerance
388
sc.setCheckpointDir("hdfs://checkpoints")
389
processed.checkpoint()
390
processed.count() // Triggers checkpoint
391
392
// Check storage status
393
println(s"Storage level: ${processed.getStorageLevel}")
394
println(s"Is cached: ${processed.isCached}")
395
println(s"Is checkpointed: ${processed.isCheckpointed}")
396
397
// Cleanup
398
processed.unpersist()
399
sc.stop()
400
```
401
402
## Performance Considerations
403
404
- **MEMORY_ONLY**: Fastest access but limited by available RAM
405
- **MEMORY_AND_DISK**: Good balance with automatic spillover
406
- **DISK_ONLY**: Slower but handles large datasets
407
- **Serialized formats**: More compact but require deserialization
408
- **Replication**: Improves fault tolerance at cost of storage space
409
- **Checkpointing**: Breaks lineage chains for long dependency chains