0
# Input/Output Operations
1
2
File I/O operations for reading from and writing to various data sources including text files, sequence files, and Hadoop-compatible formats.
3
4
## Capabilities
5
6
### Input Operations (SparkContext)
7
8
Methods for creating RDDs from various data sources.
9
10
```scala { .api }
11
// Text file operations
12
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
13
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
14
15
// Binary file operations
16
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
17
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]
18
19
// Object file operations
20
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
21
22
// Hadoop InputFormat operations
23
def hadoopFile[K, V](
24
path: String,
25
inputFormatClass: Class[_ <: InputFormat[K, V]],
26
keyClass: Class[K],
27
valueClass: Class[V],
28
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
29
30
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
31
path: String,
32
fClass: Class[F],
33
kClass: Class[K],
34
vClass: Class[V],
35
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
36
37
// Sequence file operations
38
def sequenceFile[K, V](
39
path: String,
40
keyClass: Class[K],
41
valueClass: Class[V],
42
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
43
44
// Hadoop RDD operations
45
def hadoopRDD[K, V](
46
conf: JobConf,
47
inputFormatClass: Class[_ <: InputFormat[K, V]],
48
keyClass: Class[K],
49
valueClass: Class[V],
50
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
51
52
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
53
conf: Configuration = hadoopConfiguration,
54
fClass: Class[F],
55
kClass: Class[K],
56
vClass: Class[V]): RDD[(K, V)]
57
```
58
59
### Output Operations (RDD)
60
61
Methods for saving RDDs to various output formats.
62
63
```scala { .api }
64
// Basic output operations
65
def saveAsTextFile(path: String): Unit
66
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
67
def saveAsObjectFile(path: String): Unit
68
69
// Hadoop output operations (available on pair RDDs)
70
def saveAsHadoopFile[F <: OutputFormat[K, V]](
71
path: String,
72
keyClass: Class[K],
73
valueClass: Class[V],
74
outputFormatClass: Class[F],
75
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
76
77
def saveAsHadoopFile[F <: OutputFormat[K, V]](
78
path: String,
79
keyClass: Class[K],
80
valueClass: Class[V],
81
outputFormatClass: Class[F],
82
conf: JobConf,
83
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
84
85
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
86
path: String,
87
keyClass: Class[K],
88
valueClass: Class[V],
89
outputFormatClass: Class[F],
90
conf: Configuration = self.context.hadoopConfiguration): Unit
91
92
// Sequence file output
93
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
94
```
95
96
**Usage Examples:**
97
98
```scala
99
import org.apache.spark.{SparkContext, SparkConf}
100
import org.apache.hadoop.io.{LongWritable, Text}
101
import org.apache.hadoop.mapred.TextInputFormat
102
import org.apache.hadoop.io.compress.GzipCodec
103
104
val sc = new SparkContext(new SparkConf().setAppName("IO Examples").setMaster("local[*]"))
105
106
// Text file input
107
val textLines = sc.textFile("hdfs://input/data.txt")
108
val multipleFiles = sc.textFile("hdfs://input/*.txt") // Wildcard support
109
val localFile = sc.textFile("file:///local/path/data.txt") // Local filesystem
110
111
// Whole text files (useful for small files)
112
val wholeFiles = sc.wholeTextFiles("hdfs://input/small-files/")
113
// Returns RDD[(filename, content)]
114
115
// Binary files
116
val binaryData = sc.binaryFiles("hdfs://input/images/")
117
// Returns RDD[(filename, PortableDataStream)]
118
119
// Process binary data
120
val imageSizes = binaryData.map { case (filename, stream) =>
121
val bytes = stream.toArray()
122
(filename, bytes.length)
123
}
124
125
// Object files (for Spark-serialized objects)
126
val numbers = sc.parallelize(1 to 1000)
127
numbers.saveAsObjectFile("hdfs://output/numbers")
128
val loadedNumbers = sc.objectFile[Int]("hdfs://output/numbers")
129
130
// Sequence files
131
val keyValueData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
132
keyValueData.saveAsSequenceFile("hdfs://output/sequence")
133
val loadedSequence = sc.sequenceFile[String, String]("hdfs://output/sequence")
134
135
// Hadoop InputFormat
136
val hadoopData = sc.hadoopFile[LongWritable, Text, TextInputFormat](
137
"hdfs://input/hadoop-format",
138
classOf[LongWritable],
139
classOf[Text],
140
classOf[TextInputFormat]
141
).map { case (key, value) => (key.get(), value.toString) }
142
143
// Text output with compression
144
textLines
145
.filter(_.nonEmpty)
146
.saveAsTextFile("hdfs://output/filtered", classOf[GzipCodec])
147
```
148
149
### Specialized Input Formats
150
151
Built-in input formats for specific data types.
152
153
```scala { .api }
154
/**
155
* Input format for reading whole text files
156
*/
157
class WholeTextFileInputFormat extends FileInputFormat[String, String]
158
159
/**
160
* Input format for reading binary files as PortableDataStream
161
*/
162
class StreamInputFormat extends FileInputFormat[String, PortableDataStream]
163
164
/**
165
* Input format for fixed-length binary records
166
*/
167
class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {
168
def setRecordLength(conf: Configuration, recordLength: Int): Unit
169
}
170
171
// PortableDataStream for binary data handling
172
class PortableDataStream(
173
val isDirectory: Boolean,
174
val path: String,
175
val length: Long,
176
val modificationTime: Long) extends Serializable {
177
178
def open(): DataInputStream
179
def toArray(): Array[Byte]
180
}
181
```
182
183
**Advanced I/O Examples:**
184
185
```scala
186
import org.apache.spark.input.{WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
187
import org.apache.hadoop.conf.Configuration
188
import org.apache.hadoop.io.{BytesWritable, LongWritable}
189
190
// Fixed-length binary records
191
val conf = new Configuration()
192
FixedLengthBinaryInputFormat.setRecordLength(conf, 1024) // 1KB records
193
194
val binaryRecords = sc.newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
195
"hdfs://input/binary-data",
196
classOf[FixedLengthBinaryInputFormat],
197
classOf[LongWritable],
198
classOf[BytesWritable],
199
conf
200
).map { case (offset, bytes) =>
201
(offset.get(), bytes.getBytes)
202
}
203
204
// Process small text files efficiently
205
val smallTextFiles = sc.newAPIHadoopFile[String, String, WholeTextFileInputFormat](
206
"hdfs://input/logs/",
207
classOf[WholeTextFileInputFormat],
208
classOf[String],
209
classOf[String]
210
).map { case (filename, content) =>
211
val lines = content.split("\n")
212
val errors = lines.count(_.contains("ERROR"))
213
(filename, errors)
214
}
215
```
216
217
### Database I/O
218
219
JDBC support for reading from relational databases.
220
221
```scala { .api }
222
/**
223
* RDD for reading data from JDBC sources
224
*/
225
class JdbcRDD[T: ClassTag](
226
sc: SparkContext,
227
getConnection: () => Connection,
228
sql: String,
229
lowerBound: Long,
230
upperBound: Long,
231
numPartitions: Int,
232
mapRow: ResultSet => T) extends RDD[T]
233
234
// Constructor
235
def JdbcRDD[T: ClassTag](
236
sc: SparkContext,
237
getConnection: () => Connection,
238
sql: String,
239
lowerBound: Long,
240
upperBound: Long,
241
numPartitions: Int)(mapRow: ResultSet => T): JdbcRDD[T]
242
```
243
244
**Database Usage Examples:**
245
246
```scala
247
import org.apache.spark.rdd.JdbcRDD
248
import java.sql.{Connection, DriverManager, ResultSet}
249
250
// Database connection function
251
def createConnection(): Connection = {
252
Class.forName("com.mysql.jdbc.Driver")
253
DriverManager.getConnection(
254
"jdbc:mysql://localhost:3306/mydb",
255
"username",
256
"password"
257
)
258
}
259
260
// Create JDBC RDD
261
val jdbcRDD = new JdbcRDD(
262
sc,
263
createConnection,
264
"SELECT id, name, age FROM users WHERE id >= ? AND id <= ?",
265
lowerBound = 1,
266
upperBound = 1000000,
267
numPartitions = 4,
268
mapRow = { resultSet =>
269
val id = resultSet.getLong("id")
270
val name = resultSet.getString("name")
271
val age = resultSet.getInt("age")
272
(id, name, age)
273
}
274
)
275
276
// Process database data
277
val adultUsers = jdbcRDD.filter(_._3 >= 18)
278
val userCount = adultUsers.count()
279
```
280
281
## Advanced I/O Patterns
282
283
### Multi-Source Data Loading
284
285
```scala
286
// Load and combine data from multiple sources
287
def loadMultiSourceData(sc: SparkContext): RDD[(String, Map[String, Any])] = {
288
// Text logs
289
val logs = sc.textFile("hdfs://logs/*.log")
290
.map(parseLegLine)
291
.map(record => (record.id, Map("type" -> "log", "data" -> record)))
292
293
// CSV files
294
val csvData = sc.textFile("hdfs://csv/*.csv")
295
.map(_.split(","))
296
.filter(_.length >= 3)
297
.map(fields => (fields(0), Map("type" -> "csv", "data" -> fields)))
298
299
// Binary data
300
val binaryData = sc.binaryFiles("hdfs://binary/*")
301
.map { case (filename, stream) =>
302
val id = extractIdFromFilename(filename)
303
val data = processBinaryStream(stream)
304
(id, Map("type" -> "binary", "data" -> data))
305
}
306
307
// Combine all sources
308
logs.union(csvData).union(binaryData)
309
}
310
311
// Usage
312
val combinedData = loadMultiSourceData(sc)
313
val groupedByType = combinedData.groupBy(_._2("type").toString)
314
```
315
316
### Incremental Data Processing
317
318
```scala
319
import java.text.SimpleDateFormat
320
import java.util.Date
321
322
// Process data incrementally based on modification time
323
def processIncrementalData(sc: SparkContext, lastProcessedTime: Long): RDD[ProcessedRecord] = {
324
val files = sc.wholeTextFiles("hdfs://incremental-data/*")
325
326
// Filter files modified after last processed time
327
val newFiles = files.filter { case (filename, content) =>
328
val modTime = getFileModificationTime(filename)
329
modTime > lastProcessedTime
330
}
331
332
// Process new files
333
val processedData = newFiles.flatMap { case (filename, content) =>
334
content.split("\n")
335
.filter(_.nonEmpty)
336
.map(parseRecord)
337
.filter(_.isValid)
338
}
339
340
processedData
341
}
342
343
// Checkpoint processed time
344
def updateProcessedTime(): Long = {
345
val currentTime = System.currentTimeMillis()
346
// Save to persistent storage (HDFS, database, etc.)
347
saveProcessedTime(currentTime)
348
currentTime
349
}
350
```
351
352
### Streaming-Style File Processing
353
354
```scala
355
// Process files as they arrive (simulation of streaming)
356
def processFilesAsStream(sc: SparkContext, inputDir: String, outputDir: String): Unit = {
357
var processedFiles = Set.empty[String]
358
359
while (true) {
360
// List current files
361
val currentFiles = listHDFSFiles(inputDir)
362
val newFiles = currentFiles -- processedFiles
363
364
if (newFiles.nonEmpty) {
365
println(s"Processing ${newFiles.size} new files")
366
367
// Process new files
368
val newData = sc.textFile(newFiles.mkString(","))
369
val processed = newData
370
.filter(_.nonEmpty)
371
.map(processLine)
372
.filter(_.isSuccess)
373
374
// Save results with timestamp
375
val timestamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date())
376
processed.saveAsTextFile(s"$outputDir/batch_$timestamp")
377
378
// Update processed files set
379
processedFiles ++= newFiles
380
}
381
382
// Wait before checking again
383
Thread.sleep(30000) // 30 seconds
384
}
385
}
386
```
387
388
### Data Format Conversion Pipeline
389
390
```scala
391
// Convert between different data formats
392
class DataFormatConverter(sc: SparkContext) {
393
394
def csvToParquet(inputPath: String, outputPath: String): Unit = {
395
val csvData = sc.textFile(inputPath)
396
.map(_.split(","))
397
.filter(_.length >= 3)
398
.map(fields => (fields(0), fields(1), fields(2).toDouble))
399
400
// Would typically use Spark SQL for Parquet, but showing concept
401
csvData.saveAsSequenceFile(outputPath + "_sequence")
402
}
403
404
def textToAvro(inputPath: String, outputPath: String): Unit = {
405
val textData = sc.textFile(inputPath)
406
.map(parseTextRecord)
407
.filter(_.isValid)
408
409
// Convert to Avro format (simplified)
410
val avroData = textData.map(recordToAvroBytes)
411
avroData.saveAsObjectFile(outputPath)
412
}
413
414
def jsonToSequenceFile(inputPath: String, outputPath: String): Unit = {
415
val jsonData = sc.textFile(inputPath)
416
.map(parseJSON)
417
.filter(_.isDefined)
418
.map(_.get)
419
.map(json => (json.getString("id"), json.toString))
420
421
jsonData.saveAsSequenceFile(outputPath)
422
}
423
}
424
425
// Usage
426
val converter = new DataFormatConverter(sc)
427
converter.csvToParquet("hdfs://input/data.csv", "hdfs://output/data.parquet")
428
```
429
430
### Custom OutputFormat Example
431
432
```scala
433
import org.apache.hadoop.mapred.{OutputFormat, RecordWriter, JobConf}
434
import org.apache.hadoop.fs.FileSystem
435
import org.apache.hadoop.util.Progressable
436
437
// Custom output format for special requirements
438
class CustomOutputFormat extends OutputFormat[String, String] {
439
440
override def getRecordWriter(
441
fs: FileSystem,
442
job: JobConf,
443
name: String,
444
progress: Progressable): RecordWriter[String, String] = {
445
446
new CustomRecordWriter(fs, job, name)
447
}
448
449
override def checkOutputSpecs(fs: FileSystem, job: JobConf): Unit = {
450
// Validation logic
451
}
452
}
453
454
class CustomRecordWriter(fs: FileSystem, job: JobConf, name: String)
455
extends RecordWriter[String, String] {
456
457
private val outputStream = fs.create(new Path(name))
458
459
override def write(key: String, value: String): Unit = {
460
val record = s"$key|$value\n"
461
outputStream.writeBytes(record)
462
}
463
464
override def close(reporter: org.apache.hadoop.mapred.Reporter): Unit = {
465
outputStream.close()
466
}
467
}
468
469
// Usage
470
val customData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
471
customData.saveAsHadoopFile[CustomOutputFormat](
472
"hdfs://output/custom",
473
classOf[String],
474
classOf[String],
475
classOf[CustomOutputFormat]
476
)
477
```
478
479
## Performance Optimization
480
481
### Efficient File Reading
482
483
```scala
484
// Optimize partition count for file reading
485
def optimizeFileReading(sc: SparkContext, path: String): RDD[String] = {
486
// Get file size information
487
val fs = FileSystem.get(sc.hadoopConfiguration)
488
val fileStatus = fs.listStatus(new Path(path))
489
val totalSize = fileStatus.map(_.getLen).sum
490
491
// Calculate optimal partitions (64MB per partition)
492
val blockSize = 64 * 1024 * 1024 // 64MB
493
val optimalPartitions = math.max(1, totalSize / blockSize).toInt
494
495
sc.textFile(path, minPartitions = optimalPartitions)
496
}
497
498
// Coalesce small files
499
def coalescedOutput[T](rdd: RDD[T], outputPath: String, targetPartitions: Int): Unit = {
500
val currentPartitions = rdd.getNumPartitions
501
502
if (currentPartitions > targetPartitions) {
503
rdd.coalesce(targetPartitions).saveAsTextFile(outputPath)
504
} else {
505
rdd.saveAsTextFile(outputPath)
506
}
507
}
508
```
509
510
### Compression Strategy
511
512
```scala
513
import org.apache.hadoop.io.compress.{GzipCodec, SnappyCodec, LzopCodec}
514
515
// Choose compression based on data characteristics
516
def saveWithOptimalCompression[T](rdd: RDD[T], outputPath: String, dataType: String): Unit = {
517
val codec = dataType match {
518
case "logs" => classOf[GzipCodec] // High compression for archival
519
case "intermediate" => classOf[SnappyCodec] // Fast compression for temp data
520
case "streaming" => classOf[LzopCodec] // Splittable compression
521
case _ => classOf[GzipCodec] // Default
522
}
523
524
rdd.saveAsTextFile(outputPath, codec)
525
}
526
527
// Monitor compression ratios
528
def analyzeCompressionRatio(originalPath: String, compressedPath: String): Double = {
529
val fs = FileSystem.get(sc.hadoopConfiguration)
530
531
val originalSize = fs.listStatus(new Path(originalPath))
532
.map(_.getLen).sum
533
val compressedSize = fs.listStatus(new Path(compressedPath))
534
.map(_.getLen).sum
535
536
val ratio = compressedSize.toDouble / originalSize
537
println(s"Compression ratio: ${(1 - ratio) * 100}%")
538
ratio
539
}
540
```
541
542
## Best Practices
543
544
### File Organization
545
546
```scala
547
// Organize output by date partitions
548
def saveWithDatePartitioning[T](rdd: RDD[T], basePath: String): Unit = {
549
val today = new SimpleDateFormat("yyyy/MM/dd").format(new Date())
550
val outputPath = s"$basePath/$today"
551
552
rdd.saveAsTextFile(outputPath)
553
}
554
555
// Partition output by key ranges
556
def savePartitionedByKey(rdd: RDD[(String, String)], outputPath: String): Unit = {
557
rdd.partitionBy(new HashPartitioner(10))
558
.mapPartitionsWithIndex { (index, partition) =>
559
partition.map(record => s"partition_$index: $record")
560
}
561
.saveAsTextFile(outputPath)
562
}
563
```
564
565
### Error Handling in I/O
566
567
```scala
568
// Robust file processing with error handling
569
def robustFileProcessing(sc: SparkContext, inputPath: String): RDD[ProcessedRecord] = {
570
sc.textFile(inputPath)
571
.mapPartitionsWithIndex { (partitionId, lines) =>
572
lines.zipWithIndex.flatMap { case (line, lineNumber) =>
573
try {
574
Some(parseRecord(line))
575
} catch {
576
case e: Exception =>
577
logError(s"Partition $partitionId, Line $lineNumber: ${e.getMessage}")
578
None
579
}
580
}
581
}
582
.filter(_.isDefined)
583
.map(_.get)
584
}
585
586
// Validate output before final save
587
def validateAndSave[T](rdd: RDD[T], outputPath: String, validator: T => Boolean): Unit = {
588
val validatedRDD = rdd.filter(validator)
589
val originalCount = rdd.count()
590
val validCount = validatedRDD.count()
591
592
if (validCount < originalCount * 0.9) { // Less than 90% valid
593
throw new RuntimeException(s"Too many invalid records: $validCount/$originalCount valid")
594
}
595
596
validatedRDD.saveAsTextFile(outputPath)
597
println(s"Saved $validCount valid records out of $originalCount total")
598
}
599
```