or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdexceptions.mdgraphx.mdindex.mdlogging.mdmllib.mdsql.mdstorage.mdstreaming.mdutils.md

storage.mddocs/

0

# Storage Configuration

1

2

Storage level configuration for controlling how RDDs are stored in memory, disk, and off-heap storage with replication support.

3

4

## StorageLevel Class

5

6

The `StorageLevel` class controls RDD storage behavior by specifying memory usage, disk usage, serialization, and replication settings.

7

8

```scala { .api }

9

import org.apache.spark.storage.StorageLevel

10

11

class StorageLevel private(

12

_useDisk: Boolean,

13

_useMemory: Boolean,

14

_useOffHeap: Boolean,

15

_deserialized: Boolean,

16

_replication: Int = 1

17

) extends Externalizable

18

```

19

20

### Properties

21

22

```scala { .api }

23

// Storage medium flags

24

def useDisk: Boolean // Whether to use disk storage

25

def useMemory: Boolean // Whether to use memory storage

26

def useOffHeap: Boolean // Whether to use off-heap storage

27

28

// Serialization settings

29

def deserialized: Boolean // Whether to store deserialized objects

30

31

// Replication settings

32

def replication: Int // Number of replications (1-5)

33

```

34

35

### Methods

36

37

```scala { .api }

38

// Create a copy of this storage level

39

def clone(): StorageLevel

40

41

// Check if storage level configuration is valid

42

def isValid: Boolean

43

44

// Convert to integer representation for serialization

45

def toInt: Int

46

47

// Human readable description

48

def description: String

49

```

50

51

#### Usage Examples

52

53

```scala { .api }

54

import org.apache.spark.storage.StorageLevel

55

56

// Examine storage level properties

57

val level = StorageLevel.MEMORY_AND_DISK_SER_2

58

59

println(s"Uses memory: ${level.useMemory}") // true

60

println(s"Uses disk: ${level.useDisk}") // true

61

println(s"Serialized: ${!level.deserialized}") // true

62

println(s"Replication: ${level.replication}") // 2

63

println(s"Description: ${level.description}") // "Memory Serialized 1x Replicated"

64

65

// Validate storage level

66

if (level.isValid) {

67

println(s"Storage level is valid: ${level.toInt}")

68

}

69

70

// Clone with modifications (typically done internally)

71

val cloned = level.clone()

72

```

73

74

## Pre-defined Storage Levels

75

76

The `StorageLevel` companion object provides common storage configurations:

77

78

### Memory-Only Storage

79

80

```scala { .api }

81

import org.apache.spark.storage.StorageLevel

82

83

// Store in memory only, deserialized

84

val memoryOnly = StorageLevel.MEMORY_ONLY

85

// Properties: useMemory=true, useDisk=false, deserialized=true, replication=1

86

87

// Store in memory only, deserialized, 2x replicated

88

val memoryOnly2 = StorageLevel.MEMORY_ONLY_2

89

// Properties: useMemory=true, useDisk=false, deserialized=true, replication=2

90

91

// Store in memory only, serialized

92

val memoryOnlySer = StorageLevel.MEMORY_ONLY_SER

93

// Properties: useMemory=true, useDisk=false, deserialized=false, replication=1

94

95

// Store in memory only, serialized, 2x replicated

96

val memoryOnlySer2 = StorageLevel.MEMORY_ONLY_SER_2

97

// Properties: useMemory=true, useDisk=false, deserialized=false, replication=2

98

```

99

100

### Disk-Only Storage

101

102

```scala { .api }

103

// Store on disk only

104

val diskOnly = StorageLevel.DISK_ONLY

105

// Properties: useMemory=false, useDisk=true, deserialized=true, replication=1

106

107

// Store on disk only, 2x replicated

108

val diskOnly2 = StorageLevel.DISK_ONLY_2

109

// Properties: useMemory=false, useDisk=true, deserialized=true, replication=2

110

111

// Store on disk only, 3x replicated

112

val diskOnly3 = StorageLevel.DISK_ONLY_3

113

// Properties: useMemory=false, useDisk=true, deserialized=true, replication=3

114

```

115

116

### Memory and Disk Storage

117

118

```scala { .api }

119

// Store in memory, spill to disk, deserialized

120

val memoryAndDisk = StorageLevel.MEMORY_AND_DISK

121

// Properties: useMemory=true, useDisk=true, deserialized=true, replication=1

122

123

// Store in memory and disk, deserialized, 2x replicated

124

val memoryAndDisk2 = StorageLevel.MEMORY_AND_DISK_2

125

// Properties: useMemory=true, useDisk=true, deserialized=true, replication=2

126

127

// Store in memory, spill to disk, serialized

128

val memoryAndDiskSer = StorageLevel.MEMORY_AND_DISK_SER

129

// Properties: useMemory=true, useDisk=true, deserialized=false, replication=1

130

131

// Store in memory and disk, serialized, 2x replicated

132

val memoryAndDiskSer2 = StorageLevel.MEMORY_AND_DISK_SER_2

133

// Properties: useMemory=true, useDisk=true, deserialized=false, replication=2

134

```

135

136

### Special Storage Levels

137

138

```scala { .api }

139

// No storage (not cached)

140

val none = StorageLevel.NONE

141

// Properties: useMemory=false, useDisk=false, deserialized=false, replication=1

142

143

// Off-heap storage

144

val offHeap = StorageLevel.OFF_HEAP

145

// Properties: useMemory=false, useDisk=false, useOffHeap=true, deserialized=false, replication=1

146

```

147

148

## Custom Storage Levels

149

150

Create custom storage levels using the companion object factory methods:

151

152

### Basic Factory Method

153

154

```scala { .api }

155

import org.apache.spark.storage.StorageLevel

156

157

// Create custom storage level

158

val customLevel = StorageLevel(

159

useDisk = true,

160

useMemory = true,

161

useOffHeap = false,

162

deserialized = false,

163

replication = 3

164

)

165

```

166

167

### Alternative Factory Methods

168

169

```scala { .api }

170

// With explicit parameters (no off-heap)

171

val level1 = StorageLevel(

172

useDisk = true,

173

useMemory = true,

174

deserialized = true,

175

replication = 2

176

)

177

178

// From integer flags and replication

179

val level2 = StorageLevel(flags = 15, replication = 2)

180

181

// From ObjectInput (for deserialization)

182

// val level3 = StorageLevel(objectInput)

183

```

184

185

### String Parsing

186

187

```scala { .api }

188

// Parse storage level from string name

189

val parsed1 = StorageLevel.fromString("MEMORY_AND_DISK")

190

val parsed2 = StorageLevel.fromString("MEMORY_ONLY_SER_2")

191

val parsed3 = StorageLevel.fromString("DISK_ONLY_3")

192

193

// Handle invalid strings

194

try {

195

val invalid = StorageLevel.fromString("INVALID_LEVEL")

196

} catch {

197

case ex: IllegalArgumentException =>

198

println(s"Invalid storage level name: ${ex.getMessage}")

199

}

200

```

201

202

## Usage Patterns

203

204

### RDD Caching

205

206

```scala { .api }

207

import org.apache.spark.SparkContext

208

import org.apache.spark.storage.StorageLevel

209

210

val sc = new SparkContext()

211

val rdd = sc.textFile("data.txt")

212

213

// Cache with default memory-only storage

214

val cachedRDD1 = rdd.cache()

215

216

// Cache with specific storage level

217

val cachedRDD2 = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

218

219

// Cache with custom storage level

220

val customLevel = StorageLevel(

221

useDisk = true,

222

useMemory = true,

223

deserialized = false,

224

replication = 2

225

)

226

val cachedRDD3 = rdd.persist(customLevel)

227

```

228

229

### DataFrame/Dataset Caching

230

231

```scala { .api }

232

import org.apache.spark.sql.SparkSession

233

import org.apache.spark.storage.StorageLevel

234

235

val spark = SparkSession.builder().getOrCreate()

236

val df = spark.read.parquet("data.parquet")

237

238

// Cache with default storage level

239

val cachedDF1 = df.cache()

240

241

// Cache with specific storage level

242

val cachedDF2 = df.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

243

```

244

245

### Streaming Applications

246

247

```scala { .api }

248

import org.apache.spark.streaming.StreamingContext

249

import org.apache.spark.storage.StorageLevel

250

251

val ssc = new StreamingContext(sc, batchDuration)

252

val stream = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_2)

253

254

// For high-throughput streams, consider serialized storage

255

val highThroughputStream = ssc.socketTextStream(

256

"localhost",

257

9999,

258

StorageLevel.MEMORY_AND_DISK_SER_2

259

)

260

```

261

262

## Performance Considerations

263

264

### Memory vs. Disk Trade-offs

265

266

```scala { .api }

267

// Fast access, high memory usage (good for iterative algorithms)

268

val fastAccess = StorageLevel.MEMORY_ONLY

269

270

// Balanced performance and memory usage

271

val balanced = StorageLevel.MEMORY_AND_DISK

272

273

// Lower memory usage, slower access (good for large datasets)

274

val memoryEfficient = StorageLevel.MEMORY_AND_DISK_SER

275

```

276

277

### Serialization Impact

278

279

```scala { .api }

280

// Deserialized: faster CPU, more memory usage

281

val cpuOptimized = StorageLevel.MEMORY_ONLY // deserialized=true

282

283

// Serialized: slower CPU, less memory usage

284

val memoryOptimized = StorageLevel.MEMORY_ONLY_SER // deserialized=false

285

286

// Choose based on object size and access patterns

287

val largeObjects = StorageLevel.MEMORY_AND_DISK_SER // Serialize large objects

288

val smallObjects = StorageLevel.MEMORY_AND_DISK // Keep small objects deserialized

289

```

290

291

### Replication Strategies

292

293

```scala { .api }

294

// Single replica: normal performance and storage

295

val standard = StorageLevel.MEMORY_AND_DISK // replication=1

296

297

// High availability: fault tolerance with performance cost

298

val faultTolerant = StorageLevel.MEMORY_AND_DISK_2 // replication=2

299

300

// Critical data: maximum fault tolerance

301

val critical = StorageLevel.DISK_ONLY_3 // replication=3

302

```

303

304

## Best Practices

305

306

### Choosing Storage Levels

307

308

1. **Iterative Algorithms**: Use `MEMORY_ONLY` or `MEMORY_AND_DISK` for datasets accessed multiple times

309

2. **Large Datasets**: Use serialized storage (`*_SER`) to reduce memory footprint

310

3. **Fault Tolerance**: Use replication (`*_2`) for critical intermediate results

311

4. **Memory Pressure**: Use `MEMORY_AND_DISK` to allow spilling to disk

312

313

### Configuration Examples

314

315

```scala { .api }

316

// Interactive analysis - fast access, willing to use memory

317

val interactive = StorageLevel.MEMORY_ONLY

318

319

// Batch processing - balance memory and reliability

320

val batchProcessing = StorageLevel.MEMORY_AND_DISK_SER

321

322

// Stream processing - handle backpressure and failures

323

val streaming = StorageLevel.MEMORY_AND_DISK_2

324

325

// Long-running applications - minimize memory usage

326

val longRunning = StorageLevel.MEMORY_AND_DISK_SER

327

328

// Critical data processing - maximum fault tolerance

329

val criticalData = StorageLevel.MEMORY_AND_DISK_SER_2

330

```

331

332

### Memory Management

333

334

```scala { .api }

335

import org.apache.spark.sql.SparkSession

336

337

val spark = SparkSession.builder().getOrCreate()

338

339

// Monitor storage levels in application

340

def monitorCaching(): Unit = {

341

val storageInfo = spark.sparkContext.statusTracker.getExecutorInfos

342

storageInfo.foreach { info =>

343

println(s"Executor ${info.executorId}: " +

344

s"Memory used: ${info.memoryUsed}, " +

345

s"Memory available: ${info.maxMemory}")

346

}

347

}

348

349

// Unpersist when no longer needed

350

def cleanupCache(rdd: org.apache.spark.rdd.RDD[_]): Unit = {

351

rdd.unpersist(blocking = false)

352

}

353

```

354

355

### Dynamic Storage Level Selection

356

357

```scala { .api }

358

def selectStorageLevel(

359

dataSize: Long,

360

availableMemory: Long,

361

accessFrequency: Int

362

): StorageLevel = {

363

364

val memoryRatio = dataSize.toDouble / availableMemory

365

366

(memoryRatio, accessFrequency) match {

367

// Small dataset, frequent access

368

case (ratio, freq) if ratio < 0.1 && freq > 5 =>

369

StorageLevel.MEMORY_ONLY

370

371

// Medium dataset, frequent access

372

case (ratio, freq) if ratio < 0.5 && freq > 3 =>

373

StorageLevel.MEMORY_AND_DISK

374

375

// Large dataset or infrequent access

376

case (ratio, freq) if ratio >= 0.5 || freq <= 3 =>

377

StorageLevel.MEMORY_AND_DISK_SER

378

379

// Default fallback

380

case _ =>

381

StorageLevel.MEMORY_AND_DISK

382

}

383

}

384

```

385

386

## Integration with Spark Components

387

388

### Catalyst Optimizer Integration

389

390

Storage levels are considered by the Catalyst optimizer when planning query execution:

391

392

```scala { .api }

393

import org.apache.spark.sql.SparkSession

394

import org.apache.spark.storage.StorageLevel

395

396

val spark = SparkSession.builder().getOrCreate()

397

val df = spark.read.parquet("large_table.parquet")

398

399

// Cache intermediate results for complex queries

400

val processedDF = df

401

.filter($"status" === "active")

402

.groupBy($"category")

403

.agg(sum($"amount"))

404

.persist(StorageLevel.MEMORY_AND_DISK_SER)

405

406

// Catalyst will recognize cached data in subsequent operations

407

val result1 = processedDF.filter($"sum(amount)" > 1000)

408

val result2 = processedDF.orderBy($"category")

409

```