0
# Storage Configuration
1
2
Storage level definitions for controlling RDD and Dataset persistence, including memory, disk, serialization, and replication strategies to optimize Spark application performance.
3
4
## Capabilities
5
6
### StorageLevel Class
7
8
Configuration class that defines how RDD and Dataset data should be stored, combining memory, disk, serialization, and replication options.
9
10
```scala { .api }
11
/**
12
* Configuration for RDD/Dataset storage persistence
13
* Note: Constructor is private - use StorageLevel object factory methods or predefined constants
14
*/
15
class StorageLevel private(
16
private var _useDisk: Boolean,
17
private var _useMemory: Boolean,
18
private var _useOffHeap: Boolean,
19
private var _deserialized: Boolean,
20
private var _replication: Int = 1
21
) extends Externalizable {
22
23
/** Validates the storage level configuration */
24
def isValid: Boolean
25
26
/** Creates a copy of the storage level */
27
override def clone(): StorageLevel
28
29
/** Human-readable description of the storage strategy */
30
def description: String
31
32
/** String representation showing all configuration flags */
33
override def toString: String
34
35
// Properties for accessing configuration
36
def useDisk: Boolean
37
def useMemory: Boolean
38
def useOffHeap: Boolean
39
def deserialized: Boolean
40
def replication: Int
41
42
/** Returns the memory mode (ON_HEAP or OFF_HEAP) */
43
private[spark] def memoryMode: MemoryMode
44
}
45
```
46
47
**Usage Examples:**
48
49
```scala
50
import org.apache.spark.storage.StorageLevel
51
52
// Create custom storage level
53
val customLevel = new StorageLevel(
54
useDisk = true,
55
useMemory = true,
56
useOffHeap = false,
57
deserialized = false,
58
replication = 2
59
)
60
61
// Check storage properties
62
println(s"Uses disk: ${customLevel.useDisk}")
63
println(s"Uses memory: ${customLevel.useMemory}")
64
println(s"Replication factor: ${customLevel.replication}")
65
println(s"Description: ${customLevel.description}")
66
67
// Validate configuration
68
if (customLevel.isValid) {
69
println("Storage level is valid")
70
}
71
72
// Use with RDD persistence
73
val rdd = spark.sparkContext.parallelize(1 to 1000)
74
rdd.persist(customLevel)
75
```
76
77
### Predefined Storage Levels
78
79
Companion object providing common storage level configurations for typical use cases.
80
81
```scala { .api }
82
object StorageLevel {
83
/** No storage - data will be recomputed each time */
84
val NONE: StorageLevel
85
86
/** Store data on disk only */
87
val DISK_ONLY: StorageLevel
88
89
/** Store data on disk only with 2x replication */
90
val DISK_ONLY_2: StorageLevel
91
92
/** Store data on disk only with 3x replication */
93
val DISK_ONLY_3: StorageLevel
94
95
/** Store data in memory only, deserialized */
96
val MEMORY_ONLY: StorageLevel
97
98
/** Store data in memory only, deserialized, with 2x replication */
99
val MEMORY_ONLY_2: StorageLevel
100
101
/** Store data in memory only, serialized */
102
val MEMORY_ONLY_SER: StorageLevel
103
104
/** Store data in memory only, serialized, with 2x replication */
105
val MEMORY_ONLY_SER_2: StorageLevel
106
107
/** Store data in memory first, spill to disk if needed */
108
val MEMORY_AND_DISK: StorageLevel
109
110
/** Store data in memory first, spill to disk if needed, with 2x replication */
111
val MEMORY_AND_DISK_2: StorageLevel
112
113
/** Store data in memory first (serialized), spill to disk if needed */
114
val MEMORY_AND_DISK_SER: StorageLevel
115
116
/** Store data in memory first (serialized), spill to disk if needed, with 2x replication */
117
val MEMORY_AND_DISK_SER_2: StorageLevel
118
119
/** Store data in off-heap memory */
120
val OFF_HEAP: StorageLevel
121
}
122
```
123
124
**Usage Examples:**
125
126
```scala
127
import org.apache.spark.storage.StorageLevel
128
129
// Using predefined storage levels
130
val rdd = spark.sparkContext.parallelize(1 to 10000)
131
132
// Memory-only storage for fast access
133
rdd.persist(StorageLevel.MEMORY_ONLY)
134
135
// Memory and disk with serialization for space efficiency
136
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
137
138
// High availability with replication
139
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
140
141
// Disk-only for very large datasets
142
rdd.persist(StorageLevel.DISK_ONLY)
143
144
// No persistence (default behavior)
145
rdd.persist(StorageLevel.NONE)
146
147
// Check storage level properties
148
val level = StorageLevel.MEMORY_AND_DISK_SER_2
149
println(s"Uses memory: ${level.useMemory}") // true
150
println(s"Uses disk: ${level.useDisk}") // true
151
println(s"Serialized: ${!level.deserialized}") // true
152
println(s"Replication: ${level.replication}") // 2
153
```
154
155
### Factory Methods
156
157
Static factory methods for creating storage levels with different configurations.
158
159
```scala { .api }
160
object StorageLevel {
161
/**
162
* Creates storage level from string representation
163
* @param s - String representation of storage level
164
* @return Corresponding StorageLevel instance
165
*/
166
def fromString(s: String): StorageLevel
167
168
/**
169
* Creates storage level with full configuration
170
* @param useDisk - Whether to use disk storage
171
* @param useMemory - Whether to use memory storage
172
* @param useOffHeap - Whether to use off-heap memory
173
* @param deserialized - Whether to store data deserialized
174
* @param replication - Number of replicas
175
* @return StorageLevel instance
176
*/
177
def apply(
178
useDisk: Boolean,
179
useMemory: Boolean,
180
useOffHeap: Boolean,
181
deserialized: Boolean,
182
replication: Int
183
): StorageLevel
184
185
/**
186
* Creates storage level with simplified configuration
187
* @param useDisk - Whether to use disk storage
188
* @param useMemory - Whether to use memory storage
189
* @param deserialized - Whether to store data deserialized
190
* @param replication - Number of replicas (default 1)
191
* @return StorageLevel instance
192
*/
193
def apply(
194
useDisk: Boolean,
195
useMemory: Boolean,
196
deserialized: Boolean,
197
replication: Int = 1
198
): StorageLevel
199
200
/**
201
* Creates storage level from bit flags
202
* @param flags - Bit flags representing storage options
203
* @param replication - Number of replicas
204
* @return StorageLevel instance
205
*/
206
def apply(flags: Int, replication: Int): StorageLevel
207
208
/**
209
* Creates storage level from ObjectInput (for deserialization)
210
* @param in - ObjectInput stream
211
* @return StorageLevel instance
212
*/
213
def apply(in: ObjectInput): StorageLevel
214
}
215
```
216
217
**Usage Examples:**
218
219
```scala
220
import org.apache.spark.storage.StorageLevel
221
222
// Create from string
223
val level1 = StorageLevel.fromString("MEMORY_AND_DISK_SER_2")
224
225
// Create with full parameters
226
val level2 = StorageLevel(
227
useDisk = true,
228
useMemory = true,
229
useOffHeap = false,
230
deserialized = false,
231
replication = 2
232
)
233
234
// Create with simplified parameters
235
val level3 = StorageLevel(
236
useDisk = false,
237
useMemory = true,
238
deserialized = true,
239
replication = 3
240
)
241
242
// Create from bit flags (advanced usage)
243
val level4 = StorageLevel(0x0F, 1)
244
```
245
246
## Storage Strategy Guidelines
247
248
### Memory-Only Strategies
249
250
Best for small to medium datasets that fit in cluster memory:
251
252
```scala
253
// Fast access, but data lost if executors fail
254
StorageLevel.MEMORY_ONLY
255
256
// Fast access with fault tolerance
257
StorageLevel.MEMORY_ONLY_2
258
259
// Space-efficient for large objects
260
StorageLevel.MEMORY_ONLY_SER
261
```
262
263
### Memory and Disk Strategies
264
265
Best for medium to large datasets with balanced performance:
266
267
```scala
268
// Good balance of speed and reliability
269
StorageLevel.MEMORY_AND_DISK
270
271
// Space-efficient with reliability
272
StorageLevel.MEMORY_AND_DISK_SER
273
274
// High availability for critical data
275
StorageLevel.MEMORY_AND_DISK_2
276
```
277
278
### Disk-Only Strategies
279
280
Best for very large datasets or when memory is constrained:
281
282
```scala
283
// Cheapest storage option
284
StorageLevel.DISK_ONLY
285
286
// Disk storage with fault tolerance
287
StorageLevel.DISK_ONLY_2
288
```
289
290
### Off-Heap Storage
291
292
Best for large datasets when heap pressure is a concern:
293
294
```scala
295
// Reduces GC pressure
296
StorageLevel.OFF_HEAP
297
```
298
299
## Performance Considerations
300
301
### Serialization Trade-offs
302
303
```scala
304
// Faster access but more memory usage
305
StorageLevel.MEMORY_ONLY // Deserialized
306
307
// Slower access but less memory usage
308
StorageLevel.MEMORY_ONLY_SER // Serialized
309
```
310
311
### Replication Trade-offs
312
313
```scala
314
// Faster computation restart on failure
315
StorageLevel.MEMORY_AND_DISK_2 // 2x replication
316
317
// More storage overhead but better availability
318
StorageLevel.DISK_ONLY_3 // 3x replication
319
```
320
321
### Usage with DataFrames and Datasets
322
323
```scala
324
import org.apache.spark.storage.StorageLevel
325
326
// DataFrame persistence
327
val df = spark.read.parquet("path/to/data")
328
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
329
330
// Dataset persistence
331
case class Person(name: String, age: Int)
332
val ds = spark.read.json("path/to/people").as[Person]
333
ds.persist(StorageLevel.MEMORY_ONLY_2)
334
335
// Check current storage level
336
println(s"DataFrame storage level: ${df.storageLevel}")
337
```
338
339
## MemoryMode Enum
340
341
Enumeration defining memory allocation modes for storage operations.
342
343
```java { .api }
344
/**
345
* Memory allocation modes for Spark storage
346
*/
347
public enum MemoryMode {
348
/** Store data in JVM heap memory */
349
ON_HEAP,
350
351
/** Store data in off-heap memory */
352
OFF_HEAP
353
}
354
```
355
356
**Usage Examples:**
357
358
```scala
359
import org.apache.spark.memory.MemoryMode
360
import org.apache.spark.storage.StorageLevel
361
362
// Check memory mode of storage level
363
val level = StorageLevel.MEMORY_ONLY
364
val mode = level.memoryMode // Returns MemoryMode.ON_HEAP
365
366
val offHeapLevel = StorageLevel.OFF_HEAP
367
val offHeapMode = offHeapLevel.memoryMode // Returns MemoryMode.OFF_HEAP
368
369
// Memory mode affects performance characteristics
370
mode match {
371
case MemoryMode.ON_HEAP =>
372
println("Using JVM heap - subject to garbage collection")
373
case MemoryMode.OFF_HEAP =>
374
println("Using off-heap memory - reduced GC pressure")
375
}
376
```
377
378
## Type Definitions
379
380
```scala { .api }
381
// Storage configuration class
382
class StorageLevel private(
383
private var _useDisk: Boolean,
384
private var _useMemory: Boolean,
385
private var _useOffHeap: Boolean,
386
private var _deserialized: Boolean,
387
private var _replication: Int
388
) extends Externalizable {
389
def isValid: Boolean
390
def clone(): StorageLevel
391
def description: String
392
def useDisk: Boolean
393
def useMemory: Boolean
394
def useOffHeap: Boolean
395
def deserialized: Boolean
396
def replication: Int
397
private[spark] def memoryMode: MemoryMode
398
}
399
400
// Predefined storage level constants
401
object StorageLevel {
402
val NONE: StorageLevel
403
val DISK_ONLY: StorageLevel
404
val DISK_ONLY_2: StorageLevel
405
val DISK_ONLY_3: StorageLevel
406
val MEMORY_ONLY: StorageLevel
407
val MEMORY_ONLY_2: StorageLevel
408
val MEMORY_ONLY_SER: StorageLevel
409
val MEMORY_ONLY_SER_2: StorageLevel
410
val MEMORY_AND_DISK: StorageLevel
411
val MEMORY_AND_DISK_2: StorageLevel
412
val MEMORY_AND_DISK_SER: StorageLevel
413
val MEMORY_AND_DISK_SER_2: StorageLevel
414
val OFF_HEAP: StorageLevel
415
}
416
```
417
418
```java { .api }
419
// Memory allocation modes
420
public enum MemoryMode {
421
ON_HEAP,
422
OFF_HEAP
423
}
424
```