or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

storage-persistence.mddocs/

0

# Storage and Persistence

1

2

Data storage levels and caching mechanisms for optimizing repeated access to RDDs with configurable memory and disk usage strategies.

3

4

## Capabilities

5

6

### StorageLevel

7

8

Configuration for how RDDs are stored when persisted, controlling memory usage, disk usage, serialization, and replication.

9

10

```scala { .api }

11

/**

12

* Storage level configuration for RDD persistence

13

* @param useDisk whether to use disk storage

14

* @param useMemory whether to use memory storage

15

* @param useOffHeap whether to use off-heap memory

16

* @param deserialized whether to store in deserialized format

17

* @param replication number of replicas to maintain

18

*/

19

class StorageLevel private(

20

private var _useDisk: Boolean,

21

private var _useMemory: Boolean,

22

private var _useOffHeap: Boolean,

23

private var _deserialized: Boolean,

24

private var _replication: Int

25

) extends Externalizable {

26

27

def useDisk: Boolean

28

def useMemory: Boolean

29

def useOffHeap: Boolean

30

def deserialized: Boolean

31

def replication: Int

32

33

/** Create a copy with different replication factor */

34

def clone(newReplication: Int): StorageLevel

35

36

/** Check if this storage level is valid */

37

def isValid: Boolean

38

}

39

40

object StorageLevel {

41

/** Memory only, deserialized */

42

val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)

43

44

/** Memory only, serialized */

45

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)

46

47

/** Memory and disk spillover, deserialized */

48

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)

49

50

/** Memory and disk spillover, serialized */

51

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)

52

53

/** Disk only */

54

val DISK_ONLY = new StorageLevel(true, false, false, false, 1)

55

56

/** Memory only with 2x replication, deserialized */

57

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

58

59

/** Memory and disk with 2x replication, deserialized */

60

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, false, 2)

61

62

/** Off-heap memory only */

63

val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

64

65

/** No storage (unpersist) */

66

val NONE = new StorageLevel(false, false, false, false, 1)

67

68

/** Apply method for creating custom storage levels */

69

def apply(

70

useDisk: Boolean,

71

useMemory: Boolean,

72

useOffHeap: Boolean = false,

73

deserialized: Boolean = true,

74

replication: Int = 1

75

): StorageLevel

76

}

77

```

78

79

### RDD Persistence Methods

80

81

Methods available on RDD for controlling persistence and caching behavior.

82

83

```scala { .api }

84

abstract class RDD[T: ClassTag] {

85

/** Persist RDD with specified storage level */

86

def persist(newLevel: StorageLevel = MEMORY_ONLY): this.type

87

88

/** Cache RDD in memory (shortcut for persist(MEMORY_ONLY)) */

89

def cache(): this.type

90

91

/** Remove persisted RDD from storage */

92

def unpersist(blocking: Boolean = false): this.type

93

94

/** Mark RDD for checkpointing to reliable storage */

95

def checkpoint(): Unit

96

97

/** Check if RDD is checkpointed */

98

def isCheckpointed: Boolean

99

100

/** Get checkpoint file if available */

101

def getCheckpointFile: Option[String]

102

103

/** Get current storage level */

104

def getStorageLevel: StorageLevel

105

106

/** Check if RDD is cached */

107

def isCached: Boolean = getStorageLevel != StorageLevel.NONE

108

}

109

```

110

111

### BlockManager

112

113

Core storage management component handling data blocks across the cluster.

114

115

```scala { .api }

116

/**

117

* Manager for reading and writing data blocks

118

*/

119

class BlockManager(

120

executorId: String,

121

rpcEnv: RpcEnv,

122

master: BlockManagerMaster,

123

serializerManager: SerializerManager,

124

conf: SparkConf,

125

memoryManager: MemoryManager,

126

mapOutputTracker: MapOutputTracker,

127

shuffleManager: ShuffleManager,

128

blockTransferService: BlockTransferService,

129

securityManager: SecurityManager,

130

numUsableCores: Int

131

) extends BlockDataManager with BlockEvictionHandler with Logging {

132

133

/** Initialize block manager */

134

def initialize(appId: String): Unit

135

136

/** Get local block data */

137

def getBlockData(blockId: BlockId): ManagedBuffer

138

139

/** Put block data */

140

def putBlockData(

141

blockId: BlockId,

142

data: ManagedBuffer,

143

level: StorageLevel,

144

classTag: ClassTag[_]

145

): Boolean

146

147

/** Get block from local storage or remote */

148

def get[T: ClassTag](blockId: BlockId): Option[BlockResult]

149

150

/** Put block into storage */

151

def putSingle[T: ClassTag](

152

blockId: BlockId,

153

value: T,

154

level: StorageLevel,

155

tellMaster: Boolean = true

156

): Boolean

157

158

/** Put iterator of values into storage */

159

def putIterator[T: ClassTag](

160

blockId: BlockId,

161

values: Iterator[T],

162

level: StorageLevel,

163

tellMaster: Boolean = true

164

): Boolean

165

166

/** Remove block from storage */

167

def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit

168

169

/** Get memory status */

170

def memoryStatus: Map[BlockId, (StorageLevel, Long, Long)]

171

172

/** Get disk usage */

173

def diskBlockSize(blockId: BlockId): Long

174

}

175

```

176

177

### Block Identifiers

178

179

Type-safe identifiers for different types of data blocks.

180

181

```scala { .api }

182

/**

183

* Base class for block identifiers

184

*/

185

sealed abstract class BlockId {

186

def name: String

187

def asRDDId: Option[RDDBlockId] = None

188

}

189

190

/** Block identifier for RDD blocks */

191

case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {

192

override def name: String = s"rdd_${rddId}_$splitIndex"

193

override def asRDDId: Option[RDDBlockId] = Some(this)

194

}

195

196

/** Block identifier for shuffle blocks */

197

case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {

198

override def name: String = s"shuffle_${shuffleId}_${mapId}_$reduceId"

199

}

200

201

/** Block identifier for shuffle data blocks */

202

case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {

203

override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.data"

204

}

205

206

/** Block identifier for shuffle index blocks */

207

case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {

208

override def name: String = s"shuffle_${shuffleId}_${mapId}_${reduceId}.index"

209

}

210

211

/** Block identifier for broadcast blocks */

212

case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {

213

override def name: String = s"broadcast_${broadcastId}${if (field.nonEmpty) "_" + field else ""}"

214

}

215

216

/** Block identifier for task result blocks */

217

case class TaskResultBlockId(taskId: Long) extends BlockId {

218

override def name: String = s"taskresult_$taskId"

219

}

220

221

/** Block identifier for stream blocks */

222

case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {

223

override def name: String = s"input-${streamId}-$uniqueId"

224

}

225

```

226

227

### Checkpointing

228

229

Mechanism for saving RDD lineage to reliable storage for fault tolerance.

230

231

```scala { .api }

232

/**

233

* Checkpoint data management

234

*/

235

abstract class CheckpointData[T: ClassTag](rdd: RDD[T]) extends Serializable {

236

/** Get checkpoint state */

237

def cpState: CheckpointState

238

239

/** Get checkpoint RDD */

240

def checkpointRDD: Option[CheckpointRDD[T]]

241

242

/** Materialize checkpoint */

243

def checkpoint(): Unit

244

245

/** Check if checkpointed */

246

def isCheckpointed: Boolean

247

248

/** Get checkpoint file */

249

def getCheckpointDir: Option[String]

250

}

251

252

/**

253

* Checkpoint states

254

*/

255

object CheckpointState extends Enumeration {

256

type CheckpointState = Value

257

val Initialized, CheckpointingInProgress, Checkpointed = Value

258

}

259

260

/**

261

* RDD representing checkpointed data

262

*/

263

abstract class CheckpointRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

264

/** Get checkpoint directory */

265

def getCheckpointDir: String

266

}

267

268

/**

269

* Reliable checkpoint RDD (HDFS, S3, etc.)

270

*/

271

class ReliableCheckpointRDD[T: ClassTag](

272

sc: SparkContext,

273

checkpointPath: String,

274

partitioner: Option[Partitioner] = None

275

) extends CheckpointRDD[T](sc)

276

277

/**

278

* Local checkpoint RDD (local filesystem)

279

*/

280

class LocalCheckpointRDD[T: ClassTag](

281

sc: SparkContext,

282

checkpointPath: String,

283

originalRDD: RDD[T],

284

partitioner: Option[Partitioner] = None

285

) extends CheckpointRDD[T](sc)

286

```

287

288

### Memory Management

289

290

Components managing memory allocation for caching and execution.

291

292

```scala { .api }

293

/**

294

* Abstract memory manager for Spark

295

*/

296

abstract class MemoryManager(

297

conf: SparkConf,

298

numCores: Int,

299

onHeapStorageMemory: Long,

300

onHeapExecutionMemory: Long

301

) extends Logging {

302

303

/** Maximum memory available for storage */

304

def maxOnHeapStorageMemory: Long

305

def maxOffHeapStorageMemory: Long

306

307

/** Acquire memory for storage */

308

def acquireStorageMemory(

309

blockId: BlockId,

310

numBytes: Long,

311

memoryMode: MemoryMode

312

): Boolean

313

314

/** Acquire memory for execution */

315

def acquireExecutionMemory(

316

numBytes: Long,

317

taskAttemptId: Long,

318

memoryMode: MemoryMode

319

): Long

320

321

/** Release storage memory */

322

def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit

323

324

/** Release execution memory */

325

def releaseExecutionMemory(

326

numBytes: Long,

327

taskAttemptId: Long,

328

memoryMode: MemoryMode

329

): Unit

330

331

/** Get current storage memory usage */

332

def storageMemoryUsed: Long

333

334

/** Get current execution memory usage */

335

def executionMemoryUsed: Long

336

}

337

338

/**

339

* Memory allocation modes

340

*/

341

object MemoryMode extends Enumeration {

342

type MemoryMode = Value

343

val ON_HEAP, OFF_HEAP = Value

344

}

345

346

/**

347

* Unified memory manager (default in Spark 1.6+)

348

*/

349

class UnifiedMemoryManager(

350

conf: SparkConf,

351

maxHeapMemory: Long,

352

onHeapStorageRegionSize: Long,

353

numCores: Int

354

) extends MemoryManager(

355

conf,

356

numCores,

357

onHeapStorageRegionSize,

358

maxHeapMemory - onHeapStorageRegionSize

359

)

360

```

361

362

**Usage Examples:**

363

364

```scala

365

import org.apache.spark.{SparkContext, SparkConf}

366

import org.apache.spark.storage.StorageLevel

367

368

val sc = new SparkContext(new SparkConf().setAppName("Storage Example"))

369

370

// Create RDD

371

val data = sc.parallelize(1 to 1000000)

372

val processed = data.map(_ * 2).filter(_ > 100)

373

374

// Different persistence strategies

375

processed.persist(StorageLevel.MEMORY_ONLY) // Cache in memory only

376

processed.persist(StorageLevel.MEMORY_AND_DISK) // Memory with disk spillover

377

processed.persist(StorageLevel.DISK_ONLY) // Disk only storage

378

processed.persist(StorageLevel.MEMORY_ONLY_SER) // Serialized in memory

379

380

// Cache shortcut

381

processed.cache() // Equivalent to persist(StorageLevel.MEMORY_ONLY)

382

383

// Use cached RDD multiple times

384

val count1 = processed.count()

385

val count2 = processed.filter(_ < 1000).count() // Reuses cache

386

387

// Checkpointing for fault tolerance

388

sc.setCheckpointDir("hdfs://checkpoints")

389

processed.checkpoint()

390

processed.count() // Triggers checkpoint

391

392

// Check storage status

393

println(s"Storage level: ${processed.getStorageLevel}")

394

println(s"Is cached: ${processed.isCached}")

395

println(s"Is checkpointed: ${processed.isCheckpointed}")

396

397

// Cleanup

398

processed.unpersist()

399

sc.stop()

400

```

401

402

## Performance Considerations

403

404

- **MEMORY_ONLY**: Fastest access but limited by available RAM

405

- **MEMORY_AND_DISK**: Good balance with automatic spillover

406

- **DISK_ONLY**: Slower but handles large datasets

407

- **Serialized formats**: More compact but require deserialization

408

- **Replication**: Improves fault tolerance at cost of storage space

409

- **Checkpointing**: Breaks lineage chains for long dependency chains