or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mddata-io-persistence.mdindex.mdkey-value-operations.mdrdd-operations.mdsparkcontext.md

data-io-persistence.mddocs/

0

# Data I/O and Persistence

1

2

Spark Core provides comprehensive capabilities for reading data from various sources, writing results to different formats, and managing RDD persistence across memory and disk storage systems.

3

4

## Capabilities

5

6

### Text File Operations

7

8

Read and write text-based data formats.

9

10

```scala { .api }

11

// Reading text files

12

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

13

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

14

15

// Writing text files (available on all RDDs)

16

def saveAsTextFile(path: String): Unit

17

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

18

```

19

20

**Usage Examples:**

21

```scala

22

// Read single or multiple text files

23

val lines = sc.textFile("hdfs://data/input.txt")

24

val multipleFiles = sc.textFile("hdfs://data/logs/*.log")

25

26

// Read entire small files as key-value pairs (filename, content)

27

val smallFiles = sc.wholeTextFiles("hdfs://data/documents/")

28

// Result: RDD[(String, String)] where key is filename, value is full file content

29

30

// Write RDD to text files

31

val processed = lines.filter(_.contains("ERROR")).map(_.toUpperCase)

32

processed.saveAsTextFile("hdfs://data/output")

33

34

// Write with compression

35

processed.saveAsTextFile("hdfs://data/output-compressed",

36

classOf[org.apache.hadoop.io.compress.GzipCodec])

37

```

38

39

### Hadoop InputFormat Support

40

41

Read data using Hadoop InputFormat classes for integration with Hadoop ecosystem.

42

43

```scala { .api }

44

// Old Hadoop API (mapred package)

45

def hadoopRDD[K, V](

46

conf: JobConf,

47

inputFormat: Class[_ <: InputFormat[K, V]],

48

keyClass: Class[K],

49

valueClass: Class[V],

50

minPartitions: Int = defaultMinPartitions

51

): RDD[(K, V)]

52

53

def hadoopFile[K, V](

54

path: String,

55

inputFormat: Class[_ <: InputFormat[K, V]],

56

keyClass: Class[K],

57

valueClass: Class[V],

58

minPartitions: Int = defaultMinPartitions

59

): RDD[(K, V)]

60

61

// New Hadoop API (mapreduce package)

62

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](

63

conf: Configuration,

64

fClass: Class[F],

65

kClass: Class[K],

66

vClass: Class[V]

67

): RDD[(K, V)]

68

69

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](

70

path: String,

71

fClass: Class[F],

72

kClass: Class[K],

73

vClass: Class[V],

74

conf: Configuration = hadoopConfiguration

75

): RDD[(K, V)]

76

```

77

78

**Usage Examples:**

79

```scala

80

import org.apache.hadoop.io.{LongWritable, Text}

81

import org.apache.hadoop.mapred.TextInputFormat

82

import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}

83

import org.apache.hadoop.conf.Configuration

84

85

// Using old Hadoop API

86

val hadoopRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](

87

"hdfs://data/input",

88

classOf[TextInputFormat],

89

classOf[LongWritable],

90

classOf[Text]

91

).map { case (offset, text) => text.toString }

92

93

// Using new Hadoop API with configuration

94

val conf = new Configuration()

95

conf.set("mapreduce.input.fileinputformat.split.maxsize", "134217728") // 128MB

96

val newApiRDD = sc.newAPIHadoopFile[LongWritable, Text, NewTextInputFormat](

97

"hdfs://data/input",

98

classOf[NewTextInputFormat],

99

classOf[LongWritable],

100

classOf[Text],

101

conf

102

)

103

```

104

105

### Sequence File Operations

106

107

Read and write Hadoop SequenceFiles for efficient binary data storage.

108

109

```scala { .api }

110

// Reading SequenceFiles

111

def sequenceFile[K, V](path: String,

112

keyClass: Class[K],

113

valueClass: Class[V],

114

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

115

116

// Writing SequenceFiles (available on RDD[(K, V)])

117

def saveAsSequenceFile(path: String): Unit // Available via implicit conversion

118

119

// Hadoop OutputFormat operations

120

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,

121

keyClass: Class[_],

122

valueClass: Class[_],

123

outputFormatClass: Class[F],

124

codec: Class[_ <: CompressionCodec] = null): Unit

125

126

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String,

127

keyClass: Class[_],

128

valueClass: Class[_],

129

outputFormatClass: Class[F],

130

conf: Configuration = new Configuration): Unit

131

```

132

133

**Usage Examples:**

134

```scala

135

import org.apache.hadoop.io.{IntWritable, Text}

136

137

// Read SequenceFile

138

val seqData = sc.sequenceFile[IntWritable, Text]("hdfs://data/sequence")

139

val converted = seqData.map { case (key, value) =>

140

(key.get(), value.toString)

141

}

142

143

// Write as SequenceFile (requires key-value RDD)

144

val keyValueData = sc.parallelize(Seq((1, "first"), (2, "second"), (3, "third")))

145

val writableData = keyValueData.map { case (k, v) =>

146

(new IntWritable(k), new Text(v))

147

}

148

writableData.saveAsSequenceFile("hdfs://data/output-sequence")

149

```

150

151

### Object File Operations

152

153

Serialize and deserialize Scala objects using Java serialization.

154

155

```scala { .api }

156

// Reading object files

157

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

158

159

// Writing object files (available on all RDDs)

160

def saveAsObjectFile(path: String): Unit

161

```

162

163

**Usage Examples:**

164

```scala

165

// Serialize complex objects

166

case class Person(name: String, age: Int, email: String)

167

val people = sc.parallelize(Seq(

168

Person("Alice", 25, "alice@example.com"),

169

Person("Bob", 30, "bob@example.com")

170

))

171

172

// Save as object file

173

people.saveAsObjectFile("hdfs://data/people-objects")

174

175

// Read back as objects

176

val loadedPeople = sc.objectFile[Person]("hdfs://data/people-objects")

177

```

178

179

### RDD Persistence

180

181

Control RDD caching and storage across memory and disk.

182

183

```scala { .api }

184

// Persistence methods

185

def persist(): RDD[T] // Default: MEMORY_ONLY

186

def persist(newLevel: StorageLevel): RDD[T]

187

def cache(): RDD[T] // Alias for persist(MEMORY_ONLY)

188

def unpersist(blocking: Boolean = false): RDD[T]

189

190

// Storage level inquiry

191

def getStorageLevel: StorageLevel

192

def isCheckpointed: Boolean

193

```

194

195

**Storage Levels:**

196

```scala { .api }

197

object StorageLevel {

198

val NONE: StorageLevel

199

val DISK_ONLY: StorageLevel

200

val DISK_ONLY_2: StorageLevel // Replicated

201

val MEMORY_ONLY: StorageLevel

202

val MEMORY_ONLY_2: StorageLevel // Replicated

203

val MEMORY_ONLY_SER: StorageLevel // Serialized

204

val MEMORY_ONLY_SER_2: StorageLevel // Serialized + Replicated

205

val MEMORY_AND_DISK: StorageLevel

206

val MEMORY_AND_DISK_2: StorageLevel // Replicated

207

val MEMORY_AND_DISK_SER: StorageLevel // Serialized

208

val MEMORY_AND_DISK_SER_2: StorageLevel // Serialized + Replicated

209

val OFF_HEAP: StorageLevel

210

}

211

```

212

213

**Usage Examples:**

214

```scala

215

val expensiveRDD = sc.textFile("hdfs://large-dataset")

216

.map(parseComplexRecord)

217

.filter(isValid)

218

219

// Cache in memory for reuse

220

expensiveRDD.cache()

221

222

// Use multiple times - only computed once

223

val count = expensiveRDD.count()

224

val sample = expensiveRDD.take(10)

225

val stats = expensiveRDD.map(_.value).stats()

226

227

// Different storage levels for different use cases

228

val largeRDD = sc.parallelize(1 to 1000000)

229

230

// Memory only with replication for fault tolerance

231

largeRDD.persist(StorageLevel.MEMORY_ONLY_2)

232

233

// Memory and disk for large datasets that don't fit in memory

234

largeRDD.persist(StorageLevel.MEMORY_AND_DISK)

235

236

// Serialized storage to save memory (CPU overhead for serialization)

237

largeRDD.persist(StorageLevel.MEMORY_ONLY_SER)

238

239

// Clean up when done

240

expensiveRDD.unpersist()

241

largeRDD.unpersist()

242

```

243

244

### Checkpointing

245

246

Persist RDD lineage to stable storage for fault tolerance optimization.

247

248

```scala { .api }

249

// Checkpointing operations

250

def checkpoint(): Unit

251

def isCheckpointed: Boolean

252

def getCheckpointFile: Option[String]

253

def localCheckpoint(): RDD[T]

254

255

// SparkContext checkpoint configuration

256

def setCheckpointDir(directory: String): Unit

257

```

258

259

**Usage Examples:**

260

```scala

261

// Set checkpoint directory (usually HDFS)

262

sc.setCheckpointDir("hdfs://checkpoints")

263

264

val iterativeRDD = sc.parallelize(1 to 100)

265

var current = iterativeRDD

266

267

// In iterative algorithms, checkpoint periodically to truncate lineage

268

for (i <- 1 to 10) {

269

current = current.map(iterativeTransformation)

270

271

if (i % 3 == 0) {

272

current.checkpoint() // Save to stable storage

273

current.count() // Trigger checkpoint

274

}

275

}

276

277

// Local checkpointing (faster but less fault-tolerant)

278

val tempRDD = someComplexComputation()

279

tempRDD.localCheckpoint() // Store in executor storage only

280

tempRDD.count() // Trigger checkpoint

281

```

282

283

### Database Connectivity

284

285

Read from JDBC data sources using JdbcRDD.

286

287

```scala { .api }

288

class JdbcRDD[T: ClassTag](

289

sc: SparkContext,

290

getConnection: () => Connection,

291

sql: String,

292

lowerBound: Long,

293

upperBound: Long,

294

numPartitions: Int,

295

mapRow: (ResultSet) => T

296

) extends RDD[T]

297

```

298

299

**Usage Example:**

300

```scala

301

import java.sql.{Connection, DriverManager, ResultSet}

302

303

def createConnection(): Connection = {

304

Class.forName("org.postgresql.Driver")

305

DriverManager.getConnection(

306

"jdbc:postgresql://localhost:5432/mydb",

307

"username",

308

"password"

309

)

310

}

311

312

val jdbcRDD = new JdbcRDD(

313

sc,

314

createConnection,

315

"SELECT id, name, age FROM users WHERE id >= ? AND id <= ?",

316

lowerBound = 1,

317

upperBound = 1000,

318

numPartitions = 4,

319

mapRow = { resultSet =>

320

(resultSet.getInt("id"), resultSet.getString("name"), resultSet.getInt("age"))

321

}

322

)

323

324

val users = jdbcRDD.collect()

325

```

326

327

### Custom InputFormat

328

329

Create RDDs from custom Hadoop InputFormat implementations.

330

331

```scala { .api }

332

// For custom formats implementing InputFormat interface

333

def hadoopRDD[K, V](

334

conf: JobConf,

335

inputFormat: Class[_ <: InputFormat[K, V]],

336

keyClass: Class[K],

337

valueClass: Class[V]

338

): RDD[(K, V)]

339

```

340

341

**Usage Example:**

342

```scala

343

// Custom InputFormat for reading binary files

344

class BinaryFileInputFormat extends FileInputFormat[Text, BytesWritable] {

345

override def createRecordReader(split: InputSplit, context: TaskAttemptContext) = {

346

new BinaryFileRecordReader()

347

}

348

}

349

350

// Use custom format

351

val binaryFiles = sc.newAPIHadoopFile[Text, BytesWritable, BinaryFileInputFormat](

352

"hdfs://binary-data/",

353

classOf[BinaryFileInputFormat],

354

classOf[Text],

355

classOf[BytesWritable]

356

).map { case (filename, bytes) =>

357

(filename.toString, bytes.getBytes)

358

}

359

```

360

361

## Performance Considerations

362

363

### Storage Level Selection

364

365

```scala

366

// Choose appropriate storage level based on usage pattern

367

368

// Single use: No persistence

369

val oneTimeUse = data.filter(condition)

370

// Don't persist - just compute when needed

371

372

// Multiple actions on same RDD: Memory

373

val reusedRDD = data.map(expensiveFunction).cache()

374

375

// Large dataset, multiple uses: Memory + Disk

376

val largeReused = bigData.persist(StorageLevel.MEMORY_AND_DISK)

377

378

// Memory constrained: Serialized

379

val memoryConstrained = data.persist(StorageLevel.MEMORY_ONLY_SER)

380

381

// Critical data: Replicated

382

val critical = data.persist(StorageLevel.MEMORY_AND_DISK_2)

383

```

384

385

### Partitioning for I/O

386

387

```scala

388

// Control parallelism for I/O operations

389

val data = sc.textFile("hdfs://data", minPartitions = 100) // More parallelism

390

391

// Coalesce before writing to reduce output files

392

data.coalesce(10).saveAsTextFile("hdfs://output") // 10 output files instead of 100

393

```

394

395

## Types

396

397

```scala { .api }

398

// Storage configuration

399

class StorageLevel private (

400

private var _useDisk: Boolean,

401

private var _useMemory: Boolean,

402

private var _useOffHeap: Boolean,

403

private var _deserialized: Boolean,

404

private var _replication: Int

405

) extends Externalizable

406

407

// JDBC connectivity

408

class JdbcRDD[T: ClassTag](

409

sc: SparkContext,

410

getConnection: () => Connection,

411

sql: String,

412

lowerBound: Long,

413

upperBound: Long,

414

numPartitions: Int,

415

mapRow: (ResultSet) => T

416

) extends RDD[T]

417

418

// Hadoop integration types

419

import org.apache.hadoop.mapred.{InputFormat, JobConf}

420

import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

421

import org.apache.hadoop.conf.Configuration

422

import org.apache.hadoop.io.compress.CompressionCodec

423

```