Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
Spark provides extensive support for reading and writing data from various sources including local filesystems, HDFS, object stores, and databases. This document covers all the data source APIs available in Spark 1.0.0.
textFile: Read text files line by line
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]// Local filesystem
val localFile = sc.textFile("file:///path/to/local/file.txt")
// HDFS
val hdfsFile = sc.textFile("hdfs://namenode:port/path/to/file.txt")
// Multiple files with wildcards
val multipleFiles = sc.textFile("hdfs://path/to/files/*.txt")
// Specify minimum partitions
val partitionedFile = sc.textFile("hdfs://path/to/large/file.txt", 8)
// Compressed files (automatically detected)
val gzipFile = sc.textFile("hdfs://path/to/file.txt.gz")wholeTextFiles: Read entire files as key-value pairs
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]// Read directory of files - returns (filename, file_content) pairs
val filesRDD = sc.wholeTextFiles("hdfs://path/to/directory/")
filesRDD.foreach { case (filename, content) =>
println(s"File: $filename")
println(s"Size: ${content.length} characters")
println(s"Lines: ${content.count(_ == '\n') + 1}")
}
// Process each file separately
val processedFiles = filesRDD.mapValues { content =>
content.split("\n").filter(_.nonEmpty).length
}saveAsTextFile: Save RDD as text files
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unitval data = sc.parallelize(Array("line1", "line2", "line3"))
// Save as text files
data.saveAsTextFile("hdfs://path/to/output")
// Save with compression
import org.apache.hadoop.io.compress.GzipCodec
data.saveAsTextFile("hdfs://path/to/compressed-output", classOf[GzipCodec])
// Other compression codecs
import org.apache.hadoop.io.compress.{BZip2Codec, DefaultCodec, SnappyCodec}
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])Spark's native binary format using Java serialization.
objectFile: Load RDD of objects
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]// Save and load custom objects
case class Person(name: String, age: Int)
val people = sc.parallelize(Array(
Person("Alice", 25),
Person("Bob", 30),
Person("Charlie", 35)
))
// Save as object file
people.saveAsObjectFile("hdfs://path/to/people")
// Load back
val loadedPeople: RDD[Person] = sc.objectFile[Person]("hdfs://path/to/people")saveAsObjectFile: Save RDD as serialized objects
def saveAsObjectFile(path: String): Unitval complexData = sc.parallelize(Array(
Map("name" -> "Alice", "scores" -> List(85, 92, 78)),
Map("name" -> "Bob", "scores" -> List(91, 87, 94))
))
complexData.saveAsObjectFile("hdfs://path/to/complex-data")Spark integrates with Hadoop's input and output formats for reading various file types.
Reading SequenceFiles:
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]import org.apache.hadoop.io.{IntWritable, Text}
val seqFile = sc.sequenceFile[IntWritable, Text](
"hdfs://path/to/sequencefile",
classOf[IntWritable],
classOf[Text]
)
// Convert Writable types to Scala types
val converted = seqFile.map { case (key, value) =>
(key.get(), value.toString)
}Saving SequenceFiles:
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit// For RDDs of types that can be converted to Writable
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana"), (3, "orange")))
// Convert to Writable types
val writablePairs = pairs.map { case (k, v) =>
(new IntWritable(k), new Text(v))
}
writablePairs.saveAsSequenceFile("hdfs://path/to/output")
// With compression
writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))hadoopFile: Read files with custom InputFormat
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]import org.apache.hadoop.mapred.{TextInputFormat, FileInputFormat}
import org.apache.hadoop.io.{LongWritable, Text}
// Read with TextInputFormat (returns line number, line content)
val textWithLineNumbers = sc.hadoopFile[LongWritable, Text](
"hdfs://path/to/file",
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]
)
// Custom InputFormat example
class CustomInputFormat extends InputFormat[Text, Text] {
// Implementation here
}
val customFile = sc.hadoopFile[Text, Text](
"hdfs://path/to/custom/format",
classOf[CustomInputFormat],
classOf[Text],
classOf[Text]
)newAPIHadoopFile: Use new Hadoop API
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration
): RDD[(K, V)]import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
val newAPIFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
"hdfs://path/to/file",
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]
)Access and modify Hadoop configuration:
def hadoopConfiguration: Configurationval hadoopConf = sc.hadoopConfiguration
// Configure S3 access
hadoopConf.set("fs.s3a.access.key", "your-access-key")
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")
// Configure compression
hadoopConf.set("mapreduce.map.output.compress", "true")
hadoopConf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
// Now read from S3
val s3Data = sc.textFile("s3a://bucket-name/path/to/file")saveAsHadoopFile: Save with old Hadoop API
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]
): Unit
// Simplified versions
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unitimport org.apache.hadoop.mapred.{TextOutputFormat, SequenceFileOutputFormat}
import org.apache.hadoop.io.{IntWritable, Text}
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana")))
val writablePairs = pairs.map { case (k, v) => (new IntWritable(k), new Text(v)) }
// Save as text with custom format
writablePairs.saveAsHadoopFile[TextOutputFormat[IntWritable, Text]](
"hdfs://path/to/output",
classOf[IntWritable],
classOf[Text],
classOf[TextOutputFormat[IntWritable, Text]]
)saveAsNewAPIHadoopFile: Save with new Hadoop API
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration = context.hadoopConfiguration
): UnitsaveAsHadoopDataset: Save using JobConf
def saveAsHadoopDataset(conf: JobConf): Unitimport org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[IntWritable, Text]])
jobConf.setOutputKeyClass(classOf[IntWritable])
jobConf.setOutputValueClass(classOf[Text])
jobConf.setOutputPath(new Path("hdfs://path/to/output"))
writablePairs.saveAsHadoopDataset(jobConf)While Spark 1.0.0 doesn't have built-in JDBC DataFrames, you can read from databases using custom input formats:
import java.sql.{Connection, DriverManager, ResultSet}
// Custom function to read from database
def readFromDatabase(url: String, query: String): RDD[String] = {
sc.parallelize(Seq(query)).mapPartitions { queries =>
val connection = DriverManager.getConnection(url)
val statement = connection.createStatement()
queries.flatMap { query =>
val resultSet = statement.executeQuery(query)
val results = scala.collection.mutable.ListBuffer[String]()
while (resultSet.next()) {
// Extract data from ResultSet
results += resultSet.getString(1) // Assuming single column
}
resultSet.close()
statement.close()
connection.close()
results
}
}
}
val dbData = readFromDatabase("jdbc:mysql://localhost:3306/mydb", "SELECT * FROM users")Create custom data sources by implementing InputFormat:
import org.apache.hadoop.mapred.{InputFormat, InputSplit, JobConf, RecordReader, Reporter}
class CustomInputFormat extends InputFormat[LongWritable, Text] {
def getSplits(job: JobConf, numSplits: Int): Array[InputSplit] = {
// Create input splits
Array[InputSplit]()
}
def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, Text] = {
// Return record reader
null
}
}
// Use custom format
val customData = sc.hadoopFile[LongWritable, Text](
"path",
classOf[CustomInputFormat],
classOf[LongWritable],
classOf[Text]
)// Configure S3 access
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", "ACCESS_KEY")
hadoopConf.set("fs.s3a.secret.key", "SECRET_KEY")
// Read from S3
val s3Data = sc.textFile("s3a://my-bucket/path/to/data.txt")
// Write to S3
data.saveAsTextFile("s3a://my-bucket/output/")// Configure Azure access
hadoopConf.set("fs.azure.account.key.mystorageaccount.blob.core.windows.net", "ACCOUNT_KEY")
// Read from Azure
val azureData = sc.textFile("wasb://container@mystorageaccount.blob.core.windows.net/path/to/file")import org.apache.hadoop.io.compress.{
GzipCodec, // .gz files
BZip2Codec, // .bz2 files
SnappyCodec, // .snappy files
LzopCodec, // .lzo files
DefaultCodec // Default compression
}Spark automatically detects compression based on file extension:
// Automatically decompressed
val gzipData = sc.textFile("hdfs://path/to/file.txt.gz")
val bzip2Data = sc.textFile("hdfs://path/to/file.txt.bz2")
val snappyData = sc.textFile("hdfs://path/to/file.txt.snappy")
// Mixed compression in directory
val mixedData = sc.textFile("hdfs://path/to/directory/*") // Handles multiple formatsval data = sc.parallelize(Array("line1", "line2", "line3"))
// Save with different compression
data.saveAsTextFile("output-gzip", classOf[GzipCodec])
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])// Control number of partitions when reading
val data = sc.textFile("large-file.txt", minPartitions = 100)
// Repartition after reading if needed
val repartitioned = data.repartition(50)// For small files, use wholeTextFiles and then repartition
val smallFiles = sc.wholeTextFiles("hdfs://path/to/small-files/")
.values // Extract just the content
.repartition(10) // Reduce number of partitionsval frequentlyUsed = sc.textFile("hdfs://path/to/data")
.filter(_.contains("important"))
.cache() // Cache in memory
// Multiple actions on cached data
val count1 = frequentlyUsed.count()
val count2 = frequentlyUsed.filter(_.length > 100).count()// Validate file existence before reading
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)
val path = new Path("hdfs://path/to/file")
if (fs.exists(path)) {
val data = sc.textFile(path.toString)
// Process data
} else {
println(s"File not found: $path")
}
// Handle malformed data
val safeData = sc.textFile("data.txt").mapPartitions { lines =>
lines.flatMap { line =>
try {
Some(processLine(line))
} catch {
case e: Exception =>
println(s"Error processing line: $line, Error: ${e.getMessage}")
None
}
}
}This comprehensive coverage of data sources provides the foundation for reading and writing data in various formats and storage systems with Apache Spark.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark