or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md

storage-caching.mddocs/

0

# Storage and Caching

1

2

## StorageLevel

3

4

Storage levels define how RDDs are persisted across memory, disk, and off-heap storage with various serialization and replication options.

5

6

```scala { .api }

7

class StorageLevel private(

8

private var _useDisk: Boolean,

9

private var _useMemory: Boolean,

10

private var _useOffHeap: Boolean,

11

private var _deserialized: Boolean,

12

private var _replication: Int

13

) {

14

def useDisk: Boolean

15

def useMemory: Boolean

16

def useOffHeap: Boolean

17

def deserialized: Boolean

18

def replication: Int

19

20

def clone(): StorageLevel

21

def writeExternal(out: ObjectOutput): Unit

22

def readExternal(in: ObjectInput): Unit

23

}

24

25

object StorageLevel {

26

val NONE = new StorageLevel(false, false, false, false, 1)

27

val DISK_ONLY = new StorageLevel(true, false, false, false, 1)

28

val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

29

val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)

30

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

31

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)

32

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

33

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)

34

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

35

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)

36

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

37

val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

38

}

39

```

40

41

## RDD Persistence Methods

42

43

Methods available on all RDD types for controlling persistence behavior.

44

45

```scala { .api }

46

// From RDD[T]

47

def persist(): RDD[T]

48

def persist(newLevel: StorageLevel): RDD[T]

49

def cache(): RDD[T]

50

def unpersist(blocking: Boolean = false): RDD[T]

51

def getStorageLevel: StorageLevel

52

def checkpoint(): Unit

53

def isCheckpointed: Boolean

54

def getCheckpointFile: Option[String]

55

```

56

57

## BlockManager

58

59

Internal storage management system (some public interfaces available for advanced usage).

60

61

```scala { .api }

62

class BlockManager(

63

executorId: String,

64

rpcEnv: RpcEnv,

65

master: BlockManagerMaster,

66

serializerManager: SerializerManager,

67

conf: SparkConf,

68

memoryManager: MemoryManager,

69

mapOutputTracker: MapOutputTracker,

70

shuffleManager: ShuffleManager,

71

blockTransferService: BlockTransferService,

72

securityManager: SecurityManager,

73

numUsableCores: Int

74

) {

75

// Public methods for advanced users

76

def putSingle[T: ClassTag](blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean

77

def putIterator[T: ClassTag](blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean

78

def getLocalValues(blockId: BlockId): Option[BlockResult]

79

def getOrElseUpdate[T](blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]]

80

def remove(blockId: BlockId, tellMaster: Boolean = true): Boolean

81

}

82

```

83

84

## Storage Configuration

85

86

Key configuration properties for storage and caching behavior.

87

88

### Memory Management

89

- `spark.storage.memoryFraction` - Fraction of JVM heap space used for storage cache (deprecated in favor of unified memory management)

90

- `spark.storage.safetyFraction` - Safety fraction to prevent storage region from using entire heap

91

- `spark.storage.unrollFraction` - Fraction of storage memory used for unrolling blocks

92

- `spark.storage.replication.proactive` - Enable proactive block replication

93

- `spark.storage.blockManagerTimeoutIntervalMs` - Timeout for block manager operations

94

95

### Disk Storage

96

- `spark.storage.diskStore.subDirectories` - Number of subdirectories for disk storage

97

- `spark.storage.localDirs` - Directories for storing blocks on disk

98

- `spark.storage.localDirs.fallback` - Fallback directory if localDirs not available

99

100

### Off-Heap Storage

101

- `spark.memory.offHeap.enabled` - Enable off-heap storage

102

- `spark.memory.offHeap.size` - Size of off-heap storage region

103

104

## Usage Examples

105

106

### Basic Persistence

107

```scala

108

val expensiveData = sc.textFile("large-dataset.txt")

109

.map(processExpensiveOperation)

110

.filter(complexFilter)

111

112

// Cache for multiple uses

113

expensiveData.cache()

114

115

// Use multiple times (only computed once)

116

val count = expensiveData.count()

117

val sample = expensiveData.take(10)

118

val stats = expensiveData.map(_.length).stats()

119

120

// Clean up when done

121

expensiveData.unpersist()

122

```

123

124

### Storage Level Selection

125

```scala

126

import org.apache.spark.storage.StorageLevel

127

128

val data = sc.parallelize(1 to 1000000)

129

130

// Memory only (fastest access, but limited by memory)

131

data.persist(StorageLevel.MEMORY_ONLY)

132

133

// Memory and disk (spills to disk if memory full)

134

data.persist(StorageLevel.MEMORY_AND_DISK)

135

136

// Serialized storage (more memory efficient, slower access)

137

data.persist(StorageLevel.MEMORY_ONLY_SER)

138

139

// With replication for fault tolerance

140

data.persist(StorageLevel.MEMORY_AND_DISK_2)

141

142

// Off-heap storage (requires off-heap memory configuration)

143

data.persist(StorageLevel.OFF_HEAP)

144

```

145

146

### Checkpointing

147

```scala

148

// Set checkpoint directory (must be fault-tolerant storage like HDFS)

149

sc.setCheckpointDir("hdfs://namenode:port/checkpoint")

150

151

val iterativeData = sc.textFile("input.txt")

152

.map(parseData)

153

.filter(isValid)

154

155

// Checkpoint to break long lineage chains

156

iterativeData.checkpoint()

157

158

// Force materialization to complete checkpoint

159

iterativeData.count()

160

161

// Now safe to use in iterative algorithms

162

var current = iterativeData

163

for (i <- 1 to 10) {

164

current = current.map(iterativeFunction)

165

if (i % 3 == 0) {

166

current.checkpoint()

167

current.count() // Force checkpoint

168

}

169

}

170

```

171

172

### Advanced Persistence Patterns

173

```scala

174

// Pattern 1: Conditional persistence based on data size

175

val rdd = sc.textFile("data.txt").map(transform)

176

val estimatedSize = rdd.take(1000).map(_.length).sum * (rdd.count() / 1000.0)

177

178

val persistedRDD = if (estimatedSize > 1000000) {

179

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

180

} else {

181

rdd.persist(StorageLevel.MEMORY_ONLY)

182

}

183

184

// Pattern 2: Layered persistence for different access patterns

185

val rawData = sc.textFile("data.txt")

186

val cleanedData = rawData.map(clean).persist(StorageLevel.MEMORY_AND_DISK)

187

val aggregatedData = cleanedData.groupByKey().persist(StorageLevel.MEMORY_ONLY)

188

189

// Use cleaned data for multiple transformations

190

val result1 = cleanedData.filter(condition1).collect()

191

val result2 = cleanedData.filter(condition2).collect()

192

193

// Use aggregated data for summary statistics

194

val summary1 = aggregatedData.mapValues(_.size).collect()

195

val summary2 = aggregatedData.mapValues(_.sum).collect()

196

```

197

198

### Monitoring Storage Usage

199

```scala

200

// Get current storage level

201

val level = rdd.getStorageLevel

202

println(s"Storage level: $level")

203

204

// Check if RDD is cached

205

if (level != StorageLevel.NONE) {

206

println("RDD is persisted")

207

}

208

209

// Monitor through Spark UI

210

// Access http://driver:4040 to see Storage tab with:

211

// - RDD storage levels

212

// - Memory usage

213

// - Disk usage

214

// - Partition information

215

```

216

217

### Storage Level Guidelines

218

219

**MEMORY_ONLY**

220

- Use for: Small to medium datasets that fit in memory

221

- Best for: Frequently accessed data with fast transformations

222

- Avoid if: Dataset is larger than available memory

223

224

**MEMORY_AND_DISK**

225

- Use for: Large datasets with frequent access

226

- Best for: Iterative algorithms, multiple actions on same RDD

227

- Trade-off: Some operations may spill to disk

228

229

**MEMORY_ONLY_SER**

230

- Use for: Memory-constrained environments

231

- Best for: Large objects that compress well

232

- Trade-off: CPU overhead for serialization/deserialization

233

234

**DISK_ONLY**

235

- Use for: Very large datasets, limited memory

236

- Best for: One-time processing of massive datasets

237

- Trade-off: Slower access due to disk I/O

238

239

**Replication (\_2 variants)**

240

- Use for: Critical data requiring fault tolerance

241

- Best for: Long-running applications in unreliable environments

242

- Trade-off: Double storage cost, network overhead

243

244

**OFF_HEAP**

245

- Use for: Very large datasets with configured off-heap storage

246

- Best for: Reducing GC pressure in memory-intensive applications

247

- Requirements: Requires spark.memory.offHeap.enabled=true