or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcasting-accumulators.mdindex.mdjava-api.mdrdd-operations.mdspark-context.mdstorage-persistence.md

storage-persistence.mddocs/

0

# Storage and Persistence

1

2

Spark's storage and persistence capabilities allow RDDs to be cached in memory or persisted to disk with configurable storage levels, enabling significant performance improvements for iterative algorithms and interactive analytics.

3

4

## RDD Persistence Methods

5

6

### Basic Persistence Operations

7

8

```scala { .api }

9

abstract class RDD[T] {

10

def persist(newLevel: StorageLevel): this.type

11

def persist(): this.type // Uses MEMORY_ONLY

12

def cache(): this.type // Alias for persist() with MEMORY_ONLY

13

def unpersist(blocking: Boolean = true): this.type

14

def getStorageLevel: StorageLevel

15

}

16

```

17

18

### Checkpointing

19

20

```scala { .api }

21

abstract class RDD[T] {

22

def checkpoint(): Unit

23

def localCheckpoint(): this.type

24

def isCheckpointed: Boolean

25

def getCheckpointFile: Option[String]

26

}

27

```

28

29

## StorageLevel

30

31

Defines how RDDs are stored across memory, disk, serialization, and replication options.

32

33

### Constructor

34

35

```scala { .api }

36

class StorageLevel(

37

useDisk: Boolean,

38

useMemory: Boolean,

39

useOffHeap: Boolean,

40

deserialized: Boolean,

41

replication: Int

42

)

43

```

44

45

### Properties

46

47

```scala { .api }

48

class StorageLevel {

49

def useDisk: Boolean

50

def useMemory: Boolean

51

def useOffHeap: Boolean

52

def deserialized: Boolean

53

def replication: Int

54

def isValid: Boolean

55

def clone: StorageLevel

56

}

57

```

58

59

### Predefined Storage Levels

60

61

```scala { .api }

62

object StorageLevel {

63

val NONE: StorageLevel

64

val DISK_ONLY: StorageLevel

65

val DISK_ONLY_2: StorageLevel

66

val MEMORY_ONLY: StorageLevel

67

val MEMORY_ONLY_2: StorageLevel

68

val MEMORY_ONLY_SER: StorageLevel

69

val MEMORY_ONLY_SER_2: StorageLevel

70

val MEMORY_AND_DISK: StorageLevel

71

val MEMORY_AND_DISK_2: StorageLevel

72

val MEMORY_AND_DISK_SER: StorageLevel

73

val MEMORY_AND_DISK_SER_2: StorageLevel

74

val OFF_HEAP: StorageLevel

75

}

76

```

77

78

## RDDInfo

79

80

Information about RDD storage status and statistics.

81

82

```scala { .api }

83

case class RDDInfo(

84

id: Int,

85

name: String,

86

numPartitions: Int,

87

storageLevel: StorageLevel,

88

numCachedPartitions: Int,

89

memSize: Long,

90

diskSize: Long,

91

externalBlockStoreSize: Long

92

) {

93

def isCached: Boolean

94

}

95

```

96

97

## Storage Management Methods

98

99

### SparkContext Storage Information

100

101

```scala { .api }

102

class SparkContext {

103

def getPersistentRDDs: Map[Int, RDD[_]]

104

def getRDDStorageInfo: Array[RDDInfo]

105

def getExecutorMemoryStatus: Map[String, (Long, Long)]

106

}

107

```

108

109

## Storage Level Details

110

111

### Memory-Only Storage

112

113

- **MEMORY_ONLY**: Store RDD as deserialized Java objects in the JVM heap

114

- **MEMORY_ONLY_SER**: Store RDD as serialized Java objects (more space-efficient)

115

- **MEMORY_ONLY_2**: Store RDD as deserialized objects with 2x replication

116

- **MEMORY_ONLY_SER_2**: Store RDD as serialized objects with 2x replication

117

118

### Disk-Only Storage

119

120

- **DISK_ONLY**: Store RDD partitions only on disk

121

- **DISK_ONLY_2**: Store RDD partitions on disk with 2x replication

122

123

### Memory and Disk Storage

124

125

- **MEMORY_AND_DISK**: Store RDD in memory, spill to disk if memory is insufficient

126

- **MEMORY_AND_DISK_SER**: Store RDD in memory as serialized objects, spill to disk

127

- **MEMORY_AND_DISK_2**: Memory and disk storage with 2x replication

128

- **MEMORY_AND_DISK_SER_2**: Memory and disk serialized storage with 2x replication

129

130

### Off-Heap Storage

131

132

- **OFF_HEAP**: Store RDD in off-heap memory (requires configuration)

133

134

### No Storage

135

136

- **NONE**: Do not persist the RDD

137

138

## Usage Examples

139

140

### Basic Persistence

141

142

```scala

143

val data = sc.parallelize(1 to 1000000)

144

145

// Cache in memory (most common)

146

val cachedData = data.cache()

147

148

// Equivalent to cache()

149

val persistedData = data.persist()

150

val memoryOnlyData = data.persist(StorageLevel.MEMORY_ONLY)

151

152

// Use the cached RDD multiple times

153

val sum1 = cachedData.sum()

154

val sum2 = cachedData.map(_ * 2).sum() // Uses cached data

155

156

// Remove from cache when done

157

cachedData.unpersist()

158

```

159

160

### Different Storage Levels

161

162

```scala

163

val largeRDD = sc.parallelize(1 to 10000000)

164

165

// Memory and disk with spillover

166

val spillableRDD = largeRDD.persist(StorageLevel.MEMORY_AND_DISK)

167

168

// Serialized storage (less memory usage, more CPU)

169

val serializedRDD = largeRDD.persist(StorageLevel.MEMORY_ONLY_SER)

170

171

// Disk-only storage

172

val diskRDD = largeRDD.persist(StorageLevel.DISK_ONLY)

173

174

// High availability with replication

175

val replicatedRDD = largeRDD.persist(StorageLevel.MEMORY_AND_DISK_2)

176

```

177

178

### Off-Heap Storage

179

180

```scala

181

// Requires spark.memory.offHeap.enabled=true and spark.memory.offHeap.size

182

val offHeapRDD = largeRDD.persist(StorageLevel.OFF_HEAP)

183

```

184

185

### Iterative Algorithms

186

187

```scala

188

// K-means clustering example

189

var centroids = sc.parallelize(initialCentroids).cache()

190

val points = sc.textFile("points.txt")

191

.map(parsePoint)

192

.cache() // Cache input data

193

194

for (i <- 1 to maxIterations) {

195

val closest = points.map(p => (closestCentroid(p, centroids.collect()), p))

196

.groupByKey()

197

198

val newCentroids = closest.map { case (centroidIndex, points) =>

199

(centroidIndex, averagePoints(points))

200

}

201

202

// Update centroids

203

centroids.unpersist()

204

centroids = newCentroids.cache()

205

}

206

207

// Clean up

208

points.unpersist()

209

centroids.unpersist()

210

```

211

212

### Monitoring Storage Usage

213

214

```scala

215

val rdd = sc.parallelize(1 to 1000).cache()

216

rdd.count() // Trigger caching

217

218

// Get storage information

219

val storageInfo = sc.getRDDStorageInfo

220

storageInfo.foreach { info =>

221

println(s"RDD ${info.id}: ${info.name}")

222

println(s" Cached partitions: ${info.numCachedPartitions}/${info.numPartitions}")

223

println(s" Memory size: ${info.memSize} bytes")

224

println(s" Disk size: ${info.diskSize} bytes")

225

println(s" Storage level: ${info.storageLevel}")

226

}

227

228

// Get executor memory status

229

val memoryStatus = sc.getExecutorMemoryStatus

230

memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>

231

val usedMem = maxMem - remainingMem

232

println(s"Executor $executorId: ${usedMem}/${maxMem} bytes used")

233

}

234

235

// Get all persistent RDDs

236

val persistentRDDs = sc.getPersistentRDDs

237

println(s"Number of persistent RDDs: ${persistentRDDs.size}")

238

```

239

240

### Checkpointing

241

242

```scala

243

// Set checkpoint directory (required before checkpointing)

244

sc.setCheckpointDir("hdfs://path/to/checkpoint")

245

246

val rdd = sc.parallelize(1 to 1000)

247

.map(_ * 2)

248

.filter(_ > 100)

249

250

// Mark for checkpointing

251

rdd.checkpoint()

252

253

// Trigger computation and checkpointing

254

val result = rdd.count()

255

256

// Check if checkpointed

257

if (rdd.isCheckpointed) {

258

println(s"RDD checkpointed to: ${rdd.getCheckpointFile}")

259

}

260

261

// Local checkpointing (faster but less fault-tolerant)

262

val localRDD = sc.parallelize(1 to 1000)

263

localRDD.localCheckpoint()

264

```

265

266

### Custom Storage Levels

267

268

```scala

269

// Create custom storage level

270

val customLevel = new StorageLevel(

271

useDisk = true,

272

useMemory = true,

273

useOffHeap = false,

274

deserialized = false, // Serialized

275

replication = 3 // Triple replication

276

)

277

278

val rdd = sc.parallelize(criticalData).persist(customLevel)

279

```

280

281

## Performance Considerations

282

283

### When to Cache

284

285

- **Iterative algorithms**: Cache datasets that are accessed multiple times

286

- **Interactive analysis**: Cache datasets for exploratory queries

287

- **Branching workflows**: Cache shared datasets before multiple transformations

288

- **Expensive computations**: Cache results of costly operations

289

290

### When Not to Cache

291

292

- **One-time use**: Don't cache RDDs accessed only once

293

- **Memory pressure**: Avoid caching when memory is limited

294

- **Large datasets**: Consider serialized or disk storage for large datasets

295

- **Simple computations**: Don't cache if recomputation is cheaper than storage

296

297

### Storage Level Selection

298

299

#### Choose MEMORY_ONLY when:

300

- Dataset fits comfortably in memory

301

- Fast access is critical

302

- CPU for deserialization is not a concern

303

304

#### Choose MEMORY_ONLY_SER when:

305

- Memory is limited but dataset should stay in memory

306

- Network/disk I/O should be minimized

307

- CPU for serialization/deserialization is acceptable

308

309

#### Choose MEMORY_AND_DISK when:

310

- Dataset might not fit in memory

311

- Fault tolerance is important

312

- Balance between memory and disk performance is needed

313

314

#### Choose DISK_ONLY when:

315

- Dataset is too large for memory

316

- Memory should be reserved for other operations

317

- Disk I/O performance is acceptable

318

319

#### Choose replication levels (2 or higher) when:

320

- High availability is required

321

- Cluster has frequent node failures

322

- Cost of recomputation is very high

323

324

### Memory Management

325

326

```scala

327

// Check RDD memory footprint before caching large datasets

328

val estimatedSize = SizeEstimator.estimate(rdd.first()) * rdd.count()

329

println(s"Estimated RDD size: ${estimatedSize} bytes")

330

331

// Configure memory fractions for storage

332

// spark.memory.fraction = 0.6 (fraction of heap for execution + storage)

333

// spark.memory.storageFraction = 0.5 (fraction of above for storage)

334

335

// Monitor and tune garbage collection

336

// Use G1GC for large heaps: -XX:+UseG1GC

337

// Tune GC: -XX:G1HeapRegionSize=32m

338

```

339

340

## Configuration Properties

341

342

### Memory Management

343

344

- `spark.memory.fraction` - Fraction of heap space for execution and storage (default: 0.6)

345

- `spark.memory.storageFraction` - Fraction of storage memory for caching (default: 0.5)

346

- `spark.memory.offHeap.enabled` - Enable off-heap memory (default: false)

347

- `spark.memory.offHeap.size` - Off-heap memory size (default: 0)

348

349

### Storage Behavior

350

351

- `spark.rdd.compress` - Compress serialized RDD partitions (default: false)

352

- `spark.serializer` - Serializer for cached objects (KryoSerializer recommended)

353

- `spark.storage.memoryFraction` - (Legacy) Fraction of JVM heap for RDD storage

354

- `spark.storage.unrollFraction` - (Legacy) Fraction of storage memory for unrolling

355

356

### Checkpointing

357

358

- `spark.checkpoint.compress` - Compress checkpoint files (default: false)

359

360

## Best Practices

361

362

### Caching Strategy

363

364

1. **Cache after expensive operations** but before multiple uses

365

2. **Use appropriate storage levels** based on memory availability and access patterns

366

3. **Monitor storage usage** and adjust cache strategy accordingly

367

4. **Unpersist RDDs** when no longer needed to free memory

368

5. **Consider serialization overhead** when choosing between serialized and deserialized storage

369

370

### Memory Optimization

371

372

1. **Use Kryo serialization** for better performance with serialized storage

373

2. **Tune memory allocation** between storage and execution

374

3. **Monitor GC overhead** and tune garbage collection settings

375

4. **Consider off-heap storage** for very large datasets

376

5. **Profile memory usage** with tools like Spark UI and OS monitoring

377

378

### Fault Tolerance

379

380

1. **Use replication** for critical datasets that are expensive to recompute

381

2. **Set up checkpointing** for long lineages or iterative algorithms

382

3. **Balance fault tolerance** with storage overhead and performance

383

4. **Choose appropriate checkpoint intervals** to avoid excessive overhead

384

5. **Monitor checkpoint performance** and adjust directory and compression settings

385

386

## Important Notes

387

388

- **Caching is lazy** - RDD is not cached until an action is executed

389

- **Storage levels are immutable** - cannot be changed after persistence

390

- **Unpersist is asynchronous** by default - use `blocking = true` for synchronous unpersist

391

- **Memory pressure triggers eviction** - least recently used RDD partitions are removed first

392

- **Checkpointing truncates lineage** - provides fault tolerance for long computation chains

393

- **Off-heap storage requires configuration** - must enable and allocate off-heap memory

394

- **Replication doubles storage overhead** - consider trade-offs between availability and resources