File I/O operations for reading from and writing to various data sources including text files, sequence files, and Hadoop-compatible formats.
Methods for creating RDDs from various data sources.
// Text file operations
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
// Binary file operations
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]
// Object file operations
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
// Hadoop InputFormat operations
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
// Sequence file operations
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
// Hadoop RDD operations
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)]Methods for saving RDDs to various output formats.
// Basic output operations
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit
// Hadoop output operations (available on pair RDDs)
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[K],
valueClass: Class[V],
outputFormatClass: Class[F],
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[K],
valueClass: Class[V],
outputFormatClass: Class[F],
conf: JobConf,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String,
keyClass: Class[K],
valueClass: Class[V],
outputFormatClass: Class[F],
conf: Configuration = self.context.hadoopConfiguration): Unit
// Sequence file output
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): UnitUsage Examples:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.compress.GzipCodec
val sc = new SparkContext(new SparkConf().setAppName("IO Examples").setMaster("local[*]"))
// Text file input
val textLines = sc.textFile("hdfs://input/data.txt")
val multipleFiles = sc.textFile("hdfs://input/*.txt") // Wildcard support
val localFile = sc.textFile("file:///local/path/data.txt") // Local filesystem
// Whole text files (useful for small files)
val wholeFiles = sc.wholeTextFiles("hdfs://input/small-files/")
// Returns RDD[(filename, content)]
// Binary files
val binaryData = sc.binaryFiles("hdfs://input/images/")
// Returns RDD[(filename, PortableDataStream)]
// Process binary data
val imageSizes = binaryData.map { case (filename, stream) =>
val bytes = stream.toArray()
(filename, bytes.length)
}
// Object files (for Spark-serialized objects)
val numbers = sc.parallelize(1 to 1000)
numbers.saveAsObjectFile("hdfs://output/numbers")
val loadedNumbers = sc.objectFile[Int]("hdfs://output/numbers")
// Sequence files
val keyValueData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
keyValueData.saveAsSequenceFile("hdfs://output/sequence")
val loadedSequence = sc.sequenceFile[String, String]("hdfs://output/sequence")
// Hadoop InputFormat
val hadoopData = sc.hadoopFile[LongWritable, Text, TextInputFormat](
"hdfs://input/hadoop-format",
classOf[LongWritable],
classOf[Text],
classOf[TextInputFormat]
).map { case (key, value) => (key.get(), value.toString) }
// Text output with compression
textLines
.filter(_.nonEmpty)
.saveAsTextFile("hdfs://output/filtered", classOf[GzipCodec])Built-in input formats for specific data types.
/**
* Input format for reading whole text files
*/
class WholeTextFileInputFormat extends FileInputFormat[String, String]
/**
* Input format for reading binary files as PortableDataStream
*/
class StreamInputFormat extends FileInputFormat[String, PortableDataStream]
/**
* Input format for fixed-length binary records
*/
class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {
def setRecordLength(conf: Configuration, recordLength: Int): Unit
}
// PortableDataStream for binary data handling
class PortableDataStream(
val isDirectory: Boolean,
val path: String,
val length: Long,
val modificationTime: Long) extends Serializable {
def open(): DataInputStream
def toArray(): Array[Byte]
}Advanced I/O Examples:
import org.apache.spark.input.{WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{BytesWritable, LongWritable}
// Fixed-length binary records
val conf = new Configuration()
FixedLengthBinaryInputFormat.setRecordLength(conf, 1024) // 1KB records
val binaryRecords = sc.newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
"hdfs://input/binary-data",
classOf[FixedLengthBinaryInputFormat],
classOf[LongWritable],
classOf[BytesWritable],
conf
).map { case (offset, bytes) =>
(offset.get(), bytes.getBytes)
}
// Process small text files efficiently
val smallTextFiles = sc.newAPIHadoopFile[String, String, WholeTextFileInputFormat](
"hdfs://input/logs/",
classOf[WholeTextFileInputFormat],
classOf[String],
classOf[String]
).map { case (filename, content) =>
val lines = content.split("\n")
val errors = lines.count(_.contains("ERROR"))
(filename, errors)
}JDBC support for reading from relational databases.
/**
* RDD for reading data from JDBC sources
*/
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: ResultSet => T) extends RDD[T]
// Constructor
def JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int)(mapRow: ResultSet => T): JdbcRDD[T]Database Usage Examples:
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
// Database connection function
def createConnection(): Connection = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection(
"jdbc:mysql://localhost:3306/mydb",
"username",
"password"
)
}
// Create JDBC RDD
val jdbcRDD = new JdbcRDD(
sc,
createConnection,
"SELECT id, name, age FROM users WHERE id >= ? AND id <= ?",
lowerBound = 1,
upperBound = 1000000,
numPartitions = 4,
mapRow = { resultSet =>
val id = resultSet.getLong("id")
val name = resultSet.getString("name")
val age = resultSet.getInt("age")
(id, name, age)
}
)
// Process database data
val adultUsers = jdbcRDD.filter(_._3 >= 18)
val userCount = adultUsers.count()// Load and combine data from multiple sources
def loadMultiSourceData(sc: SparkContext): RDD[(String, Map[String, Any])] = {
// Text logs
val logs = sc.textFile("hdfs://logs/*.log")
.map(parseLegLine)
.map(record => (record.id, Map("type" -> "log", "data" -> record)))
// CSV files
val csvData = sc.textFile("hdfs://csv/*.csv")
.map(_.split(","))
.filter(_.length >= 3)
.map(fields => (fields(0), Map("type" -> "csv", "data" -> fields)))
// Binary data
val binaryData = sc.binaryFiles("hdfs://binary/*")
.map { case (filename, stream) =>
val id = extractIdFromFilename(filename)
val data = processBinaryStream(stream)
(id, Map("type" -> "binary", "data" -> data))
}
// Combine all sources
logs.union(csvData).union(binaryData)
}
// Usage
val combinedData = loadMultiSourceData(sc)
val groupedByType = combinedData.groupBy(_._2("type").toString)import java.text.SimpleDateFormat
import java.util.Date
// Process data incrementally based on modification time
def processIncrementalData(sc: SparkContext, lastProcessedTime: Long): RDD[ProcessedRecord] = {
val files = sc.wholeTextFiles("hdfs://incremental-data/*")
// Filter files modified after last processed time
val newFiles = files.filter { case (filename, content) =>
val modTime = getFileModificationTime(filename)
modTime > lastProcessedTime
}
// Process new files
val processedData = newFiles.flatMap { case (filename, content) =>
content.split("\n")
.filter(_.nonEmpty)
.map(parseRecord)
.filter(_.isValid)
}
processedData
}
// Checkpoint processed time
def updateProcessedTime(): Long = {
val currentTime = System.currentTimeMillis()
// Save to persistent storage (HDFS, database, etc.)
saveProcessedTime(currentTime)
currentTime
}// Process files as they arrive (simulation of streaming)
def processFilesAsStream(sc: SparkContext, inputDir: String, outputDir: String): Unit = {
var processedFiles = Set.empty[String]
while (true) {
// List current files
val currentFiles = listHDFSFiles(inputDir)
val newFiles = currentFiles -- processedFiles
if (newFiles.nonEmpty) {
println(s"Processing ${newFiles.size} new files")
// Process new files
val newData = sc.textFile(newFiles.mkString(","))
val processed = newData
.filter(_.nonEmpty)
.map(processLine)
.filter(_.isSuccess)
// Save results with timestamp
val timestamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date())
processed.saveAsTextFile(s"$outputDir/batch_$timestamp")
// Update processed files set
processedFiles ++= newFiles
}
// Wait before checking again
Thread.sleep(30000) // 30 seconds
}
}// Convert between different data formats
class DataFormatConverter(sc: SparkContext) {
def csvToParquet(inputPath: String, outputPath: String): Unit = {
val csvData = sc.textFile(inputPath)
.map(_.split(","))
.filter(_.length >= 3)
.map(fields => (fields(0), fields(1), fields(2).toDouble))
// Would typically use Spark SQL for Parquet, but showing concept
csvData.saveAsSequenceFile(outputPath + "_sequence")
}
def textToAvro(inputPath: String, outputPath: String): Unit = {
val textData = sc.textFile(inputPath)
.map(parseTextRecord)
.filter(_.isValid)
// Convert to Avro format (simplified)
val avroData = textData.map(recordToAvroBytes)
avroData.saveAsObjectFile(outputPath)
}
def jsonToSequenceFile(inputPath: String, outputPath: String): Unit = {
val jsonData = sc.textFile(inputPath)
.map(parseJSON)
.filter(_.isDefined)
.map(_.get)
.map(json => (json.getString("id"), json.toString))
jsonData.saveAsSequenceFile(outputPath)
}
}
// Usage
val converter = new DataFormatConverter(sc)
converter.csvToParquet("hdfs://input/data.csv", "hdfs://output/data.parquet")import org.apache.hadoop.mapred.{OutputFormat, RecordWriter, JobConf}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.util.Progressable
// Custom output format for special requirements
class CustomOutputFormat extends OutputFormat[String, String] {
override def getRecordWriter(
fs: FileSystem,
job: JobConf,
name: String,
progress: Progressable): RecordWriter[String, String] = {
new CustomRecordWriter(fs, job, name)
}
override def checkOutputSpecs(fs: FileSystem, job: JobConf): Unit = {
// Validation logic
}
}
class CustomRecordWriter(fs: FileSystem, job: JobConf, name: String)
extends RecordWriter[String, String] {
private val outputStream = fs.create(new Path(name))
override def write(key: String, value: String): Unit = {
val record = s"$key|$value\n"
outputStream.writeBytes(record)
}
override def close(reporter: org.apache.hadoop.mapred.Reporter): Unit = {
outputStream.close()
}
}
// Usage
val customData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
customData.saveAsHadoopFile[CustomOutputFormat](
"hdfs://output/custom",
classOf[String],
classOf[String],
classOf[CustomOutputFormat]
)// Optimize partition count for file reading
def optimizeFileReading(sc: SparkContext, path: String): RDD[String] = {
// Get file size information
val fs = FileSystem.get(sc.hadoopConfiguration)
val fileStatus = fs.listStatus(new Path(path))
val totalSize = fileStatus.map(_.getLen).sum
// Calculate optimal partitions (64MB per partition)
val blockSize = 64 * 1024 * 1024 // 64MB
val optimalPartitions = math.max(1, totalSize / blockSize).toInt
sc.textFile(path, minPartitions = optimalPartitions)
}
// Coalesce small files
def coalescedOutput[T](rdd: RDD[T], outputPath: String, targetPartitions: Int): Unit = {
val currentPartitions = rdd.getNumPartitions
if (currentPartitions > targetPartitions) {
rdd.coalesce(targetPartitions).saveAsTextFile(outputPath)
} else {
rdd.saveAsTextFile(outputPath)
}
}import org.apache.hadoop.io.compress.{GzipCodec, SnappyCodec, LzopCodec}
// Choose compression based on data characteristics
def saveWithOptimalCompression[T](rdd: RDD[T], outputPath: String, dataType: String): Unit = {
val codec = dataType match {
case "logs" => classOf[GzipCodec] // High compression for archival
case "intermediate" => classOf[SnappyCodec] // Fast compression for temp data
case "streaming" => classOf[LzopCodec] // Splittable compression
case _ => classOf[GzipCodec] // Default
}
rdd.saveAsTextFile(outputPath, codec)
}
// Monitor compression ratios
def analyzeCompressionRatio(originalPath: String, compressedPath: String): Double = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val originalSize = fs.listStatus(new Path(originalPath))
.map(_.getLen).sum
val compressedSize = fs.listStatus(new Path(compressedPath))
.map(_.getLen).sum
val ratio = compressedSize.toDouble / originalSize
println(s"Compression ratio: ${(1 - ratio) * 100}%")
ratio
}// Organize output by date partitions
def saveWithDatePartitioning[T](rdd: RDD[T], basePath: String): Unit = {
val today = new SimpleDateFormat("yyyy/MM/dd").format(new Date())
val outputPath = s"$basePath/$today"
rdd.saveAsTextFile(outputPath)
}
// Partition output by key ranges
def savePartitionedByKey(rdd: RDD[(String, String)], outputPath: String): Unit = {
rdd.partitionBy(new HashPartitioner(10))
.mapPartitionsWithIndex { (index, partition) =>
partition.map(record => s"partition_$index: $record")
}
.saveAsTextFile(outputPath)
}// Robust file processing with error handling
def robustFileProcessing(sc: SparkContext, inputPath: String): RDD[ProcessedRecord] = {
sc.textFile(inputPath)
.mapPartitionsWithIndex { (partitionId, lines) =>
lines.zipWithIndex.flatMap { case (line, lineNumber) =>
try {
Some(parseRecord(line))
} catch {
case e: Exception =>
logError(s"Partition $partitionId, Line $lineNumber: ${e.getMessage}")
None
}
}
}
.filter(_.isDefined)
.map(_.get)
}
// Validate output before final save
def validateAndSave[T](rdd: RDD[T], outputPath: String, validator: T => Boolean): Unit = {
val validatedRDD = rdd.filter(validator)
val originalCount = rdd.count()
val validCount = validatedRDD.count()
if (validCount < originalCount * 0.9) { // Less than 90% valid
throw new RuntimeException(s"Too many invalid records: $validCount/$originalCount valid")
}
validatedRDD.saveAsTextFile(outputPath)
println(s"Saved $validCount valid records out of $originalCount total")
}