0
# Storage Configuration
1
2
Storage level configuration for controlling how RDDs are stored in memory, disk, and off-heap storage with replication support.
3
4
## StorageLevel Class
5
6
The `StorageLevel` class controls RDD storage behavior by specifying memory usage, disk usage, serialization, and replication settings.
7
8
```scala { .api }
9
import org.apache.spark.storage.StorageLevel
10
11
class StorageLevel private(
12
_useDisk: Boolean,
13
_useMemory: Boolean,
14
_useOffHeap: Boolean,
15
_deserialized: Boolean,
16
_replication: Int = 1
17
) extends Externalizable
18
```
19
20
### Properties
21
22
```scala { .api }
23
// Storage medium flags
24
def useDisk: Boolean // Whether to use disk storage
25
def useMemory: Boolean // Whether to use memory storage
26
def useOffHeap: Boolean // Whether to use off-heap storage
27
28
// Serialization settings
29
def deserialized: Boolean // Whether to store deserialized objects
30
31
// Replication settings
32
def replication: Int // Number of replications (1-5)
33
```
34
35
### Methods
36
37
```scala { .api }
38
// Create a copy of this storage level
39
def clone(): StorageLevel
40
41
// Check if storage level configuration is valid
42
def isValid: Boolean
43
44
// Convert to integer representation for serialization
45
def toInt: Int
46
47
// Human readable description
48
def description: String
49
```
50
51
#### Usage Examples
52
53
```scala { .api }
54
import org.apache.spark.storage.StorageLevel
55
56
// Examine storage level properties
57
val level = StorageLevel.MEMORY_AND_DISK_SER_2
58
59
println(s"Uses memory: ${level.useMemory}") // true
60
println(s"Uses disk: ${level.useDisk}") // true
61
println(s"Serialized: ${!level.deserialized}") // true
62
println(s"Replication: ${level.replication}") // 2
63
println(s"Description: ${level.description}") // "Memory Serialized 1x Replicated"
64
65
// Validate storage level
66
if (level.isValid) {
67
println(s"Storage level is valid: ${level.toInt}")
68
}
69
70
// Clone with modifications (typically done internally)
71
val cloned = level.clone()
72
```
73
74
## Pre-defined Storage Levels
75
76
The `StorageLevel` companion object provides common storage configurations:
77
78
### Memory-Only Storage
79
80
```scala { .api }
81
import org.apache.spark.storage.StorageLevel
82
83
// Store in memory only, deserialized
84
val memoryOnly = StorageLevel.MEMORY_ONLY
85
// Properties: useMemory=true, useDisk=false, deserialized=true, replication=1
86
87
// Store in memory only, deserialized, 2x replicated
88
val memoryOnly2 = StorageLevel.MEMORY_ONLY_2
89
// Properties: useMemory=true, useDisk=false, deserialized=true, replication=2
90
91
// Store in memory only, serialized
92
val memoryOnlySer = StorageLevel.MEMORY_ONLY_SER
93
// Properties: useMemory=true, useDisk=false, deserialized=false, replication=1
94
95
// Store in memory only, serialized, 2x replicated
96
val memoryOnlySer2 = StorageLevel.MEMORY_ONLY_SER_2
97
// Properties: useMemory=true, useDisk=false, deserialized=false, replication=2
98
```
99
100
### Disk-Only Storage
101
102
```scala { .api }
103
// Store on disk only
104
val diskOnly = StorageLevel.DISK_ONLY
105
// Properties: useMemory=false, useDisk=true, deserialized=true, replication=1
106
107
// Store on disk only, 2x replicated
108
val diskOnly2 = StorageLevel.DISK_ONLY_2
109
// Properties: useMemory=false, useDisk=true, deserialized=true, replication=2
110
111
// Store on disk only, 3x replicated
112
val diskOnly3 = StorageLevel.DISK_ONLY_3
113
// Properties: useMemory=false, useDisk=true, deserialized=true, replication=3
114
```
115
116
### Memory and Disk Storage
117
118
```scala { .api }
119
// Store in memory, spill to disk, deserialized
120
val memoryAndDisk = StorageLevel.MEMORY_AND_DISK
121
// Properties: useMemory=true, useDisk=true, deserialized=true, replication=1
122
123
// Store in memory and disk, deserialized, 2x replicated
124
val memoryAndDisk2 = StorageLevel.MEMORY_AND_DISK_2
125
// Properties: useMemory=true, useDisk=true, deserialized=true, replication=2
126
127
// Store in memory, spill to disk, serialized
128
val memoryAndDiskSer = StorageLevel.MEMORY_AND_DISK_SER
129
// Properties: useMemory=true, useDisk=true, deserialized=false, replication=1
130
131
// Store in memory and disk, serialized, 2x replicated
132
val memoryAndDiskSer2 = StorageLevel.MEMORY_AND_DISK_SER_2
133
// Properties: useMemory=true, useDisk=true, deserialized=false, replication=2
134
```
135
136
### Special Storage Levels
137
138
```scala { .api }
139
// No storage (not cached)
140
val none = StorageLevel.NONE
141
// Properties: useMemory=false, useDisk=false, deserialized=false, replication=1
142
143
// Off-heap storage
144
val offHeap = StorageLevel.OFF_HEAP
145
// Properties: useMemory=false, useDisk=false, useOffHeap=true, deserialized=false, replication=1
146
```
147
148
## Custom Storage Levels
149
150
Create custom storage levels using the companion object factory methods:
151
152
### Basic Factory Method
153
154
```scala { .api }
155
import org.apache.spark.storage.StorageLevel
156
157
// Create custom storage level
158
val customLevel = StorageLevel(
159
useDisk = true,
160
useMemory = true,
161
useOffHeap = false,
162
deserialized = false,
163
replication = 3
164
)
165
```
166
167
### Alternative Factory Methods
168
169
```scala { .api }
170
// With explicit parameters (no off-heap)
171
val level1 = StorageLevel(
172
useDisk = true,
173
useMemory = true,
174
deserialized = true,
175
replication = 2
176
)
177
178
// From integer flags and replication
179
val level2 = StorageLevel(flags = 15, replication = 2)
180
181
// From ObjectInput (for deserialization)
182
// val level3 = StorageLevel(objectInput)
183
```
184
185
### String Parsing
186
187
```scala { .api }
188
// Parse storage level from string name
189
val parsed1 = StorageLevel.fromString("MEMORY_AND_DISK")
190
val parsed2 = StorageLevel.fromString("MEMORY_ONLY_SER_2")
191
val parsed3 = StorageLevel.fromString("DISK_ONLY_3")
192
193
// Handle invalid strings
194
try {
195
val invalid = StorageLevel.fromString("INVALID_LEVEL")
196
} catch {
197
case ex: IllegalArgumentException =>
198
println(s"Invalid storage level name: ${ex.getMessage}")
199
}
200
```
201
202
## Usage Patterns
203
204
### RDD Caching
205
206
```scala { .api }
207
import org.apache.spark.SparkContext
208
import org.apache.spark.storage.StorageLevel
209
210
val sc = new SparkContext()
211
val rdd = sc.textFile("data.txt")
212
213
// Cache with default memory-only storage
214
val cachedRDD1 = rdd.cache()
215
216
// Cache with specific storage level
217
val cachedRDD2 = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
218
219
// Cache with custom storage level
220
val customLevel = StorageLevel(
221
useDisk = true,
222
useMemory = true,
223
deserialized = false,
224
replication = 2
225
)
226
val cachedRDD3 = rdd.persist(customLevel)
227
```
228
229
### DataFrame/Dataset Caching
230
231
```scala { .api }
232
import org.apache.spark.sql.SparkSession
233
import org.apache.spark.storage.StorageLevel
234
235
val spark = SparkSession.builder().getOrCreate()
236
val df = spark.read.parquet("data.parquet")
237
238
// Cache with default storage level
239
val cachedDF1 = df.cache()
240
241
// Cache with specific storage level
242
val cachedDF2 = df.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
243
```
244
245
### Streaming Applications
246
247
```scala { .api }
248
import org.apache.spark.streaming.StreamingContext
249
import org.apache.spark.storage.StorageLevel
250
251
val ssc = new StreamingContext(sc, batchDuration)
252
val stream = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_2)
253
254
// For high-throughput streams, consider serialized storage
255
val highThroughputStream = ssc.socketTextStream(
256
"localhost",
257
9999,
258
StorageLevel.MEMORY_AND_DISK_SER_2
259
)
260
```
261
262
## Performance Considerations
263
264
### Memory vs. Disk Trade-offs
265
266
```scala { .api }
267
// Fast access, high memory usage (good for iterative algorithms)
268
val fastAccess = StorageLevel.MEMORY_ONLY
269
270
// Balanced performance and memory usage
271
val balanced = StorageLevel.MEMORY_AND_DISK
272
273
// Lower memory usage, slower access (good for large datasets)
274
val memoryEfficient = StorageLevel.MEMORY_AND_DISK_SER
275
```
276
277
### Serialization Impact
278
279
```scala { .api }
280
// Deserialized: faster CPU, more memory usage
281
val cpuOptimized = StorageLevel.MEMORY_ONLY // deserialized=true
282
283
// Serialized: slower CPU, less memory usage
284
val memoryOptimized = StorageLevel.MEMORY_ONLY_SER // deserialized=false
285
286
// Choose based on object size and access patterns
287
val largeObjects = StorageLevel.MEMORY_AND_DISK_SER // Serialize large objects
288
val smallObjects = StorageLevel.MEMORY_AND_DISK // Keep small objects deserialized
289
```
290
291
### Replication Strategies
292
293
```scala { .api }
294
// Single replica: normal performance and storage
295
val standard = StorageLevel.MEMORY_AND_DISK // replication=1
296
297
// High availability: fault tolerance with performance cost
298
val faultTolerant = StorageLevel.MEMORY_AND_DISK_2 // replication=2
299
300
// Critical data: maximum fault tolerance
301
val critical = StorageLevel.DISK_ONLY_3 // replication=3
302
```
303
304
## Best Practices
305
306
### Choosing Storage Levels
307
308
1. **Iterative Algorithms**: Use `MEMORY_ONLY` or `MEMORY_AND_DISK` for datasets accessed multiple times
309
2. **Large Datasets**: Use serialized storage (`*_SER`) to reduce memory footprint
310
3. **Fault Tolerance**: Use replication (`*_2`) for critical intermediate results
311
4. **Memory Pressure**: Use `MEMORY_AND_DISK` to allow spilling to disk
312
313
### Configuration Examples
314
315
```scala { .api }
316
// Interactive analysis - fast access, willing to use memory
317
val interactive = StorageLevel.MEMORY_ONLY
318
319
// Batch processing - balance memory and reliability
320
val batchProcessing = StorageLevel.MEMORY_AND_DISK_SER
321
322
// Stream processing - handle backpressure and failures
323
val streaming = StorageLevel.MEMORY_AND_DISK_2
324
325
// Long-running applications - minimize memory usage
326
val longRunning = StorageLevel.MEMORY_AND_DISK_SER
327
328
// Critical data processing - maximum fault tolerance
329
val criticalData = StorageLevel.MEMORY_AND_DISK_SER_2
330
```
331
332
### Memory Management
333
334
```scala { .api }
335
import org.apache.spark.sql.SparkSession
336
337
val spark = SparkSession.builder().getOrCreate()
338
339
// Monitor storage levels in application
340
def monitorCaching(): Unit = {
341
val storageInfo = spark.sparkContext.statusTracker.getExecutorInfos
342
storageInfo.foreach { info =>
343
println(s"Executor ${info.executorId}: " +
344
s"Memory used: ${info.memoryUsed}, " +
345
s"Memory available: ${info.maxMemory}")
346
}
347
}
348
349
// Unpersist when no longer needed
350
def cleanupCache(rdd: org.apache.spark.rdd.RDD[_]): Unit = {
351
rdd.unpersist(blocking = false)
352
}
353
```
354
355
### Dynamic Storage Level Selection
356
357
```scala { .api }
358
def selectStorageLevel(
359
dataSize: Long,
360
availableMemory: Long,
361
accessFrequency: Int
362
): StorageLevel = {
363
364
val memoryRatio = dataSize.toDouble / availableMemory
365
366
(memoryRatio, accessFrequency) match {
367
// Small dataset, frequent access
368
case (ratio, freq) if ratio < 0.1 && freq > 5 =>
369
StorageLevel.MEMORY_ONLY
370
371
// Medium dataset, frequent access
372
case (ratio, freq) if ratio < 0.5 && freq > 3 =>
373
StorageLevel.MEMORY_AND_DISK
374
375
// Large dataset or infrequent access
376
case (ratio, freq) if ratio >= 0.5 || freq <= 3 =>
377
StorageLevel.MEMORY_AND_DISK_SER
378
379
// Default fallback
380
case _ =>
381
StorageLevel.MEMORY_AND_DISK
382
}
383
}
384
```
385
386
## Integration with Spark Components
387
388
### Catalyst Optimizer Integration
389
390
Storage levels are considered by the Catalyst optimizer when planning query execution:
391
392
```scala { .api }
393
import org.apache.spark.sql.SparkSession
394
import org.apache.spark.storage.StorageLevel
395
396
val spark = SparkSession.builder().getOrCreate()
397
val df = spark.read.parquet("large_table.parquet")
398
399
// Cache intermediate results for complex queries
400
val processedDF = df
401
.filter($"status" === "active")
402
.groupBy($"category")
403
.agg(sum($"amount"))
404
.persist(StorageLevel.MEMORY_AND_DISK_SER)
405
406
// Catalyst will recognize cached data in subsequent operations
407
val result1 = processedDF.filter($"sum(amount)" > 1000)
408
val result2 = processedDF.orderBy($"category")
409
```