or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md

io-operations.mddocs/

0

# Input/Output Operations

1

2

File I/O operations for reading from and writing to various data sources including text files, sequence files, and Hadoop-compatible formats.

3

4

## Capabilities

5

6

### Input Operations (SparkContext)

7

8

Methods for creating RDDs from various data sources.

9

10

```scala { .api }

11

// Text file operations

12

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

13

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

14

15

// Binary file operations

16

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

17

def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]

18

19

// Object file operations

20

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

21

22

// Hadoop InputFormat operations

23

def hadoopFile[K, V](

24

path: String,

25

inputFormatClass: Class[_ <: InputFormat[K, V]],

26

keyClass: Class[K],

27

valueClass: Class[V],

28

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

29

30

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](

31

path: String,

32

fClass: Class[F],

33

kClass: Class[K],

34

vClass: Class[V],

35

conf: Configuration = hadoopConfiguration): RDD[(K, V)]

36

37

// Sequence file operations

38

def sequenceFile[K, V](

39

path: String,

40

keyClass: Class[K],

41

valueClass: Class[V],

42

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

43

44

// Hadoop RDD operations

45

def hadoopRDD[K, V](

46

conf: JobConf,

47

inputFormatClass: Class[_ <: InputFormat[K, V]],

48

keyClass: Class[K],

49

valueClass: Class[V],

50

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

51

52

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](

53

conf: Configuration = hadoopConfiguration,

54

fClass: Class[F],

55

kClass: Class[K],

56

vClass: Class[V]): RDD[(K, V)]

57

```

58

59

### Output Operations (RDD)

60

61

Methods for saving RDDs to various output formats.

62

63

```scala { .api }

64

// Basic output operations

65

def saveAsTextFile(path: String): Unit

66

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

67

def saveAsObjectFile(path: String): Unit

68

69

// Hadoop output operations (available on pair RDDs)

70

def saveAsHadoopFile[F <: OutputFormat[K, V]](

71

path: String,

72

keyClass: Class[K],

73

valueClass: Class[V],

74

outputFormatClass: Class[F],

75

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

76

77

def saveAsHadoopFile[F <: OutputFormat[K, V]](

78

path: String,

79

keyClass: Class[K],

80

valueClass: Class[V],

81

outputFormatClass: Class[F],

82

conf: JobConf,

83

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

84

85

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](

86

path: String,

87

keyClass: Class[K],

88

valueClass: Class[V],

89

outputFormatClass: Class[F],

90

conf: Configuration = self.context.hadoopConfiguration): Unit

91

92

// Sequence file output

93

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

94

```

95

96

**Usage Examples:**

97

98

```scala

99

import org.apache.spark.{SparkContext, SparkConf}

100

import org.apache.hadoop.io.{LongWritable, Text}

101

import org.apache.hadoop.mapred.TextInputFormat

102

import org.apache.hadoop.io.compress.GzipCodec

103

104

val sc = new SparkContext(new SparkConf().setAppName("IO Examples").setMaster("local[*]"))

105

106

// Text file input

107

val textLines = sc.textFile("hdfs://input/data.txt")

108

val multipleFiles = sc.textFile("hdfs://input/*.txt") // Wildcard support

109

val localFile = sc.textFile("file:///local/path/data.txt") // Local filesystem

110

111

// Whole text files (useful for small files)

112

val wholeFiles = sc.wholeTextFiles("hdfs://input/small-files/")

113

// Returns RDD[(filename, content)]

114

115

// Binary files

116

val binaryData = sc.binaryFiles("hdfs://input/images/")

117

// Returns RDD[(filename, PortableDataStream)]

118

119

// Process binary data

120

val imageSizes = binaryData.map { case (filename, stream) =>

121

val bytes = stream.toArray()

122

(filename, bytes.length)

123

}

124

125

// Object files (for Spark-serialized objects)

126

val numbers = sc.parallelize(1 to 1000)

127

numbers.saveAsObjectFile("hdfs://output/numbers")

128

val loadedNumbers = sc.objectFile[Int]("hdfs://output/numbers")

129

130

// Sequence files

131

val keyValueData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))

132

keyValueData.saveAsSequenceFile("hdfs://output/sequence")

133

val loadedSequence = sc.sequenceFile[String, String]("hdfs://output/sequence")

134

135

// Hadoop InputFormat

136

val hadoopData = sc.hadoopFile[LongWritable, Text, TextInputFormat](

137

"hdfs://input/hadoop-format",

138

classOf[LongWritable],

139

classOf[Text],

140

classOf[TextInputFormat]

141

).map { case (key, value) => (key.get(), value.toString) }

142

143

// Text output with compression

144

textLines

145

.filter(_.nonEmpty)

146

.saveAsTextFile("hdfs://output/filtered", classOf[GzipCodec])

147

```

148

149

### Specialized Input Formats

150

151

Built-in input formats for specific data types.

152

153

```scala { .api }

154

/**

155

* Input format for reading whole text files

156

*/

157

class WholeTextFileInputFormat extends FileInputFormat[String, String]

158

159

/**

160

* Input format for reading binary files as PortableDataStream

161

*/

162

class StreamInputFormat extends FileInputFormat[String, PortableDataStream]

163

164

/**

165

* Input format for fixed-length binary records

166

*/

167

class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {

168

def setRecordLength(conf: Configuration, recordLength: Int): Unit

169

}

170

171

// PortableDataStream for binary data handling

172

class PortableDataStream(

173

val isDirectory: Boolean,

174

val path: String,

175

val length: Long,

176

val modificationTime: Long) extends Serializable {

177

178

def open(): DataInputStream

179

def toArray(): Array[Byte]

180

}

181

```

182

183

**Advanced I/O Examples:**

184

185

```scala

186

import org.apache.spark.input.{WholeTextFileInputFormat, FixedLengthBinaryInputFormat}

187

import org.apache.hadoop.conf.Configuration

188

import org.apache.hadoop.io.{BytesWritable, LongWritable}

189

190

// Fixed-length binary records

191

val conf = new Configuration()

192

FixedLengthBinaryInputFormat.setRecordLength(conf, 1024) // 1KB records

193

194

val binaryRecords = sc.newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](

195

"hdfs://input/binary-data",

196

classOf[FixedLengthBinaryInputFormat],

197

classOf[LongWritable],

198

classOf[BytesWritable],

199

conf

200

).map { case (offset, bytes) =>

201

(offset.get(), bytes.getBytes)

202

}

203

204

// Process small text files efficiently

205

val smallTextFiles = sc.newAPIHadoopFile[String, String, WholeTextFileInputFormat](

206

"hdfs://input/logs/",

207

classOf[WholeTextFileInputFormat],

208

classOf[String],

209

classOf[String]

210

).map { case (filename, content) =>

211

val lines = content.split("\n")

212

val errors = lines.count(_.contains("ERROR"))

213

(filename, errors)

214

}

215

```

216

217

### Database I/O

218

219

JDBC support for reading from relational databases.

220

221

```scala { .api }

222

/**

223

* RDD for reading data from JDBC sources

224

*/

225

class JdbcRDD[T: ClassTag](

226

sc: SparkContext,

227

getConnection: () => Connection,

228

sql: String,

229

lowerBound: Long,

230

upperBound: Long,

231

numPartitions: Int,

232

mapRow: ResultSet => T) extends RDD[T]

233

234

// Constructor

235

def JdbcRDD[T: ClassTag](

236

sc: SparkContext,

237

getConnection: () => Connection,

238

sql: String,

239

lowerBound: Long,

240

upperBound: Long,

241

numPartitions: Int)(mapRow: ResultSet => T): JdbcRDD[T]

242

```

243

244

**Database Usage Examples:**

245

246

```scala

247

import org.apache.spark.rdd.JdbcRDD

248

import java.sql.{Connection, DriverManager, ResultSet}

249

250

// Database connection function

251

def createConnection(): Connection = {

252

Class.forName("com.mysql.jdbc.Driver")

253

DriverManager.getConnection(

254

"jdbc:mysql://localhost:3306/mydb",

255

"username",

256

"password"

257

)

258

}

259

260

// Create JDBC RDD

261

val jdbcRDD = new JdbcRDD(

262

sc,

263

createConnection,

264

"SELECT id, name, age FROM users WHERE id >= ? AND id <= ?",

265

lowerBound = 1,

266

upperBound = 1000000,

267

numPartitions = 4,

268

mapRow = { resultSet =>

269

val id = resultSet.getLong("id")

270

val name = resultSet.getString("name")

271

val age = resultSet.getInt("age")

272

(id, name, age)

273

}

274

)

275

276

// Process database data

277

val adultUsers = jdbcRDD.filter(_._3 >= 18)

278

val userCount = adultUsers.count()

279

```

280

281

## Advanced I/O Patterns

282

283

### Multi-Source Data Loading

284

285

```scala

286

// Load and combine data from multiple sources

287

def loadMultiSourceData(sc: SparkContext): RDD[(String, Map[String, Any])] = {

288

// Text logs

289

val logs = sc.textFile("hdfs://logs/*.log")

290

.map(parseLegLine)

291

.map(record => (record.id, Map("type" -> "log", "data" -> record)))

292

293

// CSV files

294

val csvData = sc.textFile("hdfs://csv/*.csv")

295

.map(_.split(","))

296

.filter(_.length >= 3)

297

.map(fields => (fields(0), Map("type" -> "csv", "data" -> fields)))

298

299

// Binary data

300

val binaryData = sc.binaryFiles("hdfs://binary/*")

301

.map { case (filename, stream) =>

302

val id = extractIdFromFilename(filename)

303

val data = processBinaryStream(stream)

304

(id, Map("type" -> "binary", "data" -> data))

305

}

306

307

// Combine all sources

308

logs.union(csvData).union(binaryData)

309

}

310

311

// Usage

312

val combinedData = loadMultiSourceData(sc)

313

val groupedByType = combinedData.groupBy(_._2("type").toString)

314

```

315

316

### Incremental Data Processing

317

318

```scala

319

import java.text.SimpleDateFormat

320

import java.util.Date

321

322

// Process data incrementally based on modification time

323

def processIncrementalData(sc: SparkContext, lastProcessedTime: Long): RDD[ProcessedRecord] = {

324

val files = sc.wholeTextFiles("hdfs://incremental-data/*")

325

326

// Filter files modified after last processed time

327

val newFiles = files.filter { case (filename, content) =>

328

val modTime = getFileModificationTime(filename)

329

modTime > lastProcessedTime

330

}

331

332

// Process new files

333

val processedData = newFiles.flatMap { case (filename, content) =>

334

content.split("\n")

335

.filter(_.nonEmpty)

336

.map(parseRecord)

337

.filter(_.isValid)

338

}

339

340

processedData

341

}

342

343

// Checkpoint processed time

344

def updateProcessedTime(): Long = {

345

val currentTime = System.currentTimeMillis()

346

// Save to persistent storage (HDFS, database, etc.)

347

saveProcessedTime(currentTime)

348

currentTime

349

}

350

```

351

352

### Streaming-Style File Processing

353

354

```scala

355

// Process files as they arrive (simulation of streaming)

356

def processFilesAsStream(sc: SparkContext, inputDir: String, outputDir: String): Unit = {

357

var processedFiles = Set.empty[String]

358

359

while (true) {

360

// List current files

361

val currentFiles = listHDFSFiles(inputDir)

362

val newFiles = currentFiles -- processedFiles

363

364

if (newFiles.nonEmpty) {

365

println(s"Processing ${newFiles.size} new files")

366

367

// Process new files

368

val newData = sc.textFile(newFiles.mkString(","))

369

val processed = newData

370

.filter(_.nonEmpty)

371

.map(processLine)

372

.filter(_.isSuccess)

373

374

// Save results with timestamp

375

val timestamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date())

376

processed.saveAsTextFile(s"$outputDir/batch_$timestamp")

377

378

// Update processed files set

379

processedFiles ++= newFiles

380

}

381

382

// Wait before checking again

383

Thread.sleep(30000) // 30 seconds

384

}

385

}

386

```

387

388

### Data Format Conversion Pipeline

389

390

```scala

391

// Convert between different data formats

392

class DataFormatConverter(sc: SparkContext) {

393

394

def csvToParquet(inputPath: String, outputPath: String): Unit = {

395

val csvData = sc.textFile(inputPath)

396

.map(_.split(","))

397

.filter(_.length >= 3)

398

.map(fields => (fields(0), fields(1), fields(2).toDouble))

399

400

// Would typically use Spark SQL for Parquet, but showing concept

401

csvData.saveAsSequenceFile(outputPath + "_sequence")

402

}

403

404

def textToAvro(inputPath: String, outputPath: String): Unit = {

405

val textData = sc.textFile(inputPath)

406

.map(parseTextRecord)

407

.filter(_.isValid)

408

409

// Convert to Avro format (simplified)

410

val avroData = textData.map(recordToAvroBytes)

411

avroData.saveAsObjectFile(outputPath)

412

}

413

414

def jsonToSequenceFile(inputPath: String, outputPath: String): Unit = {

415

val jsonData = sc.textFile(inputPath)

416

.map(parseJSON)

417

.filter(_.isDefined)

418

.map(_.get)

419

.map(json => (json.getString("id"), json.toString))

420

421

jsonData.saveAsSequenceFile(outputPath)

422

}

423

}

424

425

// Usage

426

val converter = new DataFormatConverter(sc)

427

converter.csvToParquet("hdfs://input/data.csv", "hdfs://output/data.parquet")

428

```

429

430

### Custom OutputFormat Example

431

432

```scala

433

import org.apache.hadoop.mapred.{OutputFormat, RecordWriter, JobConf}

434

import org.apache.hadoop.fs.FileSystem

435

import org.apache.hadoop.util.Progressable

436

437

// Custom output format for special requirements

438

class CustomOutputFormat extends OutputFormat[String, String] {

439

440

override def getRecordWriter(

441

fs: FileSystem,

442

job: JobConf,

443

name: String,

444

progress: Progressable): RecordWriter[String, String] = {

445

446

new CustomRecordWriter(fs, job, name)

447

}

448

449

override def checkOutputSpecs(fs: FileSystem, job: JobConf): Unit = {

450

// Validation logic

451

}

452

}

453

454

class CustomRecordWriter(fs: FileSystem, job: JobConf, name: String)

455

extends RecordWriter[String, String] {

456

457

private val outputStream = fs.create(new Path(name))

458

459

override def write(key: String, value: String): Unit = {

460

val record = s"$key|$value\n"

461

outputStream.writeBytes(record)

462

}

463

464

override def close(reporter: org.apache.hadoop.mapred.Reporter): Unit = {

465

outputStream.close()

466

}

467

}

468

469

// Usage

470

val customData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))

471

customData.saveAsHadoopFile[CustomOutputFormat](

472

"hdfs://output/custom",

473

classOf[String],

474

classOf[String],

475

classOf[CustomOutputFormat]

476

)

477

```

478

479

## Performance Optimization

480

481

### Efficient File Reading

482

483

```scala

484

// Optimize partition count for file reading

485

def optimizeFileReading(sc: SparkContext, path: String): RDD[String] = {

486

// Get file size information

487

val fs = FileSystem.get(sc.hadoopConfiguration)

488

val fileStatus = fs.listStatus(new Path(path))

489

val totalSize = fileStatus.map(_.getLen).sum

490

491

// Calculate optimal partitions (64MB per partition)

492

val blockSize = 64 * 1024 * 1024 // 64MB

493

val optimalPartitions = math.max(1, totalSize / blockSize).toInt

494

495

sc.textFile(path, minPartitions = optimalPartitions)

496

}

497

498

// Coalesce small files

499

def coalescedOutput[T](rdd: RDD[T], outputPath: String, targetPartitions: Int): Unit = {

500

val currentPartitions = rdd.getNumPartitions

501

502

if (currentPartitions > targetPartitions) {

503

rdd.coalesce(targetPartitions).saveAsTextFile(outputPath)

504

} else {

505

rdd.saveAsTextFile(outputPath)

506

}

507

}

508

```

509

510

### Compression Strategy

511

512

```scala

513

import org.apache.hadoop.io.compress.{GzipCodec, SnappyCodec, LzopCodec}

514

515

// Choose compression based on data characteristics

516

def saveWithOptimalCompression[T](rdd: RDD[T], outputPath: String, dataType: String): Unit = {

517

val codec = dataType match {

518

case "logs" => classOf[GzipCodec] // High compression for archival

519

case "intermediate" => classOf[SnappyCodec] // Fast compression for temp data

520

case "streaming" => classOf[LzopCodec] // Splittable compression

521

case _ => classOf[GzipCodec] // Default

522

}

523

524

rdd.saveAsTextFile(outputPath, codec)

525

}

526

527

// Monitor compression ratios

528

def analyzeCompressionRatio(originalPath: String, compressedPath: String): Double = {

529

val fs = FileSystem.get(sc.hadoopConfiguration)

530

531

val originalSize = fs.listStatus(new Path(originalPath))

532

.map(_.getLen).sum

533

val compressedSize = fs.listStatus(new Path(compressedPath))

534

.map(_.getLen).sum

535

536

val ratio = compressedSize.toDouble / originalSize

537

println(s"Compression ratio: ${(1 - ratio) * 100}%")

538

ratio

539

}

540

```

541

542

## Best Practices

543

544

### File Organization

545

546

```scala

547

// Organize output by date partitions

548

def saveWithDatePartitioning[T](rdd: RDD[T], basePath: String): Unit = {

549

val today = new SimpleDateFormat("yyyy/MM/dd").format(new Date())

550

val outputPath = s"$basePath/$today"

551

552

rdd.saveAsTextFile(outputPath)

553

}

554

555

// Partition output by key ranges

556

def savePartitionedByKey(rdd: RDD[(String, String)], outputPath: String): Unit = {

557

rdd.partitionBy(new HashPartitioner(10))

558

.mapPartitionsWithIndex { (index, partition) =>

559

partition.map(record => s"partition_$index: $record")

560

}

561

.saveAsTextFile(outputPath)

562

}

563

```

564

565

### Error Handling in I/O

566

567

```scala

568

// Robust file processing with error handling

569

def robustFileProcessing(sc: SparkContext, inputPath: String): RDD[ProcessedRecord] = {

570

sc.textFile(inputPath)

571

.mapPartitionsWithIndex { (partitionId, lines) =>

572

lines.zipWithIndex.flatMap { case (line, lineNumber) =>

573

try {

574

Some(parseRecord(line))

575

} catch {

576

case e: Exception =>

577

logError(s"Partition $partitionId, Line $lineNumber: ${e.getMessage}")

578

None

579

}

580

}

581

}

582

.filter(_.isDefined)

583

.map(_.get)

584

}

585

586

// Validate output before final save

587

def validateAndSave[T](rdd: RDD[T], outputPath: String, validator: T => Boolean): Unit = {

588

val validatedRDD = rdd.filter(validator)

589

val originalCount = rdd.count()

590

val validCount = validatedRDD.count()

591

592

if (validCount < originalCount * 0.9) { // Less than 90% valid

593

throw new RuntimeException(s"Too many invalid records: $validCount/$originalCount valid")

594

}

595

596

validatedRDD.saveAsTextFile(outputPath)

597

println(s"Saved $validCount valid records out of $originalCount total")

598

}

599

```